123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """Tests of AWS functionality that run against localstack."""
- import uuid
- from typing import Any, cast
- import boto3
- from materialize.mzcompose import (
- DEFAULT_CLOUD_REGION,
- DEFAULT_MZ_ENVIRONMENT_ID,
- DEFAULT_ORDINAL,
- DEFAULT_ORG_ID,
- )
- from materialize.mzcompose.composition import (
- Composition,
- )
- from materialize.mzcompose.services.localstack import Localstack
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.testdrive import Testdrive
- ENVIRONMENT_NAME = f"environment-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
- NAMESPACE = ENVIRONMENT_NAME
- SERVICE_ACCOUNT_NAME = ENVIRONMENT_NAME
- OIDC_SUB = f"system:serviceaccount:{NAMESPACE}:{SERVICE_ACCOUNT_NAME}"
- PURPOSE = "test-aws"
- STACK = "mzcompose"
- KMS_KEY_ALIAS_NAME = f"alias/customer_key_{DEFAULT_MZ_ENVIRONMENT_ID}"
- AWS_CONNECTION_ROLE_ARN = "arn:aws:iam::123456789000:role/MaterializeConnection"
- AWS_EXTERNAL_ID_PREFIX = "eb5cb59b-e2fe-41f3-87ca-d2176a495345"
- AWS_ACCESS_KEY_ID = "LSIAQAAAAAAVNCBMPNSG"
- AWS_SECRET_ACCESS_KEY = "secret"
- AWS_ENDPOINT_URL_MZ = "http://localstack:4566"
- SERVICES = [
- Localstack(),
- Mz(app_password=""),
- Materialized(
- depends_on=["localstack"],
- environment_extra=[
- f"AWS_REGION={DEFAULT_CLOUD_REGION}",
- f"AWS_ENDPOINT_URL={AWS_ENDPOINT_URL_MZ}",
- f"AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}",
- f"AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}",
- ],
- options=[
- "--secrets-controller=aws-secrets-manager",
- f"--aws-secrets-controller-tags=Owner={OIDC_SUB}",
- f"--aws-secrets-controller-tags=Environment={ENVIRONMENT_NAME}",
- f"--aws-secrets-controller-tags=Purpose={PURPOSE}",
- f"--aws-secrets-controller-tags=Stack={STACK}",
- f"--aws-connection-role-arn={AWS_CONNECTION_ROLE_ARN}",
- f"--aws-external-id-prefix={AWS_EXTERNAL_ID_PREFIX}",
- ],
- ),
- Testdrive(default_timeout="5s"),
- ]
- def workflow_default(c: Composition) -> None:
- def process(name: str) -> None:
- with c.test_case(name):
- c.workflow(name)
- workflows = ["secrets-manager", "aws-connection", "copy-to-s3"]
- c.test_parts(workflows, process)
- def workflow_secrets_manager(c: Composition) -> None:
- c.up("localstack")
- aws_endpoint_url = f"http://localhost:{c.port('localstack', 4566)}"
- kms_client = boto3.client(
- "kms",
- endpoint_url=aws_endpoint_url,
- region_name=DEFAULT_CLOUD_REGION,
- aws_access_key_id=AWS_ACCESS_KEY_ID,
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
- )
- key_id = kms_client.create_key()["KeyMetadata"]["KeyId"]
- kms_client.create_alias(
- AliasName=KMS_KEY_ALIAS_NAME,
- TargetKeyId=key_id,
- )
- sm_client = boto3.client(
- "secretsmanager",
- endpoint_url=aws_endpoint_url,
- region_name=DEFAULT_CLOUD_REGION,
- aws_access_key_id=AWS_ACCESS_KEY_ID,
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
- )
- expected_tags = [
- {
- "Key": "Owner",
- "Value": OIDC_SUB,
- },
- {
- "Key": "Environment",
- "Value": ENVIRONMENT_NAME,
- },
- {
- "Key": "Purpose",
- "Value": PURPOSE,
- },
- {
- "Key": "Stack",
- "Value": STACK,
- },
- ]
- # Use up IDs u1 and u2 so we can test behavior with orphaned secrets.
- c.up("materialized")
- c.sql("CREATE TABLE t ()")
- c.sql("DROP TABLE t")
- c.sql("CREATE TABLE t ()")
- c.sql("DROP TABLE t")
- c.stop("materialized")
- # Create an orphaned secret that should get deleted when starting environmentd.
- orphan_1_name = f"/user-managed/{DEFAULT_MZ_ENVIRONMENT_ID}/u1"
- sm_client.create_secret(
- Name=orphan_1_name,
- KmsKeyId=KMS_KEY_ALIAS_NAME,
- SecretString="I'm an orphan, delete me!",
- Tags=expected_tags,
- )
- # Create an orphaned secret without the correct tags, so it should be ignored.
- orphan_2_name = f"/user-managed/{DEFAULT_MZ_ENVIRONMENT_ID}/u2"
- sm_client.create_secret(
- Name=orphan_2_name,
- KmsKeyId=KMS_KEY_ALIAS_NAME,
- SecretString="I'm an orphan, but I shouldn't be deleted because of my missing tags!",
- )
- # Create a secret for a different environment, so it should be ignored.
- other_environment_name = "environment-11111111-2222-3333-4444-555555555555-0"
- other_oidc_sub = (
- f"system:serviceaccount:{other_environment_name}:{other_environment_name}"
- )
- other_name = f"/user-managed/{other_environment_name}/u9876"
- sm_client.create_secret(
- Name=other_name,
- KmsKeyId=KMS_KEY_ALIAS_NAME,
- SecretString="I belong to a different environment, so leave me alone!",
- Tags=[
- {
- "Key": "Owner",
- "Value": other_oidc_sub,
- },
- {
- "Key": "Environment",
- "Value": other_environment_name,
- },
- {
- "Key": "Purpose",
- "Value": PURPOSE,
- },
- {
- "Key": "Stack",
- "Value": STACK,
- },
- ],
- )
- def list_secrets() -> dict[str, dict[str, Any]]:
- return {
- secret["Name"]: secret for secret in sm_client.list_secrets()["SecretList"]
- }
- def secret_name(_id: str) -> str:
- return f"/user-managed/{DEFAULT_MZ_ENVIRONMENT_ID}/{_id}"
- def get_secret_value(_id: str) -> bytes:
- return cast(
- bytes,
- sm_client.get_secret_value(SecretId=secret_name(_id))["SecretBinary"],
- )
- c.up("materialized")
- secrets = list_secrets()
- assert orphan_1_name not in secrets
- assert orphan_2_name in secrets
- assert other_name in secrets
- # Should include migrated secrets and secrets for other environments
- assert len(secrets) == 2
- c.sql("CREATE SECRET secret AS 's3cret'")
- secrets = list_secrets()
- # New secret should exist with specified contents
- assert secret_name("u3") in secrets
- assert b"s3cret" == get_secret_value("u3")
- # Secrets should have expected tags
- secret_u3 = secrets[secret_name("u3")]
- for tag in expected_tags:
- assert tag in secret_u3["Tags"]
- # Check that alter secret gets reflected in Secrets Manager
- c.sql("ALTER SECRET secret AS 'tops3cret'")
- assert b"tops3cret" == get_secret_value("u3")
- # Rename should not change the contents in Secrets Manager
- c.sql("ALTER SECRET secret RENAME TO renamed_secret")
- assert b"tops3cret" == get_secret_value("u3")
- # Ensure secret still exists after a restart (i.e., test that orphaned
- # cleanup doesn't fire incorrectly).
- c.stop("materialized")
- c.up("materialized")
- secrets = list_secrets()
- assert secret_name("u3") in secrets
- c.sql("DROP SECRET renamed_secret")
- # Check that the file has been deleted from Secrets Manager
- secrets = list_secrets()
- assert secret_name("u3") not in secrets
- def workflow_aws_connection(c: Composition) -> None:
- c.up("localstack", "materialized")
- c.run_testdrive_files("aws-connection/aws-connection.td")
- def workflow_copy_to_s3(c: Composition) -> None:
- with c.override(
- Materialized(
- depends_on=["localstack"],
- environment_extra=[
- f"AWS_ENDPOINT_URL={AWS_ENDPOINT_URL_MZ}",
- f"AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}",
- f"AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}",
- ],
- )
- ):
- c.up("localstack", "materialized")
- localhost_aws_endpoint_url = f"http://localhost:{c.port('localstack', 4566)}"
- s3_client = boto3.client(
- "s3",
- endpoint_url=localhost_aws_endpoint_url,
- region_name=DEFAULT_CLOUD_REGION,
- aws_access_key_id=AWS_ACCESS_KEY_ID,
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
- )
- bucket_name = "copy-to-s3"
- s3_client.create_bucket(Bucket=bucket_name)
- path_prefix = str(uuid.uuid4())
- c.run_testdrive_files(
- f"--var=endpoint={AWS_ENDPOINT_URL_MZ}",
- f"--var=access-key={AWS_ACCESS_KEY_ID}",
- f"--var=secret-key={AWS_SECRET_ACCESS_KEY}",
- f"--var=s3-prefix={bucket_name}/{path_prefix}",
- f"--var=region={DEFAULT_CLOUD_REGION}",
- "--default-timeout=300s",
- "copy-to-s3/copy-to-s3.td",
- )
- def validate_upload(upload, expected_output_set):
- assert len(upload["Contents"]) > 0
- output_lines = set()
- for obj in upload["Contents"]:
- assert obj["Key"].endswith(".csv")
- key = obj["Key"]
- object_response = s3_client.get_object(Bucket=bucket_name, Key=key)
- body = object_response["Body"].read().decode("utf-8")
- output_lines.update(body.splitlines())
- assert output_lines == expected_output_set
- # asserting the uploaded files
- date = c.sql_query("SELECT TO_CHAR(now(), 'YYYY-MM-DD')")[0][0]
- expected_output = set(map(lambda x: str(x), range(10)))
- first_upload = s3_client.list_objects_v2(
- Bucket=bucket_name, Prefix=f"{path_prefix}/1/{date}/"
- )
- validate_upload(first_upload, expected_output)
- second_upload = s3_client.list_objects_v2(
- Bucket=bucket_name, Prefix=f"{path_prefix}/2/"
- )
- validate_upload(second_upload, expected_output)
- third_upload = s3_client.list_objects_v2(
- Bucket=bucket_name, Prefix=f"{path_prefix}/3/"
- )
- validate_upload(third_upload, set(["1000"]))
|