source_actions.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  13. from materialize.zippy.framework import (
  14. Action,
  15. ActionFactory,
  16. Capabilities,
  17. Capability,
  18. State,
  19. )
  20. from materialize.zippy.kafka_capabilities import KafkaRunning, TopicExists
  21. from materialize.zippy.mz_capabilities import MzIsRunning
  22. from materialize.zippy.replica_capabilities import source_capable_clusters
  23. from materialize.zippy.source_capabilities import SourceExists
  24. from materialize.zippy.storaged_capabilities import StoragedRunning
  25. class CreateSourceParameterized(ActionFactory):
  26. """Creates a source in Materialized."""
  27. @classmethod
  28. def requires(cls) -> set[type[Capability]]:
  29. return {
  30. BalancerdIsRunning,
  31. MzIsRunning,
  32. StoragedRunning,
  33. KafkaRunning,
  34. TopicExists,
  35. }
  36. def __init__(self, max_sources: int = 10) -> None:
  37. self.max_sources = max_sources
  38. def new(self, capabilities: Capabilities) -> list[Action]:
  39. new_source_name = capabilities.get_free_capability_name(
  40. SourceExists, self.max_sources
  41. )
  42. if new_source_name:
  43. return [
  44. CreateSource(
  45. capabilities=capabilities,
  46. source=SourceExists(
  47. name=new_source_name,
  48. topic=random.choice(capabilities.get(TopicExists)),
  49. cluster_name=random.choice(
  50. source_capable_clusters(capabilities)
  51. ),
  52. uses_ssh_tunnel=random.choice([True, False]),
  53. ),
  54. )
  55. ]
  56. else:
  57. return []
  58. class AlterSourceConnectionParameterized(ActionFactory):
  59. """Alters a source in Materialized."""
  60. @classmethod
  61. def requires(cls) -> set[type[Capability]]:
  62. return {MzIsRunning, StoragedRunning, KafkaRunning, TopicExists, SourceExists}
  63. def new(self, capabilities: Capabilities) -> list[Action]:
  64. existing_source_exists = capabilities.get(
  65. SourceExists,
  66. )
  67. return [
  68. AlterSourceConnection(
  69. capabilities=capabilities,
  70. source=source_exists,
  71. )
  72. for source_exists in existing_source_exists
  73. ]
  74. class CreateSource(Action):
  75. def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
  76. self.source = source
  77. super().__init__(capabilities)
  78. def run(self, c: Composition, state: State) -> None:
  79. envelope = str(self.source.topic.envelope).split(".")[1]
  80. kafka_connection_name = f"{self.source.name}_kafka_conn"
  81. c.testdrive(
  82. dedent(
  83. f"""
  84. > CREATE CONNECTION IF NOT EXISTS {self.source.name}_csr_conn
  85. TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  86. > CREATE CONNECTION IF NOT EXISTS {kafka_connection_name}
  87. TO KAFKA (BROKER '${{testdrive.kafka-addr}}' {'USING SSH TUNNEL zippy_ssh' if self.source.uses_ssh_tunnel else ''}, SECURITY PROTOCOL PLAINTEXT);
  88. > CREATE SOURCE {self.source.name}
  89. IN CLUSTER {self.source.cluster_name}
  90. FROM KAFKA CONNECTION {kafka_connection_name}
  91. (TOPIC 'testdrive-{self.source.topic.name}-${{testdrive.seed}}')
  92. > CREATE TABLE {self.source.get_name_for_query()} FROM SOURCE {self.source.name} (REFERENCE "testdrive-{self.source.topic.name}-${{testdrive.seed}}")
  93. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.source.name}_csr_conn
  94. ENVELOPE {envelope}
  95. """
  96. ),
  97. mz_service=state.mz_service,
  98. )
  99. def provides(self) -> list[Capability]:
  100. return [self.source]
  101. class AlterSourceConnection(Action):
  102. def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
  103. self.source = source
  104. super().__init__(capabilities)
  105. def run(self, c: Composition, state: State) -> None:
  106. # This flips the usage of the SSH tunnel.
  107. self.flip_usage_of_ssh_tunnel(
  108. c,
  109. new_use_ssh_status=not self.source.uses_ssh_tunnel,
  110. mz_service=state.mz_service,
  111. )
  112. self.source.uses_ssh_tunnel = not self.source.uses_ssh_tunnel
  113. def flip_usage_of_ssh_tunnel(
  114. self, c: Composition, new_use_ssh_status: bool, mz_service: str
  115. ) -> None:
  116. kafka_connection_name = f"{self.source.name}_kafka_conn"
  117. c.testdrive(
  118. dedent(
  119. f"""
  120. > ALTER CONNECTION {kafka_connection_name} SET (BROKER '${{testdrive.kafka-addr}}'
  121. {'USING SSH TUNNEL zippy_ssh' if new_use_ssh_status else ''});
  122. """
  123. ),
  124. mz_service=mz_service,
  125. )
  126. def provides(self) -> list[Capability]:
  127. return []