mzcompose.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Verify that data from Kafka is only ingested exactly once, there should be no
  11. duplicates, even after restarting Materialize.
  12. """
  13. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  14. from materialize.mzcompose.services.kafka import Kafka
  15. from materialize.mzcompose.services.materialized import Materialized
  16. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  17. from materialize.mzcompose.services.testdrive import Testdrive
  18. from materialize.mzcompose.services.zookeeper import Zookeeper
  19. SERVICES = [
  20. Zookeeper(),
  21. Kafka(),
  22. SchemaRegistry(),
  23. Materialized(default_replication_factor=2),
  24. Testdrive(),
  25. ]
  26. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  27. parser.add_argument(
  28. "--seed",
  29. help="an alternate seed to use to avoid clashing with existing topics",
  30. type=int,
  31. default=1,
  32. )
  33. args = parser.parse_args()
  34. c.up("zookeeper", "kafka", "schema-registry", "materialized")
  35. c.run_testdrive_files(
  36. f"--seed={args.seed}",
  37. "--kafka-option=group.id=group1",
  38. "--no-reset",
  39. "before-restart.td",
  40. )
  41. c.kill("materialized")
  42. c.up("materialized")
  43. c.run_testdrive_files(
  44. f"--seed={args.seed}",
  45. "--no-reset",
  46. "--kafka-option=group.id=group2",
  47. "after-restart.td",
  48. )