avro-cdcv2.td 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-replica-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET enable_envelope_materialize = true
  13. # Test support for Avro sources without using the Confluent Schema Registry.
  14. $ set schema=[
  15. {
  16. "type": "array",
  17. "items": {
  18. "type": "record",
  19. "name": "update",
  20. "namespace": "com.materialize.cdc",
  21. "fields": [
  22. {
  23. "name": "data",
  24. "type": {
  25. "type": "record",
  26. "name": "data",
  27. "fields": [
  28. {
  29. "name": "id",
  30. "type": "long"
  31. },
  32. {
  33. "name": "price",
  34. "type": [
  35. "null",
  36. "int"
  37. ]
  38. }
  39. ]
  40. }
  41. },
  42. {
  43. "name": "time",
  44. "type": "long"
  45. },
  46. {
  47. "name": "diff",
  48. "type": "long"
  49. }
  50. ]
  51. }
  52. },
  53. {
  54. "type": "record",
  55. "name": "progress",
  56. "namespace": "com.materialize.cdc",
  57. "fields": [
  58. {
  59. "name": "lower",
  60. "type": {
  61. "type": "array",
  62. "items": "long"
  63. }
  64. },
  65. {
  66. "name": "upper",
  67. "type": {
  68. "type": "array",
  69. "items": "long"
  70. }
  71. },
  72. {
  73. "name": "counts",
  74. "type": {
  75. "type": "array",
  76. "items": {
  77. "type": "record",
  78. "name": "counts",
  79. "fields": [
  80. {
  81. "name": "time",
  82. "type": "long"
  83. },
  84. {
  85. "name": "count",
  86. "type": "long"
  87. }
  88. ]
  89. }
  90. }
  91. }
  92. ]
  93. }
  94. ]
  95. $ kafka-create-topic topic=data
  96. $ kafka-ingest format=avro topic=data schema=${schema}
  97. {"array":[{"data":{"id":5,"price":{"int":10}},"time":5,"diff":1}]}
  98. {"array":[{"data":{"id":5,"price":{"int":12}},"time":4,"diff":1}]}
  99. {"array":[{"data":{"id":5,"price":{"int":12}},"time":5,"diff":-1}]}
  100. # Create a source using an inline schema.
  101. > CREATE CONNECTION kafka_conn
  102. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  103. > CREATE SOURCE data_schema_inline
  104. IN CLUSTER ${arg.single-replica-cluster}
  105. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  106. > CREATE TABLE data_schema_inline_tbl FROM SOURCE data_schema_inline (REFERENCE "testdrive-data-${testdrive.seed}")
  107. FORMAT AVRO USING SCHEMA '${schema}'
  108. ENVELOPE MATERIALIZE
  109. $ kafka-ingest format=avro topic=data schema=${schema}
  110. {"com.materialize.cdc.progress":{"lower":[0],"upper":[3],"counts":[]}}
  111. {"com.materialize.cdc.progress":{"lower":[3],"upper":[10],"counts":[{"time":4,"count":1},{"time":5,"count":2}, {"time": 6, "count": 1}]}}
  112. > SELECT * FROM data_schema_inline_tbl
  113. id price
  114. --------
  115. 5 10
  116. $ kafka-ingest format=avro topic=data schema=${schema}
  117. {"array":[{"data":{"id":5,"price":{"int":10}},"time":6,"diff":-1}]}
  118. > SELECT * FROM data_schema_inline_tbl
  119. # Inject "junk" with a previous timestamp, which could simulate a materialized
  120. # that restarted and emits previously emitted data at a compacted timestamp
  121. $ kafka-ingest format=avro topic=data schema=${schema}
  122. {"array":[{"data":{"id":5,"price":{"int":10}},"time":5,"diff":1}]}
  123. {"array":[{"data":{"id":5,"price":{"int":12}},"time":4,"diff":1}]}
  124. {"array":[{"data":{"id":5,"price":{"int":12}},"time":5,"diff":-1}]}
  125. $ kafka-ingest format=avro topic=data schema=${schema}
  126. {"com.materialize.cdc.progress":{"lower":[3],"upper":[6],"counts":[{"time":4,"count":1},{"time":5,"count":2}]}}
  127. > SELECT * FROM data_schema_inline_tbl
  128. # and now, new data again
  129. $ kafka-ingest format=avro topic=data schema=${schema}
  130. {"array":[{"data":{"id":6,"price":{"int":10}},"time":10,"diff":1}]}
  131. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=5
  132. {"com.materialize.cdc.progress":{"lower":[10],"upper":[15],"counts":[{"time":10,"count":1}]}}
  133. > SELECT * FROM data_schema_inline_tbl
  134. id price
  135. --------
  136. 6 10
  137. # Test that tails report progress messages even without new data
  138. # The ouput of SUBSCRIBE is dependent on the replica size
  139. $ skip-if
  140. SELECT '${arg.default-replica-size}' != '4-4';
  141. > BEGIN
  142. > DECLARE c CURSOR FOR SUBSCRIBE data_schema_inline_tbl WITH (SNAPSHOT = FALSE, PROGRESS = TRUE);
  143. > FETCH 2 FROM c WITH (timeout = '60s')
  144. 14 true <null> <null> <null>
  145. 15 true <null> <null> <null>
  146. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=6
  147. {"com.materialize.cdc.progress":{"lower":[15],"upper":[20],"counts":[]}}
  148. > FETCH 1 FROM c WITH (timeout = '60s')
  149. 20 true <null> <null> <null>