kafka-upsert-sources-named.td 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # This testdrive file uses the deprecated syntax, but is otherwise identical to upsert-kafka-new.td
  11. #
  12. # This file can be deleted when/if we finish the deprecation and perform the removal of the old syntax.
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key", "type": "string"},
  18. {"name": "key2", "type": "string"}
  19. ]
  20. }
  21. $ set schema={
  22. "type" : "record",
  23. "name" : "test",
  24. "fields" : [
  25. {"name":"f1", "type":"string"},
  26. {"name":"f2", "type":"long"}
  27. ]
  28. }
  29. $ kafka-create-topic topic=avroavro
  30. $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
  31. {"key": "fish", "key2": "key2" } {"f1": "fishval", "f2": 1000}
  32. > CREATE CONNECTION kafka_conn
  33. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  34. > CREATE SOURCE avroavro
  35. IN CLUSTER ${arg.single-replica-cluster}
  36. FROM KAFKA CONNECTION kafka_conn (TOPIC
  37. 'testdrive-avroavro-${testdrive.seed}')
  38. > CREATE TABLE avroavro_tbl FROM SOURCE avroavro (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  39. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  40. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  41. INCLUDE KEY AS named
  42. ENVELOPE UPSERT
  43. > SELECT (named).key, (named).key2, f1, f2 from avroavro_tbl
  44. key key2 f1 f2
  45. ---------------------------
  46. fish key2 fishval 1000
  47. $ set keyschemasingle={
  48. "type": "record",
  49. "name": "Key",
  50. "fields": [
  51. {"name": "key", "type": "string"}
  52. ]
  53. }
  54. $ kafka-create-topic topic=single
  55. $ kafka-ingest format=avro topic=single key-format=avro key-schema=${keyschemasingle} schema=${schema}
  56. {"key": "fish" } {"f1": "fishval", "f2": 1000}
  57. > CREATE SOURCE single
  58. IN CLUSTER ${arg.single-replica-cluster}
  59. FROM KAFKA CONNECTION kafka_conn (TOPIC
  60. 'testdrive-single-${testdrive.seed}')
  61. > CREATE TABLE single_tbl FROM SOURCE single (REFERENCE "testdrive-single-${testdrive.seed}")
  62. KEY FORMAT AVRO USING SCHEMA '${keyschemasingle}'
  63. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  64. INCLUDE KEY AS named
  65. ENVELOPE UPSERT
  66. > SELECT named, f1, f2 from single_tbl
  67. named f1 f2
  68. ---------------------------
  69. fish fishval 1000