kafka-sink-multi-partition.td 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # We will create topics with 100 partitions and we create two sinks that
  10. # publishes a single record twice, each time with a different Avro schema ID due
  11. # to changed comments.
  12. #
  13. # Records are expected to be routed to the same partition, regardless of comment.
  14. $ kafka-create-topic topic=v1 partitions=100
  15. $ set-arg-default default-storage-size=1
  16. $ set-arg-default single-replica-cluster=quickstart
  17. > CREATE CONNECTION kafka_conn
  18. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  19. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  20. URL '${testdrive.schema-registry-url}'
  21. );
  22. # This is the row that will be published
  23. > CREATE TABLE data (key text, value text);
  24. > INSERT INTO data VALUES ('v1', NULL);
  25. # Execution 1
  26. > COMMENT ON COLUMN data.key IS 'v11';
  27. > CREATE SINK v11
  28. IN CLUSTER ${arg.single-replica-cluster}
  29. FROM data
  30. INTO KAFKA CONNECTION kafka_conn (
  31. TOPIC 'testdrive-v1-${testdrive.seed}',
  32. TOPIC METADATA REFRESH INTERVAL '2s'
  33. )
  34. KEY (key) NOT ENFORCED
  35. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  36. ENVELOPE UPSERT;
  37. # Execution 2
  38. > COMMENT ON COLUMN data.key IS 'v12';
  39. > CREATE SINK v12
  40. IN CLUSTER ${arg.single-replica-cluster}
  41. FROM data
  42. INTO KAFKA CONNECTION kafka_conn (
  43. TOPIC 'testdrive-v1-${testdrive.seed}',
  44. TOPIC METADATA REFRESH INTERVAL '2s'
  45. )
  46. KEY (key) NOT ENFORCED
  47. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  48. ENVELOPE UPSERT;
  49. $ kafka-verify-data format=avro sink=materialize.public.v11
  50. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=75
  51. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=75
  52. # Test that Kafka sinks discover new partitions in a timely fashion and start
  53. # routing data to the new partitions.
  54. $ kafka-add-partitions topic=v1 total-partitions=200
  55. # Wait out twice the topic metadata refresh duration to virtually guarantee that
  56. # the Kafka sinks have received the updated partition information.
  57. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=5s
  58. > INSERT INTO data VALUES ('v1')
  59. # Even though the key is the same as before, the data is sent to a new
  60. # partition.
  61. $ kafka-verify-data format=avro sink=materialize.public.v11
  62. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=175
  63. {"key": {"string": "v1"}} {"key": {"string": "v1"}, "value": null} partition=175