kafka-duplicate-topic.td 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set schema={"type" : "record", "name" : "test", "fields": [ { "name": "f2", "type": "long" } ] }
  11. $ kafka-create-topic topic=topic0
  12. $ kafka-create-topic topic=topic1
  13. $ kafka-ingest format=avro topic=topic0 schema=${schema} repeat=1
  14. {"f2": 1}
  15. $ kafka-ingest format=avro topic=topic1 schema=${schema} repeat=1
  16. {"f2": 7}
  17. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  18. URL '${testdrive.schema-registry-url}'
  19. );
  20. > CREATE CONNECTION kafka_conn
  21. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  22. > CREATE CLUSTER source0_cluster SIZE '${arg.default-storage-size}';
  23. > CREATE SOURCE source0
  24. IN CLUSTER source0_cluster
  25. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic0-${testdrive.seed}')
  26. > CREATE TABLE source0_tbl FROM SOURCE source0 (REFERENCE "testdrive-topic0-${testdrive.seed}")
  27. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  28. ENVELOPE NONE
  29. > CREATE CLUSTER source1_cluster SIZE '${arg.default-storage-size}';
  30. > CREATE SOURCE source1
  31. IN CLUSTER source1_cluster
  32. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}')
  33. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-topic1-${testdrive.seed}")
  34. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  35. ENVELOPE NONE
  36. > CREATE CLUSTER sink0_cluster SIZE '${arg.default-storage-size}';
  37. > CREATE SINK sink0
  38. IN CLUSTER sink0_cluster
  39. FROM source0_tbl
  40. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${testdrive.seed}')
  41. KEY (f2)
  42. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  43. ENVELOPE DEBEZIUM
  44. > CREATE CLUSTER sink1_cluster SIZE '${arg.default-storage-size}';
  45. > CREATE SINK sink1
  46. IN CLUSTER sink1_cluster
  47. FROM source1_tbl
  48. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${testdrive.seed}')
  49. KEY (f2)
  50. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  51. ENVELOPE DEBEZIUM
  52. $ kafka-verify-data format=avro sort-messages=true sink=materialize.public.sink1
  53. {"f2": 1} {"before": null, "after": {"row": {"f2": 1}}}
  54. {"f2": 7} {"before": null, "after": {"row": {"f2": 7}}}