setup.td 1.3 KB

12345678910111213141516171819202122232425262728293031323334
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET kafka_transaction_timeout = '10min';
  11. > CREATE TABLE t (c1 TEXT, c2 INT);
  12. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  13. URL '${testdrive.schema-registry-url}'
  14. );
  15. > INSERT INTO t VALUES ('A', 1);
  16. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  17. > CREATE SINK output
  18. IN CLUSTER quickstart
  19. FROM t
  20. INTO KAFKA CONNECTION kafka_conn (TOPIC 'table-sink-${testdrive.seed}')
  21. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  22. ENVELOPE DEBEZIUM
  23. > INSERT INTO t VALUES ('B', 2);
  24. $ kafka-verify-data format=avro sink=materialize.public.output sort-messages=true
  25. {"before": null, "after": {"row": {"c1": {"string": "A"}, "c2": {"int": 1}}}}
  26. {"before": null, "after": {"row": {"c1": {"string": "B"}, "c2": {"int": 2}}}}