debezium-multiple-partitions.td 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test that debezium deduplication works correctly in the presence of multiple partitions
  11. $ set key-schema={"type": "record", "name": "row", "fields": [{"name": "a", "type": "long"}]}
  12. $ set schema={
  13. "type": "record",
  14. "name": "envelope",
  15. "fields": [
  16. {
  17. "name": "before",
  18. "type": [
  19. {
  20. "name": "row",
  21. "type": "record",
  22. "fields": [
  23. {"name": "a", "type": "long"},
  24. {"name": "b", "type": "long"}
  25. ]
  26. },
  27. "null"
  28. ]
  29. },
  30. { "name": "after", "type": ["row", "null"] },
  31. { "name": "op", "type": "string" },
  32. {
  33. "name": "source",
  34. "type": {
  35. "type": "record",
  36. "name": "Source",
  37. "namespace": "io.debezium.connector.mysql",
  38. "fields": [
  39. {
  40. "name": "file",
  41. "type": "string"
  42. },
  43. {
  44. "name": "pos",
  45. "type": "long"
  46. },
  47. {
  48. "name": "row",
  49. "type": "int"
  50. },
  51. {
  52. "name": "snapshot",
  53. "type": [
  54. {
  55. "type": "boolean",
  56. "connect.default": false
  57. },
  58. "null"
  59. ],
  60. "default": false
  61. }
  62. ],
  63. "connect.name": "io.debezium.connector.mysql.Source"
  64. }
  65. }
  66. ]
  67. }
  68. $ kafka-create-topic topic=data partitions=3
  69. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  70. URL '${testdrive.schema-registry-url}'
  71. );
  72. > CREATE CONNECTION kafka_conn
  73. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  74. # Ingest the data in the reverse order but in separate partitions
  75. $ kafka-ingest format=avro key-format=avro topic=data schema=${schema} key-schema=${key-schema} partition=2
  76. {"a":3} {"before":null,"after":{"row":{"a":3,"b":1}},"source":{"file":"binlog","pos":3,"row":0,"snapshot":{"boolean":false}}, "op": "c"}
  77. $ kafka-ingest format=avro key-format=avro topic=data schema=${schema} key-schema=${key-schema} partition=1
  78. {"a":2} {"before":null,"after":{"row":{"a":2,"b":1}},"source":{"file":"binlog","pos":2,"row":0,"snapshot":{"boolean":false}}, "op": "c"}
  79. $ kafka-ingest format=avro key-format=avro topic=data schema=${schema} key-schema=${key-schema} partition=0
  80. {"a":1} {"before":null,"after":{"row":{"a":1,"b":1}},"source":{"file":"binlog","pos":1,"row":0,"snapshot":{"boolean":false}}, "op": "c"}
  81. > CREATE SOURCE multipartition
  82. IN CLUSTER ${arg.single-replica-cluster}
  83. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  84. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  85. ENVELOPE DEBEZIUM
  86. > SELECT a, b FROM multipartition
  87. a b
  88. ----
  89. 1 1
  90. 2 1
  91. 3 1