34-replica-identity-default.td 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # A cursory check for REPLICA IDENTITY DEFAULT. It is not a supported
  11. # configuration but we should not panic.
  12. #
  13. $ postgres-execute connection=postgres://postgres:postgres@postgres
  14. CREATE TABLE replica_identity_default (f1 INTEGER, f2 INTEGER, f3 INTEGER, PRIMARY KEY (f1));
  15. ALTER TABLE replica_identity_default REPLICA IDENTITY DEFAULT;
  16. INSERT INTO replica_identity_default VALUES (1,1,1), (2,2,2), (3,3,3), (4,4,4);
  17. $ schema-registry-wait topic=postgres.public.replica_identity_default
  18. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  19. URL '${testdrive.schema-registry-url}'
  20. );
  21. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  22. > CREATE SOURCE replica_identity_default
  23. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.replica_identity_default');
  24. > CREATE TABLE replica_identity_default_tbl FROM SOURCE replica_identity_default (REFERENCE "postgres.public.replica_identity_default")
  25. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  26. ENVELOPE DEBEZIUM;
  27. > SELECT * FROM replica_identity_default_tbl
  28. 1 1 1
  29. 2 2 2
  30. 3 3 3
  31. 4 4 4
  32. $ postgres-execute connection=postgres://postgres:postgres@postgres
  33. UPDATE replica_identity_default SET f2 = 5 WHERE f1 = 1;
  34. UPDATE replica_identity_default SET f3 = 5 WHERE f1 = 2;
  35. DELETE FROM replica_identity_default WHERE f1 = 3;
  36. UPDATE replica_identity_default SET f1 = 5 WHERE f1 = 4;
  37. # [btv] This succeeds now, due to upsert semantics.
  38. # ! SELECT * FROM replica_identity_default_tbl;
  39. # contains:Invalid data in source, saw retractions
  40. > SELECT * FROM replica_identity_default_tbl
  41. 1 5 1
  42. 2 2 5
  43. 5 4 4