# Copyright Materialize, Inc. and contributors. All rights reserved. # # Use of this software is governed by the Business Source License # included in the LICENSE file at the root of this repository. # # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. import subprocess from textwrap import dedent import pytest from pg8000.exceptions import InterfaceError from materialize.cloudtest.app.materialize_application import MaterializeApplication from materialize.cloudtest.util.cluster import cluster_pod_name from materialize.cloudtest.util.wait import wait def test_secrets(mz: MaterializeApplication) -> None: mz.testdrive.run( input=dedent( """ > CREATE SECRET username AS '123'; > CREATE SECRET password AS '234'; # Our Redpanda instance is not configured for SASL, so we can not # really establish a successful connection. ! CREATE CONNECTION secrets_conn TO KAFKA ( BROKER '${testdrive.kafka-addr}', SASL MECHANISMS 'PLAIN', SASL USERNAME = SECRET username, SASL PASSWORD = SECRET password ); contains:Broker does not support SSL connections """ ) ) id = mz.environmentd.sql_query("SELECT id FROM mz_secrets WHERE name = 'username'")[ 0 ][0] assert id is not None secret = f"user-managed-{id}" # wait(condition="condition=Ready", resource=f"secret/{secret}") describe = mz.kubectl("describe", "secret", secret) assert "contents: 3 bytes" in describe mz.environmentd.sql("ALTER SECRET username AS '1234567890'") describe = mz.kubectl("describe", "secret", secret) assert "contents: 10 bytes" in describe mz.environmentd.sql("DROP SECRET username CASCADE") wait(condition="delete", resource=f"secret/{secret}") # Tests that secrets deleted from the catalog but not from k8s are cleaned up on # envd startup. @pytest.mark.skip(reason="Failpoints mess up the Mz intance database-issues#5263") def test_orphaned_secrets(mz: MaterializeApplication) -> None: # Use two separate failpoints. One that crashes after modifying the catalog # (drop_secrets), and one that fails during bootstrap (orphan_secrets) so # that we can prevent a racy startup from cleaning up the secret before we # observed it. mz.set_environmentd_failpoints("orphan_secrets=panic") mz.environmentd.sql("SET failpoints = 'drop_secrets=panic'") mz.environmentd.sql("CREATE SECRET orphan AS '123'") id = mz.environmentd.sql_query("SELECT id FROM mz_secrets WHERE name = 'orphan'")[ 0 ][0] assert id is not None secret = f"user-managed-{id}" # The failpoint should cause this to fail. try: mz.environmentd.sql("DROP SECRET orphan") raise Exception("Unexpected success") except InterfaceError: pass describe = mz.kubectl("describe", "secret", secret) assert "contents: 3 bytes" in describe # We saw the secret, allow orphan cleanup. mz.set_environmentd_failpoints("") mz.wait_for_sql() wait(condition="delete", resource=f"secret/{secret}") @pytest.mark.skip(reason="Flaky, see database-issues#8456") def test_missing_secret(mz: MaterializeApplication) -> None: """Test that Mz does not panic if a secret goes missing from K8s""" mz.testdrive.run( input=dedent( """ $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} ALTER SYSTEM SET enable_connection_validation_syntax = true > CREATE CLUSTER to_be_killed REPLICAS (to_be_killed (SIZE '1')); > CREATE SECRET to_be_deleted AS 'postgres' > CREATE CONNECTION kafka_conn_with_deleted_secret TO KAFKA ( BROKER '${testdrive.kafka-addr}', SASL MECHANISMS 'PLAIN', SASL USERNAME = SECRET to_be_deleted, SASL PASSWORD = SECRET to_be_deleted ) WITH (VALIDATE = false); > CREATE CONNECTION pg_conn_with_deleted_secret TO POSTGRES ( HOST 'postgres', DATABASE postgres, USER postgres, PASSWORD SECRET to_be_deleted ); $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER USER postgres WITH replication; DROP SCHEMA IF EXISTS public CASCADE; DROP PUBLICATION IF EXISTS mz_source; CREATE SCHEMA public; CREATE TABLE t1 (f1 INTEGER); ALTER TABLE t1 REPLICA IDENTITY FULL; INSERT INTO t1 VALUES (1); CREATE PUBLICATION mz_source FOR TABLE t1; > CREATE SOURCE source_with_deleted_secret IN CLUSTER to_be_killed FROM POSTGRES CONNECTION pg_conn_with_deleted_secret (PUBLICATION 'mz_source'); > CREATE TABLE t1 FROM SOURCE source_with_deleted_secret (REFERENCE t1); > SELECT COUNT(*) > 0 FROM t1; true """ ) ) id = mz.environmentd.sql_query( "SELECT id FROM mz_secrets WHERE name = 'to_be_deleted'" )[0][0] assert id is not None secret = f"user-managed-{id}" mz.kubectl("delete", "secret", secret) wait(condition="delete", resource=f"secret/{secret}") mz.testdrive.run( input=dedent( """ ! CREATE SOURCE some_pg_source FROM POSTGRES CONNECTION pg_conn_with_deleted_secret (PUBLICATION 'mz_source'); contains: NotFound ! CREATE SOURCE some_kafka_source FROM KAFKA CONNECTION kafka_conn_with_deleted_secret (TOPIC 'foo') contains:failed to create and connect Kafka consumer """ ), no_reset=True, ) # Restart the storage computed and confirm that the source errors out properly cluster_id, replica_id = mz.environmentd.sql_query( "SELECT cluster_id, id FROM mz_cluster_replicas WHERE name = 'to_be_killed'" )[0] pod_name = cluster_pod_name(cluster_id, replica_id, 0) # wait for the cluster to be ready first before attempting to kill it wait(condition="condition=Ready", resource=f"{pod_name}") try: mz.kubectl("exec", pod_name, "--", "bash", "-c", "kill -9 `pidof clusterd`") except subprocess.CalledProcessError as e: # Killing the entrypoint via kubectl may result in kubectl exiting with code 137 assert e.returncode == 137 mz.testdrive.run( input=dedent( """ ! CREATE SOURCE some_pg_source FROM POSTGRES CONNECTION pg_conn_with_deleted_secret (PUBLICATION 'mz_source'); contains: NotFound ! CREATE SOURCE some_kafka_source FROM KAFKA CONNECTION kafka_conn_with_deleted_secret (TOPIC 'foo') contains:failed to create and connect Kafka consumer > SELECT error like '%NotFound%' FROM mz_internal.mz_source_statuses WHERE name = 'source_with_deleted_secret'; true """ ), no_reset=True, ) # Kill the environmentd and confirm the same mz.kubectl( "exec", "pod/environmentd-0", "--", "bash", "-c", "kill -9 `pidof environmentd`", ) wait(condition="condition=Ready", resource="pod/environmentd-0") mz.testdrive.run( input=dedent( """ ! CREATE SOURCE some_pg_source FROM POSTGRES CONNECTION pg_conn_with_deleted_secret (PUBLICATION 'mz_source'); contains: NotFound ! CREATE SOURCE some_kafka_source FROM KAFKA CONNECTION kafka_conn_with_deleted_secret (TOPIC 'foo') contains:failed to create and connect Kafka consumer > SELECT error like '%NotFound%' FROM mz_internal.mz_source_statuses WHERE name = 'source_with_deleted_secret'; true # The secret missing is similar to if the user dropped their # upstream Postgres DB, and we don't want to block dropping objects # because of that. > DROP CLUSTER to_be_killed CASCADE; """ ), no_reset=True, )