mzcompose.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test Materialize with the Random Query Generator (grammar-based):
  11. https://github.com/MaterializeInc/RQG/ Can find query errors and panics, but
  12. not correctness.
  13. """
  14. import argparse
  15. from dataclasses import dataclass
  16. from enum import Enum
  17. from materialize.mzcompose.composition import (
  18. Composition,
  19. Service,
  20. WorkflowArgumentParser,
  21. )
  22. from materialize.mzcompose.services.materialized import Materialized
  23. from materialize.mzcompose.services.postgres import Postgres
  24. from materialize.mzcompose.services.rqg import RQG
  25. from materialize.version_ancestor_overrides import (
  26. ANCESTOR_OVERRIDES_FOR_CORRECTNESS_REGRESSIONS,
  27. )
  28. from materialize.version_list import resolve_ancestor_image_tag
  29. SERVICES = [
  30. RQG(),
  31. Materialized(name="mz_this", default_replication_factor=2),
  32. Materialized(name="mz_other", default_replication_factor=2),
  33. Postgres(),
  34. ]
  35. class Dataset(Enum):
  36. SIMPLE = 1
  37. DBT3 = 2
  38. STAR_SCHEMA = 3
  39. def files(self) -> list[str]:
  40. match self:
  41. case Dataset.SIMPLE:
  42. return ["conf/mz/simple.sql"]
  43. case Dataset.DBT3:
  44. # With Postgres, CREATE MATERIALZIED VIEW from dbt3-ddl.sql will produce
  45. # a view thats is empty unless REFRESH MATERIALIZED VIEW from dbt3-ddl-refresh-mvs.sql
  46. # is also run after the data has been loaded by dbt3-s0.0001.dump
  47. return [
  48. "conf/mz/dbt3-ddl.sql",
  49. "conf/mz/dbt3-s0.0001.dump",
  50. "conf/mz/dbt3-ddl-refresh-mvs.sql",
  51. ]
  52. case Dataset.STAR_SCHEMA:
  53. return ["/workdir/datasets/star_schema.sql"]
  54. case _:
  55. raise RuntimeError(f"Not handled: {self}")
  56. class ReferenceImplementation(Enum):
  57. MATERIALIZE = 1
  58. POSTGRES = 2
  59. def dsn(self) -> str:
  60. match self:
  61. case ReferenceImplementation.MATERIALIZE:
  62. return "dbname=materialize;host=mz_other;user=materialize;port=6875"
  63. case ReferenceImplementation.POSTGRES:
  64. return "dbname=postgres;host=postgres;user=postgres;password=postgres"
  65. case _:
  66. raise RuntimeError("Unsupported case")
  67. @dataclass
  68. class Workload:
  69. name: str
  70. # All paths are relative to the CWD of the rqg container, which is /RQG and contains
  71. # a checked-out copy of the MaterializeInc/RQG repository
  72. # Use /workdir/file-name-goes-here.yy for files located in test/rqg
  73. grammar: str
  74. reference_implementation: ReferenceImplementation | None
  75. dataset: Dataset | None = None
  76. duration: int = 30 * 60
  77. queries: int = 100000000
  78. disabled: bool = False
  79. threads: int = 4
  80. validator: str | None = None
  81. def dataset_files(self) -> list[str]:
  82. return self.dataset.files() if self.dataset is not None else []
  83. WORKLOADS = [
  84. Workload(
  85. name="simple-aggregates",
  86. dataset=Dataset.SIMPLE,
  87. grammar="conf/mz/simple-aggregates.yy",
  88. reference_implementation=ReferenceImplementation.POSTGRES,
  89. validator="ResultsetComparatorSimplify",
  90. ),
  91. Workload(
  92. name="lateral-joins",
  93. dataset=Dataset.SIMPLE,
  94. grammar="conf/mz/lateral-joins.yy",
  95. reference_implementation=ReferenceImplementation.POSTGRES,
  96. validator="ResultsetComparatorSimplify",
  97. ),
  98. Workload(
  99. name="dbt3-joins",
  100. dataset=Dataset.DBT3,
  101. grammar="conf/mz/dbt3-joins.yy",
  102. reference_implementation=ReferenceImplementation.POSTGRES,
  103. validator="ResultsetComparatorSimplify",
  104. ),
  105. Workload(
  106. name="subqueries",
  107. dataset=Dataset.SIMPLE,
  108. grammar="conf/mz/subqueries.yy",
  109. reference_implementation=ReferenceImplementation.POSTGRES,
  110. validator="ResultsetComparatorSimplify",
  111. ),
  112. Workload(
  113. name="window-functions",
  114. dataset=Dataset.SIMPLE,
  115. grammar="conf/mz/window-functions.yy",
  116. reference_implementation=ReferenceImplementation.POSTGRES,
  117. validator="ResultsetComparatorSimplify",
  118. ),
  119. Workload(
  120. name="wmr",
  121. grammar="conf/mz/with-mutually-recursive.yy",
  122. # Postgres does not support WMR, so our only hope for a comparison
  123. # test is to use a previous Mz version via --other-tag=...
  124. reference_implementation=ReferenceImplementation.MATERIALIZE,
  125. validator="ResultsetComparatorSimplify",
  126. # See https://github.com/MaterializeInc/database-issues/issues/9439
  127. threads=1,
  128. ),
  129. Workload(
  130. # A workload that performs DML that preserve the dataset's invariants
  131. # and also checks that those invariants are not violated
  132. name="banking",
  133. grammar="conf/mz/banking.yy",
  134. reference_implementation=None,
  135. validator="QueryProperties,RepeatableRead",
  136. ),
  137. # Added as part of MaterializeInc/database-issues#7561.
  138. Workload(
  139. name="left-join-stacks",
  140. dataset=Dataset.STAR_SCHEMA,
  141. grammar="/workdir/grammars/left_join_stacks.yy",
  142. reference_implementation=ReferenceImplementation.POSTGRES,
  143. validator="ResultsetComparatorSimplify",
  144. queries=1000, # Reduced no. of queries because the grammar is quite focused.
  145. ),
  146. ]
  147. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  148. parser.add_argument(
  149. "--this-tag",
  150. help="Run Materialize with this git tag on port 16875",
  151. )
  152. parser.add_argument(
  153. "--other-tag",
  154. action=StoreOtherTag,
  155. help="Run Materialize with this git tag on port 26875 (for workloads that compare two MZ instances)",
  156. )
  157. parser.add_argument(
  158. "--grammar",
  159. type=str,
  160. help="Override the default grammar of the workload",
  161. )
  162. parser.add_argument(
  163. "--dataset",
  164. type=str,
  165. action="append",
  166. help="Override the dataset files for the workload",
  167. )
  168. parser.add_argument(
  169. "--starting-rule",
  170. type=str,
  171. help="Override the default starting-rule for the workload",
  172. )
  173. parser.add_argument(
  174. "--duration",
  175. type=int,
  176. help="Run the Workload for the specified time in seconds",
  177. )
  178. parser.add_argument(
  179. "--queries",
  180. type=int,
  181. help="Run the Workload for the specified number of queries",
  182. )
  183. parser.add_argument(
  184. "--threads",
  185. type=int,
  186. help="Run the Workload with the specified number of concurrent threads",
  187. )
  188. parser.add_argument(
  189. "--sqltrace", action="store_true", help="Print all generated SQL statements"
  190. )
  191. parser.add_argument(
  192. "--skip-recursive-rules",
  193. action="store_true",
  194. help="Generate simpler queries by avoiding recursive productions",
  195. )
  196. parser.add_argument(
  197. "--seed",
  198. metavar="SEED",
  199. type=str,
  200. help="Random seed to use.",
  201. )
  202. parser.add_argument(
  203. "workloads", nargs="*", default=None, help="Run specified workloads"
  204. )
  205. args = parser.parse_args()
  206. c.up({"name": "rqg", "persistent": True})
  207. if args.workloads is not None and len(args.workloads) > 0:
  208. workloads_to_run = [w for w in WORKLOADS if w.name in args.workloads]
  209. else:
  210. workloads_to_run = [w for w in WORKLOADS if not w.disabled]
  211. assert (
  212. len(workloads_to_run) > 0
  213. ), f"No matching workloads found (args was {args.workloads})"
  214. for workload in workloads_to_run:
  215. print(f"--- Running workload {workload.name}: {workload} ...")
  216. run_workload(c, args, workload)
  217. def run_workload(c: Composition, args: argparse.Namespace, workload: Workload) -> None:
  218. def materialize_image(tag: str | None) -> str | None:
  219. return f"materialize/materialized:{tag}" if tag else None
  220. # A list of psql-compatible services participating in the test
  221. participants: list[Service] = [
  222. Materialized(
  223. name="mz_this",
  224. ports=["16875:6875", "16876:6876", "16877:6877", "16878:6878"],
  225. image=materialize_image(args.this_tag),
  226. use_default_volumes=False,
  227. default_replication_factor=2,
  228. ),
  229. ]
  230. # A list of psql URLs for dataset initialization
  231. psql_urls = ["postgresql://materialize@mz_this:6875/materialize"]
  232. # If we have --other-tag, assume we want to run a comparison test against Materialize
  233. reference_impl = (
  234. ReferenceImplementation.MATERIALIZE
  235. if args.other_tag and workload.reference_implementation is not None
  236. else workload.reference_implementation
  237. )
  238. match reference_impl:
  239. case ReferenceImplementation.MATERIALIZE:
  240. participants.append(
  241. Materialized(
  242. name="mz_other",
  243. image=materialize_image(args.other_tag),
  244. ports=["26875:6875", "26876:6876", "26877:6877", "26878:6878"],
  245. use_default_volumes=False,
  246. default_replication_factor=2,
  247. )
  248. )
  249. psql_urls.append("postgresql://materialize@mz_other:6875/materialize")
  250. case ReferenceImplementation.POSTGRES:
  251. participants.append(Postgres(ports=["15432:5432"]))
  252. psql_urls.append("postgresql://postgres:postgres@postgres/postgres")
  253. case None:
  254. pass
  255. case _:
  256. raise RuntimeError(
  257. f"Unsupported reference implementation: {reference_impl}"
  258. )
  259. dsn1 = "dbi:Pg:dbname=materialize;host=mz_this;user=materialize;port=6875"
  260. dsn2 = f"dbi:Pg:{reference_impl.dsn()}" if reference_impl else None
  261. dataset = args.dataset if args.dataset is not None else workload.dataset_files()
  262. grammar = str(args.grammar) if args.grammar is not None else workload.grammar
  263. queries = int(args.queries) if args.queries is not None else workload.queries
  264. threads = int(args.threads) if args.threads is not None else workload.threads
  265. duration = int(args.duration) if args.duration is not None else workload.duration
  266. with c.override(*participants):
  267. try:
  268. c.up(*[p.name for p in participants])
  269. for file in dataset:
  270. for psql_url in psql_urls:
  271. print(f"--- Populating {psql_url} with {file} ...")
  272. c.exec("rqg", "bash", "-c", f"psql -f {file} {psql_url}")
  273. c.exec(
  274. "rqg",
  275. "perl",
  276. "gentest.pl",
  277. f"--seed={args.seed}",
  278. f"--dsn1={dsn1}",
  279. f"--dsn2={dsn2}" if dsn2 else "",
  280. f"--grammar={grammar}",
  281. f"--queries={queries}",
  282. f"--threads={threads}",
  283. f"--duration={duration}",
  284. f"--validator={workload.validator}" if workload.validator else "",
  285. f"--starting-rule={args.starting_rule}" if args.starting_rule else "",
  286. "--sqltrace" if args.sqltrace else "",
  287. "--skip-recursive-rules" if args.skip_recursive_rules else "",
  288. )
  289. finally:
  290. c.capture_logs()
  291. class StoreOtherTag(argparse.Action):
  292. """Resolve common ancestor during argument parsing"""
  293. def __call__(self, parser, namespace, values, option_string=None):
  294. if values == "common-ancestor":
  295. tag = resolve_ancestor_image_tag(
  296. ANCESTOR_OVERRIDES_FOR_CORRECTNESS_REGRESSIONS
  297. )
  298. print(f"Resolving --other-tag to {tag}")
  299. else:
  300. tag = str(values)
  301. setattr(namespace, self.dest, tag)