github-15748.td 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  11. ALTER SYSTEM SET enable_envelope_materialize = true
  12. #
  13. # This test creates a single timestamp with multiple events in it
  14. #
  15. > CREATE CONNECTION kafka_conn
  16. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  17. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  18. URL '${testdrive.schema-registry-url}'
  19. );
  20. $ set schema=[
  21. {
  22. "type": "array",
  23. "items": {
  24. "type": "record",
  25. "name": "update",
  26. "namespace": "com.materialize.cdc",
  27. "fields": [
  28. {
  29. "name": "data",
  30. "type": {
  31. "type": "record",
  32. "name": "data",
  33. "fields": [
  34. {"name": "a", "type": "long"},
  35. {"name": "b", "type": "long"}
  36. ]
  37. }
  38. },
  39. {
  40. "name": "time",
  41. "type": "long"
  42. },
  43. {
  44. "name": "diff",
  45. "type": "long"
  46. }
  47. ]
  48. }
  49. },
  50. {
  51. "type": "record",
  52. "name": "progress",
  53. "namespace": "com.materialize.cdc",
  54. "fields": [
  55. {
  56. "name": "lower",
  57. "type": {
  58. "type": "array",
  59. "items": "long"
  60. }
  61. },
  62. {
  63. "name": "upper",
  64. "type": {
  65. "type": "array",
  66. "items": "long"
  67. }
  68. },
  69. {
  70. "name": "counts",
  71. "type": {
  72. "type": "array",
  73. "items": {
  74. "type": "record",
  75. "name": "counts",
  76. "fields": [
  77. {
  78. "name": "time",
  79. "type": "long"
  80. },
  81. {
  82. "name": "count",
  83. "type": "long"
  84. }
  85. ]
  86. }
  87. }
  88. }
  89. ]
  90. }
  91. ]
  92. #
  93. # Insert some rows and then delete, upsert and insert even more in the same timestamp
  94. #
  95. $ kafka-create-topic topic=topic1
  96. > CREATE SOURCE source1
  97. IN CLUSTER ${arg.single-replica-cluster}
  98. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}')
  99. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-topic1-${testdrive.seed}")
  100. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  101. > CREATE SINK sink1
  102. IN CLUSTER ${arg.single-replica-cluster}
  103. FROM source1_tbl
  104. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}') KEY (a) NOT ENFORCED
  105. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  106. # We start with 10 records
  107. $ kafka-ingest format=avro topic=topic1 schema=${schema}
  108. {"array":[{"data":{"a":10,"b":1},"time":1,"diff":1}]}
  109. {"array":[{"data":{"a":9,"b":1},"time":1,"diff":1}]}
  110. {"array":[{"data":{"a":8,"b":1},"time":1,"diff":1}]}
  111. {"array":[{"data":{"a":7,"b":1},"time":1,"diff":1}]}
  112. {"array":[{"data":{"a":6,"b":1},"time":1,"diff":1}]}
  113. {"array":[{"data":{"a":5,"b":1},"time":1,"diff":1}]}
  114. {"array":[{"data":{"a":4,"b":1},"time":1,"diff":1}]}
  115. {"array":[{"data":{"a":2,"b":1},"time":1,"diff":1}]}
  116. {"array":[{"data":{"a":3,"b":1},"time":1,"diff":1}]}
  117. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  118. # All of the below happens in a single timestamp "time":2
  119. # Delete 2 records
  120. $ kafka-ingest format=avro topic=topic1 schema=${schema}
  121. {"array":[{"data":{"a":7,"b":1},"time":2,"diff":-1}]}
  122. {"array":[{"data":{"a":3,"b":1},"time":2,"diff":-1}]}
  123. # Upsert 2 records
  124. $ kafka-ingest format=avro topic=topic1 schema=${schema}
  125. {"array":[{"data":{"a":8,"b":1},"time":2,"diff":-1}]}
  126. {"array":[{"data":{"a":2,"b":1},"time":2,"diff":-1}]}
  127. {"array":[{"data":{"a":8,"b":8},"time":2,"diff":1}]}
  128. {"array":[{"data":{"a":2,"b":2},"time":2,"diff":1}]}
  129. # Insert 2 records
  130. $ kafka-ingest format=avro topic=topic1 schema=${schema}
  131. {"array":[{"data":{"a":0,"b":0},"time":2,"diff":1}]}
  132. {"array":[{"data":{"a":15,"b":15},"time":2,"diff":1}]}
  133. # Emit the progress
  134. $ kafka-ingest format=avro topic=topic1 schema=${schema}
  135. {"com.materialize.cdc.progress":{"lower":[0],"upper":[10],"counts":[{"time":1,"count":10}, {"time":2,"count":8}]}}
  136. $ kafka-verify-data headers=materialize-timestamp format=avro topic=testdrive-sink1-${testdrive.seed} sort-messages=true
  137. 1 {"a": 1} {"a": 1, "b": 1}
  138. 1 {"a": 10} {"a": 10, "b": 1}
  139. 1 {"a": 2} {"a": 2, "b": 1}
  140. 1 {"a": 3} {"a": 3, "b": 1}
  141. 1 {"a": 4} {"a": 4, "b": 1}
  142. 1 {"a": 5} {"a": 5, "b": 1}
  143. 1 {"a": 6} {"a": 6, "b": 1}
  144. 1 {"a": 7} {"a": 7, "b": 1}
  145. 1 {"a": 8} {"a": 8, "b": 1}
  146. 1 {"a": 9} {"a": 9, "b": 1}
  147. $ kafka-verify-data headers=materialize-timestamp format=avro topic=testdrive-sink1-${testdrive.seed} sort-messages=true
  148. 2 {"a": 0} {"a": 0, "b": 0}
  149. 2 {"a": 15} {"a": 15, "b": 15}
  150. 2 {"a": 2} {"a": 2, "b": 2}
  151. 2 {"a": 3}
  152. 2 {"a": 7}
  153. 2 {"a": 8} {"a": 8, "b": 8}