123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """Tests of AWS functionality that run against AWS.
- To run these tests locally:
- $ cd test/aws
- $ AWS_PROFILE=mz-scratch-admin ./mzcompose --dev run default
- """
- import codecs
- import json
- import random
- import boto3
- from psycopg.errors import SystemError
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.materialized import Materialized
- AWS_EXTERNAL_ID_PREFIX = "eb5cb59b-e2fe-41f3-87ca-d2176a495345"
- SERVICES = [
- Materialized(),
- ]
- class TestContext:
- def __init__(self, iam_propagation_seconds: int):
- self.iam_propagation_seconds = iam_propagation_seconds
- self.seed = random.getrandbits(32)
- self.sts = boto3.client("sts")
- self.iam = boto3.client("iam")
- # Get the IAM principal that we're running as.
- caller = self.sts.get_caller_identity()
- self.account_id = caller["Account"]
- self.materialized_principal = caller["Arn"]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- # Sleeping to wait for IAM to propagate is ugly and somewhat flaky, but
- # there isn't an obviously better solution. This only runs in the nightly
- # pipeline, so flakes are more tolerable than they would be if this ran in
- # the PR pipeline.
- parser.add_argument(
- "--iam-propagation-seconds",
- type=int,
- default=10,
- help="how long to wait for IAM policies to propagate",
- )
- args = parser.parse_args()
- # Set up.
- ctx = TestContext(iam_propagation_seconds=args.iam_propagation_seconds)
- # Create the "jump role" that Materialize will use to assume each
- # connection's role.
- connection_role = f"testdrive-{ctx.seed}-MaterializeConnection"
- connection_role_arn = f"arn:aws:iam::{ctx.account_id}:role/{connection_role}"
- _create_role(ctx, connection_role, ctx.materialized_principal)
- try:
- # Start Materialize.
- materialized = Materialized(
- environment_extra=[
- "AWS_DEFAULT_REGION=us-east-1",
- "AWS_ACCESS_KEY_ID",
- "AWS_PROFILE",
- "AWS_SECRET_ACCESS_KEY",
- "AWS_SESSION_TOKEN",
- ],
- volumes_extra=[
- # Mounting the .aws directory in the container allows Materialize to
- # use SSO credentials, which makes it easier to run this composition
- # locally. CI doesn't need this.
- "~/.aws:/home/materialize/.aws",
- ],
- options=[
- f"--aws-connection-role-arn={connection_role_arn}",
- f"--aws-external-id-prefix={AWS_EXTERNAL_ID_PREFIX}",
- ],
- )
- with c.override(materialized):
- # (Re)start Materialize and enable AWS connections.
- c.down()
- c.up("materialized")
- c.sql(
- port=6877,
- user="mz_system",
- sql="""
- ALTER SYSTEM SET enable_connection_validation_syntax = true;
- """,
- )
- for fn in [test_credentials, test_assume_role]:
- with c.test_case(fn.__name__):
- fn(c, ctx)
- finally:
- _delete_role(ctx, connection_role)
- def test_credentials(c: Composition, ctx: TestContext):
- # Create a user with an access key.
- customer_user = f"testdrive-{ctx.seed}-Customer"
- ctx.iam.create_user(UserName=customer_user)
- access_key = ctx.iam.create_access_key(UserName=customer_user)
- access_key_id = access_key["AccessKey"]["AccessKeyId"]
- secret_access_key = access_key["AccessKey"]["SecretAccessKey"]
- # Creating a connection with those credentials should work.
- c.sql(
- f"""
- CREATE SECRET aws_secret_access_key AS '{secret_access_key}';
- CREATE CONNECTION aws_credentials TO AWS (
- ACCESS KEY ID = '{access_key_id}',
- SECRET ACCESS KEY = SECRET aws_secret_access_key
- );
- """,
- print_statement=False,
- )
- # Wait for IAM to propagate.
- c.sleep(ctx.iam_propagation_seconds)
- c.sql("VALIDATE CONNECTION aws_credentials")
- # Corrupting the secret access key should cause authentication to fail with
- # an invalid signature error.
- bad_secret_access_key = codecs.encode(secret_access_key, "rot13")
- c.sql(
- f"ALTER SECRET aws_secret_access_key AS '{bad_secret_access_key}'",
- print_statement=False,
- )
- try:
- c.sql("VALIDATE CONNECTION aws_credentials")
- except SystemError as e:
- assert (
- e.diag.message_primary and "SignatureDoesNotMatch" in e.diag.message_primary
- ), e
- else:
- raise RuntimeError("connection validation unexpectedly succeeded")
- # Changing the access key to a nonexistent access key should fail with an
- # invalid client ID error.
- c.sql(
- "ALTER CONNECTION aws_credentials SET (ACCESS KEY ID = 'AKIAV2KIV5LP3RAKAZUY')",
- print_statement=False,
- )
- try:
- c.sql("VALIDATE CONNECTION aws_credentials")
- except SystemError as e:
- assert (
- e.diag.message_primary and "InvalidClientTokenId" in e.diag.message_primary
- ), e
- else:
- raise RuntimeError("connection validation unexpectedly succeeded")
- def test_assume_role(c: Composition, ctx: TestContext):
- # Create a connection to a not-yet-existing customer role.
- customer_role = f"testdrive-{ctx.seed}-Customer"
- customer_role_arn = f"arn:aws:iam::{ctx.account_id}:role/{customer_role}"
- c.sql(
- f"CREATE CONNECTION aws_assume_role TO AWS (ASSUME ROLE ARN '{customer_role_arn}')"
- )
- connection_id = c.sql_query(
- "SELECT id FROM mz_connections WHERE name = 'aws_assume_role'"
- )[0][0]
- # Ensure that validating the connection fails.
- try:
- c.sql("VALIDATE CONNECTION aws_assume_role")
- except SystemError as e:
- assert e.diag.message_primary and "AccessDenied" in e.diag.message_primary, e
- else:
- raise RuntimeError("connection validation unexpectedly succeeded")
- # Create the customer role, but incorrectly fail to constrain the
- # external ID.
- principal = c.sql_query(
- f"SELECT principal FROM mz_internal.mz_aws_connections WHERE id = '{connection_id}'"
- )[0][0]
- _create_role(ctx, customer_role, principal)
- # Wait for IAM to propagate.
- c.sleep(ctx.iam_propagation_seconds)
- try:
- try:
- c.sql("VALIDATE CONNECTION aws_assume_role")
- except SystemError as e:
- # Ensure the top line error message is exactly what we expect.
- assert (
- "role trust policy does not require an external ID"
- == e.diag.message_primary
- )
- # We're not as prescriptive about the detail/hint fields. Just ensure
- # that the details include the exact ARN of the connection's role and
- # that the hint includes a link to further documentation.
- assert (
- e.diag.message_detail and customer_role_arn in e.diag.message_detail
- ), e
- assert (
- e.diag.message_hint
- and "https://materialize.com/s/aws-connection-role-trust-policy"
- in e.diag.message_hint
- ), e
- else:
- raise RuntimeError("connection validation unexpectedly succeeded")
- # Update the customer role's trust policy to use Materialize's example.
- trust_policy = c.sql_query(
- f"SELECT example_trust_policy FROM mz_internal.mz_aws_connections WHERE id = '{connection_id}'"
- )[0][0]
- ctx.iam.update_assume_role_policy(
- RoleName=customer_role,
- PolicyDocument=json.dumps(trust_policy),
- )
- # Wait for IAM to propagate.
- c.sleep(ctx.iam_propagation_seconds)
- # Ensure that connection validation now succeeds.
- c.sql("VALIDATE CONNECTION aws_assume_role")
- finally:
- _delete_role(ctx, customer_role)
- def _create_role(ctx: TestContext, customer_role: str, principal: str) -> None:
- ctx.iam.create_role(
- RoleName=customer_role,
- AssumeRolePolicyDocument=json.dumps(
- {
- "Version": "2012-10-17",
- "Statement": [
- {
- "Effect": "Allow",
- "Principal": {
- "AWS": principal,
- },
- "Action": "sts:AssumeRole",
- }
- ],
- }
- ),
- )
- def _delete_role(ctx: TestContext, customer_role: str) -> None:
- ctx.iam.delete_role(
- RoleName=customer_role,
- )
|