mzcompose.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test toxiproxy disruptions in the persist pubsub connection.
  11. """
  12. from collections.abc import Callable
  13. from dataclasses import dataclass
  14. from textwrap import dedent
  15. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  16. from materialize.mzcompose.services.materialized import Materialized
  17. from materialize.mzcompose.services.redpanda import Redpanda
  18. from materialize.mzcompose.services.testdrive import Testdrive
  19. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  20. from materialize.util import selected_by_name
  21. SERVICES = [
  22. Materialized(options=["--persist-pubsub-url=http://toxiproxy:6879"]),
  23. Redpanda(),
  24. Toxiproxy(),
  25. Testdrive(no_reset=True, seed=1, default_timeout="60s"),
  26. ]
  27. SCHEMA = dedent(
  28. """
  29. $ set keyschema={
  30. "type" : "record",
  31. "name" : "test",
  32. "fields" : [
  33. {"name":"f1", "type":"long"}
  34. ]
  35. }
  36. $ set schema={
  37. "type" : "record",
  38. "name" : "test",
  39. "fields" : [
  40. {"name":"f2", "type":"long"}
  41. ]
  42. }
  43. """
  44. )
  45. @dataclass
  46. class Disruption:
  47. name: str
  48. breakage: Callable
  49. fixage: Callable
  50. disruptions = [
  51. Disruption(
  52. name="kill-pubsub",
  53. breakage=lambda c: c.kill("toxiproxy"),
  54. fixage=lambda c: toxiproxy_start(c),
  55. ),
  56. # docker compose pause has become unreliable recently
  57. Disruption(
  58. name="sigstop-pubsub",
  59. breakage=lambda c: c.kill("toxiproxy", signal="SIGSTOP", wait=False),
  60. fixage=lambda c: c.kill("toxiproxy", signal="SIGCONT", wait=False),
  61. ),
  62. ]
  63. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  64. """Test that the system is able to make progress in the face of PubSub disruptions."""
  65. parser.add_argument("disruptions", nargs="*", default=[d.name for d in disruptions])
  66. args = parser.parse_args()
  67. for disruption in selected_by_name(args.disruptions, disruptions):
  68. c.down(destroy_volumes=True)
  69. c.up("redpanda", "materialized", {"name": "testdrive", "persistent": True})
  70. toxiproxy_start(c)
  71. c.testdrive(
  72. input=SCHEMA
  73. + dedent(
  74. """
  75. > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
  76. $ kafka-create-topic topic=pubsub-disruption partitions=4
  77. > CREATE CONNECTION IF NOT EXISTS csr_conn
  78. TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}');
  79. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  80. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  81. > INSERT INTO t1 SELECT generate_series, 1 FROM generate_series(1,1000000);
  82. $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
  83. {"f1": ${kafka-ingest.iteration}} {"f2": 1}
  84. > CREATE SOURCE s1
  85. FROM KAFKA CONNECTION kafka_conn
  86. (TOPIC 'testdrive-pubsub-disruption-${testdrive.seed}')
  87. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-pubsub-disruption-${testdrive.seed}")
  88. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  89. ENVELOPE UPSERT
  90. > CREATE MATERIALIZED VIEW v1 AS
  91. SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
  92. MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
  93. FROM t1;
  94. > CREATE MATERIALIZED VIEW v2 AS
  95. SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
  96. MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
  97. FROM s1_tbl;
  98. > UPDATE t1 SET f2 = 2;
  99. $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
  100. {"f1": ${kafka-ingest.iteration}} {"f2": 2}
  101. """
  102. )
  103. )
  104. disruption.breakage(c)
  105. c.testdrive(
  106. input=SCHEMA
  107. + dedent(
  108. """
  109. > UPDATE t1 SET f2 = 3;
  110. $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
  111. {"f1": ${kafka-ingest.iteration}} {"f2": 3}
  112. > SELECT * FROM v1
  113. 1000000 1000000 1 1 3 1000000 3
  114. > SELECT * FROM v2
  115. 1000000 1000000 1 1 3 1000000 3
  116. # Create more views during the disruption
  117. > CREATE MATERIALIZED VIEW v3 AS
  118. SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
  119. MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
  120. FROM t1;
  121. > CREATE MATERIALIZED VIEW v4 AS
  122. SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
  123. MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
  124. FROM s1_tbl;
  125. """
  126. )
  127. )
  128. disruption.fixage(c)
  129. c.testdrive(
  130. input=SCHEMA
  131. + dedent(
  132. """
  133. > UPDATE t1 SET f2 = 4;
  134. $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
  135. {"f1": ${kafka-ingest.iteration}} {"f2": 4}
  136. > SELECT * FROM v1
  137. 1000000 1000000 1 1 4 1000000 4
  138. > SELECT * FROM v2
  139. 1000000 1000000 1 1 4 1000000 4
  140. > SELECT * FROM v3
  141. 1000000 1000000 1 1 4 1000000 4
  142. > SELECT * FROM v4
  143. 1000000 1000000 1 1 4 1000000 4
  144. """
  145. )
  146. )
  147. def toxiproxy_start(c: Composition) -> None:
  148. c.up("toxiproxy")
  149. c.testdrive(
  150. input=dedent(
  151. """
  152. $ http-request method=POST url=http://toxiproxy:8474/proxies content-type=application/json
  153. {
  154. "name": "pubsub",
  155. "listen": "0.0.0.0:6879",
  156. "upstream": "materialized:6879",
  157. "enabled": true
  158. }
  159. """
  160. )
  161. )