mzcompose.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test that the Kafka <-> Materialize connection (source + sink) can survive
  11. network problems and interruptions using Toxiyproxy.
  12. """
  13. import argparse
  14. import random
  15. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  16. from materialize.mzcompose.services.clusterd import Clusterd
  17. from materialize.mzcompose.services.kafka import Kafka
  18. from materialize.mzcompose.services.materialized import Materialized
  19. from materialize.mzcompose.services.mz import Mz
  20. from materialize.mzcompose.services.redpanda import Redpanda
  21. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  22. from materialize.mzcompose.services.testdrive import Testdrive
  23. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  24. from materialize.mzcompose.services.zookeeper import Zookeeper
  25. SERVICES = [
  26. Zookeeper(),
  27. Kafka(),
  28. SchemaRegistry(),
  29. Redpanda(),
  30. Mz(app_password=""),
  31. Materialized(default_replication_factor=2),
  32. Clusterd(),
  33. Toxiproxy(),
  34. Testdrive(default_timeout="120s"),
  35. ]
  36. def parse_args(parser: WorkflowArgumentParser) -> argparse.Namespace:
  37. parser.add_argument(
  38. "--redpanda",
  39. action="store_true",
  40. help="run against Redpanda instead of the Confluent Platform",
  41. )
  42. return parser.parse_args()
  43. def get_kafka_services(redpanda: bool) -> list[str]:
  44. return ["redpanda"] if redpanda else ["zookeeper", "kafka", "schema-registry"]
  45. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  46. def process(name: str) -> None:
  47. if name == "default":
  48. return
  49. with c.test_case(name):
  50. c.workflow(name, *parser.args)
  51. c.test_parts(list(c.workflows.keys()), process)
  52. #
  53. # Test the kafka sink resumption logic in the presence of networking problems
  54. #
  55. def workflow_sink_networking(c: Composition, parser: WorkflowArgumentParser) -> None:
  56. args = parse_args(parser)
  57. c.up(*(["materialized", "toxiproxy"] + get_kafka_services(args.redpanda)))
  58. seed = random.getrandbits(16)
  59. for i, failure_mode in enumerate(
  60. [
  61. "toxiproxy-close-connection.td",
  62. "toxiproxy-limit-connection.td",
  63. "toxiproxy-timeout.td",
  64. "toxiproxy-timeout-hold.td",
  65. ]
  66. ):
  67. c.up("toxiproxy")
  68. c.run_testdrive_files(
  69. "--no-reset",
  70. "--max-errors=1",
  71. f"--seed={seed}{i}",
  72. f"--temp-dir=/share/tmp/kafka-resumption-{seed}{i}",
  73. "sink-networking/setup.td",
  74. f"sink-networking/{failure_mode}",
  75. "sink-networking/during.td",
  76. "sink-networking/sleep.td",
  77. "sink-networking/toxiproxy-restore-connection.td",
  78. "sink-networking/verify-success.td",
  79. "sink-networking/cleanup.td",
  80. )
  81. c.kill("toxiproxy")
  82. def workflow_sink_kafka_restart(c: Composition, parser: WorkflowArgumentParser) -> None:
  83. args = parse_args(parser)
  84. # Sleeping for 5s before the transaction commits virtually guarantees that
  85. # there will be an open transaction when the Kafka/Redpanda service is
  86. # killed, which lets us tests whether open transactions for the same
  87. # producer ID are properly aborted after a broker restart.
  88. with c.override(
  89. Materialized(
  90. environment_extra=["FAILPOINTS=kafka_sink_commit_transaction=sleep(5000)"],
  91. default_replication_factor=2,
  92. )
  93. ):
  94. c.up(*(["materialized"] + get_kafka_services(args.redpanda)))
  95. seed = random.getrandbits(16)
  96. c.run_testdrive_files(
  97. "--no-reset",
  98. "--max-errors=1",
  99. f"--seed={seed}",
  100. f"--temp-dir=/share/tmp/kafka-resumption-{seed}",
  101. "--default-timeout=40s",
  102. "sink-kafka-restart/setup.td",
  103. )
  104. c.kill("redpanda" if args.redpanda else "kafka")
  105. c.up("redpanda" if args.redpanda else "kafka")
  106. c.run_testdrive_files(
  107. "--no-reset",
  108. "--max-errors=1",
  109. f"--seed={seed}",
  110. f"--temp-dir=/share/tmp/kafka-resumption-{seed}",
  111. "--default-timeout=40s",
  112. "sink-kafka-restart/verify.td",
  113. "sink-kafka-restart/cleanup.td",
  114. )
  115. def workflow_source_resumption(c: Composition, parser: WorkflowArgumentParser) -> None:
  116. """Test creating sources in a remote clusterd process."""
  117. args = parse_args(parser)
  118. with c.override(
  119. Testdrive(no_reset=True, consistent_seed=True),
  120. Clusterd(workers=4),
  121. ):
  122. c.up(*(["materialized", "clusterd"] + get_kafka_services(args.redpanda)))
  123. c.run_testdrive_files("source-resumption/setup.td")
  124. c.run_testdrive_files("source-resumption/verify.td")
  125. c.kill("clusterd")
  126. c.up("clusterd")
  127. c.sleep(10)
  128. # Verify the same data is query-able, and that we can make forward progress
  129. c.run_testdrive_files("source-resumption/verify.td")
  130. c.run_testdrive_files("source-resumption/re-ingest-and-verify.td")
  131. def workflow_sink_queue_full(c: Composition, parser: WorkflowArgumentParser) -> None:
  132. """Similar to the sink-networking workflow, but with 11 million rows (more then the 11 million defined as queue.buffering.max.messages) and only creating the sink after these rows are ingested into Mz. Triggers database-issues#7442"""
  133. args = parse_args(parser)
  134. seed = random.getrandbits(16)
  135. c.up(*(["materialized", "toxiproxy"] + get_kafka_services(args.redpanda)))
  136. c.run_testdrive_files(
  137. "--no-reset",
  138. "--max-errors=1",
  139. f"--seed={seed}",
  140. f"--temp-dir=/share/tmp/kafka-resumption-{seed}",
  141. "sink-queue-full/setup.td",
  142. "sink-queue-full/during.td",
  143. "sink-queue-full/verify-success.td",
  144. "sink-queue-full/cleanup.td",
  145. )