65-concurrent-delete-multi.td 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # Concurrent deletes over different tables
  11. $ postgres-execute connection=postgres://postgres:postgres@postgres
  12. CREATE TABLE concurrent_delete1 (f1 INTEGER PRIMARY KEY);
  13. ALTER TABLE concurrent_delete1 REPLICA IDENTITY FULL;
  14. CREATE TABLE concurrent_delete2 (f1 INTEGER PRIMARY KEY);
  15. ALTER TABLE concurrent_delete2 REPLICA IDENTITY FULL;
  16. INSERT INTO concurrent_delete1 VALUES (1);
  17. INSERT INTO concurrent_delete1 VALUES (2);
  18. INSERT INTO concurrent_delete1 VALUES (3);
  19. INSERT INTO concurrent_delete1 VALUES (4);
  20. INSERT INTO concurrent_delete1 VALUES (5);
  21. INSERT INTO concurrent_delete2 VALUES (1);
  22. INSERT INTO concurrent_delete2 VALUES (2);
  23. INSERT INTO concurrent_delete2 VALUES (3);
  24. INSERT INTO concurrent_delete2 VALUES (4);
  25. INSERT INTO concurrent_delete2 VALUES (5);
  26. $ schema-registry-wait topic=postgres.public.concurrent_delete1
  27. $ schema-registry-wait topic=postgres.public.concurrent_delete2
  28. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  29. URL '${testdrive.schema-registry-url}'
  30. );
  31. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  32. > CREATE SOURCE concurrent_delete1
  33. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.concurrent_delete1');
  34. > CREATE TABLE concurrent_delete1_tbl FROM SOURCE concurrent_delete1 (REFERENCE "postgres.public.concurrent_delete1")
  35. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  36. ENVELOPE DEBEZIUM;
  37. > CREATE SOURCE concurrent_delete2
  38. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.concurrent_delete2');
  39. > CREATE TABLE concurrent_delete2_tbl FROM SOURCE concurrent_delete2 (REFERENCE "postgres.public.concurrent_delete2")
  40. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  41. ENVELOPE DEBEZIUM;
  42. $ postgres-connect name=conn1 url=postgres://postgres:postgres@postgres
  43. $ postgres-connect name=conn2 url=postgres://postgres:postgres@postgres
  44. $ postgres-execute connection=conn1
  45. BEGIN;
  46. DELETE FROM concurrent_delete1 WHERE f1 = 2;
  47. $ postgres-execute connection=conn2
  48. BEGIN;
  49. DELETE FROM concurrent_delete2 WHERE f1 = 3;
  50. $ postgres-execute connection=conn1
  51. DELETE FROM concurrent_delete2 WHERE f1 = 4;
  52. COMMIT;
  53. $ postgres-execute connection=conn2
  54. DELETE FROM concurrent_delete1 WHERE f1 = 5;
  55. COMMIT;
  56. > SELECT * FROM concurrent_delete1_tbl;
  57. 1
  58. 3
  59. 4
  60. > SELECT * FROM concurrent_delete2_tbl;
  61. 1
  62. 2
  63. 5