mzcompose.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Tries to find race conditions in Materialize, mostly DDLs. Can find panics and wrong results.
  11. """
  12. import datetime
  13. import random
  14. import time
  15. from textwrap import dedent
  16. from uuid import uuid4
  17. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  18. from materialize.mzcompose.services.kafka import Kafka
  19. from materialize.mzcompose.services.materialized import Materialized
  20. from materialize.mzcompose.services.minio import Mc, Minio
  21. from materialize.mzcompose.services.mysql import MySql
  22. from materialize.mzcompose.services.postgres import Postgres
  23. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  24. from materialize.mzcompose.services.testdrive import Testdrive
  25. from materialize.mzcompose.services.zookeeper import Zookeeper
  26. from materialize.util import PropagatingThread, all_subclasses
  27. SERVICES = [
  28. Postgres(),
  29. MySql(),
  30. Zookeeper(),
  31. Kafka(
  32. auto_create_topics=False,
  33. ports=["30123:30123"],
  34. allow_host_ports=True,
  35. environment_extra=[
  36. "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
  37. "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
  38. ],
  39. ),
  40. SchemaRegistry(),
  41. Minio(setup_materialize=True, additional_directories=["copytos3"]),
  42. Testdrive(no_reset=True, consistent_seed=True, default_timeout="600s"),
  43. Mc(),
  44. Materialized(default_replication_factor=2),
  45. ]
  46. SERVICE_NAMES = [
  47. "postgres",
  48. "mysql",
  49. "zookeeper",
  50. "kafka",
  51. "schema-registry",
  52. # Still required for backups/s3 testing even when we use Azurite as blob store
  53. "minio",
  54. "materialized",
  55. ]
  56. class Object:
  57. name: str
  58. references: "Object | None"
  59. can_refer: bool = True
  60. enabled: bool = True
  61. def __init__(self, name: str, references: "Object | None", rng: random.Random):
  62. self.name = name
  63. self.references = references
  64. def prepare(self) -> str:
  65. return ""
  66. def create(self) -> str:
  67. raise NotImplementedError
  68. def destroy(self) -> str:
  69. raise NotImplementedError
  70. def manipulate(self, kind: int) -> str:
  71. manipulations = [
  72. lambda: "",
  73. ]
  74. return manipulations[kind % len(manipulations)]()
  75. def verify(self) -> str:
  76. raise NotImplementedError
  77. class UpsertSource(Object):
  78. def prepare(self) -> str:
  79. return dedent(
  80. f"""
  81. $ set keyschema={{
  82. "type": "record",
  83. "name": "Key",
  84. "fields": [
  85. {{"name": "b", "type": "string"}},
  86. {{"name": "a", "type": "long"}}
  87. ]
  88. }}
  89. $ set schema={{
  90. "type" : "record",
  91. "name" : "envelope",
  92. "fields" : [
  93. {{
  94. "name": "before",
  95. "type": [
  96. {{
  97. "name": "row",
  98. "type": "record",
  99. "fields": [
  100. {{
  101. "name": "a",
  102. "type": "long"
  103. }},
  104. {{
  105. "name": "data",
  106. "type": "string"
  107. }},
  108. {{
  109. "name": "b",
  110. "type": "string"
  111. }}]
  112. }},
  113. "null"
  114. ]
  115. }},
  116. {{
  117. "name": "after",
  118. "type": ["row", "null"]
  119. }}
  120. ]
  121. }}
  122. $ kafka-create-topic topic={self.name} partitions=1
  123. $ kafka-ingest format=avro topic={self.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat=1000000
  124. {{"b": "bdata", "a": ${{kafka-ingest.iteration}}}} {{"before": {{"row": {{"a": ${{kafka-ingest.iteration}}, "data": "fish", "b": "bdata"}}}}, "after": {{"row": {{"a": ${{kafka-ingest.iteration}}, "data": "fish2", "b": "bdata"}}}}}}
  125. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  126. URL '${{testdrive.schema-registry-url}}')
  127. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  128. TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT)
  129. > DROP SOURCE IF EXISTS {self.name}_source CASCADE
  130. > CREATE SOURCE {self.name}_source
  131. IN CLUSTER quickstart
  132. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-{self.name}-${{testdrive.seed}}')"""
  133. )
  134. def create(self) -> str:
  135. return dedent(
  136. f"""
  137. > CREATE TABLE {self.name} FROM SOURCE {self.name}_source (REFERENCE "testdrive-{self.name}-${{testdrive.seed}}")
  138. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  139. ENVELOPE DEBEZIUM"""
  140. )
  141. def destroy(self) -> str:
  142. return f"> DROP TABLE {self.name} CASCADE"
  143. def manipulate(self, kind: int) -> str:
  144. manipulations = [
  145. lambda: "",
  146. ]
  147. return manipulations[kind % len(manipulations)]()
  148. def verify(self) -> str:
  149. raise NotImplementedError
  150. class Table(Object):
  151. def create(self) -> str:
  152. return f"> CREATE TABLE {self.name} (a TEXT, b TEXT)"
  153. def destroy(self) -> str:
  154. return f"> DROP TABLE {self.name} CASCADE"
  155. def manipulate(self, kind: int) -> str:
  156. manipulations = [
  157. lambda: "",
  158. ]
  159. return manipulations[kind % len(manipulations)]()
  160. def verify(self) -> str:
  161. raise NotImplementedError
  162. # TODO: How to handle things like clusters, replicas?
  163. # TODO: Add more manipulations: inserts, updates, deletes, ALTER RENAME (twice)
  164. class PostgresSource(Object):
  165. def prepare(self) -> str:
  166. return dedent(
  167. f"""
  168. $ postgres-execute connection=postgres://postgres:postgres@postgres
  169. DROP USER IF EXISTS {self.name}_role;
  170. CREATE USER {self.name}_role WITH SUPERUSER PASSWORD 'postgres';
  171. ALTER USER {self.name}_role WITH replication;
  172. DROP PUBLICATION IF EXISTS {self.name}_source;
  173. DROP TABLE IF EXISTS {self.name}_table;
  174. CREATE TABLE {self.name}_table (a TEXT, b TEXT);
  175. ALTER TABLE {self.name}_table REPLICA IDENTITY FULL;
  176. CREATE PUBLICATION {self.name}_source FOR TABLE {self.name}_table;
  177. INSERT INTO {self.name}_table VALUES ('foo', 'bar');
  178. > DROP SECRET IF EXISTS {self.name}_pass CASCADE
  179. > CREATE SECRET {self.name}_pass AS 'postgres'
  180. > DROP CONNECTION IF EXISTS {self.name}_conn CASCADE
  181. > CREATE CONNECTION {self.name}_conn FOR POSTGRES
  182. HOST 'postgres',
  183. DATABASE postgres,
  184. USER {self.name}_role,
  185. PASSWORD SECRET {self.name}_pass
  186. > DROP SOURCE IF EXISTS {self.name}_source
  187. > CREATE SOURCE {self.name}_source
  188. IN CLUSTER quickstart
  189. FROM POSTGRES CONNECTION {self.name}_conn
  190. (PUBLICATION '{self.name}_source')"""
  191. )
  192. def create(self) -> str:
  193. return f"> CREATE TABLE {self.name} FROM SOURCE {self.name}_source (REFERENCE {self.name}_table)"
  194. def destroy(self) -> str:
  195. return f"> DROP TABLE {self.name} CASCADE"
  196. def manipulate(self, kind: int) -> str:
  197. manipulations = [
  198. lambda: "",
  199. lambda: dedent(
  200. f"""
  201. $ postgres-execute connection=postgres://postgres:postgres@postgres
  202. INSERT INTO {self.name}_table VALUES ('foo', 'bar');"""
  203. ),
  204. lambda: dedent(
  205. f"""
  206. $ postgres-execute connection=postgres://postgres:postgres@postgres
  207. UPDATE {self.name}_table SET b = b || 'bar' WHERE true;"""
  208. ),
  209. lambda: dedent(
  210. f"""
  211. $ postgres-execute connection=postgres://postgres:postgres@postgres
  212. DELETE FROM {self.name}_table WHERE LENGTH(b) > 12;"""
  213. ),
  214. lambda: dedent(
  215. f"""
  216. > DROP TABLE IF EXISTS {self.name}_tmp_table
  217. > ALTER TABLE {self.name} RENAME TO {self.name}_tmp_table
  218. > ALTER TABLE {self.name}_tmp_table RENAME TO {self.name}
  219. """
  220. ),
  221. ]
  222. return manipulations[kind % len(manipulations)]()
  223. def verify(self) -> str:
  224. raise NotImplementedError
  225. # TODO: Can't set up with an empty table in mysql? ERROR: reference to public.o_0_table not found in source
  226. class MySqlSource(Object):
  227. def prepare(self) -> str:
  228. return dedent(
  229. f"""
  230. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  231. $ mysql-execute name=mysql
  232. # create the database if it does not exist yet but do not drop it
  233. CREATE DATABASE IF NOT EXISTS public;
  234. USE public;
  235. CREATE USER IF NOT EXISTS {self.name}_role IDENTIFIED BY 'mysql';
  236. GRANT REPLICATION SLAVE ON *.* TO {self.name}_role;
  237. GRANT ALL ON public.* TO {self.name}_role;
  238. DROP TABLE IF EXISTS {self.name}_table;
  239. CREATE TABLE {self.name}_table (a TEXT, b TEXT);
  240. INSERT INTO {self.name}_table VALUES ('foo', 'bar');
  241. > DROP SECRET IF EXISTS {self.name}_pass CASCADE
  242. > CREATE SECRET {self.name}_pass AS 'mysql'
  243. > DROP CONNECTION IF EXISTS {self.name}_conn CASCADE
  244. > CREATE CONNECTION {self.name}_conn TO MYSQL (
  245. HOST 'mysql',
  246. USER {self.name}_role,
  247. PASSWORD SECRET {self.name}_pass
  248. )
  249. > DROP SOURCE IF EXISTS {self.name}_source
  250. > CREATE SOURCE {self.name}_source
  251. IN CLUSTER quickstart
  252. FROM MYSQL CONNECTION {self.name}_conn;"""
  253. )
  254. def create(self) -> str:
  255. return f"> CREATE TABLE {self.name} FROM SOURCE {self.name}_source (REFERENCE public.{self.name}_table)"
  256. def destroy(self) -> str:
  257. return f"> DROP TABLE {self.name} CASCADE"
  258. def manipulate(self, kind: int) -> str:
  259. manipulations = [
  260. lambda: "",
  261. lambda: dedent(
  262. f"""
  263. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  264. $ mysql-execute name=mysql
  265. USE public;
  266. INSERT INTO {self.name}_table VALUES ('foo', 'bar');"""
  267. ),
  268. lambda: dedent(
  269. f"""
  270. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  271. $ mysql-execute name=mysql
  272. USE public;
  273. UPDATE {self.name}_table SET b = CONCAT(b, 'bar') WHERE true;"""
  274. ),
  275. lambda: dedent(
  276. f"""
  277. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  278. $ mysql-execute name=mysql
  279. USE public;
  280. DELETE FROM {self.name}_table WHERE LENGTH(b) > 12;"""
  281. ),
  282. lambda: dedent(
  283. f"""
  284. > DROP TABLE IF EXISTS {self.name}_tmp_table
  285. > ALTER TABLE {self.name} RENAME TO {self.name}_tmp_table
  286. > ALTER TABLE {self.name}_tmp_table RENAME TO {self.name}
  287. """
  288. ),
  289. ]
  290. return manipulations[kind % len(manipulations)]()
  291. def verify(self) -> str:
  292. raise NotImplementedError
  293. class LoadGeneratorSource(Object):
  294. def __init__(self, name: str, references: "Object | None", rng: random.Random):
  295. super().__init__(name, references, rng)
  296. self.tick_interval = rng.choice(["1ms", "10ms", "100ms", "1s", "10s"])
  297. def create(self) -> str:
  298. return f"> CREATE SOURCE {self.name} IN CLUSTER quickstart FROM LOAD GENERATOR COUNTER (TICK INTERVAL '{self.tick_interval}')"
  299. def destroy(self) -> str:
  300. return f"> DROP SOURCE {self.name} CASCADE"
  301. def manipulate(self, kind: int) -> str:
  302. manipulations = [
  303. lambda: "",
  304. lambda: dedent(
  305. f"""
  306. > DROP SOURCE IF EXISTS {self.name}_tmp_source
  307. > ALTER SOURCE {self.name} RENAME TO {self.name}_tmp_source
  308. > ALTER SOURCE {self.name}_tmp_source RENAME TO {self.name}
  309. """
  310. ),
  311. ]
  312. return manipulations[kind % len(manipulations)]()
  313. def verify(self) -> str:
  314. raise NotImplementedError
  315. class WebhookSource(Object):
  316. def __init__(self, name: str, references: "Object | None", rng: random.Random):
  317. super().__init__(name, references, rng)
  318. self.body_format = rng.choice(["TEXT", "JSON", "JSON ARRAY", "BYTES"])
  319. def create(self) -> str:
  320. return dedent(
  321. f"""
  322. > DROP CLUSTER IF EXISTS {self.name}_cluster
  323. > CREATE CLUSTER {self.name}_cluster SIZE '1', REPLICATION FACTOR 1
  324. > CREATE SOURCE {self.name} IN CLUSTER {self.name}_cluster FROM WEBHOOK BODY FORMAT {self.body_format}
  325. """
  326. )
  327. def destroy(self) -> str:
  328. return dedent(
  329. f"""
  330. > DROP CLUSTER {self.name}_cluster CASCADE
  331. """
  332. )
  333. def manipulate(self, kind: int) -> str:
  334. manipulations = [
  335. lambda: "",
  336. lambda: dedent(
  337. f"""
  338. > DROP SOURCE IF EXISTS {self.name}_tmp_source
  339. > ALTER SOURCE {self.name} RENAME TO {self.name}_tmp_source
  340. > ALTER SOURCE {self.name}_tmp_source RENAME TO {self.name}
  341. """
  342. ),
  343. ]
  344. return manipulations[kind % len(manipulations)]()
  345. def verify(self) -> str:
  346. raise NotImplementedError
  347. class KafkaSink(Object):
  348. can_refer: bool = False
  349. def __init__(self, name: str, references: Object | None, rng: random.Random):
  350. super().__init__(name, references, rng)
  351. self.format = rng.choice(
  352. [
  353. "JSON",
  354. "AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn",
  355. ]
  356. )
  357. def create(self) -> str:
  358. self.references_str = (
  359. self.references.name if self.references else f"{self.name}_view"
  360. )
  361. cmds = []
  362. if not self.references:
  363. cmds.append(
  364. f"> CREATE MATERIALIZED VIEW IF NOT EXISTS {self.references_str} AS SELECT 'foo' AS a, 'bar' AS b"
  365. )
  366. elif isinstance(self.references, View):
  367. self.references_str = f"{self.name}_mv"
  368. cmds.append(
  369. f"> CREATE MATERIALIZED VIEW IF NOT EXISTS {self.references_str} AS SELECT * FROM {self.references.name}"
  370. )
  371. # See database-issues#9048, topic has to be unique
  372. topic = f"{self.name}-{uuid4()}"
  373. cmds.append(
  374. dedent(
  375. f"""
  376. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  377. TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT)
  378. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  379. URL '${{testdrive.schema-registry-url}}'
  380. )
  381. > CREATE SINK {self.name}
  382. IN CLUSTER quickstart
  383. FROM {self.references_str}
  384. INTO KAFKA CONNECTION kafka_conn (TOPIC '{topic}')
  385. FORMAT {self.format}
  386. ENVELOPE DEBEZIUM"""
  387. )
  388. )
  389. return "\n".join(cmds)
  390. def destroy(self) -> str:
  391. return f"> DROP SINK {self.name} CASCADE"
  392. def manipulate(self, kind: int) -> str:
  393. manipulations = [
  394. lambda: "",
  395. lambda: dedent(
  396. f"""
  397. > DROP MATERIALIZED VIEW IF EXISTS {self.name}_tmp_mv
  398. > CREATE MATERIALIZED VIEW {self.name}_tmp_mv AS SELECT * FROM {self.references_str}
  399. > ALTER SINK {self.name} SET FROM {self.name}_tmp_mv
  400. > ALTER SINK {self.name} SET FROM {self.references_str}
  401. > DROP MATERIALIZED VIEW {self.name}_tmp_mv
  402. """
  403. ),
  404. lambda: dedent(
  405. f"""
  406. > DROP SINK IF EXISTS {self.name}_tmp_sink
  407. > ALTER SINK {self.name} RENAME TO {self.name}_tmp_sink
  408. > ALTER SINK {self.name}_tmp_sink RENAME TO {self.name}
  409. """
  410. ),
  411. ]
  412. return manipulations[kind % len(manipulations)]()
  413. def verify(self) -> str:
  414. raise NotImplementedError
  415. class View(Object):
  416. def create(self) -> str:
  417. return f'> CREATE VIEW {self.name} AS SELECT {"* FROM " + self.references.name if self.references else "'foo' AS a, 'bar' AS b"}'
  418. def destroy(self) -> str:
  419. return f"> DROP VIEW {self.name} CASCADE"
  420. def manipulate(self, kind: int) -> str:
  421. manipulations = [
  422. lambda: "",
  423. lambda: dedent(
  424. f"""
  425. > DROP VIEW IF EXISTS {self.name}_tmp_view
  426. > ALTER VIEW {self.name} RENAME TO {self.name}_tmp_view
  427. > ALTER VIEW {self.name}_tmp_view RENAME TO {self.name}
  428. """
  429. ),
  430. ]
  431. return manipulations[kind % len(manipulations)]()
  432. def verify(self) -> str:
  433. raise NotImplementedError
  434. class MaterializedView(Object):
  435. def create(self) -> str:
  436. return f'> CREATE MATERIALIZED VIEW {self.name} AS SELECT {"* FROM " + self.references.name if self.references else "'foo' AS a, 'bar' AS b"}'
  437. def destroy(self) -> str:
  438. return f"> DROP MATERIALIZED VIEW {self.name} CASCADE"
  439. def manipulate(self, kind: int) -> str:
  440. manipulations = [
  441. lambda: "",
  442. lambda: dedent(
  443. f"""
  444. > DROP MATERIALIZED VIEW IF EXISTS {self.name}_tmp_mv
  445. > ALTER MATERIALIZED VIEW {self.name} RENAME TO {self.name}_tmp_mv
  446. > ALTER MATERIALIZED VIEW {self.name}_tmp_mv RENAME TO {self.name}
  447. """
  448. ),
  449. ]
  450. return manipulations[kind % len(manipulations)]()
  451. def verify(self) -> str:
  452. raise NotImplementedError
  453. class DefaultIndex(Object):
  454. can_refer: bool = False
  455. def create(self) -> str:
  456. return (
  457. f"> CREATE DEFAULT INDEX ON {self.references.name}"
  458. if self.references
  459. else ""
  460. )
  461. def destroy(self) -> str:
  462. return (
  463. f"> DROP INDEX {self.references.name}_primary_idx"
  464. if self.references
  465. else ""
  466. )
  467. def manipulate(self, kind: int) -> str:
  468. manipulations = [
  469. lambda: "",
  470. ]
  471. return manipulations[kind % len(manipulations)]()
  472. def verify(self) -> str:
  473. raise NotImplementedError
  474. class Executor:
  475. def execute(self, td: str) -> None:
  476. raise NotImplementedError
  477. class Scenario:
  478. def __init__(self, c: Composition, rng: random.Random, num_objects: int):
  479. self.c = c
  480. self.rng = rng
  481. self.num_objects = num_objects
  482. def _impl(self, num_executions: int) -> str:
  483. raise NotImplementedError
  484. def print(self) -> None:
  485. print(self._impl(1))
  486. def run_fragment(self, text: str, tries: int = 1) -> None:
  487. if not text:
  488. return
  489. for i in range(tries):
  490. try:
  491. self.c.testdrive(text, quiet=True)
  492. return
  493. except Exception as e:
  494. print(e)
  495. if i == tries - 1:
  496. print("Failed to run fragment, giving up")
  497. raise
  498. print(f"Failed to run fragment, retrying ({i+1}/{tries})")
  499. def run(self, num_executions: int) -> None:
  500. self.run_fragment(self._impl(num_executions))
  501. class Concurrent(Scenario):
  502. def __init__(self, c: Composition, rng: random.Random, num_objects: int):
  503. super().__init__(c, rng, num_objects)
  504. objects = [o for o in list(all_subclasses(Object)) if o.enabled]
  505. self.objs = [
  506. rng.choice([o for o in objects if o.can_refer])("o_base", None, rng)
  507. ]
  508. self.manipulators = []
  509. for i in range(num_objects):
  510. self.objs.append(rng.choice(objects)(f"o_{i}", self.objs[0], rng))
  511. self.manipulators.append(self.rng.randrange(100))
  512. def print(self) -> None:
  513. pass # TODO: print
  514. def run(self, num_executions: int) -> None:
  515. for i in range(num_executions):
  516. # Clean up old state
  517. self.c.down(destroy_volumes=True)
  518. self.c.up(*SERVICE_NAMES, {"name": "testdrive", "persistent": True})
  519. for obj in self.objs:
  520. self.run_fragment(obj.prepare())
  521. self.run_fragment(self.objs[0].create())
  522. def run(o: Object, m: int) -> None:
  523. try:
  524. self.run_fragment(o.create(), tries=100)
  525. self.run_fragment(o.manipulate(m), tries=100)
  526. finally:
  527. try:
  528. self.run_fragment(o.destroy(), tries=100)
  529. except:
  530. # Might be in a half-finished state, ignore
  531. pass
  532. threads = [
  533. PropagatingThread(target=lambda: run(obj, manipulator))
  534. for obj, manipulator in zip(self.objs[1:], self.manipulators)
  535. ]
  536. for thread in threads:
  537. thread.start()
  538. for thread in threads:
  539. thread.join()
  540. self.run_fragment(self.objs[0].destroy())
  541. class Subsequent(Scenario):
  542. def __init__(self, c: Composition, rng: random.Random, num_objects: int):
  543. super().__init__(c, rng, num_objects)
  544. objects = list(all_subclasses(Object))
  545. self.objs = [
  546. rng.choice([o for o in objects if o.can_refer])("o_base", None, rng)
  547. ]
  548. self.manipulators = []
  549. for i in range(num_objects):
  550. self.objs.append(rng.choice(objects)(f"o_{i}", self.objs[0], rng))
  551. self.manipulators.append(self.rng.randrange(100))
  552. def _impl(self, num_executions: int) -> str:
  553. result = ""
  554. for i in range(num_executions):
  555. if i == 0:
  556. for obj in self.objs:
  557. result += obj.prepare() + "\n"
  558. result += self.objs[0].create() + "\n"
  559. for obj, manipulator in zip(self.objs[1:], self.manipulators):
  560. result += obj.create() + "\n"
  561. result += obj.manipulate(manipulator) + "\n"
  562. result += obj.destroy() + "\n"
  563. result += self.objs[0].destroy() + "\n"
  564. return result
  565. class SubsequentChain(Scenario):
  566. def __init__(self, c: Composition, rng: random.Random, num_objects: int):
  567. super().__init__(c, rng, num_objects)
  568. objects = list(all_subclasses(Object))
  569. self.objs = [
  570. rng.choice([o for o in objects if o.can_refer])("o_base", None, rng)
  571. ]
  572. self.manipulators = []
  573. for i in range(num_objects):
  574. self.objs.append(
  575. rng.choice([o for o in objects if o.can_refer])(
  576. f"o_{i}", self.objs[-1], rng
  577. )
  578. )
  579. self.manipulators.append(self.rng.randrange(100))
  580. def _impl(self, num_executions: int) -> str:
  581. result = ""
  582. for i in range(num_executions):
  583. if i == 0:
  584. for obj in self.objs:
  585. result += obj.prepare() + "\n"
  586. for obj in self.objs:
  587. result += obj.create() + "\n"
  588. for obj, manipulator in zip(self.objs[1:], self.manipulators):
  589. result += obj.manipulate(manipulator) + "\n"
  590. for obj in reversed(self.objs):
  591. result += obj.destroy() + "\n"
  592. return result
  593. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  594. parser.add_argument("--seed", type=str, default=random.randrange(1000000))
  595. parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
  596. parser.add_argument(
  597. "--repetitions", default=100, type=int, help="Repeatitions per scenario"
  598. )
  599. parser.add_argument(
  600. "--scenario",
  601. default="subsequent",
  602. type=str,
  603. choices=["subsequent", "subsequent-chain", "concurrent"],
  604. )
  605. parser.add_argument(
  606. "--num-objects",
  607. default=5,
  608. type=int,
  609. )
  610. args = parser.parse_args()
  611. print(f"--- Random seed is {args.seed}")
  612. end_time = (
  613. datetime.datetime.now() + datetime.timedelta(seconds=args.runtime)
  614. ).timestamp()
  615. c.up(*SERVICE_NAMES, {"name": "testdrive", "persistent": True})
  616. seed = args.seed
  617. while time.time() < end_time:
  618. rng = random.Random(seed)
  619. if args.scenario == "subsequent":
  620. scenario = Subsequent(c, rng, args.num_objects)
  621. elif args.scenario == "subsequent-chain":
  622. scenario = Subsequent(c, rng, args.num_objects)
  623. elif args.scenario == "concurrent":
  624. scenario = Concurrent(c, rng, args.num_objects)
  625. else:
  626. raise ValueError(f"Unknown scenario {args.scenario}")
  627. print(f"--- Scenario to run (--seed={seed})")
  628. scenario.print()
  629. print(f"--- Running scenario {args.repetitions} times")
  630. scenario.run(args.repetitions)
  631. seed = rng.randrange(1000000)