kafka_protocols.py 11 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. @externally_idempotent(False)
  13. class KafkaProtocols(Check):
  14. def initialize(self) -> Testdrive:
  15. return Testdrive(
  16. dedent(
  17. """
  18. $ set-from-file ca-crt=/share/secrets/ca.crt
  19. $ set-from-file kafka-crt=/share/secrets/materialized-kafka.crt
  20. $ set-from-file kafka-key=/share/secrets/materialized-kafka.key
  21. $ set-from-file kafka1-crt=/share/secrets/materialized-kafka1.crt
  22. $ set-from-file kafka1-key=/share/secrets/materialized-kafka1.key
  23. > CREATE SECRET kafka_key AS '${kafka-key}'
  24. > CREATE SECRET kafka1_key AS '${kafka1-key}'
  25. > CREATE SECRET garbage_key AS 'garbage'
  26. $ kafka-create-topic topic=kafka-protocols-1
  27. $ kafka-ingest topic=kafka-protocols-1 format=bytes
  28. one
  29. $ kafka-create-topic topic=kafka-protocols-2
  30. $ kafka-ingest topic=kafka-protocols-2 format=bytes
  31. one
  32. $ kafka-create-topic topic=kafka-protocols-3
  33. $ kafka-ingest topic=kafka-protocols-3 format=bytes
  34. one
  35. > CREATE SECRET kafka_protocols_password AS 'sekurity'
  36. > CREATE CONNECTION kafka_plaintext TO KAFKA (
  37. BROKER 'kafka:9093' USING SSH TUNNEL ssh_tunnel_0,
  38. SSL CERTIFICATE AUTHORITY '${ca-crt}'
  39. )
  40. > CREATE CONNECTION kafka_ssl TO KAFKA (
  41. BROKER 'kafka:9094',
  42. SSL CERTIFICATE '${kafka-crt}',
  43. SSL KEY SECRET kafka_key,
  44. SSL CERTIFICATE AUTHORITY '${ca-crt}'
  45. )
  46. > CREATE CONNECTION kafka_scram_sha_512 TO KAFKA (
  47. BROKER 'kafka:9095' USING SSH TUNNEL ssh_tunnel_0,
  48. SASL MECHANISMS 'SCRAM-SHA-512',
  49. SASL USERNAME 'materialize',
  50. SASL PASSWORD SECRET kafka_protocols_password,
  51. SECURITY PROTOCOL SASL_PLAINTEXT
  52. )
  53. > CREATE CONNECTION kafka_ssl_scram_sha_512 TO KAFKA (
  54. BROKER 'kafka:9096',
  55. SASL MECHANISMS 'SCRAM-SHA-512',
  56. SASL USERNAME 'materialize',
  57. SASL PASSWORD SECRET kafka_protocols_password,
  58. SSL CERTIFICATE AUTHORITY '${ca-crt}'
  59. )
  60. > CREATE CONNECTION kafka_sasl TO KAFKA (
  61. BROKER 'kafka:9097' USING SSH TUNNEL ssh_tunnel_0,
  62. SASL MECHANISMS 'PLAIN',
  63. SASL USERNAME 'materialize',
  64. SASL PASSWORD SECRET kafka_protocols_password,
  65. SSL CERTIFICATE '${kafka-crt}',
  66. SSL KEY SECRET kafka_key,
  67. SSL CERTIFICATE AUTHORITY '${ca-crt}'
  68. )
  69. > CREATE SOURCE kafka_plaintext_1_src FROM KAFKA CONNECTION kafka_plaintext (
  70. TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
  71. )
  72. > CREATE TABLE kafka_plaintext_1 FROM SOURCE kafka_plaintext_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
  73. FORMAT TEXT
  74. > CREATE SOURCE kafka_ssl_1_src FROM KAFKA CONNECTION kafka_ssl (
  75. TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
  76. )
  77. > CREATE TABLE kafka_ssl_1 FROM SOURCE kafka_ssl_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
  78. FORMAT TEXT
  79. > CREATE SOURCE kafka_scram_sha_512_1_src FROM KAFKA CONNECTION kafka_scram_sha_512 (
  80. TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
  81. )
  82. > CREATE TABLE kafka_scram_sha_512_1 FROM SOURCE kafka_scram_sha_512_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
  83. FORMAT TEXT
  84. > CREATE SOURCE kafka_ssl_scram_sha_512_1_src FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
  85. TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
  86. )
  87. > CREATE TABLE kafka_ssl_scram_sha_512_1 FROM SOURCE kafka_ssl_scram_sha_512_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
  88. FORMAT TEXT
  89. > CREATE SOURCE kafka_sasl_1_src FROM KAFKA CONNECTION kafka_sasl (
  90. TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
  91. )
  92. > CREATE TABLE kafka_sasl_1 FROM SOURCE kafka_sasl_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
  93. FORMAT TEXT
  94. """
  95. )
  96. )
  97. def manipulate(self) -> list[Testdrive]:
  98. return [
  99. Testdrive(dedent(s))
  100. for s in [
  101. """
  102. > CREATE SOURCE kafka_plaintext_2_src FROM KAFKA CONNECTION kafka_plaintext (
  103. TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
  104. )
  105. > CREATE TABLE kafka_plaintext_2 FROM SOURCE kafka_plaintext_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
  106. FORMAT TEXT
  107. > CREATE SOURCE kafka_ssl_2_src FROM KAFKA CONNECTION kafka_ssl (
  108. TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
  109. )
  110. > CREATE TABLE kafka_ssl_2 FROM SOURCE kafka_ssl_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
  111. FORMAT TEXT
  112. > CREATE SOURCE kafka_scram_sha_512_2_src FROM KAFKA CONNECTION kafka_scram_sha_512 (
  113. TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
  114. )
  115. > CREATE TABLE kafka_scram_sha_512_2 FROM SOURCE kafka_scram_sha_512_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
  116. FORMAT TEXT
  117. > CREATE SOURCE kafka_ssl_scram_sha_512_2_src FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
  118. TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
  119. )
  120. > CREATE TABLE kafka_ssl_scram_sha_512_2 FROM SOURCE kafka_ssl_scram_sha_512_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
  121. FORMAT TEXT
  122. > CREATE SOURCE kafka_sasl_2_src FROM KAFKA CONNECTION kafka_sasl (
  123. TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
  124. )
  125. > CREATE TABLE kafka_sasl_2 FROM SOURCE kafka_sasl_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
  126. FORMAT TEXT
  127. $ kafka-ingest topic=kafka-protocols-1 format=bytes
  128. two
  129. $ kafka-ingest topic=kafka-protocols-2 format=bytes
  130. two
  131. $ kafka-ingest topic=kafka-protocols-3 format=bytes
  132. two
  133. """,
  134. """
  135. > CREATE SOURCE kafka_plaintext_3_src FROM KAFKA CONNECTION kafka_plaintext (
  136. TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
  137. )
  138. > CREATE TABLE kafka_plaintext_3 FROM SOURCE kafka_plaintext_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
  139. FORMAT TEXT
  140. > CREATE SOURCE kafka_ssl_3_src FROM KAFKA CONNECTION kafka_ssl (
  141. TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
  142. )
  143. > CREATE TABLE kafka_ssl_3 FROM SOURCE kafka_ssl_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
  144. FORMAT TEXT
  145. > CREATE SOURCE kafka_scram_sha_512_3_src FROM KAFKA CONNECTION kafka_scram_sha_512 (
  146. TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
  147. )
  148. > CREATE TABLE kafka_scram_sha_512_3 FROM SOURCE kafka_scram_sha_512_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
  149. FORMAT TEXT
  150. > CREATE SOURCE kafka_ssl_scram_sha_512_3_src FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
  151. TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
  152. )
  153. > CREATE TABLE kafka_ssl_scram_sha_512_3 FROM SOURCE kafka_ssl_scram_sha_512_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
  154. FORMAT TEXT
  155. > CREATE SOURCE kafka_sasl_3_src FROM KAFKA CONNECTION kafka_sasl (
  156. TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
  157. )
  158. > CREATE TABLE kafka_sasl_3 FROM SOURCE kafka_sasl_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
  159. FORMAT TEXT
  160. $ kafka-ingest topic=kafka-protocols-1 format=bytes
  161. three
  162. $ kafka-ingest topic=kafka-protocols-2 format=bytes
  163. three
  164. $ kafka-ingest topic=kafka-protocols-3 format=bytes
  165. three
  166. """,
  167. ]
  168. ]
  169. def validate(self) -> Testdrive:
  170. return Testdrive(
  171. dedent(
  172. """
  173. > SELECT * FROM kafka_plaintext_1
  174. one
  175. two
  176. three
  177. > SELECT * FROM kafka_ssl_1
  178. one
  179. two
  180. three
  181. > SELECT * FROM kafka_scram_sha_512_1
  182. one
  183. two
  184. three
  185. > SELECT * FROM kafka_ssl_scram_sha_512_1
  186. one
  187. two
  188. three
  189. > SELECT * FROM kafka_sasl_1
  190. one
  191. two
  192. three
  193. > SELECT * FROM kafka_plaintext_2
  194. one
  195. two
  196. three
  197. > SELECT * FROM kafka_ssl_2
  198. one
  199. two
  200. three
  201. > SELECT * FROM kafka_scram_sha_512_2
  202. one
  203. two
  204. three
  205. > SELECT * FROM kafka_ssl_scram_sha_512_2
  206. one
  207. two
  208. three
  209. > SELECT * FROM kafka_sasl_2
  210. one
  211. two
  212. three
  213. > SELECT * FROM kafka_plaintext_3
  214. one
  215. two
  216. three
  217. > SELECT * FROM kafka_ssl_3
  218. one
  219. two
  220. three
  221. > SELECT * FROM kafka_scram_sha_512_3
  222. one
  223. two
  224. three
  225. > SELECT * FROM kafka_ssl_scram_sha_512_3
  226. one
  227. two
  228. three
  229. > SELECT * FROM kafka_sasl_3
  230. one
  231. two
  232. three
  233. """
  234. )
  235. )