clone.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Utilities to clone DDL statements from a Materialize catalog for exploration
  11. and experimentation.
  12. """
  13. import string
  14. import textwrap
  15. from contextlib import closing
  16. from itertools import chain
  17. from typing import TextIO
  18. from psycopg.errors import DatabaseError
  19. from materialize.mzexplore import sql
  20. from materialize.mzexplore.common import ClonedItem, ItemType, info, warn
  21. def defs(
  22. ddl_out: TextIO,
  23. cmp_out: TextIO,
  24. database: str,
  25. schema: str,
  26. cluster: str,
  27. object_ids: list[str],
  28. db_port: int,
  29. db_host: str,
  30. db_user: str,
  31. db_pass: str | None,
  32. db_require_ssl: bool,
  33. mzfmt: bool,
  34. ) -> None:
  35. """
  36. Generate a DDL script that clones object_ids and dependencies that live in
  37. the same cluster into a different cluster.
  38. """
  39. database_new = sql.identifier(database, True)
  40. schema_new = sql.identifier(schema, True)
  41. cluster_new = sql.identifier(cluster, True)
  42. with closing(
  43. sql.Database(
  44. port=db_port,
  45. host=db_host,
  46. user=db_user,
  47. database=None,
  48. password=db_pass,
  49. require_ssl=db_require_ssl,
  50. )
  51. ) as db:
  52. output_template = string.Template(
  53. textwrap.dedent(
  54. """
  55. -- original id: $id
  56. $create_sql
  57. """
  58. ).lstrip()
  59. )
  60. # Extract dependencies
  61. # --------------------
  62. if len(object_ids) == 0:
  63. msg = "At least one object_id necessary"
  64. raise ValueError(msg)
  65. old_clusters = list(db.object_clusters(object_ids))
  66. if len(old_clusters) != 1:
  67. msg = f"Cannot find unique old cluster for object ids: {object_ids}"
  68. raise ValueError(msg)
  69. [cluster_old] = old_clusters
  70. if cluster_old["name"] == cluster:
  71. msg = f"Old and new clusters can't have the same name: {cluster}"
  72. raise ValueError(msg)
  73. # Replacement pattern for clusters.
  74. cluster_str_old = f"IN CLUSTER {sql.identifier(cluster_old['name'], True)}"
  75. cluster_str_new = f"IN CLUSTER {cluster_new}"
  76. items_seen = set()
  77. aliased_refs, index_on_refs, simple_refs = [], [], []
  78. for item in db.clone_dependencies(object_ids, cluster_old["id"]):
  79. item_database = sql.identifier(item["database"], True)
  80. item_schema = sql.identifier(item["schema"], True)
  81. item_name = sql.identifier(item["name"], True)
  82. fqname = f"{item_database}.{item_schema}.{item_name}"
  83. try:
  84. item_type = ItemType(item["type"])
  85. except ValueError:
  86. warn(f"Unsupported item type `{item['type']}` for {fqname}")
  87. continue
  88. cloned_item = ClonedItem(
  89. database=item["database"],
  90. schema=item["schema"],
  91. name=item["name"],
  92. id=item["id"],
  93. item_type=item_type,
  94. )
  95. show_create_query = item_type.show_create(fqname)
  96. if show_create_query is None:
  97. msg = f"Cannot determine SHOW CREATE query for {fqname}"
  98. raise ValueError(msg)
  99. assert cloned_item not in items_seen
  100. items_seen.add(cloned_item)
  101. # Add replacement string pairs
  102. (create_name_old, create_name_new) = (
  103. cloned_item.create_name_old(),
  104. cloned_item.create_name_new(database_new, schema_new),
  105. )
  106. aliased_refs.append(
  107. (
  108. cloned_item.aliased_ref_old(),
  109. cloned_item.aliased_ref_new(database_new, schema_new),
  110. )
  111. )
  112. index_on_refs.append(
  113. (
  114. cloned_item.index_on_ref_old(),
  115. cloned_item.index_on_ref_new(database_new, schema_new),
  116. )
  117. )
  118. simple_refs.append(
  119. (
  120. cloned_item.simple_ref_old(),
  121. cloned_item.simple_ref_new(database_new, schema_new),
  122. )
  123. )
  124. try:
  125. info(f"Appending DDL for {item_type.sql()} {fqname}")
  126. sql_old = db.query_one(show_create_query)["create_sql"]
  127. sql_new = sql_old
  128. sql_new = sql_new.replace(create_name_old, create_name_new, 1)
  129. sql_new = sql_new.replace(cluster_str_old, cluster_str_new, 1)
  130. # Aliases need to be replaced in this order (simple_refs last).
  131. for ref_old, ref_new in chain(aliased_refs, index_on_refs, simple_refs):
  132. sql_new = sql_new.replace(ref_old, ref_new)
  133. sql_old = sql.try_mzfmt(sql_old) if mzfmt else sql_old
  134. sql_old = sql_old.rstrip() + "\n"
  135. sql_new = sql.try_mzfmt(sql_new) if mzfmt else sql_new
  136. sql_new = sql_new.rstrip() + "\n"
  137. # Write the original definition into cmp_out.
  138. cmp_out.write(
  139. output_template.substitute(dict(create_sql=sql_old, **item))
  140. )
  141. # Write the modified definition into ddl_out.
  142. ddl_out.write(
  143. output_template.substitute(dict(create_sql=sql_new, **item))
  144. )
  145. except DatabaseError as e:
  146. warn(f"Cannot append DDL for {item_type.sql()} {fqname}: {e}")