01-init.td 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # This test initializes a source and pushes some data through kafka into it
  11. #
  12. $ set schema={"type": "record", "name": "schema", "fields": [ {"name": "f1", "type": "string" } ] }
  13. $ kafka-create-topic topic=kafka-multi-broker replication-factor=2
  14. $ kafka-ingest format=avro topic=kafka-multi-broker schema=${schema} timestamp=1
  15. {"f1": "01-01" }
  16. $ kafka-ingest format=avro topic=kafka-multi-broker schema=${schema} timestamp=2
  17. {"f1": "01-02" }
  18. $ kafka-ingest format=avro topic=kafka-multi-broker schema=${schema} timestamp=3
  19. {"f1": "01-03" }
  20. > CREATE CONNECTION kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  21. > CREATE SOURCE kafka_multi_broker
  22. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-multi-broker-${testdrive.seed}')
  23. > CREATE TABLE kafka_multi_broker_tbl FROM SOURCE kafka_multi_broker (REFERENCE "testdrive-kafka-multi-broker-${testdrive.seed}")
  24. FORMAT AVRO USING SCHEMA '${schema}'
  25. ENVELOPE NONE
  26. $ kafka-ingest format=avro topic=kafka-multi-broker schema=${schema} timestamp=4
  27. {"f1": "02-01" }
  28. $ kafka-ingest format=avro topic=kafka-multi-broker schema=${schema} timestamp=5
  29. {"f1": "02-02" }
  30. $ kafka-ingest format=avro topic=kafka-multi-broker schema=${schema} timestamp=6
  31. {"f1": "02-03" }
  32. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  33. URL '${testdrive.schema-registry-url}'
  34. );
  35. > CREATE SINK multi_broker_sink
  36. IN CLUSTER quickstart
  37. FROM kafka_multi_broker_tbl
  38. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-multi-broker-sink-${testdrive.seed}')
  39. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  40. ENVELOPE DEBEZIUM