test-schema-registry-ssl-basic.td 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET enable_default_connection_validation = true
  11. ALTER SYSTEM SET enable_connection_validation_syntax = true
  12. # ==> Set up. <==
  13. $ set-from-file ca-crt=/share/secrets/ca.crt
  14. $ set-sql-timeout duration=60s
  15. > CREATE SECRET password AS 'sekurity'
  16. > CREATE SECRET password_wrong AS 'wrong'
  17. > CREATE CONNECTION kafka to KAFKA (
  18. BROKER 'kafka:9092',
  19. SECURITY PROTOCOL PLAINTEXT
  20. )
  21. $ set schema={
  22. "name": "row",
  23. "type": "record",
  24. "fields": [
  25. {"name": "a", "type": "long"}
  26. ]
  27. }
  28. $ kafka-create-topic topic=avro-data
  29. $ kafka-ingest topic=avro-data format=avro schema=${schema} timestamp=1
  30. {"a": 1}
  31. # ==> Test invalid configurations. <==
  32. ! CREATE CONNECTION schema_registry_invalid TO CONFLUENT SCHEMA REGISTRY (
  33. URL 'https://ssl-basic.schema-registry.local:8082'
  34. )
  35. contains:certificate verify failed
  36. ! CREATE CONNECTION schema_registry_invalid TO CONFLUENT SCHEMA REGISTRY (
  37. URL 'https://ssl-basic.schema-registry.local:8082',
  38. SSL CERTIFICATE AUTHORITY = '${ca-crt}'
  39. )
  40. contains:server error 401: Unauthorized
  41. ! CREATE CONNECTION schema_registry_invalid TO CONFLUENT SCHEMA REGISTRY (
  42. URL 'https://ssl-basic.schema-registry.local:8082',
  43. USERNAME 'materialize',
  44. PASSWORD SECRET password_wrong,
  45. SSL CERTIFICATE AUTHORITY = '${ca-crt}'
  46. )
  47. contains:server error 401: Unauthorized
  48. # ==> Test without an SSH tunnel. <==
  49. > CREATE CONNECTION schema_registry TO CONFLUENT SCHEMA REGISTRY (
  50. URL 'https://ssl-basic.schema-registry.local:8082',
  51. USERNAME 'materialize',
  52. PASSWORD SECRET password,
  53. SSL CERTIFICATE AUTHORITY = '${ca-crt}'
  54. )
  55. > CREATE SOURCE avro_data FROM KAFKA CONNECTION kafka (
  56. TOPIC 'testdrive-avro-data-${testdrive.seed}'
  57. )
  58. > CREATE TABLE avro_data_tbl FROM SOURCE avro_data (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  59. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
  60. > SELECT * FROM avro_data_tbl
  61. a
  62. ----
  63. 1
  64. # ==> Test with an SSH tunnel. <==
  65. > CREATE CONNECTION schema_registry_ssh TO CONFLUENT SCHEMA REGISTRY (
  66. URL 'https://ssl-basic.schema-registry.local:8082',
  67. USERNAME 'materialize',
  68. PASSWORD SECRET password,
  69. SSL CERTIFICATE AUTHORITY = '${ca-crt}',
  70. SSH TUNNEL testdrive_no_reset_connections.public.ssh
  71. )
  72. > CREATE SOURCE avro_data_ssh FROM KAFKA CONNECTION kafka (
  73. TOPIC 'testdrive-avro-data-${testdrive.seed}'
  74. )
  75. > CREATE TABLE avro_data_ssh_tbl FROM SOURCE avro_data_ssh (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  76. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
  77. > SELECT * FROM avro_data_ssh_tbl
  78. a
  79. ----
  80. 1
  81. # ALTER CONNECTION
  82. ## Sink Kafka connection
  83. ### Create a new connection in use only by the sink so that we may break it.
  84. > CREATE CONNECTION kafka_backup TO KAFKA (
  85. BROKER 'kafka:9092',
  86. SECURITY PROTOCOL PLAINTEXT
  87. );
  88. #### Create a backup sink
  89. > CREATE SINK snk_backup FROM avro_data_tbl
  90. INTO KAFKA CONNECTION kafka_backup (TOPIC 'snk_backup')
  91. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
  92. ENVELOPE DEBEZIUM
  93. $ kafka-verify-data format=avro sink=materialize.public.snk_backup sort-messages=true
  94. {"before": null, "after": {"row":{"a": 1}}}
  95. ### Break sink broker connection and produce data
  96. > ALTER CONNECTION kafka_backup SET (BROKER 'kafka:9093') WITH (VALIDATE = false);
  97. $ kafka-ingest topic=avro-data format=avro schema=${schema} timestamp=2
  98. {"a": 2}
  99. > SELECT * FROM avro_data_tbl
  100. 1
  101. 2
  102. > SELECT count(status) > 0 FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON sink_id = id WHERE name = 'snk_backup' AND status = 'stalled';
  103. true
  104. > ALTER CONNECTION kafka_backup SET (BROKER 'kafka:9092');
  105. > SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'snk_backup';
  106. running
  107. $ kafka-verify-data format=avro sink=materialize.public.snk_backup sort-messages=true
  108. {"before": null, "after": {"row":{"a": 2}}}
  109. ## CSR connection
  110. ! ALTER CONNECTION schema_registry SET (URL = 'abc') WITH (VALIDATE = true);
  111. contains:invalid ALTER CONNECTION: parsing schema registry url: relative URL without a base
  112. ! ALTER CONNECTION schema_registry RESET (URL);
  113. contains:invalid ALTER CONNECTION: invalid CONNECTION: must specify URL
  114. ! ALTER CONNECTION schema_registry SET (SSL KEY = 'x') WITH (VALIDATE = true);
  115. contains:invalid SSL KEY: must provide a secret value
  116. > CREATE SECRET IF NOT EXISTS invalid_secret AS 'x'
  117. ! ALTER CONNECTION schema_registry SET (SSL KEY = SECRET invalid_secret) WITH (VALIDATE = true);
  118. contains:requires both SSL KEY and SSL CERTIFICATE
  119. ! ALTER CONNECTION schema_registry SET (SSL CERTIFICATE = 'x') WITH (VALIDATE = true);
  120. contains:requires both SSL KEY and SSL CERTIFICATE
  121. ! ALTER CONNECTION schema_registry SET (SSL KEY = SECRET invalid_secret), SET (SSL CERTIFICATE = 'x') WITH (VALIDATE = true);
  122. contains:No supported data to decode
  123. ! ALTER CONNECTION schema_registry SET (SSL CERTIFICATE AUTHORITY = 'x') WITH (VALIDATE = true);
  124. contains:CERTIFICATE
  125. > ALTER CONNECTION schema_registry RESET (SSL KEY);
  126. > ALTER CONNECTION schema_registry RESET (SSL CERTIFICATE);
  127. ! ALTER CONNECTION schema_registry RESET (SSL CERTIFICATE AUTHORITY) WITH (VALIDATE = true);
  128. contains:self-signed certificate in certificate chain
  129. ! ALTER CONNECTION schema_registry RESET (SSL KEY), RESET (SSL CERTIFICATE), RESET (SSL CERTIFICATE AUTHORITY);
  130. contains:self-signed certificate in certificate chain
  131. ! ALTER CONNECTION schema_registry RESET (USERNAME);
  132. contains:Unauthorized
  133. ! ALTER CONNECTION schema_registry RESET (PASSWORD);
  134. contains:Unauthorized
  135. ! ALTER CONNECTION schema_registry RESET (USERNAME), RESET (PASSWORD);
  136. contains:Unauthorized
  137. # Break CSR connection
  138. # These won't necessarily stall running sources which might not reach out to the CSR.
  139. > ALTER CONNECTION schema_registry DROP (SSL CERTIFICATE AUTHORITY) WITH (VALIDATE = false);
  140. $ kafka-create-topic topic=data partitions=1
  141. > CREATE SOURCE kafka_broken_csr_connector_source_broken
  142. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-data-${testdrive.seed}')
  143. ! CREATE TABLE kafka_broken_csr_connector_source_broken_tbl FROM SOURCE kafka_broken_csr_connector_source_broken (REFERENCE "testdrive-data-${testdrive.seed}")
  144. FORMAT AVRO
  145. USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
  146. contains:failed to fetch schema subject
  147. > ALTER CONNECTION schema_registry SET (SSL CERTIFICATE AUTHORITY = '${ca-crt}') WITH (VALIDATE = true);
  148. > CREATE SOURCE kafka_broken_csr_connector_source_fixed FROM KAFKA CONNECTION kafka (
  149. TOPIC 'testdrive-avro-data-${testdrive.seed}'
  150. )
  151. > CREATE TABLE kafka_broken_csr_connector_source_fixed_tbl FROM SOURCE kafka_broken_csr_connector_source_fixed (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  152. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
  153. > SELECT * FROM kafka_broken_csr_connector_source_fixed_tbl
  154. 1
  155. 2