sink_actions.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  13. from materialize.zippy.framework import (
  14. Action,
  15. ActionFactory,
  16. Capabilities,
  17. Capability,
  18. State,
  19. )
  20. from materialize.zippy.mz_capabilities import MzIsRunning
  21. from materialize.zippy.replica_capabilities import source_capable_clusters
  22. from materialize.zippy.sink_capabilities import SinkExists
  23. from materialize.zippy.storaged_capabilities import StoragedRunning
  24. from materialize.zippy.view_capabilities import ViewExists
  25. class CreateSinkParameterized(ActionFactory):
  26. """Creates a sink over an existing view. Then creates a source over that sink and a view over that source."""
  27. @classmethod
  28. def requires(cls) -> list[set[type[Capability]]]:
  29. return [{BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}]
  30. def __init__(self, max_sinks: int = 10) -> None:
  31. self.max_sinks = max_sinks
  32. def new(self, capabilities: Capabilities) -> list[Action]:
  33. new_sink_name = capabilities.get_free_capability_name(
  34. SinkExists, self.max_sinks
  35. )
  36. if new_sink_name:
  37. source_view = random.choice(capabilities.get(ViewExists))
  38. cluster_name_out = random.choice(source_capable_clusters(capabilities))
  39. cluster_name_in = random.choice(source_capable_clusters(capabilities))
  40. dest_view = ViewExists(
  41. name=f"{new_sink_name}_view",
  42. inputs=[source_view],
  43. expensive_aggregates=source_view.expensive_aggregates,
  44. )
  45. return [
  46. CreateSink(
  47. sink=SinkExists(
  48. name=new_sink_name,
  49. source_view=source_view,
  50. dest_view=dest_view,
  51. cluster_name_out=cluster_name_out,
  52. cluster_name_in=cluster_name_in,
  53. ),
  54. capabilities=capabilities,
  55. ),
  56. ]
  57. else:
  58. return []
  59. class CreateSink(Action):
  60. def __init__(
  61. self,
  62. sink: SinkExists,
  63. capabilities: Capabilities,
  64. ) -> None:
  65. assert (
  66. sink is not None
  67. ), "CreateSink Action can not be referenced directly, it is produced by CreateSinkParameterized factory"
  68. self.sink = sink
  69. super().__init__(capabilities)
  70. def run(self, c: Composition, state: State) -> None:
  71. # The sink-derived source has upsert semantics, so produce a "normal" ViewExists output
  72. # from the 'before' and the 'after'
  73. refresh = random.choice(
  74. ["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"]
  75. )
  76. dest_view_sql = dedent(
  77. f"""
  78. > CREATE MATERIALIZED VIEW {self.sink.dest_view.name}
  79. WITH (REFRESH {refresh}) AS
  80. SELECT SUM(count_all)::int AS count_all, SUM(count_distinct)::int AS count_distinct, SUM(min_value)::int AS min_value, SUM(max_value)::int AS max_value FROM (
  81. SELECT (after).count_all, (after).count_distinct, (after).min_value, (after).max_value FROM {self.sink.name}_source_tbl
  82. UNION ALL
  83. SELECT -(before).count_all, -(before).count_distinct, -(before).min_value, -(before).max_value FROM {self.sink.name}_source_tbl
  84. );
  85. """
  86. if self.sink.dest_view.expensive_aggregates
  87. else f"""
  88. > CREATE MATERIALIZED VIEW {self.sink.dest_view.name} AS
  89. SELECT SUM(count_all)::int AS count_all FROM (
  90. SELECT (after).count_all FROM {self.sink.name}_source_tbl
  91. UNION ALL
  92. SELECT -(before).count_all FROM {self.sink.name}_source_tbl
  93. );
  94. """
  95. )
  96. c.testdrive(
  97. dedent(
  98. f"""
  99. > CREATE CONNECTION IF NOT EXISTS {self.sink.name}_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', PROGRESS TOPIC 'zippy-{self.sink.name}-${{testdrive.seed}}', SECURITY PROTOCOL PLAINTEXT);
  100. > CREATE CONNECTION IF NOT EXISTS {self.sink.name}_csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  101. > CREATE SINK {self.sink.name}
  102. IN CLUSTER {self.sink.cluster_name_out}
  103. FROM {self.sink.source_view.name}
  104. INTO KAFKA CONNECTION {self.sink.name}_kafka_conn (TOPIC 'sink-{self.sink.name}')
  105. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.sink.name}_csr_conn
  106. ENVELOPE DEBEZIUM;
  107. $ kafka-verify-topic sink=materialize.public.{self.sink.name} await-value-schema=true
  108. # Ingest the sink again in order to be able to validate its contents
  109. > CREATE SOURCE {self.sink.name}_source
  110. IN CLUSTER {self.sink.cluster_name_in}
  111. FROM KAFKA CONNECTION {self.sink.name}_kafka_conn (TOPIC 'sink-{self.sink.name}')
  112. > CREATE TABLE {self.sink.name}_source_tbl FROM SOURCE {self.sink.name}_source (REFERENCE "sink-{self.sink.name}")
  113. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.sink.name}_csr_conn
  114. ENVELOPE NONE
  115. """
  116. )
  117. + dest_view_sql,
  118. mz_service=state.mz_service,
  119. )
  120. def provides(self) -> list[Capability]:
  121. return [self.sink, self.sink.dest_view]