mzcompose.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Tests that Materialize can connect to Kafka's various connection modes:
  11. plaintext, ssl, mssl, sasl_plaintext, sasl_ssl, sasl_mssl
  12. """
  13. import glob
  14. from materialize import MZ_ROOT, buildkite
  15. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  16. from materialize.mzcompose.services.kafka import Kafka
  17. from materialize.mzcompose.services.materialized import Materialized
  18. from materialize.mzcompose.services.mz import Mz
  19. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  20. from materialize.mzcompose.services.ssh_bastion_host import SshBastionHost
  21. from materialize.mzcompose.services.test_certs import TestCerts
  22. from materialize.mzcompose.services.testdrive import Testdrive
  23. from materialize.mzcompose.services.zookeeper import Zookeeper
  24. SERVICES = [
  25. TestCerts(),
  26. SshBastionHost(),
  27. Zookeeper(),
  28. Kafka(
  29. depends_on_extra=["test-certs"],
  30. advertised_listeners=[
  31. # Using lowercase listener names here bypasses some too-helpful
  32. # checks in the Docker entrypoint that (incorrectly) attempt to
  33. # assess the validity of the authentication configuration.
  34. "plaintext://kafka:9092",
  35. "ssl://kafka:9093",
  36. "mssl://kafka:9094",
  37. "sasl_plaintext://kafka:9095",
  38. "sasl_ssl://kafka:9096",
  39. "sasl_mssl://kafka:9097",
  40. ],
  41. environment_extra=[
  42. "ZOOKEEPER_SASL_ENABLED=FALSE",
  43. "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,ssl:SSL,mssl:SSL,sasl_plaintext:SASL_PLAINTEXT,sasl_ssl:SASL_SSL,sasl_mssl:SASL_SSL",
  44. "KAFKA_INTER_BROKER_LISTENER_NAME=plaintext",
  45. "KAFKA_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512",
  46. "KAFKA_SSL_KEY_PASSWORD=mzmzmz",
  47. "KAFKA_SSL_KEYSTORE_LOCATION=/etc/kafka/secrets/kafka.keystore.jks",
  48. "KAFKA_SSL_KEYSTORE_PASSWORD=mzmzmz",
  49. "KAFKA_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.truststore.jks",
  50. "KAFKA_SSL_TRUSTSTORE_PASSWORD=mzmzmz",
  51. "KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/jaas.config",
  52. "KAFKA_LISTENER_NAME_MSSL_SSL_CLIENT_AUTH=required",
  53. "KAFKA_LISTENER_NAME_SASL__MSSL_SSL_CLIENT_AUTH=required",
  54. "KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer",
  55. "KAFKA_SUPER_USERS=User:materialize;User:CN=materialized;User:ANONYMOUS",
  56. ],
  57. volumes=[
  58. "secrets:/etc/kafka/secrets",
  59. "./kafka.jaas.config:/etc/kafka/jaas.config",
  60. ],
  61. ),
  62. SchemaRegistry(
  63. environment_extra=[
  64. # Only allow this schema registry, which does not require
  65. # authentication, to be the leader. This simplifies testdrive, as
  66. # it is assured that it can submit requests to the schema registry
  67. # without needing authentication.
  68. "SCHEMA_REGISTRY_LEADER_ELIGIBILITY=true",
  69. ]
  70. ),
  71. SchemaRegistry(
  72. name="schema-registry-basic",
  73. aliases=["basic.schema-registry.local"],
  74. environment_extra=[
  75. "SCHEMA_REGISTRY_LEADER_ELIGIBILITY=false",
  76. "SCHEMA_REGISTRY_AUTHENTICATION_METHOD=BASIC",
  77. "SCHEMA_REGISTRY_AUTHENTICATION_ROLES=user",
  78. "SCHEMA_REGISTRY_AUTHENTICATION_REALM=SchemaRegistry",
  79. "SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/jaas.config",
  80. ],
  81. volumes=[
  82. "./schema-registry.jaas.config:/etc/schema-registry/jaas.config",
  83. "./schema-registry.user.properties:/etc/schema-registry/user.properties",
  84. ],
  85. ),
  86. SchemaRegistry(
  87. name="schema-registry-ssl",
  88. aliases=["ssl.schema-registry.local"],
  89. environment_extra=[
  90. "SCHEMA_REGISTRY_LEADER_ELIGIBILITY=false",
  91. "SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081,https://0.0.0.0:8082",
  92. "SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION=/etc/schema-registry/secrets/schema-registry.keystore.jks",
  93. "SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD=mzmzmz",
  94. "SCHEMA_REGISTRY_SSL_KEY_PASSWORD=mzmzmz",
  95. ],
  96. volumes=[
  97. "secrets:/etc/schema-registry/secrets",
  98. ],
  99. ),
  100. SchemaRegistry(
  101. name="schema-registry-mssl",
  102. aliases=["mssl.schema-registry.local"],
  103. environment_extra=[
  104. "SCHEMA_REGISTRY_LEADER_ELIGIBILITY=false",
  105. "SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081,https://0.0.0.0:8082",
  106. "SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION=/etc/schema-registry/secrets/schema-registry.keystore.jks",
  107. "SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD=mzmzmz",
  108. "SCHEMA_REGISTRY_SSL_KEY_PASSWORD=mzmzmz",
  109. "SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION=/etc/schema-registry/secrets/schema-registry.truststore.jks",
  110. "SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD=mzmzmz",
  111. "SCHEMA_REGISTRY_SSL_CLIENT_AUTH=true",
  112. ],
  113. volumes=[
  114. "secrets:/etc/schema-registry/secrets",
  115. ],
  116. ),
  117. SchemaRegistry(
  118. name="schema-registry-ssl-basic",
  119. aliases=["ssl-basic.schema-registry.local"],
  120. environment_extra=[
  121. "SCHEMA_REGISTRY_LEADER_ELIGIBILITY=false",
  122. "SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081,https://0.0.0.0:8082",
  123. "SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION=/etc/schema-registry/secrets/schema-registry.keystore.jks",
  124. "SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD=mzmzmz",
  125. "SCHEMA_REGISTRY_SSL_KEY_PASSWORD=mzmzmz",
  126. "SCHEMA_REGISTRY_AUTHENTICATION_METHOD=BASIC",
  127. "SCHEMA_REGISTRY_AUTHENTICATION_ROLES=user",
  128. "SCHEMA_REGISTRY_AUTHENTICATION_REALM=SchemaRegistry",
  129. "SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/jaas.config",
  130. ],
  131. volumes=[
  132. "secrets:/etc/schema-registry/secrets",
  133. "./schema-registry.jaas.config:/etc/schema-registry/jaas.config",
  134. "./schema-registry.user.properties:/etc/schema-registry/user.properties",
  135. ],
  136. ),
  137. SchemaRegistry(
  138. name="schema-registry-mssl-basic",
  139. aliases=["mssl-basic.schema-registry.local"],
  140. environment_extra=[
  141. "SCHEMA_REGISTRY_LEADER_ELIGIBILITY=false",
  142. "SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081,https://0.0.0.0:8082",
  143. "SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION=/etc/schema-registry/secrets/schema-registry.keystore.jks",
  144. "SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD=mzmzmz",
  145. "SCHEMA_REGISTRY_SSL_KEY_PASSWORD=mzmzmz",
  146. "SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION=/etc/schema-registry/secrets/schema-registry.truststore.jks",
  147. "SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD=mzmzmz",
  148. "SCHEMA_REGISTRY_SSL_CLIENT_AUTH=true",
  149. "SCHEMA_REGISTRY_AUTHENTICATION_METHOD=BASIC",
  150. "SCHEMA_REGISTRY_AUTHENTICATION_ROLES=user",
  151. "SCHEMA_REGISTRY_AUTHENTICATION_REALM=SchemaRegistry",
  152. "SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/jaas.config",
  153. ],
  154. volumes=[
  155. "secrets:/etc/schema-registry/secrets",
  156. "./schema-registry.jaas.config:/etc/schema-registry/jaas.config",
  157. "./schema-registry.user.properties:/etc/schema-registry/user.properties",
  158. ],
  159. ),
  160. Materialized(
  161. volumes_extra=["secrets:/share/secrets"],
  162. default_replication_factor=2,
  163. ),
  164. Testdrive(
  165. volumes_extra=["secrets:/share/secrets"],
  166. default_timeout="30s",
  167. ),
  168. Mz(app_password=""),
  169. ]
  170. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  171. """Run testdrive against an authentication-enabled Confluent Platform."""
  172. parser.add_argument(
  173. "filter", nargs="?", default="*", help="limit to only the files matching filter"
  174. )
  175. args = parser.parse_args()
  176. # Bring up a single schema registry first, so that it can bootstrap the
  177. # underlying Kafka topic. Attempting to bring up multiple schema registries
  178. # simultaneously will cause several to fail to bootstrap the underlying
  179. # Kafka topic.
  180. c.up("ssh-bastion-host", "schema-registry", "materialized")
  181. # Add `materialize` SCRAM user to Kafka.
  182. c.exec(
  183. "kafka",
  184. "kafka-configs",
  185. "--bootstrap-server=localhost:9092",
  186. "--alter",
  187. "--add-config=SCRAM-SHA-256=[password=sekurity],SCRAM-SHA-512=[password=sekurity]",
  188. "--entity-type=users",
  189. "--entity-name=materialize",
  190. )
  191. # Restrict the `materialize_no_describe_configs` user from running the
  192. # `DescribeConfigs` cluster operation, but allow it to idempotently read and
  193. # write to all topics.
  194. user = "materialize_no_describe_configs"
  195. add_acl(c, user, "deny", "DescribeConfigs")
  196. add_acl(c, user, "allow", "ALL", "transactional-id=*")
  197. add_acl(c, user, "allow", "ALL", "topic=*")
  198. add_acl(c, user, "allow", "ALL", "group=*")
  199. # Only allow the `materialize_lockdown` user access to specific
  200. # transactional IDs, topics, and group IDs.
  201. user = "materialize_lockdown"
  202. add_acl(
  203. c, user, "allow", "ALL", "transactional-id=lockdown", pattern_type="prefixed"
  204. )
  205. add_acl(c, user, "allow", "ALL", "topic=lockdown-progress")
  206. add_acl(c, user, "allow", "ALL", "topic=lockdown-data", pattern_type="prefixed")
  207. add_acl(c, user, "allow", "ALL", "group=lockdown", pattern_type="prefixed")
  208. add_acl(c, user, "allow", "ALL", "topic=testdrive-data", pattern_type="prefixed")
  209. # Now that the Kafka topic has been bootstrapped, it's safe to bring up all
  210. # the other schema registries in parallel.
  211. c.up(
  212. "schema-registry-basic",
  213. "schema-registry-ssl",
  214. "schema-registry-mssl",
  215. "schema-registry-ssl-basic",
  216. "schema-registry-mssl-basic",
  217. )
  218. # Set up SSH connection.
  219. c.sql(
  220. """
  221. CREATE DATABASE IF NOT EXISTS testdrive_no_reset_connections;
  222. CREATE CONNECTION IF NOT EXISTS testdrive_no_reset_connections.public.ssh TO SSH TUNNEL (
  223. HOST 'ssh-bastion-host',
  224. USER 'mz',
  225. PORT 22
  226. );
  227. """
  228. )
  229. public_key = c.sql_query(
  230. "select public_key_1 from mz_ssh_tunnel_connections where id = 'u1';"
  231. )[0][0]
  232. c.exec(
  233. "ssh-bastion-host",
  234. "bash",
  235. "-c",
  236. f"echo '{public_key}' > /etc/authorized_keys/mz",
  237. )
  238. # Set up backup SSH connection.
  239. c.sql(
  240. """
  241. CREATE DATABASE IF NOT EXISTS testdrive_no_reset_connections;
  242. CREATE CONNECTION IF NOT EXISTS testdrive_no_reset_connections.public.ssh_backup TO SSH TUNNEL (
  243. HOST 'ssh-bastion-host',
  244. USER 'mz',
  245. PORT 22
  246. );
  247. """
  248. )
  249. public_key = c.sql_query(
  250. "select public_key_1 from mz_ssh_tunnel_connections where id = 'u2';"
  251. )[0][0]
  252. c.exec(
  253. "ssh-bastion-host",
  254. "bash",
  255. "-c",
  256. f"echo '{public_key}' >> /etc/authorized_keys/mz",
  257. )
  258. files = buildkite.shard_list(
  259. sorted(
  260. [
  261. file
  262. for file in glob.glob(
  263. f"test-{args.filter}.td", root_dir=MZ_ROOT / "test" / "kafka-auth"
  264. )
  265. ]
  266. ),
  267. lambda file: file,
  268. )
  269. c.test_parts(files, c.run_testdrive_files)
  270. def add_acl(
  271. c: Composition,
  272. user: str,
  273. action: str,
  274. operation: str,
  275. resource: str | None = None,
  276. pattern_type: str = "literal",
  277. ):
  278. c.exec(
  279. "kafka",
  280. "kafka-acls",
  281. "--bootstrap-server=localhost:9092",
  282. "--add",
  283. f"--{action}-principal=User:{user}",
  284. f"--operation={operation}",
  285. f"--{resource}" if resource else "--cluster",
  286. f"--resource-pattern-type={pattern_type}",
  287. )