alter-sink.td 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # TODO: Reenable when https://github.com/MaterializeInc/database-issues/issues/8636 is fixed
  10. $ skip-if
  11. SELECT true
  12. $ set-arg-default single-replica-cluster=quickstart
  13. > CREATE CONNECTION kafka_conn
  14. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  15. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  16. URL '${testdrive.schema-registry-url}'
  17. );
  18. > CREATE TABLE pre_alter (pre_name string NOT NULL);
  19. > INSERT INTO pre_alter VALUES ('fish');
  20. > CREATE TABLE post_alter (post_name string, post_value int);
  21. # This value should be ignored by the sink because the alter will happen after
  22. # this record has been inserted and we don't re-emit a snapshot of the new
  23. # collection when it changes.
  24. > INSERT INTO post_alter VALUES ('ignored', 0);
  25. ! CREATE SINK sink
  26. IN CLUSTER ${arg.single-replica-cluster}
  27. FROM mz_tables
  28. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-alter-sink-${testdrive.seed}')
  29. FORMAT JSON
  30. ENVELOPE DEBEZIUM;
  31. contains: creating a sink directly on a catalog object not yet supported
  32. > CREATE SINK sink
  33. IN CLUSTER ${arg.single-replica-cluster}
  34. FROM pre_alter
  35. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-alter-sink-${testdrive.seed}')
  36. FORMAT JSON
  37. ENVELOPE DEBEZIUM;
  38. ! ALTER SINK sink SET FROM mz_tables;
  39. contains: creating a sink directly on a catalog object not yet supported
  40. $ kafka-verify-data format=json sink=materialize.public.sink key=false
  41. {"before": null, "after": {"pre_name": "fish"}}
  42. > ALTER SINK sink SET FROM post_alter;
  43. # The sink will start sinking updates from `post_alter` at the timestamp that
  44. # the previous dataflow happens to stop. This happens pretty quickly but we
  45. # wait a few seconds more for good measure to avoid flaking.
  46. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=4s
  47. > INSERT INTO post_alter VALUES ('chips', 42);
  48. $ kafka-verify-data format=json sink=materialize.public.sink key=false
  49. {"before": null, "after": {"post_name": "chips", "post_value": 42}}
  50. # Test that backward incompatible schema changes lead to an error
  51. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  52. URL '${testdrive.schema-registry-url}'
  53. );
  54. > CREATE TABLE post_alter_incompatible (post_value int NOT NULL);
  55. > CREATE SINK incompatible_sink
  56. IN CLUSTER ${arg.single-replica-cluster}
  57. FROM pre_alter
  58. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-alter-sink-incompatible-${testdrive.seed}')
  59. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  60. ENVELOPE DEBEZIUM
  61. $ kafka-verify-data format=avro sink=materialize.public.incompatible_sink sort-messages=true
  62. {"before": null, "after": {"row": {"pre_name": "fish"}}}
  63. > ALTER SINK incompatible_sink SET FROM post_alter_incompatible;
  64. > SELECT st.error LIKE '%schema being registered is incompatible with an earlier schema%'
  65. FROM mz_sinks s JOIN mz_internal.mz_sink_statuses st ON s.id = st.id
  66. WHERE s.name = 'incompatible_sink';
  67. true
  68. # Create a cluster with no replicas so sources can't make progress. This will ensure `ALTER SINK` hangs forever until we cancel it.
  69. > CREATE CLUSTER no_replicas REPLICAS ()
  70. > CREATE SOURCE counter
  71. IN CLUSTER no_replicas
  72. FROM LOAD GENERATOR COUNTER (UP TO 100);
  73. > CREATE SINK wedged_sink
  74. IN CLUSTER ${arg.single-replica-cluster}
  75. FROM counter
  76. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-alter-sink-${testdrive.seed}')
  77. FORMAT JSON
  78. ENVELOPE DEBEZIUM;
  79. $ set-from-sql var=backend-pid
  80. SELECT CAST(pg_backend_pid() AS text);
  81. $ postgres-execute background=true connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
  82. SELECT mz_unsafe.mz_sleep(3);
  83. SELECT pg_cancel_backend(CAST(${backend-pid} AS int4));
  84. ! ALTER SINK wedged_sink SET FROM post_alter;
  85. contains:canceling statement due to user request
  86. # There is a meaningful difference in having an object created after the sink
  87. # already exists, see incident-131:
  88. > CREATE TABLE created_post_alter (created_post_name string, created_post_value int);
  89. # This value should be ignored by the sink because the alter will happen after
  90. # this record has been inserted and we don't re-emit a snapshot of the new
  91. # collection when it changes.
  92. > INSERT INTO created_post_alter VALUES ('ignored', 0);
  93. > INSERT INTO created_post_alter VALUES ('ignored2', 1);
  94. > ALTER SINK sink SET FROM created_post_alter;
  95. # The sink will start sinking updates from `post_alter` at the timestamp that
  96. # the previous dataflow happens to stop. This happens pretty quickly but we
  97. # wait a few seconds more for good measure to avoid flaking.
  98. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s
  99. > INSERT INTO created_post_alter VALUES ('hundred', 99);
  100. $ kafka-verify-data format=json sink=materialize.public.sink key=false
  101. {"before": null, "after": {"created_post_name": "hundred", "created_post_value": 99}}