mzcompose.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Functional test for the native (non-Debezium) MySQL sources, using Toxiproxy to
  11. simulate bad network as well as checking how Materialize handles binary log
  12. corruptions.
  13. """
  14. import time
  15. from typing import Any
  16. import pymysql
  17. from psycopg import Cursor
  18. from materialize import buildkite
  19. from materialize.mzcompose.composition import Composition
  20. from materialize.mzcompose.services.alpine import Alpine
  21. from materialize.mzcompose.services.materialized import Materialized
  22. from materialize.mzcompose.services.mysql import MySql, create_mysql_server_args
  23. from materialize.mzcompose.services.mz import Mz
  24. from materialize.mzcompose.services.testdrive import Testdrive
  25. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  26. SERVICES = [
  27. Alpine(),
  28. Mz(app_password=""),
  29. Materialized(default_replication_factor=2),
  30. MySql(),
  31. MySql(
  32. name="mysql-replica-1",
  33. version=MySql.DEFAULT_VERSION,
  34. additional_args=create_mysql_server_args(server_id="2", is_master=False),
  35. ),
  36. MySql(
  37. name="mysql-replica-2",
  38. version=MySql.DEFAULT_VERSION,
  39. additional_args=create_mysql_server_args(server_id="3", is_master=False),
  40. ),
  41. Toxiproxy(),
  42. Testdrive(
  43. no_reset=True,
  44. default_timeout="300s",
  45. ),
  46. ]
  47. def workflow_default(c: Composition) -> None:
  48. def process(name: str) -> None:
  49. if name == "default":
  50. return
  51. # TODO(def-): Reenable when database-issues#7775 is fixed
  52. if name in ("bin-log-manipulations", "short-bin-log-retention"):
  53. return
  54. # clear to avoid issues
  55. c.kill("mysql")
  56. c.rm("mysql")
  57. with c.test_case(name):
  58. c.workflow(name)
  59. workflows_with_internal_sharding = [
  60. "disruptions",
  61. "bin-log-manipulations",
  62. "short-bin-log-retention",
  63. ]
  64. sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
  65. [w for w in c.workflows if w not in workflows_with_internal_sharding],
  66. lambda w: w,
  67. )
  68. print(
  69. f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
  70. )
  71. c.test_parts(sharded_workflows, process)
  72. def workflow_disruptions(c: Composition) -> None:
  73. """Test MySQL direct replication's failure handling by
  74. disrupting replication at various stages using Toxiproxy or service restarts
  75. """
  76. scenarios = [
  77. mysql_out_of_disk_space,
  78. disconnect_mysql_during_snapshot,
  79. disconnect_mysql_during_replication,
  80. restart_mysql_during_snapshot,
  81. restart_mz_during_snapshot,
  82. restart_mysql_during_replication,
  83. restart_mz_during_replication,
  84. fix_mysql_schema_while_mz_restarts,
  85. verify_no_snapshot_reingestion,
  86. transaction_with_rollback,
  87. ]
  88. scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
  89. print(
  90. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
  91. )
  92. for scenario in scenarios:
  93. overrides = (
  94. [MySql(volumes=["sourcedata_512Mb:/var/lib/mysql"])]
  95. if scenario == mysql_out_of_disk_space
  96. else []
  97. )
  98. with c.override(*overrides):
  99. print(
  100. f"--- Running scenario {scenario.__name__} with overrides: {overrides}"
  101. )
  102. initialize(c)
  103. scenario(c)
  104. end(c)
  105. def workflow_backup_restore(c: Composition) -> None:
  106. with c.override(
  107. Materialized(sanity_restart=False, default_replication_factor=2),
  108. ):
  109. scenario = backup_restore_mysql
  110. print(f"--- Running scenario {scenario.__name__}")
  111. initialize(c)
  112. scenario(c)
  113. # No end confirmation here, since we expect the source to be in a bad state
  114. def workflow_bin_log_manipulations(c: Composition) -> None:
  115. with c.override(
  116. Materialized(sanity_restart=False, default_replication_factor=2),
  117. ):
  118. scenarios = [
  119. reset_master_gtid,
  120. corrupt_bin_log_to_stall_source,
  121. corrupt_bin_log_and_add_sub_source,
  122. ]
  123. scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
  124. print(
  125. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
  126. )
  127. for scenario in scenarios:
  128. print(f"--- Running scenario {scenario.__name__}")
  129. initialize(c)
  130. scenario(c)
  131. # No end confirmation here, since we expect the source to be in a bad state
  132. def workflow_short_bin_log_retention(c: Composition) -> None:
  133. bin_log_expiration_in_sec = 2
  134. args = MySql.DEFAULT_ADDITIONAL_ARGS.copy()
  135. args.append(f"--binlog_expire_logs_seconds={bin_log_expiration_in_sec}")
  136. with c.override(
  137. Materialized(sanity_restart=False, default_replication_factor=2),
  138. MySql(additional_args=args),
  139. ):
  140. scenarios = [logs_expiration_while_mz_down, create_source_after_logs_expiration]
  141. scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
  142. print(
  143. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
  144. )
  145. for scenario in scenarios:
  146. print(f"--- Running scenario {scenario.__name__}")
  147. initialize(c, create_source=False)
  148. scenario(
  149. c,
  150. bin_log_expiration_in_sec,
  151. )
  152. # No end confirmation here
  153. def workflow_master_changes(c: Composition) -> None:
  154. """
  155. mysql-replica-1 and mysql-replica-2 replicate mysql. The source is attached to mysql-replica-2. mysql is
  156. killed and mysql-replica-1 becomes the new master. mysql-replica-2 is configured to replicate from mysql-replica-1.
  157. """
  158. with c.override(
  159. Materialized(sanity_restart=False, default_replication_factor=2),
  160. MySql(
  161. name="mysql-replica-1",
  162. version=MySql.DEFAULT_VERSION,
  163. additional_args=create_mysql_server_args(server_id="2", is_master=False),
  164. ),
  165. MySql(
  166. name="mysql-replica-2",
  167. version=MySql.DEFAULT_VERSION,
  168. additional_args=create_mysql_server_args(server_id="3", is_master=False),
  169. ),
  170. ):
  171. initialize(c, create_source=False)
  172. host_data_master = "mysql"
  173. host_for_mz_source = "mysql-replica-2"
  174. c.up("mysql-replica-1", "mysql-replica-2")
  175. # configure mysql-replica-1 to replicate mysql
  176. run_testdrive_files(
  177. c,
  178. f"--var=mysql-replication-master-host={host_data_master}",
  179. "configure-replica.td",
  180. mysql_host="mysql-replica-1",
  181. )
  182. # configure mysql-replica-2 to replicate mysql
  183. run_testdrive_files(
  184. c,
  185. f"--var=mysql-replication-master-host={host_data_master}",
  186. "configure-replica.td",
  187. mysql_host="mysql-replica-2",
  188. )
  189. # wait for mysql-replica-2 to replicate the current state
  190. time.sleep(15)
  191. run_testdrive_files(
  192. c,
  193. f"--var=mysql-source-host={host_for_mz_source}",
  194. "verify-mysql-source-select-all.td",
  195. )
  196. # create source pointing to mysql-replica-2
  197. run_testdrive_files(
  198. c,
  199. f"--var=mysql-source-host={host_for_mz_source}",
  200. "create-source.td",
  201. )
  202. run_testdrive_files(
  203. c,
  204. "verify-source-running.td",
  205. )
  206. c.kill("mysql")
  207. host_data_master = "mysql-replica-1"
  208. # let mysql-replica-2 replicate from mysql-replica-1
  209. run_testdrive_files(
  210. c,
  211. f"--var=mysql-replication-master-host={host_data_master}",
  212. "configure-replica.td",
  213. mysql_host=host_for_mz_source,
  214. )
  215. run_testdrive_files(
  216. c,
  217. "verify-source-running.td",
  218. )
  219. # delete rows in mysql-replica-1
  220. run_testdrive_files(c, "delete-rows-t1.td", mysql_host=host_data_master)
  221. # It may take some time until mysql-replica-2 catches up.
  222. time.sleep(15)
  223. run_testdrive_files(
  224. c,
  225. "verify-rows-deleted-t1.td",
  226. )
  227. def workflow_switch_to_replica_and_kill_master(c: Composition) -> None:
  228. """
  229. mysql-replica-1 replicates mysql. The source is attached to mysql. mz switches the connection to mysql-replica-1.
  230. Changing the connection should not brick the source and the source should still work if mysql is killed.
  231. """
  232. with c.override(
  233. Materialized(sanity_restart=False, default_replication_factor=2),
  234. MySql(
  235. name="mysql-replica-1",
  236. version=MySql.DEFAULT_VERSION,
  237. additional_args=create_mysql_server_args(server_id="2", is_master=False),
  238. ),
  239. ):
  240. initialize(c)
  241. host_data_master = "mysql"
  242. host_for_mz_source = "mysql"
  243. c.up("mysql-replica-1")
  244. # configure replica
  245. run_testdrive_files(
  246. c,
  247. f"--var=mysql-replication-master-host={host_data_master}",
  248. "configure-replica.td",
  249. mysql_host="mysql-replica-1",
  250. )
  251. # give the replica some time to replicate the current state
  252. time.sleep(3)
  253. run_testdrive_files(
  254. c,
  255. "delete-rows-t1.td",
  256. "verify-rows-deleted-t1.td",
  257. )
  258. # change connection to replica
  259. host_for_mz_source = "mysql-replica-1"
  260. run_testdrive_files(
  261. c,
  262. f"--var=mysql-source-host={host_for_mz_source}",
  263. "alter-source-connection.td",
  264. )
  265. run_testdrive_files(
  266. c,
  267. "verify-source-running.td",
  268. )
  269. c.kill("mysql")
  270. time.sleep(3)
  271. run_testdrive_files(
  272. c,
  273. "verify-source-running.td",
  274. )
  275. def initialize(c: Composition, create_source: bool = True) -> None:
  276. c.down(destroy_volumes=True)
  277. c.up("materialized", "mysql", "toxiproxy")
  278. run_testdrive_files(
  279. c,
  280. "configure-toxiproxy.td",
  281. "configure-mysql.td",
  282. "populate-tables.td",
  283. )
  284. if create_source:
  285. run_testdrive_files(
  286. c,
  287. "--var=mysql-source-host=toxiproxy",
  288. "create-source.td",
  289. )
  290. def restart_mysql(c: Composition) -> None:
  291. c.kill("mysql")
  292. c.up("mysql")
  293. def restart_mz(c: Composition) -> None:
  294. c.kill("materialized")
  295. c.up("materialized")
  296. def end(c: Composition) -> None:
  297. """Validate the data at the end."""
  298. run_testdrive_files(c, "verify-data.td", "cleanup.td")
  299. def disconnect_mysql_during_snapshot(c: Composition) -> None:
  300. run_testdrive_files(
  301. c,
  302. "toxiproxy-close-connection.td",
  303. "toxiproxy-restore-connection.td",
  304. "delete-rows-t1.td",
  305. "delete-rows-t2.td",
  306. "alter-table.td",
  307. "alter-mz.td",
  308. )
  309. def restart_mysql_during_snapshot(c: Composition) -> None:
  310. restart_mysql(c)
  311. run_testdrive_files(
  312. c,
  313. "delete-rows-t1.td",
  314. "delete-rows-t2.td",
  315. "alter-table.td",
  316. "alter-mz.td",
  317. )
  318. def restart_mz_during_snapshot(c: Composition) -> None:
  319. run_testdrive_files(c, "alter-mz.td")
  320. restart_mz(c)
  321. run_testdrive_files(c, "delete-rows-t1.td", "delete-rows-t2.td", "alter-table.td")
  322. def disconnect_mysql_during_replication(c: Composition) -> None:
  323. run_testdrive_files(
  324. c,
  325. "wait-for-snapshot.td",
  326. "delete-rows-t1.td",
  327. "delete-rows-t2.td",
  328. "alter-table.td",
  329. "alter-mz.td",
  330. "toxiproxy-close-connection.td",
  331. "toxiproxy-restore-connection.td",
  332. )
  333. def restart_mysql_during_replication(c: Composition) -> None:
  334. run_testdrive_files(
  335. c,
  336. "wait-for-snapshot.td",
  337. "delete-rows-t1.td",
  338. "alter-table.td",
  339. "alter-mz.td",
  340. )
  341. restart_mysql(c)
  342. run_testdrive_files(c, "delete-rows-t2.td")
  343. def restart_mz_during_replication(c: Composition) -> None:
  344. run_testdrive_files(
  345. c,
  346. "wait-for-snapshot.td",
  347. "delete-rows-t1.td",
  348. "alter-table.td",
  349. "alter-mz.td",
  350. )
  351. restart_mz(c)
  352. run_testdrive_files(c, "delete-rows-t2.td")
  353. def fix_mysql_schema_while_mz_restarts(c: Composition) -> None:
  354. run_testdrive_files(
  355. c,
  356. "delete-rows-t1.td",
  357. "delete-rows-t2.td",
  358. "alter-table.td",
  359. "alter-mz.td",
  360. "verify-data.td",
  361. "alter-table-fix.td",
  362. )
  363. restart_mz(c)
  364. def verify_no_snapshot_reingestion(c: Composition) -> None:
  365. """Confirm that Mz does not reingest the entire snapshot on restart by
  366. revoking its SELECT privileges
  367. """
  368. run_testdrive_files(c, "wait-for-snapshot.td", "mysql-disable-select-permission.td")
  369. restart_mz(c)
  370. run_testdrive_files(
  371. c,
  372. "delete-rows-t1.td",
  373. "delete-rows-t2.td",
  374. "alter-table.td",
  375. "alter-mz.td",
  376. )
  377. def reset_master_gtid(c: Composition) -> None:
  378. """Confirm behavior after resetting GTID in MySQL"""
  379. run_testdrive_files(c, "reset-master.td")
  380. run_testdrive_files(
  381. c,
  382. "delete-rows-t1.td",
  383. "verify-source-stalled.td",
  384. )
  385. def corrupt_bin_log_to_stall_source(c: Composition) -> None:
  386. """
  387. Switch off mz, modify data in mysql, and purge the bin-log so that mz hasn't seen all entries in the replication
  388. stream.
  389. """
  390. _corrupt_bin_log(c)
  391. run_testdrive_files(
  392. c,
  393. "verify-source-stalled.td",
  394. )
  395. def corrupt_bin_log_and_add_sub_source(c: Composition) -> None:
  396. """
  397. Corrupt the bin log, add a sub source, and expect it to be working.
  398. """
  399. _corrupt_bin_log(c)
  400. run_testdrive_files(
  401. c,
  402. "populate-table-t3.td",
  403. "alter-source-add-subsource.td",
  404. "verify-t3.td",
  405. )
  406. def _corrupt_bin_log(c: Composition) -> None:
  407. run_testdrive_files(c, "wait-for-snapshot.td")
  408. c.kill("materialized")
  409. mysql_conn = create_mysql_connection(c)
  410. mysql_conn.autocommit(True)
  411. with mysql_conn.cursor() as cur:
  412. cur.execute("INSERT INTO public.t1 VALUES (1, 'text')")
  413. _purge_bin_logs(cur)
  414. cur.execute("INSERT INTO public.t1 VALUES (2, 'text')")
  415. c.up("materialized")
  416. def _purge_bin_logs(cur: Cursor) -> None:
  417. cur.execute("FLUSH BINARY LOGS")
  418. time.sleep(2)
  419. cur.execute("PURGE BINARY LOGS BEFORE now()")
  420. cur.execute("FLUSH BINARY LOGS")
  421. def transaction_with_rollback(c: Composition) -> None:
  422. """
  423. Rollback a tx in MySQL.
  424. """
  425. # needed for verify-data to succeed in the end (triggered by the workflow)
  426. run_testdrive_files(
  427. c,
  428. "delete-rows-t1.td",
  429. "delete-rows-t2.td",
  430. )
  431. mysql_conn = create_mysql_connection(c)
  432. mysql_conn.autocommit(False)
  433. with mysql_conn.cursor() as cur:
  434. cur.execute("INSERT INTO public.t1 VALUES (1, 'text')")
  435. time.sleep(2)
  436. cur.execute("INSERT INTO public.t1 VALUES (2, 'text')")
  437. time.sleep(2)
  438. cur.execute("ROLLBACK")
  439. run_testdrive_files(
  440. c,
  441. "verify-source-running.td",
  442. )
  443. # needed for verify-data to succeed in the end (triggered by the workflow)
  444. run_testdrive_files(
  445. c,
  446. "alter-table.td",
  447. "alter-mz.td",
  448. )
  449. def mysql_out_of_disk_space(c: Composition) -> None:
  450. run_testdrive_files(
  451. c,
  452. "wait-for-snapshot.td",
  453. "delete-rows-t1.td",
  454. )
  455. fill_file = "/var/lib/mysql/fill_file"
  456. c.exec(
  457. "mysql",
  458. "bash",
  459. "-c",
  460. f"dd if=/dev/zero of={fill_file} bs=1024 count=$[1024*512] || true",
  461. )
  462. print("Sleeping for 30 seconds ...")
  463. time.sleep(30)
  464. c.exec("mysql", "bash", "-c", f"rm {fill_file}")
  465. run_testdrive_files(c, "delete-rows-t2.td", "alter-table.td", "alter-mz.td")
  466. def backup_restore_mysql(c: Composition) -> None:
  467. # Backup MySQL, wait for completion
  468. backup_file = "backup.sql"
  469. c.exec(
  470. "mysql",
  471. "bash",
  472. "-c",
  473. f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD} && mysqldump --all-databases -u root --set-gtid-purged=OFF > {backup_file}",
  474. )
  475. run_testdrive_files(c, "delete-rows-t1.td")
  476. run_testdrive_files(c, "verify-rows-deleted-t1.td")
  477. # Restart MySQL service
  478. c.stop("mysql")
  479. c.up("mysql")
  480. c.exec(
  481. "mysql",
  482. "bash",
  483. "-c",
  484. f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD} && mysql -u root < {backup_file}",
  485. )
  486. run_testdrive_files(c, "verify-mysql-select.td")
  487. # TODO: database-issues#7683: one of the two following commands must succeed
  488. # run_testdrive_files(c, "verify-rows-after-restore-t1.td")
  489. # run_testdrive_files(c, "verify-source-failed.td")
  490. def create_source_after_logs_expiration(
  491. c: Composition, bin_log_expiration_in_sec: int
  492. ) -> None:
  493. """Populate tables, delete rows, and create the source after the log expiration in MySQL took place"""
  494. run_testdrive_files(c, "delete-rows-t1.td")
  495. sleep_duration = bin_log_expiration_in_sec + 2
  496. print(f"Sleeping for {sleep_duration} sec")
  497. time.sleep(sleep_duration)
  498. mysql_conn = create_mysql_connection(c)
  499. with mysql_conn.cursor() as cur:
  500. cur.execute("FLUSH BINARY LOGS")
  501. restart_mysql(c)
  502. # not really necessary, still do it
  503. mysql_conn = create_mysql_connection(c)
  504. mysql_conn.autocommit(True)
  505. with mysql_conn.cursor() as cur:
  506. cur.execute("FLUSH BINARY LOGS")
  507. run_testdrive_files(
  508. c,
  509. "--var=mysql-source-host=toxiproxy",
  510. "create-source.td",
  511. )
  512. run_testdrive_files(c, "verify-rows-deleted-t1.td")
  513. def logs_expiration_while_mz_down(
  514. c: Composition, bin_log_expiration_in_sec: int
  515. ) -> None:
  516. """Switch off mz, conduct changes in MySQL, let MySQL bin logs expire, and start mz"""
  517. run_testdrive_files(
  518. c,
  519. "--var=mysql-source-host=toxiproxy",
  520. "create-source.td",
  521. )
  522. c.kill("materialized")
  523. mysql_conn = create_mysql_connection(c)
  524. mysql_conn.autocommit(True)
  525. with mysql_conn.cursor() as cur:
  526. cur.execute("DELETE FROM public.t1 WHERE f1 % 2 = 0;")
  527. sleep_duration = bin_log_expiration_in_sec + 2
  528. print(f"Sleeping for {sleep_duration} sec")
  529. time.sleep(sleep_duration)
  530. restart_mysql(c)
  531. mysql_conn = create_mysql_connection(c)
  532. mysql_conn.autocommit(True)
  533. # conduct a further change to be added to the bin log
  534. with mysql_conn.cursor() as cur:
  535. cur.execute("UPDATE public.t1 SET f2 = NULL;")
  536. cur.execute("FLUSH BINARY LOGS")
  537. c.up("materialized")
  538. run_testdrive_files(c, "verify-source-stalled.td")
  539. def run_testdrive_files(c: Composition, *files: str, mysql_host: str = "mysql") -> None:
  540. c.run_testdrive_files(
  541. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  542. f"--var=mysql-host={mysql_host}",
  543. *files,
  544. )
  545. def create_mysql_connection(c: Composition) -> Any:
  546. return pymysql.connect(
  547. host="localhost",
  548. user="root",
  549. password=MySql.DEFAULT_ROOT_PASSWORD,
  550. database="mysql",
  551. port=c.default_port("mysql"),
  552. )