test-kafka-ssl.td 6.7 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET enable_default_connection_validation = true
  11. ALTER SYSTEM SET enable_connection_validation_syntax = true
  12. # ==> Set up. <==
  13. $ set-from-file ca-crt=/share/secrets/ca.crt
  14. $ set-from-file ca-selective-crt=/share/secrets/ca-selective.crt
  15. $ kafka-create-topic topic=text-data
  16. $ kafka-ingest topic=text-data format=bytes
  17. banana
  18. # ==> Test invalid configurations. <==
  19. ! CREATE CONNECTION kafka_invalid TO KAFKA (
  20. BROKER 'kafka:9093',
  21. SECURITY PROTOCOL PLAINTEXT
  22. )
  23. contains:Disconnected during handshake; broker might require SSL encryption
  24. ! CREATE CONNECTION kafka_invalid TO KAFKA (
  25. BROKER 'kafka:9093'
  26. -- SECURITY PROTOCOL defaults to SSL when no SASL options are specified.
  27. )
  28. contains:Invalid CA certificate
  29. ! CREATE CONNECTION kafka_invalid TO KAFKA (
  30. BROKER 'kafka:9093',
  31. SSL CERTIFICATE AUTHORITY = '${ca-selective-crt}'
  32. )
  33. contains:Invalid CA certificate
  34. ! CREATE CONNECTION kafka_invalid TO KAFKA (
  35. BROKER 'kafka:9093',
  36. SSL CERTIFICATE AUTHORITY = 'this is garbage'
  37. )
  38. contains:ssl.ca.pem failed: not in PEM format?
  39. # ==> Test without an SSH tunnel. <==
  40. > CREATE CONNECTION kafka TO KAFKA (
  41. BROKER 'kafka:9093',
  42. SSL CERTIFICATE AUTHORITY '${ca-crt}'
  43. )
  44. > CREATE SOURCE text_data FROM KAFKA CONNECTION kafka (
  45. TOPIC 'testdrive-text-data-${testdrive.seed}'
  46. )
  47. > CREATE TABLE text_data_tbl (a)
  48. FROM SOURCE text_data (REFERENCE "testdrive-text-data-${testdrive.seed}")
  49. FORMAT TEXT
  50. > SELECT * FROM text_data_tbl
  51. banana
  52. # ==> Test with an SSH tunnel. <==
  53. > CREATE CONNECTION kafka_ssh TO KAFKA (
  54. BROKER 'kafka:9093' USING SSH TUNNEL testdrive_no_reset_connections.public.ssh,
  55. SSL CERTIFICATE AUTHORITY '${ca-crt}'
  56. )
  57. > CREATE SOURCE text_data_ssh FROM KAFKA CONNECTION kafka_ssh (
  58. TOPIC 'testdrive-text-data-${testdrive.seed}'
  59. )
  60. > CREATE TABLE text_data_ssh_tbl FROM SOURCE text_data_ssh (REFERENCE "testdrive-text-data-${testdrive.seed}") FORMAT TEXT
  61. > SELECT * FROM text_data_ssh_tbl
  62. banana
  63. # ALTER CONNECTION
  64. # ALTER CONNECTION for Kafka
  65. ! ALTER CONNECTION kafka SET (SSL KEY = 'x') WITH (VALIDATE = true);
  66. contains:invalid SSL KEY: must provide a secret value
  67. > CREATE SECRET IF NOT EXISTS invalid_secret AS 'x'
  68. ! ALTER CONNECTION kafka SET (SSL KEY = SECRET invalid_secret) WITH (VALIDATE = true);
  69. contains:option SSL KEY not supported with this configuration
  70. ! ALTER CONNECTION kafka SET (SSL CERTIFICATE = 'x') WITH (VALIDATE = true);
  71. contains:SSL KEY must be specified with SSL CERTIFICATE
  72. ! ALTER CONNECTION kafka SET (SSL KEY = SECRET invalid_secret), SET (SSL CERTIFICATE = 'x') WITH (VALIDATE = true);
  73. contains:Client creation error
  74. ! ALTER CONNECTION kafka SET (SSL CERTIFICATE AUTHORITY = 'x') WITH (VALIDATE = true);
  75. contains:Client creation error
  76. > ALTER CONNECTION kafka RESET (SSL KEY) WITH (VALIDATE = true);
  77. > ALTER CONNECTION kafka RESET (SSL CERTIFICATE) WITH (VALIDATE = true);
  78. ! ALTER CONNECTION kafka RESET (SSL CERTIFICATE AUTHORITY) WITH (VALIDATE = true);
  79. contains:Invalid CA certificate
  80. ! ALTER CONNECTION kafka RESET (SSL KEY), RESET (SSL CERTIFICATE), RESET (SSL CERTIFICATE AUTHORITY) WITH (VALIDATE = true);
  81. contains:Invalid CA certificate
  82. > ALTER CONNECTION kafka RESET (SSL KEY), RESET (SSL CERTIFICATE), RESET (SSL CERTIFICATE AUTHORITY) WITH (VALIDATE = false);
  83. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data';
  84. stalled
  85. > ALTER CONNECTION kafka
  86. DROP (SSL KEY),
  87. DROP (SSL CERTIFICATE),
  88. SET (SSL CERTIFICATE AUTHORITY '${ca-crt}');
  89. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data';
  90. running
  91. # ALTER CONNECTION for Kafka + SSH
  92. ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = true);
  93. contains:failed to lookup address information
  94. ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (HOST);
  95. contains:HOST option is required
  96. ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (USER = 'abcd') WITH (VALIDATE = true);
  97. contains:Permission denied
  98. ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (USER);
  99. contains:invalid ALTER CONNECTION: USER option is required
  100. ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (PORT = 1) WITH (VALIDATE = true);
  101. contains:Connection refused
  102. #Break SSH connection via host
  103. > ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = false);
  104. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data_ssh';
  105. stalled
  106. > ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'ssh-bastion-host') WITH (VALIDATE = true);
  107. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data_ssh';
  108. running
  109. # Break SSH connection via port
  110. > ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (PORT = 1) WITH (VALIDATE = false);
  111. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data_ssh';
  112. stalled
  113. > ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (PORT) WITH (VALIDATE = true);
  114. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data_ssh';
  115. running
  116. # Swap out SSH connection
  117. > SELECT COUNT(*) FROM mz_ssh_tunnel_connections
  118. 2
  119. > SELECT COUNT(*) FROM mz_internal.mz_object_dependencies WHERE referenced_object_id = 'u1';
  120. 1
  121. ! DROP CONNECTION testdrive_no_reset_connections.public.ssh;
  122. contains:still depended upon by connection "kafka_ssh"
  123. > SELECT COUNT(*) FROM mz_internal.mz_object_dependencies WHERE referenced_object_id = 'u1';
  124. 1
  125. > ALTER CONNECTION kafka_ssh SET (BROKER 'kafka:9093' USING SSH TUNNEL testdrive_no_reset_connections.public.ssh_backup);
  126. # We've removed all dependencies on testdrive_no_reset_connections.public.ssh, so it could be dropped
  127. > SELECT COUNT(*) FROM mz_internal.mz_object_dependencies WHERE referenced_object_id = 'u1';
  128. 0
  129. # Break new SSH tunnel to show that we can fix it
  130. > ALTER CONNECTION testdrive_no_reset_connections.public.ssh_backup SET (PORT = 1) WITH (VALIDATE = false);
  131. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data_ssh';
  132. stalled
  133. $ kafka-ingest topic=text-data format=bytes
  134. papaya
  135. > ALTER CONNECTION testdrive_no_reset_connections.public.ssh_backup RESET (PORT) WITH (VALIDATE = true);
  136. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'text_data_ssh';
  137. running
  138. > SELECT * FROM text_data_tbl
  139. banana
  140. papaya