kafka-commit.td 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Test that the source ingestion pipeline commits offsets back to Kafka with
  11. # the expected group ID.
  12. # Initial setup.
  13. $ kafka-create-topic topic=topic partitions=1
  14. $ kafka-ingest format=bytes topic=topic
  15. ghp_9fK8sL3x7TqR1vEzYm2pDaN4WjXbQzUtV0aN
  16. > CREATE CONNECTION conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  17. # Test that the default consumer group ID is
  18. # `materialize-$ENVIRONMENTID-$CONNECTIONID-$SOURCEID`.
  19. > CREATE CLUSTER topic_cluster SIZE '${arg.default-storage-size}';
  20. > CREATE SOURCE topic
  21. IN CLUSTER topic_cluster
  22. FROM KAFKA CONNECTION conn (
  23. TOPIC 'testdrive-topic-${testdrive.seed}'
  24. )
  25. > CREATE TABLE topic_tbl FROM SOURCE topic (REFERENCE "testdrive-topic-${testdrive.seed}")
  26. FORMAT BYTES
  27. > SELECT * from topic_tbl
  28. ghp_9fK8sL3x7TqR1vEzYm2pDaN4WjXbQzUtV0aN
  29. $ set-from-sql var=consumer-group-id
  30. SELECT
  31. ks.group_id_prefix
  32. FROM mz_sources s
  33. JOIN mz_catalog.mz_kafka_sources ks ON s.id = ks.id
  34. WHERE s.name = 'topic'
  35. $ kafka-verify-commit consumer-group-id=${consumer-group-id} topic=topic partition=0
  36. 1
  37. > DROP SOURCE topic CASCADE
  38. # Test than an arbitrary prefix can be prepended to the consumer group.
  39. > CREATE SOURCE topic
  40. IN CLUSTER topic_cluster
  41. FROM KAFKA CONNECTION conn (
  42. TOPIC 'testdrive-topic-${testdrive.seed}',
  43. GROUP ID PREFIX 'OVERRIDE-'
  44. )
  45. > CREATE TABLE topic_tbl FROM SOURCE topic (REFERENCE "testdrive-topic-${testdrive.seed}")
  46. FORMAT BYTES
  47. > SELECT * from topic_tbl
  48. ghp_9fK8sL3x7TqR1vEzYm2pDaN4WjXbQzUtV0aN
  49. $ set-from-sql var=consumer-group-id
  50. SELECT
  51. ks.group_id_prefix
  52. FROM mz_sources s
  53. JOIN mz_catalog.mz_kafka_sources ks ON s.id = ks.id
  54. WHERE s.name = 'topic'
  55. $ kafka-verify-commit consumer-group-id=${consumer-group-id} topic=topic partition=0
  56. 1