kafka-sink-multi-partition.td 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # We will create topics with 100 partitions and we create two sinks that
  10. # publishes a single record twice, each time with a different Avro schema ID due
  11. # to changed comments.
  12. #
  13. # Records are expected to be routed to the same partition, regardless of comment.
  14. $ skip-if
  15. SELECT mz_version_num() < 13500;
  16. $ kafka-create-topic topic=v1 partitions=100
  17. $ set-arg-default default-storage-size=1
  18. $ set-arg-default single-replica-cluster=quickstart
  19. > CREATE CONNECTION kafka_conn
  20. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  21. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  22. URL '${testdrive.schema-registry-url}'
  23. );
  24. # This is the row that will be published
  25. > CREATE TABLE data (key text, value text);
  26. > INSERT INTO data VALUES ('v1', NULL);
  27. # Execution 1
  28. > COMMENT ON COLUMN data.key IS 'v11';
  29. > CREATE SINK v11
  30. IN CLUSTER ${arg.single-replica-cluster}
  31. FROM data
  32. INTO KAFKA CONNECTION kafka_conn (
  33. TOPIC 'testdrive-v1-${testdrive.seed}',
  34. TOPIC METADATA REFRESH INTERVAL '2s'
  35. )
  36. KEY (key) NOT ENFORCED
  37. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  38. ENVELOPE UPSERT;
  39. # Execution 2
  40. > COMMENT ON COLUMN data.key IS 'v12';
  41. > CREATE SINK v12
  42. IN CLUSTER ${arg.single-replica-cluster}
  43. FROM data
  44. INTO KAFKA CONNECTION kafka_conn (
  45. TOPIC 'testdrive-v1-${testdrive.seed}',
  46. TOPIC METADATA REFRESH INTERVAL '2s'
  47. )
  48. KEY (key) NOT ENFORCED
  49. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  50. ENVELOPE UPSERT;
  51. $ kafka-verify-data format=avro sink=materialize.public.v11
  52. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=75
  53. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=75
  54. # Test that Kafka sinks discover new partitions in a timely fashion and start
  55. # routing data to the new partitions.
  56. $ kafka-add-partitions topic=v1 total-partitions=200
  57. # Wait out twice the topic metadata refresh duration to virtually guarantee that
  58. # the Kafka sinks have received the updated partition information.
  59. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=5s
  60. > INSERT INTO data VALUES ('v1')
  61. # Even though the key is the same as before, the data is sent to a new
  62. # partition.
  63. $ kafka-verify-data format=avro sink=materialize.public.v11
  64. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=175
  65. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=175