123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import time
- from textwrap import dedent
- import pytest
- from pg8000.dbapi import DatabaseError, ProgrammingError
- from materialize.cloudtest.app.materialize_application import MaterializeApplication
- from materialize.cloudtest.util.common import retry
- from materialize.cloudtest.util.exists import exists, not_exists
- from materialize.ui import UIError
- def test_create_privatelink_connection(mz: MaterializeApplication) -> None:
- # Create a PrivateLink SQL connection object,
- # which should create a K8S VpcEndpoint object.
- # We don't run the environment-controller,
- # so no AWS VPC Endpoint will be created.
- # so we don't need the named service to actually exist.
- create_connection_statement = dedent(
- """\
- CREATE CONNECTION privatelinkconn
- TO AWS PRIVATELINK (
- SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
- AVAILABILITY ZONES ('use1-az1', 'use1-az2')
- )
- """
- )
- # This should fail until max_aws_privatelink_connections is increased.
- with pytest.raises(
- ProgrammingError,
- match="creating AWS PrivateLink Connection would violate max_aws_privatelink_connections limit",
- ):
- mz.environmentd.sql(create_connection_statement)
- next_gid = mz.environmentd.sql_query(
- "SELECT MAX(SUBSTR(id, 2, LENGTH(id) - 1)::int) + 1 FROM mz_objects WHERE id LIKE 'u%'"
- )[0][0]
- not_exists(resource=f"vpcendpoint/connection-u{next_gid}")
- mz.environmentd.sql(
- "ALTER SYSTEM SET max_aws_privatelink_connections = 5",
- port="internal",
- user="mz_system",
- )
- mz.environmentd.sql(create_connection_statement)
- aws_connection_id = mz.environmentd.sql_query(
- "SELECT id FROM mz_connections WHERE name = 'privatelinkconn'"
- )[0][0]
- exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
- # Less flaky if we sleep before checking the status
- time.sleep(5)
- assert (
- "unknown"
- == mz.environmentd.sql_query(
- f"SELECT status FROM mz_internal.mz_aws_privatelink_connection_status_history WHERE connection_id = '{aws_connection_id}'"
- )[0][0]
- )
- # TODO: validate the contents of the VPC endpoint resource, rather than just
- # its existence.
- mz.environmentd.sql(
- "ALTER SYSTEM SET enable_connection_validation_syntax = true",
- port="internal",
- user="mz_system",
- )
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION kafkaconn TO KAFKA (
- BROKERS (
- 'customer-hostname-1:9092' USING AWS PRIVATELINK privatelinkconn,
- 'customer-hostname-2:9092' USING AWS PRIVATELINK privatelinkconn (PORT 9093),
- 'customer-hostname-3:9092' USING AWS PRIVATELINK privatelinkconn (AVAILABILITY ZONE 'use1-az1', PORT 9093),
- 'customer-hostname-4:9094'
- ),
- SECURITY PROTOCOL PLAINTEXT
- ) WITH (VALIDATE = false);
- """
- )
- )
- mz.environmentd.sql_query("SELECT id FROM mz_connections WHERE name = 'kafkaconn'")[
- 0
- ][0]
- principal = mz.environmentd.sql_query(
- "SELECT principal FROM mz_aws_privatelink_connections"
- )[0][0]
- assert principal == (
- f"arn:aws:iam::123456789000:role/mz_eb5cb59b-e2fe-41f3-87ca-d2176a495345_{aws_connection_id}"
- )
- # Validate default privatelink connections for kafka
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION kafkaconn_alt TO KAFKA (
- AWS PRIVATELINK privatelinkconn (PORT 9092),
- SECURITY PROTOCOL PLAINTEXT
- ) WITH (VALIDATE = false);
- """
- )
- )
- mz.environmentd.sql_query(
- "SELECT id FROM mz_connections WHERE name = 'kafkaconn_alt'"
- )[0][0]
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION sshconn TO SSH TUNNEL (
- HOST 'ssh-bastion-host',
- USER 'mz',
- PORT 22
- );
- """
- )
- )
- with pytest.raises(
- ProgrammingError, match="cannot specify both SSH TUNNEL and AWS PRIVATELINK"
- ):
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION pg TO POSTGRES (
- HOST 'postgres',
- DATABASE postgres,
- USER postgres,
- AWS PRIVATELINK privatelinkconn,
- SSH TUNNEL sshconn
- ) WITH (VALIDATE = false);
- """
- )
- )
- with pytest.raises(
- ProgrammingError, match='invalid AWS PrivateLink availability zone "us-east-1a"'
- ):
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION privatelinkconn2
- TO AWS PRIVATELINK (
- SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
- AVAILABILITY ZONES ('use1-az2', 'us-east-1a')
- );
- """
- )
- )
- with pytest.raises(
- ProgrammingError,
- match="connection cannot contain duplicate availability zones",
- ):
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION privatelinkconn2
- TO AWS PRIVATELINK (
- SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
- AVAILABILITY ZONES ('use1-az1', 'use1-az1', 'use1-az2')
- );
- """
- )
- )
- with pytest.raises(
- ProgrammingError,
- match='AWS PrivateLink availability zone "use1-az3" does not match any of the availability zones on the AWS PrivateLink connection',
- ):
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION kafkaconn2 TO KAFKA (
- BROKERS (
- 'customer-hostname-3:9092' USING AWS PRIVATELINK privatelinkconn (AVAILABILITY ZONE 'use1-az3', PORT 9093)
- ),
- SECURITY PROTOCOL PLAINTEXT
- ) WITH (VALIDATE = false);
- """
- )
- )
- with pytest.raises(
- DatabaseError,
- match="invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK",
- ):
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION kafkaconn2_alt TO KAFKA (
- AWS PRIVATELINK privatelinkconn (PORT 9092),
- BROKERS (
- 'customer-hostname-3:9092' USING AWS PRIVATELINK privatelinkconn (PORT 9093)
- ),
- SECURITY PROTOCOL PLAINTEXT
- ) WITH (VALIDATE = false);
- """
- )
- )
- with pytest.raises(
- ProgrammingError,
- match="invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka",
- ):
- mz.environmentd.sql(
- dedent(
- """\
- CREATE CONNECTION pg TO POSTGRES (
- HOST 'postgres',
- DATABASE postgres,
- USER postgres,
- AWS PRIVATELINK privatelinkconn ( PORT 1234 ),
- PORT 1234
- ) WITH (VALIDATE = false);
- """
- )
- )
- mz.environmentd.sql("DROP CONNECTION kafkaconn CASCADE")
- mz.environmentd.sql("DROP CONNECTION privatelinkconn CASCADE")
- not_exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
- def test_background_drop_privatelink_connection(mz: MaterializeApplication) -> None:
- # Ensure that privatelink connections are
- # deleted in a background task
- mz.environmentd.sql(
- "ALTER SYSTEM SET max_aws_privatelink_connections = 5",
- port="internal",
- user="mz_system",
- )
- create_connection_statement = dedent(
- """\
- CREATE CONNECTION privatelinkconn
- TO AWS PRIVATELINK (
- SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
- AVAILABILITY ZONES ('use1-az1', 'use1-az2')
- )
- """
- )
- mz.environmentd.sql(create_connection_statement)
- aws_connection_id = mz.environmentd.sql_query(
- "SELECT id FROM mz_connections WHERE name = 'privatelinkconn'"
- )[0][0]
- mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=pause'")
- mz.environmentd.sql("DROP CONNECTION privatelinkconn CASCADE")
- exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
- mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=off'")
- not_exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
- def test_retry_drop_privatelink_connection(mz: MaterializeApplication) -> None:
- # Ensure that privatelink connections are
- # deleted in a background task
- mz.environmentd.sql(
- "ALTER SYSTEM SET max_aws_privatelink_connections = 5",
- port="internal",
- user="mz_system",
- )
- create_connection_statement = dedent(
- """\
- CREATE CONNECTION privatelinkconn
- TO AWS PRIVATELINK (
- SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
- AVAILABILITY ZONES ('use1-az1', 'use1-az2')
- )
- """
- )
- mz.environmentd.sql(create_connection_statement)
- aws_connection_id = mz.environmentd.sql_query(
- "SELECT id FROM mz_connections WHERE name = 'privatelinkconn'"
- )[0][0]
- mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=return(failed)'")
- mz.environmentd.sql("DROP CONNECTION privatelinkconn CASCADE")
- exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
- mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=off'")
- retry(
- f=lambda: not_exists(resource=f"vpcendpoint/connection-{aws_connection_id}"),
- max_attempts=10,
- exception_types=[UIError],
- )
|