operations.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from psycopg import Connection
  10. from materialize.scalability.endpoint.endpoint import Endpoint
  11. from materialize.scalability.operation.operation_data import OperationData
  12. from materialize.scalability.operation.scalability_operation import (
  13. Operation,
  14. SimpleSqlOperation,
  15. SqlOperationWithSeed,
  16. SqlOperationWithTwoSeeds,
  17. )
  18. from materialize.scalability.schema.schema import Schema
  19. class InsertDefaultValues(SimpleSqlOperation):
  20. def sql_statement(self) -> str:
  21. return "INSERT INTO t1 DEFAULT VALUES;"
  22. class SelectOne(SimpleSqlOperation):
  23. def sql_statement(self) -> str:
  24. return "SELECT 1;"
  25. class SelectStar(SimpleSqlOperation):
  26. def sql_statement(self) -> str:
  27. return "SELECT * FROM t1;"
  28. class SelectLimit(SimpleSqlOperation):
  29. def sql_statement(self) -> str:
  30. return "SELECT * FROM t1 LIMIT 1;"
  31. class SelectCount(SimpleSqlOperation):
  32. def sql_statement(self) -> str:
  33. return "SELECT COUNT(*) FROM t1;"
  34. class SelectCountInMv(SimpleSqlOperation):
  35. def sql_statement(self) -> str:
  36. return "SELECT count FROM mv1;"
  37. class SelectUnionAll(SimpleSqlOperation):
  38. def sql_statement(self) -> str:
  39. return "SELECT * FROM t1 UNION ALL SELECT * FROM t1;"
  40. class Update(SimpleSqlOperation):
  41. def sql_statement(self) -> str:
  42. return "UPDATE t1 SET f1 = f1 + 1;"
  43. class CreateTableX(SqlOperationWithSeed):
  44. def __init__(self) -> None:
  45. super().__init__("table_seed")
  46. def sql_statement(self, table_seed: str) -> str:
  47. return f"CREATE TABLE x_{table_seed} (f1 INT, f2 INT, f3 INT, f4 INT, f5 INT);"
  48. class CreateIndexOnTableX(SqlOperationWithSeed):
  49. def __init__(self) -> None:
  50. super().__init__("table_seed")
  51. def sql_statement(self, table_seed: str) -> str:
  52. return f"CREATE INDEX i_x_{table_seed} ON x_{table_seed} (f1);"
  53. class CreateMvOnTableX(SqlOperationWithSeed):
  54. def __init__(self) -> None:
  55. super().__init__("table_seed")
  56. def sql_statement(self, table_seed: str) -> str:
  57. return f"CREATE MATERIALIZED VIEW mv_x_{table_seed} AS SELECT * FROM x_{table_seed};"
  58. class CreateViewXOnSeries(SqlOperationWithSeed):
  59. def __init__(self, materialized: bool, additional_name_suffix: str = "") -> None:
  60. super().__init__("view_seed")
  61. self.materialized = materialized
  62. self.additional_name_suffix = additional_name_suffix
  63. def sql_statement(self, view_seed: str) -> str:
  64. obj_name_prefix = "mv_x_" if self.materialized else "v_x_"
  65. return (
  66. f"CREATE {'MATERIALIZED ' if self.materialized else ''}VIEW {obj_name_prefix}{view_seed}{self.additional_name_suffix} AS "
  67. f"SELECT generate_series(1, 100) AS id, '{obj_name_prefix}{view_seed}{self.additional_name_suffix}' AS view_name"
  68. )
  69. class CreateViewXOnViewOnSeries(SqlOperationWithSeed):
  70. def __init__(
  71. self,
  72. materialized: bool,
  73. additional_name_suffix: str,
  74. suffixes_of_other_views_on_series: list[str],
  75. ) -> None:
  76. super().__init__("view_seed")
  77. self.materialized = materialized
  78. self.additional_name_suffix = additional_name_suffix
  79. self.suffixes_of_other_views_on_series = suffixes_of_other_views_on_series
  80. def sql_statement(self, view_seed: str) -> str:
  81. obj_name_prefix = "mv_x_" if self.materialized else "v_x_"
  82. columns = ", ".join(
  83. [
  84. f"alias{view_suffix}.id AS id{view_suffix}, alias{view_suffix}.view_name AS name{view_suffix}"
  85. for view_suffix in self.suffixes_of_other_views_on_series
  86. ]
  87. )
  88. sources = ", ".join(
  89. [
  90. f"{obj_name_prefix}{view_seed}{view_suffix} AS alias{view_suffix}"
  91. for view_suffix in self.suffixes_of_other_views_on_series
  92. ]
  93. )
  94. return (
  95. f"CREATE {'MATERIALIZED ' if self.materialized else ''}VIEW {obj_name_prefix}{view_seed}{self.additional_name_suffix} AS "
  96. f"SELECT {columns} "
  97. f"FROM {sources}"
  98. )
  99. class DropViewX(SqlOperationWithSeed):
  100. def __init__(self, materialized: bool, additional_name_suffix: str = "") -> None:
  101. super().__init__("view_seed")
  102. self.materialized = materialized
  103. self.additional_name_suffix = additional_name_suffix
  104. def sql_statement(self, view_seed: str) -> str:
  105. obj_name_prefix = "mv_x_" if self.materialized else "v_x_"
  106. return f"DROP {'MATERIALIZED ' if self.materialized else ''}VIEW {obj_name_prefix}{view_seed}{self.additional_name_suffix}"
  107. class PopulateTableX(SqlOperationWithSeed):
  108. def __init__(self) -> None:
  109. super().__init__("table_seed")
  110. def sql_statement(self, table_seed: str) -> str:
  111. return f"INSERT INTO x_{table_seed} SELECT generate_series(1, 100), 200, 300, 400, 500;"
  112. class FillColumnInTableX(SqlOperationWithTwoSeeds):
  113. def __init__(self, column_seed_key: str = "column_seed") -> None:
  114. super().__init__("table_seed", column_seed_key)
  115. def sql_statement(self, table_seed: str, column_seed: str) -> str:
  116. return f"UPDATE x_{table_seed} SET c_{column_seed} = f1;"
  117. class DropMvOfTableX(SqlOperationWithSeed):
  118. def __init__(self) -> None:
  119. super().__init__("table_seed")
  120. def sql_statement(self, table_seed: str) -> str:
  121. return f"DROP MATERIALIZED VIEW mv_x_{table_seed} CASCADE;"
  122. class DropTableX(SqlOperationWithSeed):
  123. def __init__(self) -> None:
  124. super().__init__("table_seed")
  125. def sql_statement(self, table_seed: str) -> str:
  126. return f"DROP TABLE x_{table_seed} CASCADE;"
  127. class SelectStarFromTableX(SqlOperationWithSeed):
  128. def __init__(self) -> None:
  129. super().__init__("table_seed")
  130. def sql_statement(self, table_seed: str) -> str:
  131. return f"SELECT * FROM x_{table_seed};"
  132. class SelectStarFromMvOnTableX(SqlOperationWithSeed):
  133. def __init__(self) -> None:
  134. super().__init__("table_seed")
  135. def sql_statement(self, table_seed: str) -> str:
  136. return f"SELECT * FROM mv_x_{table_seed};"
  137. class Connect(Operation):
  138. def required_keys(self) -> set[str]:
  139. return {"endpoint", "schema"}
  140. def produced_keys(self) -> set[str]:
  141. return {"connection", "cursor"}
  142. def _execute(self, data: OperationData) -> OperationData:
  143. endpoint: Endpoint = data.get("endpoint")
  144. schema: Schema = data.get("schema")
  145. connection = endpoint.sql_connection(quiet=True)
  146. connection.autocommit = True
  147. cursor = connection.cursor()
  148. # this sets the database schema
  149. for connect_sql in schema.connect_sqls():
  150. cursor.execute(connect_sql.encode("utf8"))
  151. data.push("connection", connection)
  152. data.push("cursor", cursor)
  153. return data
  154. class Disconnect(Operation):
  155. def required_keys(self) -> set[str]:
  156. return {"connection"}
  157. def _execute(self, data: OperationData) -> OperationData:
  158. connection: Connection = data.get("connection")
  159. connection.close()
  160. return data