mzcompose.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Use SQLsmith to generate random queries (AST/code based) and run them against
  11. Materialize: https://github.com/MaterializeInc/sqlsmith The queries can be
  12. complex, but we can't verify correctness or performance.
  13. """
  14. import json
  15. import random
  16. import time
  17. from datetime import datetime
  18. from threading import Thread
  19. from typing import Any
  20. from materialize.mzcompose.composition import (
  21. Composition,
  22. Service,
  23. WorkflowArgumentParser,
  24. )
  25. from materialize.mzcompose.services.materialized import Materialized
  26. from materialize.sqlsmith import known_errors
  27. TOTAL_MEMORY = 8
  28. NUM_SERVERS = 2
  29. MZ_SERVERS = [f"mz_{i + 1}" for i in range(NUM_SERVERS)]
  30. SERVICES = [
  31. # Auto-restart so we can keep testing even after we ran into a panic
  32. # Limit memory to prevent long hangs on out of memory
  33. # Don't use default volumes so we can run multiple instances at once
  34. Materialized(
  35. name=mz_server,
  36. restart="on-failure",
  37. memory=f"{TOTAL_MEMORY / len(MZ_SERVERS)}GB",
  38. use_default_volumes=False,
  39. default_replication_factor=2,
  40. )
  41. for mz_server in MZ_SERVERS
  42. ] + [
  43. Service(
  44. "sqlsmith",
  45. {
  46. "mzbuild": "sqlsmith",
  47. },
  48. ),
  49. ]
  50. def is_known_error(e: str) -> bool:
  51. for known_error in known_errors:
  52. if known_error in e:
  53. return True
  54. return False
  55. def run_sqlsmith(c: Composition, cmd: str, aggregate: dict[str, Any]) -> None:
  56. result = c.run(
  57. *cmd,
  58. capture=True,
  59. check=False, # We still get back parsable json on failure, so keep going
  60. )
  61. if result.returncode not in (0, 1):
  62. if result.returncode == 137:
  63. raise Exception("[SQLsmith] OOMed (return code 137)")
  64. raise Exception(
  65. f"[SQLsmith] Unexpected return code in SQLsmith: {result.returncode}\n{result.stdout}"
  66. )
  67. data = json.loads(result.stdout)
  68. aggregate["version"] = data["version"]
  69. aggregate["queries"] += data["queries"]
  70. aggregate["errors"].extend(data["errors"])
  71. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  72. parser.add_argument("--num-sqlsmith", default=len(MZ_SERVERS), type=int)
  73. # parser.add_argument("--queries", default=10000, type=int)
  74. parser.add_argument("--runtime", default=600, type=int)
  75. parser.add_argument("--max-joins", default=5, type=int)
  76. parser.add_argument("--explain-only", action="store_true")
  77. parser.add_argument("--exclude-catalog", default=False, type=bool)
  78. parser.add_argument("--seed", default=None, type=int)
  79. args = parser.parse_args()
  80. c.up(*MZ_SERVERS)
  81. for mz_server in MZ_SERVERS:
  82. # Very simple data for our workload
  83. c.sql(
  84. """
  85. CREATE TABLE t1 (a int2, b int4, c int8, d uint2, e uint4, f uint8, g text);
  86. INSERT INTO t1 VALUES (1, 2, 3, 4, 5, 6, '7'), (3, 4, 5, 6, 7, 8, '9'), (5, 6, 7, 8, 9, 10, '11'), (7, 8, 9, 10, 11, 12, '13'), (9, 10, 11, 12, 13, 14, '15'), (11, 12, 13, 14, 15, 16, '17'), (13, 14, 15, 16, 17, 18, '19'), (15, 16, 17, 18, 19, 20, '21');
  87. CREATE MATERIALIZED VIEW mv AS SELECT a + b AS col1, c + d AS col2, e + f AS col3, g AS col4 FROM t1;
  88. CREATE MATERIALIZED VIEW mv2 AS SELECT count(*) FROM mv;
  89. CREATE DEFAULT INDEX ON mv;
  90. CREATE TABLE t2 (
  91. a_bool BOOL,
  92. b_float4 FLOAT4,
  93. c_float8 FLOAT8,
  94. d_numeric NUMERIC,
  95. e_char CHAR(5),
  96. f_varchar VARCHAR(10),
  97. g_bytes BYTES,
  98. h_date DATE,
  99. i_time TIME,
  100. k_timestamp TIMESTAMP,
  101. l_timestamptz TIMESTAMPTZ,
  102. m_interval INTERVAL,
  103. n_jsonb JSONB,
  104. o_uuid UUID
  105. );
  106. INSERT INTO t2 VALUES
  107. (
  108. TRUE,
  109. 1.23,
  110. 4.56,
  111. 7.89,
  112. 'abc',
  113. 'hello',
  114. '\x68656c6c6f',
  115. DATE '2023-01-01',
  116. TIME '12:34:56',
  117. TIMESTAMP '2023-01-01 12:34:56',
  118. TIMESTAMPTZ '2023-01-01 12:34:56+00',
  119. INTERVAL '1 day 2 hours',
  120. '{"key": "value"}',
  121. '550e8400-e29b-41d4-a716-446655440000'
  122. );
  123. """,
  124. service=mz_server,
  125. )
  126. c.sql(
  127. """
  128. ALTER SYSTEM SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  129. ALTER SYSTEM SET CLUSTER_REPLICA TO 'r1';
  130. """,
  131. service=mz_server,
  132. port=6877,
  133. user="mz_system",
  134. )
  135. seed = args.seed or random.randint(0, 2**31 - args.num_sqlsmith)
  136. def kill_sqlsmith_with_delay() -> None:
  137. time.sleep(args.runtime)
  138. c.kill("sqlsmith", signal="SIGINT")
  139. killer = Thread(target=kill_sqlsmith_with_delay)
  140. killer.start()
  141. threads: list[Thread] = []
  142. aggregate: dict[str, Any] = {"errors": [], "version": "", "queries": 0}
  143. for i in range(args.num_sqlsmith):
  144. cmd = [
  145. "sqlsmith",
  146. # f"--max-queries={args.queries}",
  147. f"--max-joins={args.max_joins}",
  148. f"--seed={seed + i}",
  149. "--log-json",
  150. # we use mz_system to have access to all tables, including ones with restricted permissions
  151. f"--target=host={MZ_SERVERS[i % len(MZ_SERVERS)]} port=6877 dbname=materialize user=mz_system",
  152. ]
  153. if args.exclude_catalog:
  154. cmd.append("--exclude-catalog")
  155. if args.explain_only:
  156. cmd.append("--explain-only")
  157. thread = Thread(target=run_sqlsmith, args=[c, cmd, aggregate])
  158. thread.start()
  159. threads.append(thread)
  160. for thread in threads:
  161. thread.join()
  162. new_errors: dict[frozenset[tuple[str, Any]], list[dict[str, Any]]] = {}
  163. for error in aggregate["errors"]:
  164. if not is_known_error(error["message"]):
  165. frozen_key = frozenset(
  166. {x: error[x] for x in ["type", "sqlstate", "message"]}.items()
  167. )
  168. if frozen_key not in new_errors:
  169. new_errors[frozen_key] = []
  170. new_errors[frozen_key].append({x: error[x] for x in ["timestamp", "query"]})
  171. assert aggregate["queries"] > 0, "No queries were executed"
  172. print(
  173. f"SQLsmith: {aggregate['version']} seed: {seed} queries: {aggregate['queries']}"
  174. )
  175. for frozen_key, errors in new_errors.items():
  176. key = dict(frozen_key)
  177. occurrences = f" ({len(errors)} occurrences)" if len(errors) > 1 else ""
  178. # Print out crashes differently so that we don't get notified twice in ci_logged_errors_detect
  179. if "server closed the connection unexpectedly" in key["message"]:
  180. print(f"--- Server crash, check panics and segfaults {occurrences}")
  181. else:
  182. print(
  183. f"--- [SQLsmith] {key['type']} {key['sqlstate']}: {key['message']}{occurrences}"
  184. )
  185. if len(errors) > 1:
  186. from_time = datetime.fromtimestamp(errors[0]["timestamp"]).strftime(
  187. "%H:%M:%S"
  188. )
  189. to_time = datetime.fromtimestamp(errors[-1]["timestamp"]).strftime(
  190. "%H:%M:%S"
  191. )
  192. print(f"From {from_time} until {to_time}")
  193. # The error message indicates a panic, if we happen to get multiple
  194. # distinct panics we want to have all the responsible queries instead
  195. # of just one:
  196. if "server closed the connection unexpectedly" in key["message"]:
  197. for i, error in enumerate(errors, start=1):
  198. print(f"Query {i}: {error['query']}")
  199. else:
  200. shortest_query = min([error["query"] for error in errors], key=len)
  201. print(f"Query: {shortest_query}")