avro-registry-schema-selection.td 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Tests that precisely manipulate which schema to select from the
  11. # Confluent Schema Registry.
  12. $ kafka-create-topic topic=schema-strategy-test
  13. $ set first-writer-schema={"type": "record", "name": "row", "fields": [{"name": "a", "type": "long"}, {"name": "b", "type": "long"}]}
  14. $ set second-writer-schema={"type": "record", "name": "row", "fields": [{"name": "a", "type": "long"}, {"name": "b", "type": "long"}, {"name": "c", "type": ["null", "long"], "default": null}]}
  15. $ set reader-schema={"type": "record", "name": "row", "fields": [{"name": "a", "type": "long"}]}
  16. $ kafka-ingest format=avro topic=schema-strategy-test schema=${first-writer-schema} set-schema-id-var=id1
  17. {"a": 0, "b": 1}
  18. $ kafka-ingest format=avro topic=schema-strategy-test schema=${second-writer-schema} set-schema-id-var=id2
  19. {"a": 2, "b": 3, "c": {"long": 4}}
  20. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  21. URL '${testdrive.schema-registry-url}'
  22. );
  23. > CREATE CONNECTION kafka_conn
  24. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  25. > CREATE CLUSTER schema_strategy_test_inline_cluster SIZE '${arg.default-storage-size}';
  26. > CREATE SOURCE schema_strategy_test_inline
  27. IN CLUSTER schema_strategy_test_inline_cluster
  28. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-schema-strategy-test-${testdrive.seed}')
  29. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  30. VALUE STRATEGY INLINE '${reader-schema}'
  31. ENVELOPE NONE
  32. > SELECT * FROM schema_strategy_test_inline
  33. a
  34. ---
  35. 0
  36. 2
  37. > CREATE CLUSTER schema_strategy_test_id_cluster SIZE '${arg.default-storage-size}';
  38. > CREATE SOURCE schema_strategy_test_id
  39. IN CLUSTER schema_strategy_test_id_cluster
  40. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-schema-strategy-test-${testdrive.seed}')
  41. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  42. VALUE STRATEGY ID ${id1}
  43. ENVELOPE NONE
  44. > SELECT * FROM schema_strategy_test_id
  45. a b
  46. ---
  47. 0 1
  48. 2 3
  49. > CREATE CLUSTER schema_strategy_test_id2_cluster SIZE '${arg.default-storage-size}';
  50. > CREATE SOURCE schema_strategy_test_id2
  51. IN CLUSTER schema_strategy_test_id2_cluster
  52. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-schema-strategy-test-${testdrive.seed}')
  53. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  54. VALUE STRATEGY ID ${id2}
  55. ENVELOPE NONE
  56. > SELECT * FROM schema_strategy_test_id2
  57. a b c
  58. -----
  59. 0 1 <null>
  60. 2 3 4
  61. > CREATE CLUSTER schema_strategy_test_latest_cluster SIZE '${arg.default-storage-size}';
  62. > CREATE SOURCE schema_strategy_test_latest
  63. IN CLUSTER schema_strategy_test_latest_cluster
  64. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-schema-strategy-test-${testdrive.seed}')
  65. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  66. VALUE STRATEGY LATEST
  67. ENVELOPE NONE
  68. > SELECT * FROM schema_strategy_test_latest
  69. a b c
  70. -----
  71. 0 1 <null>
  72. 2 3 4