1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Functional test against Kafka with 3 brokers, most of our other tests only use
- a single Broker.
- """
- import time
- from materialize.mzcompose.composition import Composition
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.zookeeper import Zookeeper
- SERVICES = [
- Zookeeper(),
- Kafka(name="kafka1", broker_id=1, offsets_topic_replication_factor=2),
- Kafka(name="kafka2", broker_id=2, offsets_topic_replication_factor=2),
- Kafka(name="kafka3", broker_id=3, offsets_topic_replication_factor=2),
- SchemaRegistry(
- kafka_servers=[("kafka1", "9092"), ("kafka2", "9092"), ("kafka3", "9092")]
- ),
- Materialized(default_replication_factor=2),
- Testdrive(
- entrypoint_extra=[
- "--kafka-option=acks=all",
- ],
- seed=1,
- ),
- ]
- def workflow_default(c: Composition) -> None:
- c.up("zookeeper", "kafka1", "kafka2", "kafka3", "schema-registry", "materialized")
- c.run_testdrive_files("--kafka-addr=kafka2", "01-init.td")
- time.sleep(10)
- c.kill("kafka1")
- time.sleep(10)
- c.run_testdrive_files(
- "--kafka-addr=kafka2,kafka3", "--no-reset", "02-after-leave.td"
- )
- c.up("kafka1")
- time.sleep(10)
- c.run_testdrive_files("--kafka-addr=kafka1", "--no-reset", "03-after-join.td")
|