consolidation.td 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  11. ALTER SYSTEM SET enable_envelope_materialize = true
  12. ALTER SYSTEM SET enable_index_options = true
  13. ALTER SYSTEM SET enable_logical_compaction_window = true
  14. ALTER SYSTEM SET enable_unlimited_retain_history = true
  15. # Test consolidation and compaction behavior.
  16. #
  17. # The various tests in this file use the following Debezium-formatted Kafka
  18. # topics. The first topic, `nums`, is a basic data topic that contains one
  19. # bigint field. The second topic, `tx`, mimics a Debezium transactional
  20. # metadata topic that groups updates from `nums` into transactions.
  21. #
  22. # Using a transactional metadata topic like this allows us to tightly control
  23. # the timestamp at which data is ingested into Materialize. Data from the first
  24. # transaction is assigned timestamp 1, data from the second is assigned
  25. # timestamp 2, and so on.
  26. $ set nums-schema=[
  27. {
  28. "type": "array",
  29. "items": {
  30. "type": "record",
  31. "name": "update",
  32. "namespace": "com.materialize.cdc",
  33. "fields": [
  34. {
  35. "name": "data",
  36. "type": {
  37. "type": "record",
  38. "name": "data",
  39. "fields": [{"name": "num", "type": "long"}]
  40. }
  41. },
  42. {
  43. "name": "time",
  44. "type": "long"
  45. },
  46. {
  47. "name": "diff",
  48. "type": "long"
  49. }
  50. ]
  51. }
  52. },
  53. {
  54. "type": "record",
  55. "name": "progress",
  56. "namespace": "com.materialize.cdc",
  57. "fields": [
  58. {
  59. "name": "lower",
  60. "type": {
  61. "type": "array",
  62. "items": "long"
  63. }
  64. },
  65. {
  66. "name": "upper",
  67. "type": {
  68. "type": "array",
  69. "items": "long"
  70. }
  71. },
  72. {
  73. "name": "counts",
  74. "type": {
  75. "type": "array",
  76. "items": {
  77. "type": "record",
  78. "name": "counts",
  79. "fields": [
  80. {
  81. "name": "time",
  82. "type": "long"
  83. },
  84. {
  85. "name": "count",
  86. "type": "long"
  87. }
  88. ]
  89. }
  90. }
  91. }
  92. ]
  93. }
  94. ]
  95. $ kafka-create-topic topic=nums
  96. > CREATE CONNECTION kafka_conn
  97. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  98. > CREATE SOURCE nums
  99. IN CLUSTER ${arg.single-replica-cluster}
  100. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nums-${testdrive.seed}')
  101. FORMAT AVRO USING SCHEMA '${nums-schema}'
  102. ENVELOPE MATERIALIZE
  103. > CREATE DEFAULT INDEX ON nums
  104. # Disable logical compaction, to ensure we can view historical detail.
  105. > ALTER INDEX materialize.public.nums_primary_idx
  106. SET (RETAIN HISTORY = FOR 0)
  107. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  108. URL '${testdrive.schema-registry-url}'
  109. );
  110. # Create a sink before we ingest any data, to ensure the sink starts AS OF 0
  111. > CREATE SINK nums_sink
  112. IN CLUSTER ${arg.single-replica-cluster}
  113. FROM nums
  114. INTO KAFKA CONNECTION kafka_conn (TOPIC 'nums-sink-${testdrive.seed}')
  115. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  116. ENVELOPE DEBEZIUM
  117. # ==> Test consolidation.
  118. # Ingest several updates that consolidate
  119. $ kafka-ingest format=avro topic=nums schema=${nums-schema}
  120. {"array":[{"data":{"num":3},"time":1,"diff":1}]}
  121. {"array":[{"data":{"num":3},"time":2,"diff":-1}]}
  122. {"array":[{"data":{"num":4},"time":2,"diff":1}]}
  123. {"array":[{"data":{"num":4},"time":3,"diff":-1}]}
  124. {"array":[{"data":{"num":5},"time":3,"diff":1}]}
  125. {"com.materialize.cdc.progress":{"lower":[0],"upper":[4],"counts":[{"time":1,"count":1},{"time":2,"count":2}, {"time": 3, "count": 2}]}}
  126. # Test that by updates that occurred at at distinct times are not consolidated
  127. # we know that transactions (timestamps) are emitted in order, but the order
  128. # of emitted records with the same timestamp is not deterministic. We therefore
  129. # verify each transaction separately and sort within each transaction to get
  130. # deterministic results.
  131. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink
  132. 1 {"before": null, "after": {"row": {"num": 3}}}
  133. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink sort-messages=true
  134. 2 {"before": null, "after": {"row": {"num": 4}}}
  135. 2 {"before": {"row": {"num": 3}}, "after": null}
  136. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink sort-messages=true
  137. 3 {"before": null, "after": {"row": {"num": 5}}}
  138. 3 {"before": {"row": {"num": 4}}, "after": null}
  139. # TODO(benesch): re-enable when we support `CREATE SINK ... AS OF`.
  140. # # Test that a Debezium sink created `AS OF 3` (the latest completed timestamp)
  141. # # is fully consolidated.
  142. # > CREATE SINK nums_sink
  143. # IN CLUSTER ${arg.single-replica-cluster}
  144. # FROM nums
  145. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nums-sink-${testdrive.seed}')
  146. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  147. # AS OF 3
  148. #
  149. # $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.nums_sink
  150. # 6 {"before": null, "after": {"row": {"num": 5}}}
  151. # Validate that `SUBSCRIBE` is similarly consolidated.
  152. # This protects against regression of database-issues#1675.
  153. > BEGIN
  154. > DECLARE cur CURSOR FOR SUBSCRIBE nums AS OF 3
  155. > FETCH ALL cur
  156. mz_timestamp mz_diff num
  157. --------------------------
  158. 3 1 5
  159. > COMMIT
  160. # ==> Test compaction.
  161. # Each transaction that has been updated so far should be separately visible
  162. # (i.e., not compacted away).
  163. > SELECT * FROM nums AS OF 1
  164. 3
  165. > SELECT * FROM nums AS OF 2
  166. 4
  167. > SELECT * FROM nums AS OF 3
  168. 5
  169. # Decrease the compaction window and ingest some new data in transaction 4.
  170. > ALTER INDEX materialize.public.nums_primary_idx
  171. SET (RETAIN HISTORY = FOR '1ms')
  172. $ kafka-ingest format=avro topic=nums schema=${nums-schema}
  173. {"array":[{"data":{"num":5},"time":4,"diff":-1}]}
  174. {"array":[{"data":{"num":6},"time":4,"diff":1}]}
  175. {"com.materialize.cdc.progress":{"lower":[4],"upper":[5],"counts":[{"time":4,"count":2}]}}
  176. # Data from older transactions should be immediately compacted to the timestamp
  177. # of the latest transaction (i.e., 4).
  178. ! SELECT * FROM nums AS OF 2
  179. contains:Timestamp (2) is not valid for all inputs
  180. ! SELECT * FROM nums AS OF 3
  181. contains:Timestamp (3) is not valid for all inputs
  182. > SELECT * FROM nums AS OF 4
  183. 6
  184. # Set the compaction window back to off and advance the number in transactions 5 and 6.
  185. > ALTER INDEX materialize.public.nums_primary_idx
  186. SET (RETAIN HISTORY = FOR 0)
  187. # But also create an index that compacts frequently.
  188. > CREATE VIEW nums_compacted AS SELECT * FROM nums
  189. > CREATE DEFAULT INDEX ON nums_compacted WITH (RETAIN HISTORY = FOR '1ms')
  190. $ kafka-ingest format=avro topic=nums schema=${nums-schema}
  191. {"array":[{"data":{"num":6},"time":5,"diff":-1}]}
  192. {"array":[{"data":{"num":7},"time":5,"diff":1}]}
  193. {"array":[{"data":{"num":7},"time":6,"diff":-1}]}
  194. {"array":[{"data":{"num":8},"time":6,"diff":1}]}
  195. {"com.materialize.cdc.progress":{"lower":[5],"upper":[7],"counts":[{"time":5,"count":2},{"time":6,"count":2}]}}
  196. # Timestamps 4, 5, and 6 should all be available due to the longer compaction
  197. # window.
  198. > SELECT * FROM nums AS OF 4
  199. 6
  200. > SELECT * FROM nums AS OF 5
  201. 7
  202. > SELECT * FROM nums AS OF 6
  203. 8
  204. ! SELECT * FROM nums_compacted AS OF 4
  205. contains:Timestamp (4) is not valid for all inputs
  206. ! SELECT * FROM nums_compacted AS OF 5
  207. contains:Timestamp (5) is not valid for all inputs
  208. > SELECT * FROM nums_compacted AS OF 6
  209. 8