before-restart.td 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET enable_envelope_materialize = true
  11. $ set schema=[
  12. {
  13. "type": "array",
  14. "items": {
  15. "type": "record",
  16. "name": "update",
  17. "namespace": "com.materialize.cdc",
  18. "fields": [
  19. {
  20. "name": "data",
  21. "type": {
  22. "type": "record",
  23. "name": "data",
  24. "fields": [
  25. {"name": "a", "type": "long"},
  26. {"name": "b", "type": "long"}
  27. ]
  28. }
  29. },
  30. {
  31. "name": "time",
  32. "type": "long"
  33. },
  34. {
  35. "name": "diff",
  36. "type": "long"
  37. }
  38. ]
  39. }
  40. },
  41. {
  42. "type": "record",
  43. "name": "progress",
  44. "namespace": "com.materialize.cdc",
  45. "fields": [
  46. {
  47. "name": "lower",
  48. "type": {
  49. "type": "array",
  50. "items": "long"
  51. }
  52. },
  53. {
  54. "name": "upper",
  55. "type": {
  56. "type": "array",
  57. "items": "long"
  58. }
  59. },
  60. {
  61. "name": "counts",
  62. "type": {
  63. "type": "array",
  64. "items": {
  65. "type": "record",
  66. "name": "counts",
  67. "fields": [
  68. {
  69. "name": "time",
  70. "type": "long"
  71. },
  72. {
  73. "name": "count",
  74. "type": "long"
  75. }
  76. ]
  77. }
  78. }
  79. }
  80. ]
  81. }
  82. ]
  83. $ kafka-create-topic topic=input
  84. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  85. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  86. URL '${testdrive.schema-registry-url}'
  87. );
  88. > CREATE SOURCE input
  89. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-${testdrive.seed}')
  90. > CREATE TABLE input_tbl FROM SOURCE input (REFERENCE "testdrive-input-${testdrive.seed}")
  91. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  92. > CREATE SINK output
  93. IN CLUSTER quickstart
  94. FROM input_tbl
  95. INTO KAFKA CONNECTION kafka_conn (TOPIC 'output-${testdrive.seed}')
  96. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  97. ENVELOPE DEBEZIUM
  98. $ kafka-ingest format=avro topic=input schema=${schema}
  99. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  100. {"array":[{"data":{"a":2,"b":1},"time":1,"diff":1}]}
  101. {"array":[{"data":{"a":3,"b":1},"time":1,"diff":1}]}
  102. {"array":[{"data":{"a":1,"b":2},"time":1,"diff":1}]}
  103. {"array":[{"data":{"a":11,"b":11},"time":2,"diff":1}]}
  104. {"array":[{"data":{"a":22,"b":11},"time":2,"diff":1}]}
  105. {"array":[{"data":{"a":3,"b":4},"time":3,"diff":1}]}
  106. {"array":[{"data":{"a":5,"b":6},"time":3,"diff":1}]}
  107. {"com.materialize.cdc.progress":{"lower":[0],"upper":[4],"counts":[{"time":1,"count":4},{"time":2,"count":2},{"time":3,"count":2}]}}
  108. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.output sort-messages=true
  109. 1 {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  110. 1 {"before": null, "after": {"row": {"a": 1, "b": 2}}}
  111. 1 {"before": null, "after": {"row": {"a": 2, "b": 1}}}
  112. 1 {"before": null, "after": {"row": {"a": 3, "b": 1}}}
  113. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.output sort-messages=true
  114. 2 {"before": null, "after": {"row": {"a": 11, "b": 11}}}
  115. 2 {"before": null, "after": {"row": {"a": 22, "b": 11}}}
  116. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.output sort-messages=true
  117. 3 {"before": null, "after": {"row": {"a": 3, "b": 4}}}
  118. 3 {"before": null, "after": {"row": {"a": 5, "b": 6}}}
  119. # Wait a bit to allow timestamp compaction to happen. We need to ensure that we
  120. # get correct results even with compaction, which re-timestamps earlier data
  121. # at later timestamps upon restarting.
  122. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="5s"
  123. <null>