kafka-duplicate-topic.td 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set schema={"type" : "record", "name" : "test", "fields": [ { "name": "f2", "type": "long" } ] }
  11. $ kafka-create-topic topic=topic0
  12. $ kafka-create-topic topic=topic1
  13. $ kafka-ingest format=avro topic=topic0 schema=${schema} repeat=1
  14. {"f2": 1}
  15. $ kafka-ingest format=avro topic=topic1 schema=${schema} repeat=1
  16. {"f2": 7}
  17. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  18. URL '${testdrive.schema-registry-url}'
  19. );
  20. > CREATE CONNECTION kafka_conn
  21. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  22. > CREATE CLUSTER source0_cluster SIZE '${arg.default-storage-size}';
  23. > CREATE SOURCE source0
  24. IN CLUSTER source0_cluster
  25. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic0-${testdrive.seed}')
  26. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  27. ENVELOPE NONE
  28. > CREATE CLUSTER source1_cluster SIZE '${arg.default-storage-size}';
  29. > CREATE SOURCE source1
  30. IN CLUSTER source1_cluster
  31. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}')
  32. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  33. ENVELOPE NONE
  34. > CREATE CLUSTER sink0_cluster SIZE '${arg.default-storage-size}';
  35. > CREATE SINK sink0
  36. IN CLUSTER sink0_cluster
  37. FROM source0
  38. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${testdrive.seed}')
  39. KEY (f2)
  40. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  41. ENVELOPE DEBEZIUM
  42. > CREATE CLUSTER sink1_cluster SIZE '${arg.default-storage-size}';
  43. > CREATE SINK sink1
  44. IN CLUSTER sink1_cluster
  45. FROM source1
  46. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${testdrive.seed}')
  47. KEY (f2)
  48. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  49. ENVELOPE DEBEZIUM
  50. $ kafka-verify-data format=avro sort-messages=true sink=materialize.public.sink1
  51. {"f2": 1} {"before": null, "after": {"row": {"f2": 1}}}
  52. {"f2": 7} {"before": null, "after": {"row": {"f2": 7}}}