kafka-upsert-debezium-sources-unordered.td 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # must be a subset of the keys in the rows
  11. $ set keyschema={
  12. "type": "record",
  13. "name": "Key",
  14. "fields": [
  15. {"name": "id", "type": "long"}
  16. ]
  17. }
  18. $ set schema={
  19. "type" : "record",
  20. "name" : "envelope",
  21. "fields" : [
  22. {
  23. "name": "before",
  24. "type": [
  25. {
  26. "name": "row",
  27. "type": "record",
  28. "fields": [
  29. {
  30. "name": "creature",
  31. "type": "string"
  32. },
  33. {
  34. "name": "id",
  35. "type": "long"
  36. }]
  37. },
  38. "null"
  39. ]
  40. },
  41. {
  42. "name": "after",
  43. "type": ["row", "null"]
  44. }
  45. ]
  46. }
  47. $ kafka-create-topic topic=dbzupsert
  48. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  49. {"id": 1} {"before": {"row": {"creature": "fish", "id": 1}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}}
  50. {"id": 1} {"before": {"row": {"creature": "mudskipper", "id": 1}}, "after": {"row": {"id": 1, "creature": "salamander"}}}
  51. {"id": 1} {"before": {"row": {"creature": "salamander", "id": 1}}, "after": {"row": {"id": 1, "creature": "lizard"}}}
  52. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  53. URL '${testdrive.schema-registry-url}'
  54. );
  55. > CREATE CONNECTION kafka_conn
  56. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  57. > CREATE SOURCE doin_upsert
  58. IN CLUSTER ${arg.single-replica-cluster}
  59. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbzupsert-${testdrive.seed}')
  60. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  61. ENVELOPE DEBEZIUM
  62. > SELECT * FROM doin_upsert
  63. creature id
  64. -----------
  65. lizard 1