kafka-commit.td 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Test that the source ingestion pipeline commits offsets back to Kafka with
  11. # the expected group ID.
  12. # Initial setup.
  13. $ kafka-create-topic topic=topic partitions=1
  14. $ kafka-ingest format=bytes topic=topic
  15. one
  16. two
  17. three
  18. > CREATE CONNECTION conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  19. # Test that the default consumer group ID is
  20. # `materialize-$ENVIRONMENTID-$CONNECTIONID-$SOURCEID`.
  21. > CREATE CLUSTER topic_cluster SIZE '${arg.default-storage-size}';
  22. > CREATE SOURCE topic
  23. IN CLUSTER topic_cluster
  24. FROM KAFKA CONNECTION conn (
  25. TOPIC 'testdrive-topic-${testdrive.seed}'
  26. )
  27. > CREATE TABLE topic_tbl FROM SOURCE topic (REFERENCE "testdrive-topic-${testdrive.seed}")
  28. FORMAT BYTES
  29. > SELECT * from topic_tbl
  30. one
  31. two
  32. three
  33. $ set-from-sql var=consumer-group-id
  34. SELECT
  35. ks.group_id_prefix
  36. FROM mz_sources s
  37. JOIN mz_catalog.mz_kafka_sources ks ON s.id = ks.id
  38. WHERE s.name = 'topic'
  39. $ kafka-verify-commit consumer-group-id=${consumer-group-id} topic=topic partition=0
  40. 3
  41. > DROP SOURCE topic CASCADE
  42. # Test than an arbitrary prefix can be prepended to the consumer group.
  43. > CREATE SOURCE topic
  44. IN CLUSTER topic_cluster
  45. FROM KAFKA CONNECTION conn (
  46. TOPIC 'testdrive-topic-${testdrive.seed}',
  47. GROUP ID PREFIX 'OVERRIDE-'
  48. )
  49. > CREATE TABLE topic_tbl FROM SOURCE topic (REFERENCE "testdrive-topic-${testdrive.seed}")
  50. FORMAT BYTES
  51. > SELECT * from topic_tbl
  52. one
  53. two
  54. three
  55. $ set-from-sql var=consumer-group-id
  56. SELECT
  57. ks.group_id_prefix
  58. FROM mz_sources s
  59. JOIN mz_catalog.mz_kafka_sources ks ON s.id = ks.id
  60. WHERE s.name = 'topic'
  61. $ kafka-verify-commit consumer-group-id=${consumer-group-id} topic=topic partition=0
  62. 3