mzcompose.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Functional test against Kafka with 3 brokers, most of our other tests only use
  11. a single Broker.
  12. """
  13. import time
  14. from materialize.mzcompose.composition import Composition
  15. from materialize.mzcompose.services.kafka import Kafka
  16. from materialize.mzcompose.services.materialized import Materialized
  17. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  18. from materialize.mzcompose.services.testdrive import Testdrive
  19. from materialize.mzcompose.services.zookeeper import Zookeeper
  20. SERVICES = [
  21. Zookeeper(),
  22. Kafka(name="kafka1", broker_id=1, offsets_topic_replication_factor=2),
  23. Kafka(name="kafka2", broker_id=2, offsets_topic_replication_factor=2),
  24. Kafka(name="kafka3", broker_id=3, offsets_topic_replication_factor=2),
  25. SchemaRegistry(
  26. kafka_servers=[("kafka1", "9092"), ("kafka2", "9092"), ("kafka3", "9092")]
  27. ),
  28. Materialized(default_replication_factor=2),
  29. Testdrive(
  30. entrypoint_extra=[
  31. "--kafka-option=acks=all",
  32. ],
  33. seed=1,
  34. ),
  35. ]
  36. def workflow_default(c: Composition) -> None:
  37. c.up("zookeeper", "kafka1", "kafka2", "kafka3", "schema-registry", "materialized")
  38. c.run_testdrive_files("--kafka-addr=kafka2", "01-init.td")
  39. time.sleep(10)
  40. c.kill("kafka1")
  41. time.sleep(10)
  42. c.run_testdrive_files(
  43. "--kafka-addr=kafka2,kafka3", "--no-reset", "02-after-leave.td"
  44. )
  45. c.up("kafka1")
  46. time.sleep(10)
  47. c.run_testdrive_files("--kafka-addr=kafka1", "--no-reset", "03-after-join.td")