mzcompose.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Functional test for the native (non-Debezium) MySQL sources.
  11. """
  12. import glob
  13. import threading
  14. from textwrap import dedent
  15. from materialize import MZ_ROOT, buildkite
  16. from materialize.mysql_util import (
  17. retrieve_invalid_ssl_context_for_mysql,
  18. retrieve_ssl_context_for_mysql,
  19. )
  20. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  21. from materialize.mzcompose.services.materialized import Materialized
  22. from materialize.mzcompose.services.minio import Minio
  23. from materialize.mzcompose.services.mysql import MySql
  24. from materialize.mzcompose.services.mz import Mz
  25. from materialize.mzcompose.services.postgres import (
  26. METADATA_STORE,
  27. CockroachOrPostgresMetadata,
  28. )
  29. from materialize.mzcompose.services.test_certs import TestCerts
  30. from materialize.mzcompose.services.testdrive import Testdrive
  31. from materialize.source_table_migration import (
  32. get_new_image_for_source_table_migration_test,
  33. get_old_image_for_source_table_migration_test,
  34. verify_sources_after_source_table_migration,
  35. )
  36. def create_mysql(mysql_version: str) -> MySql:
  37. return MySql(version=mysql_version)
  38. def create_mysql_replica(mysql_version: str) -> MySql:
  39. return MySql(
  40. name="mysql-replica",
  41. version=mysql_version,
  42. additional_args=[
  43. "--gtid_mode=ON",
  44. "--enforce_gtid_consistency=ON",
  45. "--skip-replica-start",
  46. "--server-id=2",
  47. ],
  48. )
  49. SERVICES = [
  50. Mz(app_password=""),
  51. Materialized(
  52. external_blob_store=True,
  53. additional_system_parameter_defaults={
  54. "log_filter": "mz_storage::source::mysql=trace,info"
  55. },
  56. default_replication_factor=2,
  57. ),
  58. create_mysql(MySql.DEFAULT_VERSION),
  59. create_mysql_replica(MySql.DEFAULT_VERSION),
  60. TestCerts(),
  61. CockroachOrPostgresMetadata(),
  62. Minio(setup_materialize=True),
  63. Testdrive(default_timeout="60s"),
  64. ]
  65. def get_targeted_mysql_version(parser: WorkflowArgumentParser) -> str:
  66. parser.add_argument(
  67. "--mysql-version",
  68. default=MySql.DEFAULT_VERSION,
  69. type=str,
  70. )
  71. args, _ = parser.parse_known_args()
  72. print(f"Running with MySQL version {args.mysql_version}")
  73. return args.mysql_version
  74. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  75. def process(name: str) -> None:
  76. if name in ("default", "migration"):
  77. return
  78. with c.test_case(name):
  79. c.workflow(name, *parser.args)
  80. workflows_with_internal_sharding = ["cdc"]
  81. sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
  82. [w for w in c.workflows if w not in workflows_with_internal_sharding],
  83. lambda w: w,
  84. )
  85. print(
  86. f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
  87. )
  88. c.test_parts(sharded_workflows, process)
  89. def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
  90. mysql_version = get_targeted_mysql_version(parser)
  91. parser.add_argument(
  92. "filter",
  93. nargs="*",
  94. default=["*.td"],
  95. help="limit to only the files matching filter",
  96. )
  97. args = parser.parse_args()
  98. matching_files = []
  99. for filter in args.filter:
  100. matching_files.extend(
  101. glob.glob(filter, root_dir=MZ_ROOT / "test" / "mysql-cdc-old-syntax")
  102. )
  103. sharded_files: list[str] = buildkite.shard_list(
  104. sorted(matching_files), lambda file: file
  105. )
  106. print(f"Files: {sharded_files}")
  107. with c.override(create_mysql(mysql_version)):
  108. c.up("materialized", "mysql")
  109. valid_ssl_context = retrieve_ssl_context_for_mysql(c)
  110. wrong_ssl_context = retrieve_invalid_ssl_context_for_mysql(c)
  111. c.sources_and_sinks_ignored_from_validation.add("drop_table")
  112. c.test_parts(
  113. sharded_files,
  114. lambda file: c.run_testdrive_files(
  115. f"--var=ssl-ca={valid_ssl_context.ca}",
  116. f"--var=ssl-client-cert={valid_ssl_context.client_cert}",
  117. f"--var=ssl-client-key={valid_ssl_context.client_key}",
  118. f"--var=ssl-wrong-ca={wrong_ssl_context.ca}",
  119. f"--var=ssl-wrong-client-cert={wrong_ssl_context.client_cert}",
  120. f"--var=ssl-wrong-client-key={wrong_ssl_context.client_key}",
  121. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  122. "--var=mysql-user-password=us3rp4ssw0rd",
  123. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  124. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  125. file,
  126. ),
  127. )
  128. def workflow_replica_connection(c: Composition, parser: WorkflowArgumentParser) -> None:
  129. mysql_version = get_targeted_mysql_version(parser)
  130. with c.override(create_mysql(mysql_version), create_mysql_replica(mysql_version)):
  131. c.up("materialized", "mysql", "mysql-replica")
  132. c.run_testdrive_files(
  133. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  134. "override/10-replica-connection.td",
  135. )
  136. def workflow_schema_change_restart(
  137. c: Composition, parser: WorkflowArgumentParser
  138. ) -> None:
  139. """
  140. Validates that a schema change done to a table after the MySQL source is created
  141. but before the snapshot is completed is detected after a restart.
  142. """
  143. mysql_version = get_targeted_mysql_version(parser)
  144. with c.override(create_mysql(mysql_version)):
  145. c.up("materialized", "mysql")
  146. c.run_testdrive_files(
  147. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  148. "schema-restart/before-restart.td",
  149. )
  150. with c.override(Testdrive(no_reset=True), create_mysql(mysql_version)):
  151. # Restart mz
  152. c.kill("materialized")
  153. c.up("materialized")
  154. c.run_testdrive_files(
  155. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  156. "schema-restart/after-restart.td",
  157. )
  158. def _make_inserts(*, txns: int, txn_size: int) -> tuple[str, int]:
  159. sql = "\n".join(
  160. [
  161. f"""
  162. SET @i:=0;
  163. INSERT INTO many_inserts (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {txn_size};
  164. """
  165. for i in range(0, txns)
  166. ]
  167. )
  168. records = txns * txn_size
  169. return (sql, records)
  170. def workflow_many_inserts(c: Composition, parser: WorkflowArgumentParser) -> None:
  171. """
  172. Tests a scenario that caused a consistency issue in the past. We insert a
  173. large number of rows into a table, then create a source for that table while
  174. simultaneously inserting many more rows into the table in a background
  175. thread, then finally verify that the correct count of rows is captured by
  176. the source.
  177. In earlier incarnations of the MySQL source, the source accidentally failed
  178. to snapshot inside of a repeatable read transaction.
  179. """
  180. mysql_version = get_targeted_mysql_version(parser)
  181. with c.override(create_mysql(mysql_version)):
  182. c.up("materialized", "mysql", {"name": "testdrive", "persistent": True})
  183. # Records to before creating the source.
  184. (initial_sql, initial_records) = _make_inserts(txns=1, txn_size=1_000_000)
  185. # Records to insert concurrently with creating the source.
  186. (concurrent_sql, concurrent_records) = _make_inserts(txns=1000, txn_size=100)
  187. # Set up the MySQL server with the initial records, set up the connection to
  188. # the MySQL server in Materialize.
  189. c.testdrive(
  190. dedent(
  191. f"""
  192. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  193. ALTER SYSTEM SET max_mysql_connections = 100
  194. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  195. > CREATE SECRET IF NOT EXISTS mysqlpass AS '{MySql.DEFAULT_ROOT_PASSWORD}'
  196. > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (HOST mysql, USER root, PASSWORD SECRET mysqlpass)
  197. $ mysql-execute name=mysql
  198. DROP DATABASE IF EXISTS public;
  199. CREATE DATABASE public;
  200. USE public;
  201. DROP TABLE IF EXISTS many_inserts;
  202. CREATE TABLE many_inserts (pk SERIAL PRIMARY KEY, f2 BIGINT);
  203. """
  204. )
  205. + dedent(initial_sql)
  206. + dedent(
  207. """
  208. > DROP SOURCE IF EXISTS s1 CASCADE;
  209. """
  210. )
  211. )
  212. # Start inserting in the background.
  213. def do_inserts(c: Composition):
  214. x = dedent(
  215. f"""
  216. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  217. $ mysql-execute name=mysql
  218. USE public;
  219. {concurrent_sql}
  220. """
  221. )
  222. c.testdrive(args=["--no-reset"], input=x)
  223. insert_thread = threading.Thread(target=do_inserts, args=(c,))
  224. print("--- Start many concurrent inserts")
  225. insert_thread.start()
  226. # Create the source.
  227. c.testdrive(
  228. args=["--no-reset"],
  229. input=dedent(
  230. """
  231. > CREATE SOURCE s1
  232. FROM MYSQL CONNECTION mysql_conn
  233. FOR TABLES (public.many_inserts);
  234. """
  235. ),
  236. )
  237. # Ensure the source eventually sees the right number of records.
  238. insert_thread.join()
  239. print("--- Validate concurrent inserts")
  240. c.testdrive(
  241. args=["--no-reset"],
  242. input=dedent(
  243. f"""
  244. > SELECT count(*) FROM many_inserts
  245. {initial_records + concurrent_records}
  246. """
  247. ),
  248. )
  249. def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None:
  250. parser.add_argument(
  251. "filter",
  252. nargs="*",
  253. default=["*.td"],
  254. help="limit to only the files matching filter",
  255. )
  256. args = parser.parse_args()
  257. matching_files = []
  258. for filter in args.filter:
  259. matching_files.extend(
  260. glob.glob(filter, root_dir=MZ_ROOT / "test" / "mysql-cdc-old-syntax")
  261. )
  262. sharded_files: list[str] = buildkite.shard_list(
  263. sorted(matching_files), lambda file: file
  264. )
  265. print(f"Files: {sharded_files}")
  266. mysql_version = get_targeted_mysql_version(parser)
  267. for file in sharded_files:
  268. mz_old = Materialized(
  269. name="materialized",
  270. image=get_old_image_for_source_table_migration_test(),
  271. external_metadata_store=True,
  272. external_blob_store=True,
  273. additional_system_parameter_defaults={
  274. "log_filter": "mz_storage::source::mysql=trace,info"
  275. },
  276. default_replication_factor=2,
  277. )
  278. mz_new = Materialized(
  279. name="materialized",
  280. image=get_new_image_for_source_table_migration_test(),
  281. external_metadata_store=True,
  282. external_blob_store=True,
  283. additional_system_parameter_defaults={
  284. "log_filter": "mz_storage::source::mysql=trace,info",
  285. "force_source_table_syntax": "true",
  286. },
  287. default_replication_factor=2,
  288. )
  289. with c.override(mz_old, create_mysql(mysql_version)):
  290. c.up("materialized", "mysql")
  291. print(f"Running {file} with mz_old")
  292. valid_ssl_context = retrieve_ssl_context_for_mysql(c)
  293. wrong_ssl_context = retrieve_invalid_ssl_context_for_mysql(c)
  294. c.sources_and_sinks_ignored_from_validation.add("drop_table")
  295. c.run_testdrive_files(
  296. f"--var=ssl-ca={valid_ssl_context.ca}",
  297. f"--var=ssl-client-cert={valid_ssl_context.client_cert}",
  298. f"--var=ssl-client-key={valid_ssl_context.client_key}",
  299. f"--var=ssl-wrong-ca={wrong_ssl_context.ca}",
  300. f"--var=ssl-wrong-client-cert={wrong_ssl_context.client_cert}",
  301. f"--var=ssl-wrong-client-key={wrong_ssl_context.client_key}",
  302. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  303. "--var=mysql-user-password=us3rp4ssw0rd",
  304. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  305. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  306. "--no-reset",
  307. file,
  308. )
  309. c.kill("materialized", wait=True)
  310. with c.override(mz_new):
  311. c.up("materialized")
  312. print("Running mz_new")
  313. verify_sources_after_source_table_migration(c, file)
  314. c.kill("materialized", wait=True)
  315. c.kill("mysql", wait=True)
  316. c.kill(METADATA_STORE, wait=True)
  317. c.rm("materialized")
  318. c.rm(METADATA_STORE)
  319. c.rm("mysql")
  320. c.rm_volumes("mzdata")