kafka_actions.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. import string
  11. import threading
  12. from textwrap import dedent
  13. import numpy as np
  14. from materialize.mzcompose.composition import Composition
  15. from materialize.zippy.framework import (
  16. Action,
  17. ActionFactory,
  18. Capabilities,
  19. Capability,
  20. State,
  21. )
  22. from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists
  23. from materialize.zippy.mz_capabilities import MzIsRunning
  24. SCHEMA = """
  25. $ set keyschema={
  26. "type" : "record",
  27. "name" : "test",
  28. "fields" : [
  29. {"name":"key", "type":"long"}
  30. ]
  31. }
  32. $ set schema={
  33. "type" : "record",
  34. "name" : "test",
  35. "fields" : [
  36. {"name":"f1", "type":"long"},
  37. {"name":"pad", "type":"string"}
  38. ]
  39. }
  40. """
  41. class KafkaStart(Action):
  42. """Start a Kafka instance."""
  43. def provides(self) -> list[Capability]:
  44. return [KafkaRunning()]
  45. def run(self, c: Composition, state: State) -> None:
  46. c.up("redpanda")
  47. class KafkaStop(Action):
  48. """Stop the Kafka instance."""
  49. @classmethod
  50. def requires(cls) -> set[type[Capability]]:
  51. return {KafkaRunning}
  52. def withholds(self) -> set[type[Capability]]:
  53. return {KafkaRunning}
  54. def run(self, c: Composition, state: State) -> None:
  55. c.kill("redpanda")
  56. class CreateTopicParameterized(ActionFactory):
  57. """Creates a Kafka topic and decides on the envelope that will be used."""
  58. @classmethod
  59. def requires(cls) -> set[type[Capability]]:
  60. return {MzIsRunning, KafkaRunning}
  61. def __init__(
  62. self,
  63. max_topics: int = 10,
  64. envelopes_with_weights: dict[Envelope, int] = {
  65. Envelope.NONE: 25,
  66. Envelope.UPSERT: 75,
  67. },
  68. ) -> None:
  69. self.max_topics = max_topics
  70. self.envelopes_with_weights = envelopes_with_weights
  71. def new(self, capabilities: Capabilities) -> list[Action]:
  72. new_topic_name = capabilities.get_free_capability_name(
  73. TopicExists, self.max_topics
  74. )
  75. if new_topic_name:
  76. return [
  77. CreateTopic(
  78. capabilities=capabilities,
  79. topic=TopicExists(
  80. name=new_topic_name,
  81. envelope=random.choices(
  82. list(self.envelopes_with_weights.keys()),
  83. weights=list(self.envelopes_with_weights.values()),
  84. )[0],
  85. partitions=random.randint(1, 10),
  86. ),
  87. )
  88. ]
  89. else:
  90. return []
  91. class CreateTopic(Action):
  92. def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
  93. self.topic = topic
  94. super().__init__(capabilities)
  95. def provides(self) -> list[Capability]:
  96. return [self.topic]
  97. def run(self, c: Composition, state: State) -> None:
  98. c.testdrive(
  99. SCHEMA
  100. + dedent(
  101. f"""
  102. $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
  103. $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
  104. {{"key": 0}} {{"f1": 0, "pad": ""}}
  105. """
  106. ),
  107. mz_service=state.mz_service,
  108. )
  109. class Ingest(Action):
  110. """Ingests data (inserts, updates or deletions) into a Kafka topic."""
  111. @classmethod
  112. def requires(cls) -> set[type[Capability]]:
  113. return {MzIsRunning, KafkaRunning, TopicExists}
  114. def __init__(self, capabilities: Capabilities) -> None:
  115. self.topic = random.choice(capabilities.get(TopicExists))
  116. self.delta = random.randint(1, 10000)
  117. # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
  118. self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
  119. string.ascii_letters
  120. )
  121. super().__init__(capabilities)
  122. def __str__(self) -> str:
  123. return f"{Action.__str__(self)} {self.topic.name}"
  124. class KafkaInsert(Ingest):
  125. """Inserts data into a Kafka topic."""
  126. def parallel(self) -> bool:
  127. return False
  128. def run(self, c: Composition, state: State) -> None:
  129. prev_max = self.topic.watermarks.max
  130. self.topic.watermarks.max = prev_max + self.delta
  131. assert self.topic.watermarks.max >= 0
  132. assert self.topic.watermarks.min >= 0
  133. testdrive_str = SCHEMA + dedent(
  134. f"""
  135. $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
  136. {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
  137. """
  138. )
  139. if self.parallel():
  140. threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
  141. else:
  142. c.testdrive(testdrive_str, mz_service=state.mz_service)
  143. class KafkaInsertParallel(KafkaInsert):
  144. """Inserts data into a Kafka topic using background threads."""
  145. @classmethod
  146. def require_explicit_mention(cls) -> bool:
  147. return True
  148. def parallel(self) -> bool:
  149. return True
  150. class KafkaUpsertFromHead(Ingest):
  151. """Updates records from the head in-place by modifying their pad"""
  152. def run(self, c: Composition, state: State) -> None:
  153. if self.topic.envelope is Envelope.NONE:
  154. return
  155. head = self.topic.watermarks.max
  156. start = max(head - self.delta, self.topic.watermarks.min)
  157. actual_delta = head - start
  158. if actual_delta > 0:
  159. c.testdrive(
  160. SCHEMA
  161. + dedent(
  162. f"""
  163. $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
  164. {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
  165. """
  166. ),
  167. mz_service=state.mz_service,
  168. )
  169. class KafkaDeleteFromHead(Ingest):
  170. """Deletes the largest values previously inserted."""
  171. def run(self, c: Composition, state: State) -> None:
  172. if self.topic.envelope is Envelope.NONE:
  173. return
  174. prev_max = self.topic.watermarks.max
  175. self.topic.watermarks.max = max(
  176. prev_max - self.delta, self.topic.watermarks.min
  177. )
  178. assert self.topic.watermarks.max >= 0
  179. assert self.topic.watermarks.min >= 0
  180. actual_delta = prev_max - self.topic.watermarks.max
  181. if actual_delta > 0:
  182. c.testdrive(
  183. SCHEMA
  184. + dedent(
  185. f"""
  186. $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
  187. {{"key": ${{kafka-ingest.iteration}}}}
  188. """
  189. ),
  190. mz_service=state.mz_service,
  191. )
  192. class KafkaUpsertFromTail(Ingest):
  193. """Updates records from the tail in-place by modifying their pad"""
  194. def run(self, c: Composition, state: State) -> None:
  195. if self.topic.envelope is Envelope.NONE:
  196. return
  197. tail = self.topic.watermarks.min
  198. end = min(tail + self.delta, self.topic.watermarks.max)
  199. actual_delta = end - tail
  200. if actual_delta > 0:
  201. c.testdrive(
  202. SCHEMA
  203. + dedent(
  204. f"""
  205. $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
  206. {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
  207. """
  208. ),
  209. mz_service=state.mz_service,
  210. )
  211. class KafkaDeleteFromTail(Ingest):
  212. """Deletes the smallest values previously inserted."""
  213. def run(self, c: Composition, state: State) -> None:
  214. if self.topic.envelope is Envelope.NONE:
  215. return
  216. prev_min = self.topic.watermarks.min
  217. self.topic.watermarks.min = min(
  218. prev_min + self.delta, self.topic.watermarks.max
  219. )
  220. assert self.topic.watermarks.max >= 0
  221. assert self.topic.watermarks.min >= 0
  222. actual_delta = self.topic.watermarks.min - prev_min
  223. if actual_delta > 0:
  224. c.testdrive(
  225. SCHEMA
  226. + dedent(
  227. f"""
  228. $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
  229. {{"key": ${{kafka-ingest.iteration}}}}
  230. """
  231. ),
  232. mz_service=state.mz_service,
  233. )