mysql_cdc_actions.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.mzcompose.services.mysql import MySql
  13. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  14. from materialize.zippy.framework import Action, Capabilities, Capability, State
  15. from materialize.zippy.mysql_capabilities import MySqlRunning, MySqlTableExists
  16. from materialize.zippy.mysql_cdc_capabilities import MySqlCdcTableExists
  17. from materialize.zippy.mz_capabilities import MzIsRunning
  18. from materialize.zippy.replica_capabilities import source_capable_clusters
  19. from materialize.zippy.storaged_capabilities import StoragedRunning
  20. class CreateMySqlCdcTable(Action):
  21. """Creates a MySQL CDC source in Materialized."""
  22. @classmethod
  23. def requires(cls) -> set[type[Capability]]:
  24. return {
  25. BalancerdIsRunning,
  26. MzIsRunning,
  27. StoragedRunning,
  28. MySqlRunning,
  29. MySqlTableExists,
  30. }
  31. def __init__(self, capabilities: Capabilities) -> None:
  32. mysql_table = random.choice(capabilities.get(MySqlTableExists))
  33. mysql_pg_cdc_name = f"mysql_{mysql_table.name}"
  34. this_mysql_cdc_table = MySqlCdcTableExists(name=mysql_pg_cdc_name)
  35. cluster_name = random.choice(source_capable_clusters(capabilities))
  36. existing_mysql_cdc_tables = [
  37. s
  38. for s in capabilities.get(MySqlCdcTableExists)
  39. if s.name == this_mysql_cdc_table.name
  40. ]
  41. if len(existing_mysql_cdc_tables) == 0:
  42. self.new_mysql_cdc_table = True
  43. self.mysql_cdc_table = this_mysql_cdc_table
  44. self.mysql_cdc_table.mysql_table = mysql_table
  45. self.cluster_name = cluster_name
  46. elif len(existing_mysql_cdc_tables) == 1:
  47. self.new_mysql_cdc_table = False
  48. self.mysql_cdc_table = existing_mysql_cdc_tables[0]
  49. else:
  50. raise RuntimeError("More than one CDC table exists")
  51. super().__init__(capabilities)
  52. def run(self, c: Composition, state: State) -> None:
  53. if self.new_mysql_cdc_table:
  54. assert self.mysql_cdc_table is not None
  55. assert self.mysql_cdc_table.mysql_table is not None
  56. name = self.mysql_cdc_table.name
  57. c.testdrive(
  58. dedent(
  59. f"""
  60. > CREATE SECRET {name}_password AS '{MySql.DEFAULT_ROOT_PASSWORD}'
  61. > CREATE CONNECTION {name}_conn TO MYSQL (
  62. HOST mysql,
  63. USER root,
  64. PASSWORD SECRET {name}_password
  65. )
  66. > CREATE SOURCE {name}_source
  67. IN CLUSTER {self.cluster_name}
  68. FROM MYSQL CONNECTION {name}_conn;
  69. > CREATE TABLE {name} FROM SOURCE {name}_source (REFERENCE public.{self.mysql_cdc_table.mysql_table.name});
  70. """
  71. ),
  72. mz_service=state.mz_service,
  73. )
  74. def provides(self) -> list[Capability]:
  75. return [self.mysql_cdc_table] if self.new_mysql_cdc_table else []