view_actions.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  13. from materialize.zippy.debezium_capabilities import DebeziumSourceExists
  14. from materialize.zippy.framework import (
  15. Action,
  16. ActionFactory,
  17. Capabilities,
  18. Capability,
  19. State,
  20. )
  21. from materialize.zippy.mysql_cdc_capabilities import MySqlCdcTableExists
  22. from materialize.zippy.mz_capabilities import MzIsRunning
  23. from materialize.zippy.pg_cdc_capabilities import PostgresCdcTableExists
  24. from materialize.zippy.source_capabilities import SourceExists
  25. from materialize.zippy.storaged_capabilities import StoragedRunning
  26. from materialize.zippy.table_capabilities import TableExists
  27. from materialize.zippy.view_capabilities import ViewExists
  28. from materialize.zippy.watermarked_object_capabilities import WatermarkedObjectExists
  29. class CreateViewParameterized(ActionFactory):
  30. """Emits CreateView Actions within the constraints specified in the constructor."""
  31. @classmethod
  32. def requires(cls) -> list[set[type[Capability]]]:
  33. return [
  34. {BalancerdIsRunning, MzIsRunning, SourceExists},
  35. {BalancerdIsRunning, MzIsRunning, TableExists},
  36. {BalancerdIsRunning, MzIsRunning, DebeziumSourceExists},
  37. {BalancerdIsRunning, MzIsRunning, PostgresCdcTableExists},
  38. {BalancerdIsRunning, MzIsRunning, MySqlCdcTableExists},
  39. ]
  40. def __init__(
  41. self,
  42. max_views: int = 10,
  43. max_inputs: int = 5,
  44. expensive_aggregates: bool = True,
  45. ) -> None:
  46. self.max_views = max_views
  47. self.max_inputs = max_inputs
  48. self.expensive_aggregates = expensive_aggregates
  49. def new(self, capabilities: Capabilities) -> list[Action]:
  50. new_view_name = capabilities.get_free_capability_name(
  51. ViewExists, self.max_views
  52. )
  53. if new_view_name:
  54. potential_inputs: list[WatermarkedObjectExists] = []
  55. for source_capability in [
  56. SourceExists,
  57. TableExists,
  58. DebeziumSourceExists,
  59. PostgresCdcTableExists,
  60. MySqlCdcTableExists,
  61. ]:
  62. potential_inputs.extend(capabilities.get(source_capability))
  63. inputs = random.sample(
  64. potential_inputs,
  65. min(len(potential_inputs), random.randint(1, self.max_inputs)),
  66. )
  67. return [
  68. CreateView(
  69. capabilities=capabilities,
  70. view=ViewExists(
  71. name=new_view_name,
  72. has_index=random.choice([True, False]),
  73. expensive_aggregates=self.expensive_aggregates,
  74. inputs=inputs,
  75. ),
  76. )
  77. ]
  78. else:
  79. return []
  80. class CreateView(Action):
  81. """Creates a view that is a join over one or more sources or tables"""
  82. def __init__(self, capabilities: Capabilities, view: ViewExists) -> None:
  83. self.view = view
  84. super().__init__(capabilities)
  85. def run(self, c: Composition, state: State) -> None:
  86. first_input = self.view.inputs[0]
  87. outer_join = " ".join(
  88. f"JOIN {f.get_name_for_query()} USING (f1)" for f in self.view.inputs[1:]
  89. )
  90. index = (
  91. f"> CREATE DEFAULT INDEX ON {self.view.name}" if self.view.has_index else ""
  92. )
  93. aggregates = [f"COUNT({first_input.get_name_for_query()}.f1) AS count_all"]
  94. if self.view.expensive_aggregates:
  95. aggregates.extend(
  96. [
  97. f"COUNT(DISTINCT {first_input.get_name_for_query()}.f1) AS count_distinct",
  98. f"MIN({first_input.get_name_for_query()}.f1) AS min_value",
  99. f"MAX({first_input.get_name_for_query()}.f1) AS max_value",
  100. ]
  101. )
  102. aggregates = ", ".join(aggregates)
  103. refresh = random.choice(
  104. ["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"]
  105. )
  106. c.testdrive(
  107. dedent(
  108. f"""
  109. > CREATE MATERIALIZED VIEW {self.view.name}
  110. WITH (REFRESH {refresh}) AS
  111. SELECT {aggregates}
  112. FROM {first_input.get_name_for_query()}
  113. {outer_join}
  114. """
  115. )
  116. + index,
  117. mz_service=state.mz_service,
  118. )
  119. def provides(self) -> list[Capability]:
  120. return [self.view]
  121. class ValidateView(Action):
  122. """Validates a view."""
  123. @classmethod
  124. def requires(cls) -> set[type[Capability]]:
  125. return {BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}
  126. def __init__(
  127. self, capabilities: Capabilities, view: ViewExists | None = None
  128. ) -> None:
  129. if view is None:
  130. self.view = random.choice(capabilities.get(ViewExists))
  131. else:
  132. self.view = view
  133. # Trigger the PeekPersist optimization
  134. self.select_limit = random.choice(["", "LIMIT 1"])
  135. super().__init__(capabilities)
  136. def run(self, c: Composition, state: State) -> None:
  137. watermarks = self.view.get_watermarks()
  138. view_min = watermarks.min
  139. view_max = watermarks.max
  140. if view_min <= view_max:
  141. c.testdrive(
  142. (
  143. dedent(
  144. f"""
  145. > SELECT count_all, count_distinct, min_value, max_value FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} count_distinct = {(view_max-view_min)+1} min_value = {view_min} max_value = {view_max} */ ;
  146. {(view_max-view_min)+1} {(view_max-view_min)+1} {view_min} {view_max}
  147. """
  148. )
  149. if self.view.expensive_aggregates
  150. else dedent(
  151. f"""
  152. > SELECT count_all FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} */ ;
  153. {(view_max-view_min)+1}
  154. """
  155. )
  156. ),
  157. mz_service=state.mz_service,
  158. )