123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Test toxiproxy disruptions in the persist pubsub connection.
- """
- from collections.abc import Callable
- from dataclasses import dataclass
- from textwrap import dedent
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.redpanda import Redpanda
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.toxiproxy import Toxiproxy
- from materialize.util import selected_by_name
- SERVICES = [
- Materialized(options=["--persist-pubsub-url=http://toxiproxy:6879"]),
- Redpanda(),
- Toxiproxy(),
- Testdrive(no_reset=True, seed=1, default_timeout="60s"),
- ]
- SCHEMA = dedent(
- """
- $ set keyschema={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"long"}
- ]
- }
- $ set schema={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f2", "type":"long"}
- ]
- }
- """
- )
- @dataclass
- class Disruption:
- name: str
- breakage: Callable
- fixage: Callable
- disruptions = [
- Disruption(
- name="kill-pubsub",
- breakage=lambda c: c.kill("toxiproxy"),
- fixage=lambda c: toxiproxy_start(c),
- ),
- # docker compose pause has become unreliable recently
- Disruption(
- name="sigstop-pubsub",
- breakage=lambda c: c.kill("toxiproxy", signal="SIGSTOP", wait=False),
- fixage=lambda c: c.kill("toxiproxy", signal="SIGCONT", wait=False),
- ),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- """Test that the system is able to make progress in the face of PubSub disruptions."""
- parser.add_argument("disruptions", nargs="*", default=[d.name for d in disruptions])
- args = parser.parse_args()
- for disruption in selected_by_name(args.disruptions, disruptions):
- c.down(destroy_volumes=True)
- c.up("redpanda", "materialized", {"name": "testdrive", "persistent": True})
- toxiproxy_start(c)
- c.testdrive(
- input=SCHEMA
- + dedent(
- """
- > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
- $ kafka-create-topic topic=pubsub-disruption partitions=4
- > CREATE CONNECTION IF NOT EXISTS csr_conn
- TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}');
- > CREATE CONNECTION IF NOT EXISTS kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
- > INSERT INTO t1 SELECT generate_series, 1 FROM generate_series(1,1000000);
- $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
- {"f1": ${kafka-ingest.iteration}} {"f2": 1}
- > CREATE SOURCE s1
- FROM KAFKA CONNECTION kafka_conn
- (TOPIC 'testdrive-pubsub-disruption-${testdrive.seed}')
- > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-pubsub-disruption-${testdrive.seed}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT
- > CREATE MATERIALIZED VIEW v1 AS
- SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
- MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
- FROM t1;
- > CREATE MATERIALIZED VIEW v2 AS
- SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
- MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
- FROM s1_tbl;
- > UPDATE t1 SET f2 = 2;
- $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
- {"f1": ${kafka-ingest.iteration}} {"f2": 2}
- """
- )
- )
- disruption.breakage(c)
- c.testdrive(
- input=SCHEMA
- + dedent(
- """
- > UPDATE t1 SET f2 = 3;
- $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
- {"f1": ${kafka-ingest.iteration}} {"f2": 3}
- > SELECT * FROM v1
- 1000000 1000000 1 1 3 1000000 3
- > SELECT * FROM v2
- 1000000 1000000 1 1 3 1000000 3
- # Create more views during the disruption
- > CREATE MATERIALIZED VIEW v3 AS
- SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
- MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
- FROM t1;
- > CREATE MATERIALIZED VIEW v4 AS
- SELECT COUNT(*) AS c1, COUNT(DISTINCT f1) AS c2, COUNT(DISTINCT f2) AS c3,
- MIN(f1) AS min1, MIN(f2) AS min2, MAX(f1) AS max1, MAX(f2) AS max2
- FROM s1_tbl;
- """
- )
- )
- disruption.fixage(c)
- c.testdrive(
- input=SCHEMA
- + dedent(
- """
- > UPDATE t1 SET f2 = 4;
- $ kafka-ingest format=avro key-format=avro topic=pubsub-disruption schema=${schema} key-schema=${keyschema} start-iteration=1 repeat=1000000
- {"f1": ${kafka-ingest.iteration}} {"f2": 4}
- > SELECT * FROM v1
- 1000000 1000000 1 1 4 1000000 4
- > SELECT * FROM v2
- 1000000 1000000 1 1 4 1000000 4
- > SELECT * FROM v3
- 1000000 1000000 1 1 4 1000000 4
- > SELECT * FROM v4
- 1000000 1000000 1 1 4 1000000 4
- """
- )
- )
- def toxiproxy_start(c: Composition) -> None:
- c.up("toxiproxy")
- c.testdrive(
- input=dedent(
- """
- $ http-request method=POST url=http://toxiproxy:8474/proxies content-type=application/json
- {
- "name": "pubsub",
- "listen": "0.0.0.0:6879",
- "upstream": "materialized:6879",
- "enabled": true
- }
- """
- )
- )
|