test_secrets.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import subprocess
  10. from textwrap import dedent
  11. import pytest
  12. from pg8000.exceptions import InterfaceError
  13. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  14. from materialize.cloudtest.util.cluster import cluster_pod_name
  15. from materialize.cloudtest.util.wait import wait
  16. def test_secrets(mz: MaterializeApplication) -> None:
  17. mz.testdrive.run(
  18. input=dedent(
  19. """
  20. > CREATE SECRET username AS '123';
  21. > CREATE SECRET password AS '234';
  22. # Our Redpanda instance is not configured for SASL, so we can not
  23. # really establish a successful connection.
  24. ! CREATE CONNECTION secrets_conn TO KAFKA (
  25. BROKER '${testdrive.kafka-addr}',
  26. SASL MECHANISMS 'PLAIN',
  27. SASL USERNAME = SECRET username,
  28. SASL PASSWORD = SECRET password
  29. );
  30. contains:Broker does not support SSL connections
  31. """
  32. )
  33. )
  34. id = mz.environmentd.sql_query("SELECT id FROM mz_secrets WHERE name = 'username'")[
  35. 0
  36. ][0]
  37. assert id is not None
  38. secret = f"user-managed-{id}"
  39. # wait(condition="condition=Ready", resource=f"secret/{secret}")
  40. describe = mz.kubectl("describe", "secret", secret)
  41. assert "contents: 3 bytes" in describe
  42. mz.environmentd.sql("ALTER SECRET username AS '1234567890'")
  43. describe = mz.kubectl("describe", "secret", secret)
  44. assert "contents: 10 bytes" in describe
  45. mz.environmentd.sql("DROP SECRET username CASCADE")
  46. wait(condition="delete", resource=f"secret/{secret}")
  47. # Tests that secrets deleted from the catalog but not from k8s are cleaned up on
  48. # envd startup.
  49. @pytest.mark.skip(reason="Failpoints mess up the Mz intance database-issues#5263")
  50. def test_orphaned_secrets(mz: MaterializeApplication) -> None:
  51. # Use two separate failpoints. One that crashes after modifying the catalog
  52. # (drop_secrets), and one that fails during bootstrap (orphan_secrets) so
  53. # that we can prevent a racy startup from cleaning up the secret before we
  54. # observed it.
  55. mz.set_environmentd_failpoints("orphan_secrets=panic")
  56. mz.environmentd.sql("SET failpoints = 'drop_secrets=panic'")
  57. mz.environmentd.sql("CREATE SECRET orphan AS '123'")
  58. id = mz.environmentd.sql_query("SELECT id FROM mz_secrets WHERE name = 'orphan'")[
  59. 0
  60. ][0]
  61. assert id is not None
  62. secret = f"user-managed-{id}"
  63. # The failpoint should cause this to fail.
  64. try:
  65. mz.environmentd.sql("DROP SECRET orphan")
  66. raise Exception("Unexpected success")
  67. except InterfaceError:
  68. pass
  69. describe = mz.kubectl("describe", "secret", secret)
  70. assert "contents: 3 bytes" in describe
  71. # We saw the secret, allow orphan cleanup.
  72. mz.set_environmentd_failpoints("")
  73. mz.wait_for_sql()
  74. wait(condition="delete", resource=f"secret/{secret}")
  75. @pytest.mark.skip(reason="Flaky, see database-issues#8456")
  76. def test_missing_secret(mz: MaterializeApplication) -> None:
  77. """Test that Mz does not panic if a secret goes missing from K8s"""
  78. mz.testdrive.run(
  79. input=dedent(
  80. """
  81. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  82. ALTER SYSTEM SET enable_connection_validation_syntax = true
  83. > CREATE CLUSTER to_be_killed REPLICAS (to_be_killed (SIZE '1'));
  84. > CREATE SECRET to_be_deleted AS 'postgres'
  85. > CREATE CONNECTION kafka_conn_with_deleted_secret TO KAFKA (
  86. BROKER '${testdrive.kafka-addr}',
  87. SASL MECHANISMS 'PLAIN',
  88. SASL USERNAME = SECRET to_be_deleted,
  89. SASL PASSWORD = SECRET to_be_deleted
  90. ) WITH (VALIDATE = false);
  91. > CREATE CONNECTION pg_conn_with_deleted_secret TO POSTGRES (
  92. HOST 'postgres',
  93. DATABASE postgres,
  94. USER postgres,
  95. PASSWORD SECRET to_be_deleted
  96. );
  97. $ postgres-execute connection=postgres://postgres:postgres@postgres
  98. ALTER USER postgres WITH replication;
  99. DROP SCHEMA IF EXISTS public CASCADE;
  100. DROP PUBLICATION IF EXISTS mz_source;
  101. CREATE SCHEMA public;
  102. CREATE TABLE t1 (f1 INTEGER);
  103. ALTER TABLE t1 REPLICA IDENTITY FULL;
  104. INSERT INTO t1 VALUES (1);
  105. CREATE PUBLICATION mz_source FOR TABLE t1;
  106. > CREATE SOURCE source_with_deleted_secret
  107. IN CLUSTER to_be_killed
  108. FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
  109. (PUBLICATION 'mz_source');
  110. > CREATE TABLE t1 FROM SOURCE source_with_deleted_secret (REFERENCE t1);
  111. > SELECT COUNT(*) > 0 FROM t1;
  112. true
  113. """
  114. )
  115. )
  116. id = mz.environmentd.sql_query(
  117. "SELECT id FROM mz_secrets WHERE name = 'to_be_deleted'"
  118. )[0][0]
  119. assert id is not None
  120. secret = f"user-managed-{id}"
  121. mz.kubectl("delete", "secret", secret)
  122. wait(condition="delete", resource=f"secret/{secret}")
  123. mz.testdrive.run(
  124. input=dedent(
  125. """
  126. ! CREATE SOURCE some_pg_source
  127. FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
  128. (PUBLICATION 'mz_source');
  129. contains: NotFound
  130. ! CREATE SOURCE some_kafka_source
  131. FROM KAFKA CONNECTION kafka_conn_with_deleted_secret
  132. (TOPIC 'foo')
  133. contains:failed to create and connect Kafka consumer
  134. """
  135. ),
  136. no_reset=True,
  137. )
  138. # Restart the storage computed and confirm that the source errors out properly
  139. cluster_id, replica_id = mz.environmentd.sql_query(
  140. "SELECT cluster_id, id FROM mz_cluster_replicas WHERE name = 'to_be_killed'"
  141. )[0]
  142. pod_name = cluster_pod_name(cluster_id, replica_id, 0)
  143. # wait for the cluster to be ready first before attempting to kill it
  144. wait(condition="condition=Ready", resource=f"{pod_name}")
  145. try:
  146. mz.kubectl("exec", pod_name, "--", "bash", "-c", "kill -9 `pidof clusterd`")
  147. except subprocess.CalledProcessError as e:
  148. # Killing the entrypoint via kubectl may result in kubectl exiting with code 137
  149. assert e.returncode == 137
  150. mz.testdrive.run(
  151. input=dedent(
  152. """
  153. ! CREATE SOURCE some_pg_source
  154. FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
  155. (PUBLICATION 'mz_source');
  156. contains: NotFound
  157. ! CREATE SOURCE some_kafka_source
  158. FROM KAFKA CONNECTION kafka_conn_with_deleted_secret
  159. (TOPIC 'foo')
  160. contains:failed to create and connect Kafka consumer
  161. > SELECT error like '%NotFound%'
  162. FROM mz_internal.mz_source_statuses
  163. WHERE name = 'source_with_deleted_secret';
  164. true
  165. """
  166. ),
  167. no_reset=True,
  168. )
  169. # Kill the environmentd and confirm the same
  170. mz.kubectl(
  171. "exec",
  172. "pod/environmentd-0",
  173. "--",
  174. "bash",
  175. "-c",
  176. "kill -9 `pidof environmentd`",
  177. )
  178. wait(condition="condition=Ready", resource="pod/environmentd-0")
  179. mz.testdrive.run(
  180. input=dedent(
  181. """
  182. ! CREATE SOURCE some_pg_source
  183. FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
  184. (PUBLICATION 'mz_source');
  185. contains: NotFound
  186. ! CREATE SOURCE some_kafka_source
  187. FROM KAFKA CONNECTION kafka_conn_with_deleted_secret
  188. (TOPIC 'foo')
  189. contains:failed to create and connect Kafka consumer
  190. > SELECT error like '%NotFound%'
  191. FROM mz_internal.mz_source_statuses
  192. WHERE name = 'source_with_deleted_secret';
  193. true
  194. # The secret missing is similar to if the user dropped their
  195. # upstream Postgres DB, and we don't want to block dropping objects
  196. # because of that.
  197. > DROP CLUSTER to_be_killed CASCADE;
  198. """
  199. ),
  200. no_reset=True,
  201. )