consolidation.td 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  11. ALTER SYSTEM SET enable_envelope_materialize = true
  12. ALTER SYSTEM SET enable_index_options = true
  13. ALTER SYSTEM SET enable_logical_compaction_window = true
  14. ALTER SYSTEM SET enable_unlimited_retain_history = true
  15. # Test consolidation and compaction behavior.
  16. #
  17. # The various tests in this file use the following Debezium-formatted Kafka
  18. # topics. The first topic, `nums`, is a basic data topic that contains one
  19. # bigint field. The second topic, `tx`, mimics a Debezium transactional
  20. # metadata topic that groups updates from `nums` into transactions.
  21. #
  22. # Using a transactional metadata topic like this allows us to tightly control
  23. # the timestamp at which data is ingested into Materialize. Data from the first
  24. # transaction is assigned timestamp 1, data from the second is assigned
  25. # timestamp 2, and so on.
  26. $ set nums-schema=[
  27. {
  28. "type": "array",
  29. "items": {
  30. "type": "record",
  31. "name": "update",
  32. "namespace": "com.materialize.cdc",
  33. "fields": [
  34. {
  35. "name": "data",
  36. "type": {
  37. "type": "record",
  38. "name": "data",
  39. "fields": [{"name": "num", "type": "long"}]
  40. }
  41. },
  42. {
  43. "name": "time",
  44. "type": "long"
  45. },
  46. {
  47. "name": "diff",
  48. "type": "long"
  49. }
  50. ]
  51. }
  52. },
  53. {
  54. "type": "record",
  55. "name": "progress",
  56. "namespace": "com.materialize.cdc",
  57. "fields": [
  58. {
  59. "name": "lower",
  60. "type": {
  61. "type": "array",
  62. "items": "long"
  63. }
  64. },
  65. {
  66. "name": "upper",
  67. "type": {
  68. "type": "array",
  69. "items": "long"
  70. }
  71. },
  72. {
  73. "name": "counts",
  74. "type": {
  75. "type": "array",
  76. "items": {
  77. "type": "record",
  78. "name": "counts",
  79. "fields": [
  80. {
  81. "name": "time",
  82. "type": "long"
  83. },
  84. {
  85. "name": "count",
  86. "type": "long"
  87. }
  88. ]
  89. }
  90. }
  91. }
  92. ]
  93. }
  94. ]
  95. $ kafka-create-topic topic=nums
  96. > CREATE CONNECTION kafka_conn
  97. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  98. > CREATE SOURCE nums
  99. IN CLUSTER ${arg.single-replica-cluster}
  100. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nums-${testdrive.seed}')
  101. > CREATE TABLE nums_tbl FROM SOURCE nums (REFERENCE "testdrive-nums-${testdrive.seed}")
  102. FORMAT AVRO USING SCHEMA '${nums-schema}'
  103. ENVELOPE MATERIALIZE
  104. > CREATE DEFAULT INDEX ON nums_tbl
  105. # Disable logical compaction, to ensure we can view historical detail.
  106. > ALTER INDEX materialize.public.nums_tbl_primary_idx
  107. SET (RETAIN HISTORY = FOR 0)
  108. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  109. URL '${testdrive.schema-registry-url}'
  110. );
  111. # Create a sink before we ingest any data, to ensure the sink starts AS OF 0
  112. > CREATE SINK nums_sink
  113. IN CLUSTER ${arg.single-replica-cluster}
  114. FROM nums_tbl
  115. INTO KAFKA CONNECTION kafka_conn (TOPIC 'nums-sink-${testdrive.seed}')
  116. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  117. ENVELOPE DEBEZIUM
  118. # ==> Test consolidation.
  119. # Ingest several updates that consolidate
  120. $ kafka-ingest format=avro topic=nums schema=${nums-schema}
  121. {"array":[{"data":{"num":3},"time":1,"diff":1}]}
  122. {"array":[{"data":{"num":3},"time":2,"diff":-1}]}
  123. {"array":[{"data":{"num":4},"time":2,"diff":1}]}
  124. {"array":[{"data":{"num":4},"time":3,"diff":-1}]}
  125. {"array":[{"data":{"num":5},"time":3,"diff":1}]}
  126. {"com.materialize.cdc.progress":{"lower":[0],"upper":[4],"counts":[{"time":1,"count":1},{"time":2,"count":2}, {"time": 3, "count": 2}]}}
  127. # Test that by updates that occurred at at distinct times are not consolidated
  128. # we know that transactions (timestamps) are emitted in order, but the order
  129. # of emitted records with the same timestamp is not deterministic. We therefore
  130. # verify each transaction separately and sort within each transaction to get
  131. # deterministic results.
  132. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink
  133. 1 {"before": null, "after": {"row": {"num": 3}}}
  134. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink sort-messages=true
  135. 2 {"before": null, "after": {"row": {"num": 4}}}
  136. 2 {"before": {"row": {"num": 3}}, "after": null}
  137. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink sort-messages=true
  138. 3 {"before": null, "after": {"row": {"num": 5}}}
  139. 3 {"before": {"row": {"num": 4}}, "after": null}
  140. # TODO(benesch): re-enable when we support `CREATE SINK ... AS OF`.
  141. # # Test that a Debezium sink created `AS OF 3` (the latest completed timestamp)
  142. # # is fully consolidated.
  143. # > CREATE SINK nums_sink
  144. # IN CLUSTER ${arg.single-replica-cluster}
  145. # FROM nums_tbl
  146. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nums-sink-${testdrive.seed}')
  147. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  148. # AS OF 3
  149. #
  150. # $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink
  151. # 6 {"before": null, "after": {"row": {"num": 5}}}
  152. # Validate that `SUBSCRIBE` is similarly consolidated.
  153. # This protects against regression of database-issues#1675.
  154. > BEGIN
  155. > DECLARE cur CURSOR FOR SUBSCRIBE nums_tbl AS OF 3
  156. > FETCH ALL cur
  157. mz_timestamp mz_diff num
  158. --------------------------
  159. 3 1 5
  160. > COMMIT
  161. # ==> Test compaction.
  162. # Each transaction that has been updated so far should be separately visible
  163. # (i.e., not compacted away).
  164. > SELECT * FROM nums_tbl AS OF 1
  165. 3
  166. > SELECT * FROM nums_tbl AS OF 2
  167. 4
  168. > SELECT * FROM nums_tbl AS OF 3
  169. 5
  170. # Decrease the compaction window and ingest some new data in transaction 4.
  171. > ALTER INDEX materialize.public.nums_tbl_primary_idx
  172. SET (RETAIN HISTORY = FOR '1ms')
  173. $ kafka-ingest format=avro topic=nums schema=${nums-schema}
  174. {"array":[{"data":{"num":5},"time":4,"diff":-1}]}
  175. {"array":[{"data":{"num":6},"time":4,"diff":1}]}
  176. {"com.materialize.cdc.progress":{"lower":[4],"upper":[5],"counts":[{"time":4,"count":2}]}}
  177. # Data from older transactions should be immediately compacted to the timestamp
  178. # of the latest transaction (i.e., 4).
  179. ! SELECT * FROM nums_tbl AS OF 2
  180. contains:Timestamp (2) is not valid for all inputs
  181. ! SELECT * FROM nums_tbl AS OF 3
  182. contains:Timestamp (3) is not valid for all inputs
  183. > SELECT * FROM nums_tbl AS OF 4
  184. 6
  185. # Set the compaction window back to off and advance the number in transactions 5 and 6.
  186. > ALTER INDEX materialize.public.nums_tbl_primary_idx
  187. SET (RETAIN HISTORY = FOR 0)
  188. # But also create an index that compacts frequently.
  189. > CREATE VIEW nums_compacted AS SELECT * FROM nums_tbl
  190. > CREATE DEFAULT INDEX ON nums_compacted WITH (RETAIN HISTORY = FOR '1ms')
  191. $ kafka-ingest format=avro topic=nums schema=${nums-schema}
  192. {"array":[{"data":{"num":6},"time":5,"diff":-1}]}
  193. {"array":[{"data":{"num":7},"time":5,"diff":1}]}
  194. {"array":[{"data":{"num":7},"time":6,"diff":-1}]}
  195. {"array":[{"data":{"num":8},"time":6,"diff":1}]}
  196. {"com.materialize.cdc.progress":{"lower":[5],"upper":[7],"counts":[{"time":5,"count":2},{"time":6,"count":2}]}}
  197. # Timestamps 4, 5, and 6 should all be available due to the longer compaction
  198. # window.
  199. > SELECT * FROM nums_tbl AS OF 4
  200. 6
  201. > SELECT * FROM nums_tbl AS OF 5
  202. 7
  203. > SELECT * FROM nums_tbl AS OF 6
  204. 8
  205. ! SELECT * FROM nums_compacted AS OF 4
  206. contains:Timestamp (4) is not valid for all inputs
  207. ! SELECT * FROM nums_compacted AS OF 5
  208. contains:Timestamp (5) is not valid for all inputs
  209. > SELECT * FROM nums_compacted AS OF 6
  210. 8