123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import subprocess
- from textwrap import dedent
- import pytest
- from pg8000.exceptions import InterfaceError
- from materialize.cloudtest.app.materialize_application import MaterializeApplication
- from materialize.cloudtest.util.cluster import cluster_pod_name
- from materialize.cloudtest.util.wait import wait
- def test_secrets(mz: MaterializeApplication) -> None:
- mz.testdrive.run(
- input=dedent(
- """
- > CREATE SECRET username AS '123';
- > CREATE SECRET password AS '234';
- # Our Redpanda instance is not configured for SASL, so we can not
- # really establish a successful connection.
- ! CREATE CONNECTION secrets_conn TO KAFKA (
- BROKER '${testdrive.kafka-addr}',
- SASL MECHANISMS 'PLAIN',
- SASL USERNAME = SECRET username,
- SASL PASSWORD = SECRET password
- );
- contains:Broker does not support SSL connections
- """
- )
- )
- id = mz.environmentd.sql_query("SELECT id FROM mz_secrets WHERE name = 'username'")[
- 0
- ][0]
- assert id is not None
- secret = f"user-managed-{id}"
- # wait(condition="condition=Ready", resource=f"secret/{secret}")
- describe = mz.kubectl("describe", "secret", secret)
- assert "contents: 3 bytes" in describe
- mz.environmentd.sql("ALTER SECRET username AS '1234567890'")
- describe = mz.kubectl("describe", "secret", secret)
- assert "contents: 10 bytes" in describe
- mz.environmentd.sql("DROP SECRET username CASCADE")
- wait(condition="delete", resource=f"secret/{secret}")
- # Tests that secrets deleted from the catalog but not from k8s are cleaned up on
- # envd startup.
- @pytest.mark.skip(reason="Failpoints mess up the Mz intance database-issues#5263")
- def test_orphaned_secrets(mz: MaterializeApplication) -> None:
- # Use two separate failpoints. One that crashes after modifying the catalog
- # (drop_secrets), and one that fails during bootstrap (orphan_secrets) so
- # that we can prevent a racy startup from cleaning up the secret before we
- # observed it.
- mz.set_environmentd_failpoints("orphan_secrets=panic")
- mz.environmentd.sql("SET failpoints = 'drop_secrets=panic'")
- mz.environmentd.sql("CREATE SECRET orphan AS '123'")
- id = mz.environmentd.sql_query("SELECT id FROM mz_secrets WHERE name = 'orphan'")[
- 0
- ][0]
- assert id is not None
- secret = f"user-managed-{id}"
- # The failpoint should cause this to fail.
- try:
- mz.environmentd.sql("DROP SECRET orphan")
- raise Exception("Unexpected success")
- except InterfaceError:
- pass
- describe = mz.kubectl("describe", "secret", secret)
- assert "contents: 3 bytes" in describe
- # We saw the secret, allow orphan cleanup.
- mz.set_environmentd_failpoints("")
- mz.wait_for_sql()
- wait(condition="delete", resource=f"secret/{secret}")
- @pytest.mark.skip(reason="Flaky, see database-issues#8456")
- def test_missing_secret(mz: MaterializeApplication) -> None:
- """Test that Mz does not panic if a secret goes missing from K8s"""
- mz.testdrive.run(
- input=dedent(
- """
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_connection_validation_syntax = true
- > CREATE CLUSTER to_be_killed REPLICAS (to_be_killed (SIZE '1'));
- > CREATE SECRET to_be_deleted AS 'postgres'
- > CREATE CONNECTION kafka_conn_with_deleted_secret TO KAFKA (
- BROKER '${testdrive.kafka-addr}',
- SASL MECHANISMS 'PLAIN',
- SASL USERNAME = SECRET to_be_deleted,
- SASL PASSWORD = SECRET to_be_deleted
- ) WITH (VALIDATE = false);
- > CREATE CONNECTION pg_conn_with_deleted_secret TO POSTGRES (
- HOST 'postgres',
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET to_be_deleted
- );
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- ALTER USER postgres WITH replication;
- DROP SCHEMA IF EXISTS public CASCADE;
- DROP PUBLICATION IF EXISTS mz_source;
- CREATE SCHEMA public;
- CREATE TABLE t1 (f1 INTEGER);
- ALTER TABLE t1 REPLICA IDENTITY FULL;
- INSERT INTO t1 VALUES (1);
- CREATE PUBLICATION mz_source FOR TABLE t1;
- > CREATE SOURCE source_with_deleted_secret
- IN CLUSTER to_be_killed
- FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
- (PUBLICATION 'mz_source');
- > CREATE TABLE t1 FROM SOURCE source_with_deleted_secret (REFERENCE t1);
- > SELECT COUNT(*) > 0 FROM t1;
- true
- """
- )
- )
- id = mz.environmentd.sql_query(
- "SELECT id FROM mz_secrets WHERE name = 'to_be_deleted'"
- )[0][0]
- assert id is not None
- secret = f"user-managed-{id}"
- mz.kubectl("delete", "secret", secret)
- wait(condition="delete", resource=f"secret/{secret}")
- mz.testdrive.run(
- input=dedent(
- """
- ! CREATE SOURCE some_pg_source
- FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
- (PUBLICATION 'mz_source');
- contains: NotFound
- ! CREATE SOURCE some_kafka_source
- FROM KAFKA CONNECTION kafka_conn_with_deleted_secret
- (TOPIC 'foo')
- contains:failed to create and connect Kafka consumer
- """
- ),
- no_reset=True,
- )
- # Restart the storage computed and confirm that the source errors out properly
- cluster_id, replica_id = mz.environmentd.sql_query(
- "SELECT cluster_id, id FROM mz_cluster_replicas WHERE name = 'to_be_killed'"
- )[0]
- pod_name = cluster_pod_name(cluster_id, replica_id, 0)
- # wait for the cluster to be ready first before attempting to kill it
- wait(condition="condition=Ready", resource=f"{pod_name}")
- try:
- mz.kubectl("exec", pod_name, "--", "bash", "-c", "kill -9 `pidof clusterd`")
- except subprocess.CalledProcessError as e:
- # Killing the entrypoint via kubectl may result in kubectl exiting with code 137
- assert e.returncode == 137
- mz.testdrive.run(
- input=dedent(
- """
- ! CREATE SOURCE some_pg_source
- FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
- (PUBLICATION 'mz_source');
- contains: NotFound
- ! CREATE SOURCE some_kafka_source
- FROM KAFKA CONNECTION kafka_conn_with_deleted_secret
- (TOPIC 'foo')
- contains:failed to create and connect Kafka consumer
- > SELECT error like '%NotFound%'
- FROM mz_internal.mz_source_statuses
- WHERE name = 'source_with_deleted_secret';
- true
- """
- ),
- no_reset=True,
- )
- # Kill the environmentd and confirm the same
- mz.kubectl(
- "exec",
- "pod/environmentd-0",
- "--",
- "bash",
- "-c",
- "kill -9 `pidof environmentd`",
- )
- wait(condition="condition=Ready", resource="pod/environmentd-0")
- mz.testdrive.run(
- input=dedent(
- """
- ! CREATE SOURCE some_pg_source
- FROM POSTGRES CONNECTION pg_conn_with_deleted_secret
- (PUBLICATION 'mz_source');
- contains: NotFound
- ! CREATE SOURCE some_kafka_source
- FROM KAFKA CONNECTION kafka_conn_with_deleted_secret
- (TOPIC 'foo')
- contains:failed to create and connect Kafka consumer
- > SELECT error like '%NotFound%'
- FROM mz_internal.mz_source_statuses
- WHERE name = 'source_with_deleted_secret';
- true
- # The secret missing is similar to if the user dropped their
- # upstream Postgres DB, and we don't want to block dropping objects
- # because of that.
- > DROP CLUSTER to_be_killed CASCADE;
- """
- ),
- no_reset=True,
- )
|