mzcompose.py 14 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Functional test for the native (non-Debezium) MySQL sources.
  11. """
  12. import glob
  13. import threading
  14. from textwrap import dedent
  15. from materialize import MZ_ROOT, buildkite
  16. from materialize.mysql_util import (
  17. retrieve_invalid_ssl_context_for_mysql,
  18. retrieve_ssl_context_for_mysql,
  19. )
  20. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  21. from materialize.mzcompose.services.materialized import Materialized
  22. from materialize.mzcompose.services.mysql import MySql
  23. from materialize.mzcompose.services.mz import Mz
  24. from materialize.mzcompose.services.test_certs import TestCerts
  25. from materialize.mzcompose.services.testdrive import Testdrive
  26. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  27. def create_mysql(mysql_version: str) -> MySql:
  28. return MySql(version=mysql_version)
  29. def create_mysql_replica(mysql_version: str) -> MySql:
  30. return MySql(
  31. name="mysql-replica",
  32. version=mysql_version,
  33. additional_args=[
  34. "--gtid_mode=ON",
  35. "--enforce_gtid_consistency=ON",
  36. "--skip-replica-start",
  37. "--server-id=2",
  38. ],
  39. )
  40. SERVICES = [
  41. Mz(app_password=""),
  42. Materialized(
  43. additional_system_parameter_defaults={
  44. "log_filter": "mz_storage::source::mysql=trace,info"
  45. },
  46. default_replication_factor=2,
  47. ),
  48. create_mysql(MySql.DEFAULT_VERSION),
  49. create_mysql_replica(MySql.DEFAULT_VERSION),
  50. TestCerts(),
  51. Toxiproxy(),
  52. Testdrive(default_timeout="60s"),
  53. ]
  54. def get_targeted_mysql_version(parser: WorkflowArgumentParser) -> str:
  55. parser.add_argument(
  56. "--mysql-version",
  57. default=MySql.DEFAULT_VERSION,
  58. type=str,
  59. )
  60. args, _ = parser.parse_known_args()
  61. print(f"Running with MySQL version {args.mysql_version}")
  62. return args.mysql_version
  63. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  64. def process(name: str) -> None:
  65. if name in ("default", "large-scale"):
  66. return
  67. with c.test_case(name):
  68. c.workflow(name, *parser.args)
  69. workflows_with_internal_sharding = ["cdc"]
  70. sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
  71. [w for w in c.workflows if w not in workflows_with_internal_sharding],
  72. lambda w: w,
  73. )
  74. print(
  75. f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
  76. )
  77. c.test_parts(sharded_workflows, process)
  78. def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
  79. mysql_version = get_targeted_mysql_version(parser)
  80. parser.add_argument(
  81. "filter",
  82. nargs="*",
  83. default=["*.td"],
  84. help="limit to only the files matching filter",
  85. )
  86. args = parser.parse_args()
  87. matching_files = []
  88. for filter in args.filter:
  89. matching_files.extend(
  90. glob.glob(filter, root_dir=MZ_ROOT / "test" / "mysql-cdc")
  91. )
  92. sharded_files: list[str] = buildkite.shard_list(
  93. sorted(matching_files), lambda file: file
  94. )
  95. print(f"Files: {sharded_files}")
  96. with c.override(create_mysql(mysql_version)):
  97. c.up("materialized", "mysql")
  98. valid_ssl_context = retrieve_ssl_context_for_mysql(c)
  99. wrong_ssl_context = retrieve_invalid_ssl_context_for_mysql(c)
  100. c.sources_and_sinks_ignored_from_validation.add("drop_table")
  101. c.test_parts(
  102. sharded_files,
  103. lambda file: c.run_testdrive_files(
  104. f"--var=ssl-ca={valid_ssl_context.ca}",
  105. f"--var=ssl-client-cert={valid_ssl_context.client_cert}",
  106. f"--var=ssl-client-key={valid_ssl_context.client_key}",
  107. f"--var=ssl-wrong-ca={wrong_ssl_context.ca}",
  108. f"--var=ssl-wrong-client-cert={wrong_ssl_context.client_cert}",
  109. f"--var=ssl-wrong-client-key={wrong_ssl_context.client_key}",
  110. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  111. "--var=mysql-user-password=us3rp4ssw0rd",
  112. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  113. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  114. file,
  115. ),
  116. )
  117. def workflow_replica_connection(c: Composition, parser: WorkflowArgumentParser) -> None:
  118. mysql_version = get_targeted_mysql_version(parser)
  119. with c.override(create_mysql(mysql_version), create_mysql_replica(mysql_version)):
  120. c.up("materialized", "mysql", "mysql-replica")
  121. c.run_testdrive_files(
  122. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  123. "override/10-replica-connection.td",
  124. )
  125. def workflow_schema_change_restart(
  126. c: Composition, parser: WorkflowArgumentParser
  127. ) -> None:
  128. """
  129. Validates that a schema change done to a table after the MySQL source is created
  130. but before the snapshot is completed is detected after a restart.
  131. """
  132. mysql_version = get_targeted_mysql_version(parser)
  133. with c.override(create_mysql(mysql_version)):
  134. c.up("materialized", "mysql")
  135. c.run_testdrive_files(
  136. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  137. "schema-restart/before-restart.td",
  138. )
  139. with c.override(Testdrive(no_reset=True), create_mysql(mysql_version)):
  140. # Restart mz
  141. c.kill("materialized")
  142. c.up("materialized")
  143. c.run_testdrive_files(
  144. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  145. "schema-restart/after-restart.td",
  146. )
  147. def _make_inserts(*, txns: int, txn_size: int) -> tuple[str, int]:
  148. sql = "\n".join(
  149. [
  150. f"""
  151. SET @i:=0;
  152. INSERT INTO many_inserts (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {txn_size};
  153. """
  154. for i in range(0, txns)
  155. ]
  156. )
  157. records = txns * txn_size
  158. return (sql, records)
  159. def workflow_many_inserts(c: Composition, parser: WorkflowArgumentParser) -> None:
  160. """
  161. Tests a scenario that caused a consistency issue in the past. We insert a
  162. large number of rows into a table, then create a source for that table while
  163. simultaneously inserting many more rows into the table in a background
  164. thread, then finally verify that the correct count of rows is captured by
  165. the source.
  166. In earlier incarnations of the MySQL source, the source accidentally failed
  167. to snapshot inside of a repeatable read transaction.
  168. """
  169. mysql_version = get_targeted_mysql_version(parser)
  170. with c.override(create_mysql(mysql_version)):
  171. c.up("materialized", "mysql", {"name": "testdrive", "persistent": True})
  172. # Records to before creating the source.
  173. (initial_sql, initial_records) = _make_inserts(txns=1, txn_size=1_000_000)
  174. # Records to insert concurrently with creating the source.
  175. (concurrent_sql, concurrent_records) = _make_inserts(txns=1000, txn_size=100)
  176. # Set up the MySQL server with the initial records, set up the connection to
  177. # the MySQL server in Materialize.
  178. c.testdrive(
  179. dedent(
  180. f"""
  181. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  182. ALTER SYSTEM SET max_mysql_connections = 100
  183. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  184. > CREATE SECRET IF NOT EXISTS mysqlpass AS '{MySql.DEFAULT_ROOT_PASSWORD}'
  185. > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (HOST mysql, USER root, PASSWORD SECRET mysqlpass)
  186. $ mysql-execute name=mysql
  187. DROP DATABASE IF EXISTS public;
  188. CREATE DATABASE public;
  189. USE public;
  190. DROP TABLE IF EXISTS many_inserts;
  191. CREATE TABLE many_inserts (pk SERIAL PRIMARY KEY, f2 BIGINT);
  192. """
  193. )
  194. + dedent(initial_sql)
  195. + dedent(
  196. """
  197. > DROP SOURCE IF EXISTS s1 CASCADE;
  198. """
  199. )
  200. )
  201. # Start inserting in the background.
  202. def do_inserts(c: Composition):
  203. x = dedent(
  204. f"""
  205. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  206. $ mysql-execute name=mysql
  207. USE public;
  208. {concurrent_sql}
  209. """
  210. )
  211. c.testdrive(args=["--no-reset"], input=x)
  212. insert_thread = threading.Thread(target=do_inserts, args=(c,))
  213. print("--- Start many concurrent inserts")
  214. insert_thread.start()
  215. # Create the source.
  216. c.testdrive(
  217. args=["--no-reset"],
  218. input=dedent(
  219. """
  220. > CREATE SOURCE s1
  221. FROM MYSQL CONNECTION mysql_conn;
  222. > CREATE TABLE many_inserts FROM SOURCE s1 (REFERENCE public.many_inserts);
  223. """
  224. ),
  225. )
  226. # Ensure the source eventually sees the right number of records.
  227. insert_thread.join()
  228. print("--- Validate concurrent inserts")
  229. c.testdrive(
  230. args=["--no-reset"],
  231. input=dedent(
  232. f"""
  233. > SELECT count(*) FROM many_inserts
  234. {initial_records + concurrent_records}
  235. """
  236. ),
  237. )
  238. def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None:
  239. """
  240. The goal is to test a large scale MySQL instance and to make sure that we can successfully ingest data from it quickly.
  241. """
  242. mysql_version = get_targeted_mysql_version(parser)
  243. with c.override(create_mysql(mysql_version)):
  244. c.up("materialized", "mysql", {"name": "testdrive", "persistent": True})
  245. # Set up the MySQL server with the initial records, set up the connection to
  246. # the MySQL server in Materialize.
  247. c.testdrive(
  248. dedent(
  249. f"""
  250. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  251. ALTER SYSTEM SET max_mysql_connections = 100
  252. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  253. > CREATE SECRET IF NOT EXISTS mysqlpass AS '{MySql.DEFAULT_ROOT_PASSWORD}'
  254. > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (HOST mysql, USER root, PASSWORD SECRET mysqlpass)
  255. $ mysql-execute name=mysql
  256. DROP DATABASE IF EXISTS public;
  257. CREATE DATABASE public;
  258. USE public;
  259. DROP TABLE IF EXISTS products;
  260. CREATE TABLE products (id int NOT NULL, name varchar(255) DEFAULT NULL, merchant_id int NOT NULL, price int DEFAULT NULL, status int DEFAULT NULL, created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP(), recordSizePayload longtext, PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  261. ALTER TABLE products DISABLE KEYS;
  262. > DROP SOURCE IF EXISTS s1 CASCADE;
  263. """
  264. )
  265. )
  266. def make_inserts(c: Composition, start: int, batch_num: int):
  267. c.testdrive(
  268. args=["--no-reset"],
  269. input=dedent(
  270. f"""
  271. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  272. $ mysql-execute name=mysql
  273. SET foreign_key_checks = 0;
  274. USE public;
  275. SET @i:={start};
  276. INSERT INTO products (id, name, merchant_id, price, status, created_at, recordSizePayload) SELECT @i:=@i+1, CONCAT("name", @i), @i % 1000, @i % 1000, @i % 10, '2024-12-12', repeat('x', 1000000) FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {batch_num};
  277. """
  278. ),
  279. )
  280. num_rows = 100_000 # out of disk with 200_000 rows
  281. batch_size = 100
  282. for i in range(0, num_rows, batch_size):
  283. batch_num = min(batch_size, num_rows - i)
  284. make_inserts(c, i, batch_num)
  285. c.testdrive(
  286. args=["--no-reset"],
  287. input=dedent(
  288. f"""
  289. > CREATE SOURCE s1
  290. FROM MYSQL CONNECTION mysql_conn;
  291. > CREATE TABLE products FROM SOURCE s1 (REFERENCE public.products);
  292. > SELECT COUNT(*) FROM products;
  293. {num_rows}
  294. """
  295. ),
  296. )
  297. make_inserts(c, num_rows, 1)
  298. c.testdrive(
  299. args=["--no-reset"],
  300. input=dedent(
  301. f"""
  302. > SELECT COUNT(*) FROM products;
  303. {num_rows + 1}
  304. """
  305. ),
  306. )
  307. def workflow_source_timeouts(c: Composition, parser: WorkflowArgumentParser) -> None:
  308. """
  309. Test source connect timeout using toxiproxy to drop network traffic.
  310. """
  311. mysql_version = get_targeted_mysql_version(parser)
  312. with c.override(
  313. Materialized(
  314. sanity_restart=False,
  315. additional_system_parameter_defaults={
  316. "log_filter": "mz_storage::source::mysql=trace,info"
  317. },
  318. default_replication_factor=2,
  319. ),
  320. Toxiproxy(),
  321. create_mysql(mysql_version),
  322. ):
  323. c.up("materialized", "mysql", "toxiproxy")
  324. c.run_testdrive_files(
  325. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  326. "proxied/*.td",
  327. )