123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- from textwrap import dedent
- from materialize.mzcompose.composition import Composition
- from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
- from materialize.zippy.debezium_capabilities import DebeziumSourceExists
- from materialize.zippy.framework import (
- Action,
- ActionFactory,
- Capabilities,
- Capability,
- State,
- )
- from materialize.zippy.mysql_cdc_capabilities import MySqlCdcTableExists
- from materialize.zippy.mz_capabilities import MzIsRunning
- from materialize.zippy.pg_cdc_capabilities import PostgresCdcTableExists
- from materialize.zippy.source_capabilities import SourceExists
- from materialize.zippy.storaged_capabilities import StoragedRunning
- from materialize.zippy.table_capabilities import TableExists
- from materialize.zippy.view_capabilities import ViewExists
- from materialize.zippy.watermarked_object_capabilities import WatermarkedObjectExists
- class CreateViewParameterized(ActionFactory):
- """Emits CreateView Actions within the constraints specified in the constructor."""
- @classmethod
- def requires(cls) -> list[set[type[Capability]]]:
- return [
- {BalancerdIsRunning, MzIsRunning, SourceExists},
- {BalancerdIsRunning, MzIsRunning, TableExists},
- {BalancerdIsRunning, MzIsRunning, DebeziumSourceExists},
- {BalancerdIsRunning, MzIsRunning, PostgresCdcTableExists},
- {BalancerdIsRunning, MzIsRunning, MySqlCdcTableExists},
- ]
- def __init__(
- self,
- max_views: int = 10,
- max_inputs: int = 5,
- expensive_aggregates: bool = True,
- ) -> None:
- self.max_views = max_views
- self.max_inputs = max_inputs
- self.expensive_aggregates = expensive_aggregates
- def new(self, capabilities: Capabilities) -> list[Action]:
- new_view_name = capabilities.get_free_capability_name(
- ViewExists, self.max_views
- )
- if new_view_name:
- potential_inputs: list[WatermarkedObjectExists] = []
- for source_capability in [
- SourceExists,
- TableExists,
- DebeziumSourceExists,
- PostgresCdcTableExists,
- MySqlCdcTableExists,
- ]:
- potential_inputs.extend(capabilities.get(source_capability))
- inputs = random.sample(
- potential_inputs,
- min(len(potential_inputs), random.randint(1, self.max_inputs)),
- )
- return [
- CreateView(
- capabilities=capabilities,
- view=ViewExists(
- name=new_view_name,
- has_index=random.choice([True, False]),
- expensive_aggregates=self.expensive_aggregates,
- inputs=inputs,
- ),
- )
- ]
- else:
- return []
- class CreateView(Action):
- """Creates a view that is a join over one or more sources or tables"""
- def __init__(self, capabilities: Capabilities, view: ViewExists) -> None:
- self.view = view
- super().__init__(capabilities)
- def run(self, c: Composition, state: State) -> None:
- first_input = self.view.inputs[0]
- outer_join = " ".join(
- f"JOIN {f.get_name_for_query()} USING (f1)" for f in self.view.inputs[1:]
- )
- index = (
- f"> CREATE DEFAULT INDEX ON {self.view.name}" if self.view.has_index else ""
- )
- aggregates = [f"COUNT({first_input.get_name_for_query()}.f1) AS count_all"]
- if self.view.expensive_aggregates:
- aggregates.extend(
- [
- f"COUNT(DISTINCT {first_input.get_name_for_query()}.f1) AS count_distinct",
- f"MIN({first_input.get_name_for_query()}.f1) AS min_value",
- f"MAX({first_input.get_name_for_query()}.f1) AS max_value",
- ]
- )
- aggregates = ", ".join(aggregates)
- refresh = random.choice(
- ["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"]
- )
- c.testdrive(
- dedent(
- f"""
- > CREATE MATERIALIZED VIEW {self.view.name}
- WITH (REFRESH {refresh}) AS
- SELECT {aggregates}
- FROM {first_input.get_name_for_query()}
- {outer_join}
- """
- )
- + index,
- mz_service=state.mz_service,
- )
- def provides(self) -> list[Capability]:
- return [self.view]
- class ValidateView(Action):
- """Validates a view."""
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}
- def __init__(
- self, capabilities: Capabilities, view: ViewExists | None = None
- ) -> None:
- if view is None:
- self.view = random.choice(capabilities.get(ViewExists))
- else:
- self.view = view
- # Trigger the PeekPersist optimization
- self.select_limit = random.choice(["", "LIMIT 1"])
- super().__init__(capabilities)
- def run(self, c: Composition, state: State) -> None:
- watermarks = self.view.get_watermarks()
- view_min = watermarks.min
- view_max = watermarks.max
- if view_min <= view_max:
- c.testdrive(
- (
- dedent(
- f"""
- > SELECT count_all, count_distinct, min_value, max_value FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} count_distinct = {(view_max-view_min)+1} min_value = {view_min} max_value = {view_max} */ ;
- {(view_max-view_min)+1} {(view_max-view_min)+1} {view_min} {view_max}
- """
- )
- if self.view.expensive_aggregates
- else dedent(
- f"""
- > SELECT count_all FROM {self.view.name} {self.select_limit} /* expecting count_all = {(view_max-view_min)+1} */ ;
- {(view_max-view_min)+1}
- """
- )
- ),
- mz_service=state.mz_service,
- )
|