123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Tries to find race conditions in Materialize, mostly DDLs. Can find panics and wrong results.
- """
- import datetime
- import random
- import time
- from textwrap import dedent
- from uuid import uuid4
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.minio import Mc, Minio
- from materialize.mzcompose.services.mysql import MySql
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.zookeeper import Zookeeper
- from materialize.util import PropagatingThread, all_subclasses
- SERVICES = [
- Postgres(),
- MySql(),
- Zookeeper(),
- Kafka(
- auto_create_topics=False,
- ports=["30123:30123"],
- allow_host_ports=True,
- environment_extra=[
- "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
- "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
- ],
- ),
- SchemaRegistry(),
- Minio(setup_materialize=True, additional_directories=["copytos3"]),
- Testdrive(no_reset=True, consistent_seed=True, default_timeout="600s"),
- Mc(),
- Materialized(default_replication_factor=2),
- ]
- SERVICE_NAMES = [
- "postgres",
- "mysql",
- "zookeeper",
- "kafka",
- "schema-registry",
- # Still required for backups/s3 testing even when we use Azurite as blob store
- "minio",
- "materialized",
- ]
- class Object:
- name: str
- references: "Object | None"
- can_refer: bool = True
- enabled: bool = True
- def __init__(self, name: str, references: "Object | None", rng: random.Random):
- self.name = name
- self.references = references
- def prepare(self) -> str:
- return ""
- def create(self) -> str:
- raise NotImplementedError
- def destroy(self) -> str:
- raise NotImplementedError
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class UpsertSource(Object):
- def prepare(self) -> str:
- return dedent(
- f"""
- $ set keyschema={{
- "type": "record",
- "name": "Key",
- "fields": [
- {{"name": "b", "type": "string"}},
- {{"name": "a", "type": "long"}}
- ]
- }}
- $ set schema={{
- "type" : "record",
- "name" : "envelope",
- "fields" : [
- {{
- "name": "before",
- "type": [
- {{
- "name": "row",
- "type": "record",
- "fields": [
- {{
- "name": "a",
- "type": "long"
- }},
- {{
- "name": "data",
- "type": "string"
- }},
- {{
- "name": "b",
- "type": "string"
- }}]
- }},
- "null"
- ]
- }},
- {{
- "name": "after",
- "type": ["row", "null"]
- }}
- ]
- }}
- $ kafka-create-topic topic={self.name} partitions=1
- $ kafka-ingest format=avro topic={self.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat=1000000
- {{"b": "bdata", "a": ${{kafka-ingest.iteration}}}} {{"before": {{"row": {{"a": ${{kafka-ingest.iteration}}, "data": "fish", "b": "bdata"}}}}, "after": {{"row": {{"a": ${{kafka-ingest.iteration}}, "data": "fish2", "b": "bdata"}}}}}}
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${{testdrive.schema-registry-url}}')
- > CREATE CONNECTION IF NOT EXISTS kafka_conn
- TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT)
- > DROP SOURCE IF EXISTS {self.name}_source CASCADE
- > CREATE SOURCE {self.name}_source
- IN CLUSTER quickstart
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-{self.name}-${{testdrive.seed}}')"""
- )
- def create(self) -> str:
- return dedent(
- f"""
- > CREATE TABLE {self.name} FROM SOURCE {self.name}_source (REFERENCE "testdrive-{self.name}-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM"""
- )
- def destroy(self) -> str:
- return f"> DROP TABLE {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class Table(Object):
- def create(self) -> str:
- return f"> CREATE TABLE {self.name} (a TEXT, b TEXT)"
- def destroy(self) -> str:
- return f"> DROP TABLE {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- # TODO: How to handle things like clusters, replicas?
- # TODO: Add more manipulations: inserts, updates, deletes, ALTER RENAME (twice)
- class PostgresSource(Object):
- def prepare(self) -> str:
- return dedent(
- f"""
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- DROP USER IF EXISTS {self.name}_role;
- CREATE USER {self.name}_role WITH SUPERUSER PASSWORD 'postgres';
- ALTER USER {self.name}_role WITH replication;
- DROP PUBLICATION IF EXISTS {self.name}_source;
- DROP TABLE IF EXISTS {self.name}_table;
- CREATE TABLE {self.name}_table (a TEXT, b TEXT);
- ALTER TABLE {self.name}_table REPLICA IDENTITY FULL;
- CREATE PUBLICATION {self.name}_source FOR TABLE {self.name}_table;
- INSERT INTO {self.name}_table VALUES ('foo', 'bar');
- > DROP SECRET IF EXISTS {self.name}_pass CASCADE
- > CREATE SECRET {self.name}_pass AS 'postgres'
- > DROP CONNECTION IF EXISTS {self.name}_conn CASCADE
- > CREATE CONNECTION {self.name}_conn FOR POSTGRES
- HOST 'postgres',
- DATABASE postgres,
- USER {self.name}_role,
- PASSWORD SECRET {self.name}_pass
- > DROP SOURCE IF EXISTS {self.name}_source
- > CREATE SOURCE {self.name}_source
- IN CLUSTER quickstart
- FROM POSTGRES CONNECTION {self.name}_conn
- (PUBLICATION '{self.name}_source')"""
- )
- def create(self) -> str:
- return f"> CREATE TABLE {self.name} FROM SOURCE {self.name}_source (REFERENCE {self.name}_table)"
- def destroy(self) -> str:
- return f"> DROP TABLE {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- lambda: dedent(
- f"""
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO {self.name}_table VALUES ('foo', 'bar');"""
- ),
- lambda: dedent(
- f"""
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- UPDATE {self.name}_table SET b = b || 'bar' WHERE true;"""
- ),
- lambda: dedent(
- f"""
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- DELETE FROM {self.name}_table WHERE LENGTH(b) > 12;"""
- ),
- lambda: dedent(
- f"""
- > DROP TABLE IF EXISTS {self.name}_tmp_table
- > ALTER TABLE {self.name} RENAME TO {self.name}_tmp_table
- > ALTER TABLE {self.name}_tmp_table RENAME TO {self.name}
- """
- ),
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- # TODO: Can't set up with an empty table in mysql? ERROR: reference to public.o_0_table not found in source
- class MySqlSource(Object):
- def prepare(self) -> str:
- return dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- # create the database if it does not exist yet but do not drop it
- CREATE DATABASE IF NOT EXISTS public;
- USE public;
- CREATE USER IF NOT EXISTS {self.name}_role IDENTIFIED BY 'mysql';
- GRANT REPLICATION SLAVE ON *.* TO {self.name}_role;
- GRANT ALL ON public.* TO {self.name}_role;
- DROP TABLE IF EXISTS {self.name}_table;
- CREATE TABLE {self.name}_table (a TEXT, b TEXT);
- INSERT INTO {self.name}_table VALUES ('foo', 'bar');
- > DROP SECRET IF EXISTS {self.name}_pass CASCADE
- > CREATE SECRET {self.name}_pass AS 'mysql'
- > DROP CONNECTION IF EXISTS {self.name}_conn CASCADE
- > CREATE CONNECTION {self.name}_conn TO MYSQL (
- HOST 'mysql',
- USER {self.name}_role,
- PASSWORD SECRET {self.name}_pass
- )
- > DROP SOURCE IF EXISTS {self.name}_source
- > CREATE SOURCE {self.name}_source
- IN CLUSTER quickstart
- FROM MYSQL CONNECTION {self.name}_conn;"""
- )
- def create(self) -> str:
- return f"> CREATE TABLE {self.name} FROM SOURCE {self.name}_source (REFERENCE public.{self.name}_table)"
- def destroy(self) -> str:
- return f"> DROP TABLE {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- lambda: dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- INSERT INTO {self.name}_table VALUES ('foo', 'bar');"""
- ),
- lambda: dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- UPDATE {self.name}_table SET b = CONCAT(b, 'bar') WHERE true;"""
- ),
- lambda: dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- DELETE FROM {self.name}_table WHERE LENGTH(b) > 12;"""
- ),
- lambda: dedent(
- f"""
- > DROP TABLE IF EXISTS {self.name}_tmp_table
- > ALTER TABLE {self.name} RENAME TO {self.name}_tmp_table
- > ALTER TABLE {self.name}_tmp_table RENAME TO {self.name}
- """
- ),
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class LoadGeneratorSource(Object):
- def __init__(self, name: str, references: "Object | None", rng: random.Random):
- super().__init__(name, references, rng)
- self.tick_interval = rng.choice(["1ms", "10ms", "100ms", "1s", "10s"])
- def create(self) -> str:
- return f"> CREATE SOURCE {self.name} IN CLUSTER quickstart FROM LOAD GENERATOR COUNTER (TICK INTERVAL '{self.tick_interval}')"
- def destroy(self) -> str:
- return f"> DROP SOURCE {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- lambda: dedent(
- f"""
- > DROP SOURCE IF EXISTS {self.name}_tmp_source
- > ALTER SOURCE {self.name} RENAME TO {self.name}_tmp_source
- > ALTER SOURCE {self.name}_tmp_source RENAME TO {self.name}
- """
- ),
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class WebhookSource(Object):
- def __init__(self, name: str, references: "Object | None", rng: random.Random):
- super().__init__(name, references, rng)
- self.body_format = rng.choice(["TEXT", "JSON", "JSON ARRAY", "BYTES"])
- def create(self) -> str:
- return dedent(
- f"""
- > DROP CLUSTER IF EXISTS {self.name}_cluster
- > CREATE CLUSTER {self.name}_cluster SIZE '1', REPLICATION FACTOR 1
- > CREATE SOURCE {self.name} IN CLUSTER {self.name}_cluster FROM WEBHOOK BODY FORMAT {self.body_format}
- """
- )
- def destroy(self) -> str:
- return dedent(
- f"""
- > DROP CLUSTER {self.name}_cluster CASCADE
- """
- )
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- lambda: dedent(
- f"""
- > DROP SOURCE IF EXISTS {self.name}_tmp_source
- > ALTER SOURCE {self.name} RENAME TO {self.name}_tmp_source
- > ALTER SOURCE {self.name}_tmp_source RENAME TO {self.name}
- """
- ),
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class KafkaSink(Object):
- can_refer: bool = False
- def __init__(self, name: str, references: Object | None, rng: random.Random):
- super().__init__(name, references, rng)
- self.format = rng.choice(
- [
- "JSON",
- "AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn",
- ]
- )
- def create(self) -> str:
- self.references_str = (
- self.references.name if self.references else f"{self.name}_view"
- )
- cmds = []
- if not self.references:
- cmds.append(
- f"> CREATE MATERIALIZED VIEW IF NOT EXISTS {self.references_str} AS SELECT 'foo' AS a, 'bar' AS b"
- )
- elif isinstance(self.references, View):
- self.references_str = f"{self.name}_mv"
- cmds.append(
- f"> CREATE MATERIALIZED VIEW IF NOT EXISTS {self.references_str} AS SELECT * FROM {self.references.name}"
- )
- # See database-issues#9048, topic has to be unique
- topic = f"{self.name}-{uuid4()}"
- cmds.append(
- dedent(
- f"""
- > CREATE CONNECTION IF NOT EXISTS kafka_conn
- TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT)
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${{testdrive.schema-registry-url}}'
- )
- > CREATE SINK {self.name}
- IN CLUSTER quickstart
- FROM {self.references_str}
- INTO KAFKA CONNECTION kafka_conn (TOPIC '{topic}')
- FORMAT {self.format}
- ENVELOPE DEBEZIUM"""
- )
- )
- return "\n".join(cmds)
- def destroy(self) -> str:
- return f"> DROP SINK {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- lambda: dedent(
- f"""
- > DROP MATERIALIZED VIEW IF EXISTS {self.name}_tmp_mv
- > CREATE MATERIALIZED VIEW {self.name}_tmp_mv AS SELECT * FROM {self.references_str}
- > ALTER SINK {self.name} SET FROM {self.name}_tmp_mv
- > ALTER SINK {self.name} SET FROM {self.references_str}
- > DROP MATERIALIZED VIEW {self.name}_tmp_mv
- """
- ),
- lambda: dedent(
- f"""
- > DROP SINK IF EXISTS {self.name}_tmp_sink
- > ALTER SINK {self.name} RENAME TO {self.name}_tmp_sink
- > ALTER SINK {self.name}_tmp_sink RENAME TO {self.name}
- """
- ),
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class View(Object):
- def create(self) -> str:
- return f'> CREATE VIEW {self.name} AS SELECT {"* FROM " + self.references.name if self.references else "'foo' AS a, 'bar' AS b"}'
- def destroy(self) -> str:
- return f"> DROP VIEW {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- lambda: dedent(
- f"""
- > DROP VIEW IF EXISTS {self.name}_tmp_view
- > ALTER VIEW {self.name} RENAME TO {self.name}_tmp_view
- > ALTER VIEW {self.name}_tmp_view RENAME TO {self.name}
- """
- ),
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class MaterializedView(Object):
- def create(self) -> str:
- return f'> CREATE MATERIALIZED VIEW {self.name} AS SELECT {"* FROM " + self.references.name if self.references else "'foo' AS a, 'bar' AS b"}'
- def destroy(self) -> str:
- return f"> DROP MATERIALIZED VIEW {self.name} CASCADE"
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- lambda: dedent(
- f"""
- > DROP MATERIALIZED VIEW IF EXISTS {self.name}_tmp_mv
- > ALTER MATERIALIZED VIEW {self.name} RENAME TO {self.name}_tmp_mv
- > ALTER MATERIALIZED VIEW {self.name}_tmp_mv RENAME TO {self.name}
- """
- ),
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class DefaultIndex(Object):
- can_refer: bool = False
- def create(self) -> str:
- return (
- f"> CREATE DEFAULT INDEX ON {self.references.name}"
- if self.references
- else ""
- )
- def destroy(self) -> str:
- return (
- f"> DROP INDEX {self.references.name}_primary_idx"
- if self.references
- else ""
- )
- def manipulate(self, kind: int) -> str:
- manipulations = [
- lambda: "",
- ]
- return manipulations[kind % len(manipulations)]()
- def verify(self) -> str:
- raise NotImplementedError
- class Executor:
- def execute(self, td: str) -> None:
- raise NotImplementedError
- class Scenario:
- def __init__(self, c: Composition, rng: random.Random, num_objects: int):
- self.c = c
- self.rng = rng
- self.num_objects = num_objects
- def _impl(self, num_executions: int) -> str:
- raise NotImplementedError
- def print(self) -> None:
- print(self._impl(1))
- def run_fragment(self, text: str, tries: int = 1) -> None:
- if not text:
- return
- for i in range(tries):
- try:
- self.c.testdrive(text, quiet=True)
- return
- except Exception as e:
- print(e)
- if i == tries - 1:
- print("Failed to run fragment, giving up")
- raise
- print(f"Failed to run fragment, retrying ({i+1}/{tries})")
- def run(self, num_executions: int) -> None:
- self.run_fragment(self._impl(num_executions))
- class Concurrent(Scenario):
- def __init__(self, c: Composition, rng: random.Random, num_objects: int):
- super().__init__(c, rng, num_objects)
- objects = [o for o in list(all_subclasses(Object)) if o.enabled]
- self.objs = [
- rng.choice([o for o in objects if o.can_refer])("o_base", None, rng)
- ]
- self.manipulators = []
- for i in range(num_objects):
- self.objs.append(rng.choice(objects)(f"o_{i}", self.objs[0], rng))
- self.manipulators.append(self.rng.randrange(100))
- def print(self) -> None:
- pass # TODO: print
- def run(self, num_executions: int) -> None:
- for i in range(num_executions):
- # Clean up old state
- self.c.down(destroy_volumes=True)
- self.c.up(*SERVICE_NAMES, {"name": "testdrive", "persistent": True})
- for obj in self.objs:
- self.run_fragment(obj.prepare())
- self.run_fragment(self.objs[0].create())
- def run(o: Object, m: int) -> None:
- try:
- self.run_fragment(o.create(), tries=100)
- self.run_fragment(o.manipulate(m), tries=100)
- finally:
- try:
- self.run_fragment(o.destroy(), tries=100)
- except:
- # Might be in a half-finished state, ignore
- pass
- threads = [
- PropagatingThread(target=lambda: run(obj, manipulator))
- for obj, manipulator in zip(self.objs[1:], self.manipulators)
- ]
- for thread in threads:
- thread.start()
- for thread in threads:
- thread.join()
- self.run_fragment(self.objs[0].destroy())
- class Subsequent(Scenario):
- def __init__(self, c: Composition, rng: random.Random, num_objects: int):
- super().__init__(c, rng, num_objects)
- objects = list(all_subclasses(Object))
- self.objs = [
- rng.choice([o for o in objects if o.can_refer])("o_base", None, rng)
- ]
- self.manipulators = []
- for i in range(num_objects):
- self.objs.append(rng.choice(objects)(f"o_{i}", self.objs[0], rng))
- self.manipulators.append(self.rng.randrange(100))
- def _impl(self, num_executions: int) -> str:
- result = ""
- for i in range(num_executions):
- if i == 0:
- for obj in self.objs:
- result += obj.prepare() + "\n"
- result += self.objs[0].create() + "\n"
- for obj, manipulator in zip(self.objs[1:], self.manipulators):
- result += obj.create() + "\n"
- result += obj.manipulate(manipulator) + "\n"
- result += obj.destroy() + "\n"
- result += self.objs[0].destroy() + "\n"
- return result
- class SubsequentChain(Scenario):
- def __init__(self, c: Composition, rng: random.Random, num_objects: int):
- super().__init__(c, rng, num_objects)
- objects = list(all_subclasses(Object))
- self.objs = [
- rng.choice([o for o in objects if o.can_refer])("o_base", None, rng)
- ]
- self.manipulators = []
- for i in range(num_objects):
- self.objs.append(
- rng.choice([o for o in objects if o.can_refer])(
- f"o_{i}", self.objs[-1], rng
- )
- )
- self.manipulators.append(self.rng.randrange(100))
- def _impl(self, num_executions: int) -> str:
- result = ""
- for i in range(num_executions):
- if i == 0:
- for obj in self.objs:
- result += obj.prepare() + "\n"
- for obj in self.objs:
- result += obj.create() + "\n"
- for obj, manipulator in zip(self.objs[1:], self.manipulators):
- result += obj.manipulate(manipulator) + "\n"
- for obj in reversed(self.objs):
- result += obj.destroy() + "\n"
- return result
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- parser.add_argument("--seed", type=str, default=random.randrange(1000000))
- parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
- parser.add_argument(
- "--repetitions", default=100, type=int, help="Repeatitions per scenario"
- )
- parser.add_argument(
- "--scenario",
- default="subsequent",
- type=str,
- choices=["subsequent", "subsequent-chain", "concurrent"],
- )
- parser.add_argument(
- "--num-objects",
- default=5,
- type=int,
- )
- args = parser.parse_args()
- print(f"--- Random seed is {args.seed}")
- end_time = (
- datetime.datetime.now() + datetime.timedelta(seconds=args.runtime)
- ).timestamp()
- c.up(*SERVICE_NAMES, {"name": "testdrive", "persistent": True})
- seed = args.seed
- while time.time() < end_time:
- rng = random.Random(seed)
- if args.scenario == "subsequent":
- scenario = Subsequent(c, rng, args.num_objects)
- elif args.scenario == "subsequent-chain":
- scenario = Subsequent(c, rng, args.num_objects)
- elif args.scenario == "concurrent":
- scenario = Concurrent(c, rng, args.num_objects)
- else:
- raise ValueError(f"Unknown scenario {args.scenario}")
- print(f"--- Scenario to run (--seed={seed})")
- scenario.print()
- print(f"--- Running scenario {args.repetitions} times")
- scenario.run(args.repetitions)
- seed = rng.randrange(1000000)
|