123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- from textwrap import dedent
- from materialize.mzcompose.composition import Composition
- from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
- from materialize.zippy.framework import (
- Action,
- ActionFactory,
- Capabilities,
- Capability,
- State,
- )
- from materialize.zippy.kafka_capabilities import KafkaRunning, TopicExists
- from materialize.zippy.mz_capabilities import MzIsRunning
- from materialize.zippy.replica_capabilities import source_capable_clusters
- from materialize.zippy.source_capabilities import SourceExists
- from materialize.zippy.storaged_capabilities import StoragedRunning
- class CreateSourceParameterized(ActionFactory):
- """Creates a source in Materialized."""
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {
- BalancerdIsRunning,
- MzIsRunning,
- StoragedRunning,
- KafkaRunning,
- TopicExists,
- }
- def __init__(self, max_sources: int = 10) -> None:
- self.max_sources = max_sources
- def new(self, capabilities: Capabilities) -> list[Action]:
- new_source_name = capabilities.get_free_capability_name(
- SourceExists, self.max_sources
- )
- if new_source_name:
- return [
- CreateSource(
- capabilities=capabilities,
- source=SourceExists(
- name=new_source_name,
- topic=random.choice(capabilities.get(TopicExists)),
- cluster_name=random.choice(
- source_capable_clusters(capabilities)
- ),
- uses_ssh_tunnel=random.choice([True, False]),
- ),
- )
- ]
- else:
- return []
- class AlterSourceConnectionParameterized(ActionFactory):
- """Alters a source in Materialized."""
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {MzIsRunning, StoragedRunning, KafkaRunning, TopicExists, SourceExists}
- def new(self, capabilities: Capabilities) -> list[Action]:
- existing_source_exists = capabilities.get(
- SourceExists,
- )
- return [
- AlterSourceConnection(
- capabilities=capabilities,
- source=source_exists,
- )
- for source_exists in existing_source_exists
- ]
- class CreateSource(Action):
- def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
- self.source = source
- super().__init__(capabilities)
- def run(self, c: Composition, state: State) -> None:
- envelope = str(self.source.topic.envelope).split(".")[1]
- kafka_connection_name = f"{self.source.name}_kafka_conn"
- c.testdrive(
- dedent(
- f"""
- > CREATE CONNECTION IF NOT EXISTS {self.source.name}_csr_conn
- TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
- > CREATE CONNECTION IF NOT EXISTS {kafka_connection_name}
- TO KAFKA (BROKER '${{testdrive.kafka-addr}}' {'USING SSH TUNNEL zippy_ssh' if self.source.uses_ssh_tunnel else ''}, SECURITY PROTOCOL PLAINTEXT);
- > CREATE SOURCE {self.source.name}
- IN CLUSTER {self.source.cluster_name}
- FROM KAFKA CONNECTION {kafka_connection_name}
- (TOPIC 'testdrive-{self.source.topic.name}-${{testdrive.seed}}')
- > CREATE TABLE {self.source.get_name_for_query()} FROM SOURCE {self.source.name} (REFERENCE "testdrive-{self.source.topic.name}-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.source.name}_csr_conn
- ENVELOPE {envelope}
- """
- ),
- mz_service=state.mz_service,
- )
- def provides(self) -> list[Capability]:
- return [self.source]
- class AlterSourceConnection(Action):
- def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
- self.source = source
- super().__init__(capabilities)
- def run(self, c: Composition, state: State) -> None:
- # This flips the usage of the SSH tunnel.
- self.flip_usage_of_ssh_tunnel(
- c,
- new_use_ssh_status=not self.source.uses_ssh_tunnel,
- mz_service=state.mz_service,
- )
- self.source.uses_ssh_tunnel = not self.source.uses_ssh_tunnel
- def flip_usage_of_ssh_tunnel(
- self, c: Composition, new_use_ssh_status: bool, mz_service: str
- ) -> None:
- kafka_connection_name = f"{self.source.name}_kafka_conn"
- c.testdrive(
- dedent(
- f"""
- > ALTER CONNECTION {kafka_connection_name} SET (BROKER '${{testdrive.kafka-addr}}'
- {'USING SSH TUNNEL zippy_ssh' if new_use_ssh_status else ''});
- """
- ),
- mz_service=mz_service,
- )
- def provides(self) -> list[Capability]:
- return []
|