kafka-debezium-sources-no-before.td 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Verify that we can (and continue to do so) ingest debezium topics that don't
  11. # have a `before` field in their schema.
  12. # must be a subset of the keys in the rows
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "id", "type": "long"}
  18. ]
  19. }
  20. $ set schema={
  21. "type" : "record",
  22. "name" : "envelope",
  23. "fields" : [
  24. { "name": "op", "type": "string" },
  25. {
  26. "name": "after",
  27. "type": [
  28. {
  29. "name": "row",
  30. "type": "record",
  31. "fields": [
  32. {
  33. "name": "id",
  34. "type": "long"
  35. },
  36. {
  37. "name": "creature",
  38. "type": "string"
  39. }]
  40. },
  41. "null"
  42. ]
  43. },
  44. {
  45. "name": "source",
  46. "type": {
  47. "type": "record",
  48. "name": "Source",
  49. "namespace": "io.debezium.connector.mysql",
  50. "fields": [
  51. {
  52. "name": "file",
  53. "type": "string"
  54. },
  55. {
  56. "name": "pos",
  57. "type": "long"
  58. },
  59. {
  60. "name": "row",
  61. "type": "int"
  62. },
  63. {
  64. "name": "snapshot",
  65. "type": [
  66. {
  67. "type": "boolean",
  68. "connect.default": false
  69. },
  70. "null"
  71. ],
  72. "default": false
  73. }
  74. ],
  75. "connect.name": "io.debezium.connector.mysql.Source"
  76. }
  77. }
  78. ]
  79. }
  80. > CREATE CONNECTION kafka_conn
  81. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  82. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  83. URL '${testdrive.schema-registry-url}'
  84. );
  85. $ kafka-create-topic topic=dbz-no-before partitions=1
  86. # Note: we ignore the `op` field, so can be "u" or "c"
  87. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  88. {"id": 1} {"after": {"row": {"id": 1, "creature": "mudskipper"}}, "op": "c", "source": {"file": "binlog1", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  89. {"id": 1} {"after": {"row": {"id": 1, "creature": "salamander"}}, "op": "c", "source": {"file": "binlog2", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  90. {"id": 1} {"after": {"row": {"id": 1, "creature": "lizard"}}, "op": "c", "source": {"file": "binlog3", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  91. > CREATE CLUSTER dbz_no_before_cluster SIZE '${arg.default-storage-size}';
  92. > CREATE SOURCE dbz_no_before
  93. IN CLUSTER dbz_no_before_cluster
  94. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-no-before-${testdrive.seed}')
  95. > CREATE TABLE dbz_no_before_tbl FROM SOURCE dbz_no_before (REFERENCE "testdrive-dbz-no-before-${testdrive.seed}")
  96. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  97. ENVELOPE DEBEZIUM
  98. > SELECT * FROM dbz_no_before_tbl
  99. id creature
  100. -----------
  101. 1 lizard
  102. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=2
  103. {"id": 1} {"after": {"row": {"id": 1, "creature": "dino"}}, "op": "u", "source": {"file": "binlog4", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  104. > SELECT * FROM dbz_no_before_tbl
  105. id creature
  106. -----------
  107. 1 dino
  108. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=3
  109. {"id": 2} {"after": {"row": {"id": 2, "creature": "archeopteryx"}}, "op": "c", "source": {"file": "binlog5", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  110. {"id": 2} {"after": {"row": {"id": 2, "creature": "velociraptor"}}, "op": "u", "source": {"file": "binlog6", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  111. > SELECT * FROM dbz_no_before_tbl ORDER BY creature
  112. id creature
  113. ------------
  114. 1 dino
  115. 2 velociraptor
  116. # test duplicates
  117. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=4
  118. {"id": 3} {"after": {"row": {"id": 3, "creature": "triceratops"}}, "op": "u", "source": {"file": "binlog7", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  119. {"id": 3} {"after": {"row": {"id": 3, "creature": "triceratops"}}, "op": "u", "source": {"file": "binlog8", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  120. > SELECT * FROM dbz_no_before_tbl WHERE id = 3
  121. id creature
  122. -----------
  123. 3 triceratops
  124. # test removal and reinsertion
  125. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=5
  126. {"id": 4} {"after": {"row": {"id": 4, "creature": "moros"}}, "op": "c", "source": {"file": "binlog9", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  127. > SELECT creature FROM dbz_no_before_tbl WHERE id = 4
  128. creature
  129. --------
  130. moros
  131. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=6
  132. {"id": 4} {"after": null, "op": "d", "source": {"file": "binlog10", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  133. > SELECT creature FROM dbz_no_before_tbl WHERE id = 4
  134. creature
  135. --------
  136. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=7
  137. {"id": 4} {"after": {"row": {"id": 4, "creature": "chicken"}}, "op": "u", "source": {"file": "binlog11", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  138. > SELECT creature FROM dbz_no_before_tbl WHERE id = 4
  139. creature
  140. --------
  141. chicken
  142. > SELECT * FROM dbz_no_before_tbl WHERE id = 3
  143. id creature
  144. -----------
  145. 3 triceratops