63-concurrent-update-pk.td 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # Concurrent modifications of a PK
  11. #
  12. $ postgres-execute connection=postgres://postgres:postgres@postgres
  13. CREATE TABLE concurrent_update_pk (f1 INTEGER, f2 INTEGER, PRIMARY KEY (f1));
  14. ALTER TABLE concurrent_update_pk REPLICA IDENTITY FULL;
  15. INSERT INTO concurrent_update_pk VALUES (1, 10);
  16. INSERT INTO concurrent_update_pk VALUES (2, 20);
  17. $ schema-registry-wait topic=postgres.public.concurrent_update_pk
  18. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  19. URL '${testdrive.schema-registry-url}'
  20. );
  21. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  22. > CREATE SOURCE concurrent_update_pk
  23. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.concurrent_update_pk');
  24. > CREATE TABLE concurrent_update_pk_tbl FROM SOURCE concurrent_update_pk (REFERENCE "postgres.public.concurrent_update_pk")
  25. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  26. ENVELOPE DEBEZIUM;
  27. $ postgres-connect name=conn1 url=postgres://postgres:postgres@postgres
  28. $ postgres-connect name=conn2 url=postgres://postgres:postgres@postgres
  29. $ postgres-execute connection=conn1
  30. BEGIN;
  31. UPDATE concurrent_update_pk SET f1 = f1 + 10 , f2 = f2 + 10 WHERE f1 = 1;
  32. $ postgres-execute connection=conn2
  33. BEGIN;
  34. UPDATE concurrent_update_pk SET f1 = f1 + 10 , f2 = f2 + 10 WHERE f1 = 2;
  35. $ postgres-execute connection=conn1
  36. INSERT INTO concurrent_update_pk VALUES (4, 40);
  37. COMMIT;
  38. $ postgres-execute connection=conn2
  39. INSERT INTO concurrent_update_pk VALUES (5, 50);
  40. COMMIT;
  41. > SELECT * FROM concurrent_update_pk_tbl;
  42. 4 40
  43. 5 50
  44. 11 20
  45. 12 30