71-update-update.td 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # Multiple updates within the same transaction should be replicated
  11. # correctly.
  12. #
  13. $ postgres-execute connection=postgres://postgres:postgres@postgres
  14. CREATE TABLE update_update (f1 INTEGER, PRIMARY KEY (f1));
  15. ALTER TABLE update_update REPLICA IDENTITY FULL;
  16. INSERT INTO update_update VALUES (1);
  17. INSERT INTO update_update VALUES (10);
  18. $ schema-registry-wait topic=postgres.public.update_update
  19. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  20. URL '${testdrive.schema-registry-url}'
  21. );
  22. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  23. > CREATE SOURCE update_update
  24. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.update_update');
  25. > CREATE TABLE update_update_tbl FROM SOURCE update_update (REFERENCE "postgres.public.update_update")
  26. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  27. ENVELOPE DEBEZIUM;
  28. $ postgres-execute connection=postgres://postgres:postgres@postgres
  29. BEGIN;
  30. UPDATE update_update SET f1 = f1 + 50 WHERE f1 = 1;
  31. UPDATE update_update SET f1 = f1 * 20 WHERE f1 = 10;
  32. UPDATE update_update SET f1 = f1 * 10 WHERE f1 = 51;
  33. UPDATE update_update SET f1 = f1 + 150 WHERE f1 = 200;
  34. COMMIT;
  35. > SELECT * FROM update_update_tbl;
  36. 510
  37. 350