avro-cdcv2.td 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-replica-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET enable_envelope_materialize = true
  13. # Test support for Avro sources without using the Confluent Schema Registry.
  14. $ set schema=[
  15. {
  16. "type": "array",
  17. "items": {
  18. "type": "record",
  19. "name": "update",
  20. "namespace": "com.materialize.cdc",
  21. "fields": [
  22. {
  23. "name": "data",
  24. "type": {
  25. "type": "record",
  26. "name": "data",
  27. "fields": [
  28. {
  29. "name": "id",
  30. "type": "long"
  31. },
  32. {
  33. "name": "price",
  34. "type": [
  35. "null",
  36. "int"
  37. ]
  38. }
  39. ]
  40. }
  41. },
  42. {
  43. "name": "time",
  44. "type": "long"
  45. },
  46. {
  47. "name": "diff",
  48. "type": "long"
  49. }
  50. ]
  51. }
  52. },
  53. {
  54. "type": "record",
  55. "name": "progress",
  56. "namespace": "com.materialize.cdc",
  57. "fields": [
  58. {
  59. "name": "lower",
  60. "type": {
  61. "type": "array",
  62. "items": "long"
  63. }
  64. },
  65. {
  66. "name": "upper",
  67. "type": {
  68. "type": "array",
  69. "items": "long"
  70. }
  71. },
  72. {
  73. "name": "counts",
  74. "type": {
  75. "type": "array",
  76. "items": {
  77. "type": "record",
  78. "name": "counts",
  79. "fields": [
  80. {
  81. "name": "time",
  82. "type": "long"
  83. },
  84. {
  85. "name": "count",
  86. "type": "long"
  87. }
  88. ]
  89. }
  90. }
  91. }
  92. ]
  93. }
  94. ]
  95. $ kafka-create-topic topic=data
  96. $ kafka-ingest format=avro topic=data schema=${schema}
  97. {"array":[{"data":{"id":5,"price":{"int":10}},"time":5,"diff":1}]}
  98. {"array":[{"data":{"id":5,"price":{"int":12}},"time":4,"diff":1}]}
  99. {"array":[{"data":{"id":5,"price":{"int":12}},"time":5,"diff":-1}]}
  100. # Create a source using an inline schema.
  101. > CREATE CONNECTION kafka_conn
  102. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  103. > CREATE SOURCE data_schema_inline
  104. IN CLUSTER ${arg.single-replica-cluster}
  105. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  106. FORMAT AVRO USING SCHEMA '${schema}'
  107. ENVELOPE MATERIALIZE
  108. $ kafka-ingest format=avro topic=data schema=${schema}
  109. {"com.materialize.cdc.progress":{"lower":[0],"upper":[3],"counts":[]}}
  110. {"com.materialize.cdc.progress":{"lower":[3],"upper":[10],"counts":[{"time":4,"count":1},{"time":5,"count":2}, {"time": 6, "count": 1}]}}
  111. > SELECT * FROM data_schema_inline
  112. id price
  113. --------
  114. 5 10
  115. $ kafka-ingest format=avro topic=data schema=${schema}
  116. {"array":[{"data":{"id":5,"price":{"int":10}},"time":6,"diff":-1}]}
  117. > SELECT * FROM data_schema_inline
  118. # Inject "junk" with a previous timestamp, which could simulate a materialized
  119. # that restarted and emits previously emitted data at a compacted timestamp
  120. $ kafka-ingest format=avro topic=data schema=${schema}
  121. {"array":[{"data":{"id":5,"price":{"int":10}},"time":5,"diff":1}]}
  122. {"array":[{"data":{"id":5,"price":{"int":12}},"time":4,"diff":1}]}
  123. {"array":[{"data":{"id":5,"price":{"int":12}},"time":5,"diff":-1}]}
  124. $ kafka-ingest format=avro topic=data schema=${schema}
  125. {"com.materialize.cdc.progress":{"lower":[3],"upper":[6],"counts":[{"time":4,"count":1},{"time":5,"count":2}]}}
  126. > SELECT * FROM data_schema_inline
  127. # and now, new data again
  128. $ kafka-ingest format=avro topic=data schema=${schema}
  129. {"array":[{"data":{"id":6,"price":{"int":10}},"time":10,"diff":1}]}
  130. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=5
  131. {"com.materialize.cdc.progress":{"lower":[10],"upper":[15],"counts":[{"time":10,"count":1}]}}
  132. > SELECT * FROM data_schema_inline
  133. id price
  134. --------
  135. 6 10
  136. # Test that tails report progress messages even without new data
  137. # The ouput of SUBSCRIBE is dependent on the replica size
  138. $ skip-if
  139. SELECT '${arg.default-replica-size}' != '4-4';
  140. > BEGIN
  141. > DECLARE c CURSOR FOR SUBSCRIBE data_schema_inline WITH (SNAPSHOT = FALSE, PROGRESS = TRUE);
  142. > FETCH 2 FROM c WITH (timeout = '60s')
  143. 14 true <null> <null> <null>
  144. 15 true <null> <null> <null>
  145. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=6
  146. {"com.materialize.cdc.progress":{"lower":[15],"upper":[20],"counts":[]}}
  147. > FETCH 1 FROM c WITH (timeout = '60s')
  148. 20 true <null> <null> <null>