kafka-upsert-sources-named.td 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # This testdrive file uses the deprecated syntax, but is otherwise identical to upsert-kafka-new.td
  11. #
  12. # This file can be deleted when/if we finish the deprecation and perform the removal of the old syntax.
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key", "type": "string"},
  18. {"name": "key2", "type": "string"}
  19. ]
  20. }
  21. $ set schema={
  22. "type" : "record",
  23. "name" : "test",
  24. "fields" : [
  25. {"name":"f1", "type":"string"},
  26. {"name":"f2", "type":"long"}
  27. ]
  28. }
  29. $ kafka-create-topic topic=avroavro
  30. $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
  31. {"key": "fish", "key2": "key2" } {"f1": "fishval", "f2": 1000}
  32. > CREATE CONNECTION kafka_conn
  33. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  34. > CREATE SOURCE avroavro
  35. IN CLUSTER ${arg.single-replica-cluster}
  36. FROM KAFKA CONNECTION kafka_conn (TOPIC
  37. 'testdrive-avroavro-${testdrive.seed}')
  38. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  39. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  40. INCLUDE KEY AS named
  41. ENVELOPE UPSERT
  42. > SELECT (named).key, (named).key2, f1, f2 from avroavro
  43. key key2 f1 f2
  44. ---------------------------
  45. fish key2 fishval 1000
  46. $ set keyschemasingle={
  47. "type": "record",
  48. "name": "Key",
  49. "fields": [
  50. {"name": "key", "type": "string"}
  51. ]
  52. }
  53. $ kafka-create-topic topic=single
  54. $ kafka-ingest format=avro topic=single key-format=avro key-schema=${keyschemasingle} schema=${schema}
  55. {"key": "fish" } {"f1": "fishval", "f2": 1000}
  56. > CREATE SOURCE single
  57. IN CLUSTER ${arg.single-replica-cluster}
  58. FROM KAFKA CONNECTION kafka_conn (TOPIC
  59. 'testdrive-single-${testdrive.seed}')
  60. KEY FORMAT AVRO USING SCHEMA '${keyschemasingle}'
  61. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  62. INCLUDE KEY AS named
  63. ENVELOPE UPSERT
  64. > SELECT named, f1, f2 from single
  65. named f1 f2
  66. ---------------------------
  67. fish fishval 1000