kafka-commit.td 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Test that the source ingestion pipeline commits offsets back to Kafka with
  11. # the expected group ID.
  12. # Initial setup.
  13. $ kafka-create-topic topic=topic partitions=1
  14. $ kafka-ingest format=bytes topic=topic
  15. one
  16. two
  17. three
  18. > CREATE CONNECTION conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  19. # Test that the default consumer group ID is
  20. # `materialize-$ENVIRONMENTID-$CONNECTIONID-$SOURCEID`.
  21. > CREATE CLUSTER topic_cluster SIZE '${arg.default-storage-size}';
  22. > CREATE SOURCE topic
  23. IN CLUSTER topic_cluster
  24. FROM KAFKA CONNECTION conn (
  25. TOPIC 'testdrive-topic-${testdrive.seed}'
  26. )
  27. FORMAT BYTES
  28. > SELECT * from topic
  29. one
  30. two
  31. three
  32. $ set-from-sql var=consumer-group-id
  33. SELECT
  34. ks.group_id_prefix
  35. FROM mz_sources s
  36. JOIN mz_catalog.mz_kafka_sources ks ON s.id = ks.id
  37. WHERE s.name = 'topic'
  38. $ kafka-verify-commit consumer-group-id=${consumer-group-id} topic=topic partition=0
  39. 3
  40. > DROP SOURCE topic CASCADE
  41. # Test than an arbitrary prefix can be prepended to the consumer group.
  42. > CREATE SOURCE topic
  43. IN CLUSTER topic_cluster
  44. FROM KAFKA CONNECTION conn (
  45. TOPIC 'testdrive-topic-${testdrive.seed}',
  46. GROUP ID PREFIX 'OVERRIDE-'
  47. )
  48. FORMAT BYTES
  49. > SELECT * from topic
  50. one
  51. two
  52. three
  53. $ set-from-sql var=consumer-group-id
  54. SELECT
  55. ks.group_id_prefix
  56. FROM mz_sources s
  57. JOIN mz_catalog.mz_kafka_sources ks ON s.id = ks.id
  58. WHERE s.name = 'topic'
  59. $ kafka-verify-commit consumer-group-id=${consumer-group-id} topic=topic partition=0
  60. 3