kafka-debezium-sources-no-before.td 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Verify that we can (and continue to do so) ingest debezium topics that don't
  11. # have a `before` field in their schema.
  12. # must be a subset of the keys in the rows
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "id", "type": "long"}
  18. ]
  19. }
  20. $ set schema={
  21. "type" : "record",
  22. "name" : "envelope",
  23. "fields" : [
  24. { "name": "op", "type": "string" },
  25. {
  26. "name": "after",
  27. "type": [
  28. {
  29. "name": "row",
  30. "type": "record",
  31. "fields": [
  32. {
  33. "name": "id",
  34. "type": "long"
  35. },
  36. {
  37. "name": "creature",
  38. "type": "string"
  39. }]
  40. },
  41. "null"
  42. ]
  43. },
  44. {
  45. "name": "source",
  46. "type": {
  47. "type": "record",
  48. "name": "Source",
  49. "namespace": "io.debezium.connector.mysql",
  50. "fields": [
  51. {
  52. "name": "file",
  53. "type": "string"
  54. },
  55. {
  56. "name": "pos",
  57. "type": "long"
  58. },
  59. {
  60. "name": "row",
  61. "type": "int"
  62. },
  63. {
  64. "name": "snapshot",
  65. "type": [
  66. {
  67. "type": "boolean",
  68. "connect.default": false
  69. },
  70. "null"
  71. ],
  72. "default": false
  73. }
  74. ],
  75. "connect.name": "io.debezium.connector.mysql.Source"
  76. }
  77. }
  78. ]
  79. }
  80. > CREATE CONNECTION kafka_conn
  81. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  82. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  83. URL '${testdrive.schema-registry-url}'
  84. );
  85. $ kafka-create-topic topic=dbz-no-before partitions=1
  86. # Note: we ignore the `op` field, so can be "u" or "c"
  87. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  88. {"id": 1} {"after": {"row": {"id": 1, "creature": "mudskipper"}}, "op": "c", "source": {"file": "binlog1", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  89. {"id": 1} {"after": {"row": {"id": 1, "creature": "salamander"}}, "op": "c", "source": {"file": "binlog2", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  90. {"id": 1} {"after": {"row": {"id": 1, "creature": "lizard"}}, "op": "c", "source": {"file": "binlog3", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  91. > CREATE CLUSTER dbz_no_before_cluster SIZE '${arg.default-storage-size}';
  92. > CREATE SOURCE dbz_no_before
  93. IN CLUSTER dbz_no_before_cluster
  94. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-no-before-${testdrive.seed}')
  95. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  96. ENVELOPE DEBEZIUM
  97. > SELECT * FROM dbz_no_before
  98. id creature
  99. -----------
  100. 1 lizard
  101. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=2
  102. {"id": 1} {"after": {"row": {"id": 1, "creature": "dino"}}, "op": "u", "source": {"file": "binlog4", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  103. > SELECT * FROM dbz_no_before
  104. id creature
  105. -----------
  106. 1 dino
  107. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=3
  108. {"id": 2} {"after": {"row": {"id": 2, "creature": "archeopteryx"}}, "op": "c", "source": {"file": "binlog5", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  109. {"id": 2} {"after": {"row": {"id": 2, "creature": "velociraptor"}}, "op": "u", "source": {"file": "binlog6", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  110. > SELECT * FROM dbz_no_before ORDER BY creature
  111. id creature
  112. ------------
  113. 1 dino
  114. 2 velociraptor
  115. # test duplicates
  116. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=4
  117. {"id": 3} {"after": {"row": {"id": 3, "creature": "triceratops"}}, "op": "u", "source": {"file": "binlog7", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  118. {"id": 3} {"after": {"row": {"id": 3, "creature": "triceratops"}}, "op": "u", "source": {"file": "binlog8", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  119. > SELECT * FROM dbz_no_before WHERE id = 3
  120. id creature
  121. -----------
  122. 3 triceratops
  123. # test removal and reinsertion
  124. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=5
  125. {"id": 4} {"after": {"row": {"id": 4, "creature": "moros"}}, "op": "c", "source": {"file": "binlog9", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  126. > SELECT creature FROM dbz_no_before WHERE id = 4
  127. creature
  128. --------
  129. moros
  130. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=6
  131. {"id": 4} {"after": null, "op": "d", "source": {"file": "binlog10", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  132. > SELECT creature FROM dbz_no_before WHERE id = 4
  133. creature
  134. --------
  135. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=7
  136. {"id": 4} {"after": {"row": {"id": 4, "creature": "chicken"}}, "op": "u", "source": {"file": "binlog11", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  137. > SELECT creature FROM dbz_no_before WHERE id = 4
  138. creature
  139. --------
  140. chicken
  141. > SELECT * FROM dbz_no_before WHERE id = 3
  142. id creature
  143. -----------
  144. 3 triceratops