123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Tests using the balancerd service instead of connecting to materialized directly.
- Uses the frontegg-mock instead of a real frontend backend.
- """
- import contextlib
- import json
- import socket
- import ssl
- import struct
- import uuid
- from collections.abc import Callable
- from textwrap import dedent
- from typing import Any
- from urllib.parse import quote
- import pg8000
- import requests
- from pg8000.exceptions import InterfaceError
- from psycopg import Cursor
- from psycopg.errors import OperationalError, ProgramLimitExceeded, ProgrammingError
- from materialize import MZ_ROOT
- from materialize.mzcompose.composition import Composition
- from materialize.mzcompose.services.balancerd import Balancerd
- from materialize.mzcompose.services.frontegg import FronteggMock
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.test_certs import TestCerts
- from materialize.mzcompose.services.testdrive import Testdrive
- TENANT_ID = str(uuid.uuid4())
- ADMIN_USER = "u1@example.com"
- OTHER_USER = "u2@example.com"
- ADMIN_ROLE = "MaterializePlatformAdmin"
- OTHER_ROLE = "MaterializePlatform"
- USERS = {
- ADMIN_USER: {
- "email": ADMIN_USER,
- "password": str(uuid.uuid4()),
- "id": str(uuid.uuid4()),
- "tenant_id": TENANT_ID,
- "initial_api_tokens": [
- {
- "client_id": str(uuid.uuid4()),
- "secret": str(uuid.uuid4()),
- }
- ],
- "roles": [OTHER_ROLE, ADMIN_ROLE],
- },
- OTHER_USER: {
- "email": OTHER_USER,
- "password": str(uuid.uuid4()),
- "id": str(uuid.uuid4()),
- "tenant_id": TENANT_ID,
- "initial_api_tokens": [
- {
- "client_id": str(uuid.uuid4()),
- "secret": str(uuid.uuid4()),
- }
- ],
- "roles": [OTHER_ROLE],
- },
- }
- FRONTEGG_URL = "http://frontegg-mock:6880"
- def app_password(email: str) -> str:
- api_token = USERS[email]["initial_api_tokens"][0]
- password = f"mzp_{api_token['client_id']}{api_token['secret']}".replace("-", "")
- return password
- SERVICES = [
- TestCerts(),
- Testdrive(
- materialize_url=f"postgres://{quote(ADMIN_USER)}:{app_password(ADMIN_USER)}@balancerd:6875?sslmode=require",
- materialize_use_https=True,
- no_reset=True,
- ),
- Balancerd(
- command=[
- "service",
- "--pgwire-listen-addr=0.0.0.0:6875",
- "--https-listen-addr=0.0.0.0:6876",
- "--internal-http-listen-addr=0.0.0.0:6878",
- "--frontegg-resolver-template=materialized:6875",
- "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
- f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
- f"--frontegg-admin-role={ADMIN_ROLE}",
- "--https-resolver-template=materialized:6876",
- "--tls-key=/secrets/balancerd.key",
- "--tls-cert=/secrets/balancerd.crt",
- "--default-config=balancerd_inject_proxy_protocol_header_http=true",
- "--internal-tls",
- # Nonsensical but we don't need cancellations here
- "--cancellation-resolver-dir=/secrets/",
- ],
- depends_on=["test-certs"],
- volumes=[
- "secrets:/secrets",
- ],
- ),
- FronteggMock(
- issuer=FRONTEGG_URL,
- encoding_key_file="/secrets/frontegg-mock.key",
- decoding_key_file="/secrets/frontegg-mock.crt",
- users=json.dumps(list(USERS.values())),
- depends_on=["test-certs"],
- volumes=[
- "secrets:/secrets",
- ],
- ),
- Mz(app_password=""),
- Materialized(
- options=[
- # Enable TLS on the public port to verify that balancerd is connecting to the balancerd
- # port.
- "--tls-mode=require",
- "--tls-key=/secrets/materialized.key",
- "--tls-cert=/secrets/materialized.crt",
- f"--frontegg-tenant={TENANT_ID}",
- "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
- f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
- f"--frontegg-admin-role={ADMIN_ROLE}",
- ],
- # We do not do anything interesting on the Mz side
- # to justify the extra restarts
- sanity_restart=False,
- depends_on=["test-certs"],
- volumes_extra=[
- "secrets:/secrets",
- ],
- listeners_config_path=f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth_https.json",
- ),
- ]
- def grant_all_admin_user(c: Composition):
- # Connect once just to force the user to exist
- sql_cursor(c)
- mz_system_cursor = c.sql_cursor(service="materialized", port=6877, user="mz_system")
- mz_system_cursor.execute(
- f'GRANT ALL PRIVILEGES ON SCHEMA public TO "{ADMIN_USER}";'
- )
- mz_system_cursor.execute(
- f'GRANT ALL PRIVILEGES ON CLUSTER quickstart TO "{ADMIN_USER}";'
- )
- # Assert that contains is present in balancer metrics.
- def assert_metrics(c: Composition, contains: str):
- result = c.exec(
- "materialized",
- "curl",
- "http://balancerd:6878/metrics",
- "-s",
- capture=True,
- )
- assert contains in result.stdout
- def sql_cursor(
- c: Composition, service="balancerd", email="u1@example.com", startup_params={}
- ) -> Cursor:
- return c.sql_cursor(
- service=service,
- user=email,
- password=app_password(email),
- sslmode="require",
- startup_params=startup_params,
- )
- def pg8000_sql_cursor(
- c: Composition, service="balancerd", email="u1@example.com", startup_params={}
- ) -> pg8000.Cursor:
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- ssl_context.verify_mode = ssl.CERT_NONE
- conn = pg8000.connect(
- host="127.0.0.1",
- port=c.default_port(service),
- user=email,
- password=app_password(email),
- ssl_context=ssl_context,
- startup_params=startup_params,
- )
- return conn.cursor()
- def workflow_default(c: Composition) -> None:
- c.down(destroy_volumes=True)
- def process(name: str) -> None:
- if name in ["default", "plaintext"]:
- return
- with c.test_case(name):
- c.workflow(name)
- c.test_parts(list(c.workflows.keys()), process)
- with c.test_case("plaintext"):
- c.workflow("plaintext")
- def workflow_plaintext(c: Composition) -> None:
- """Test plaintext internal connections"""
- c.down(destroy_volumes=True)
- with c.override(
- Materialized(
- options=[
- # Enable TLS on the public port to verify that balancerd is connecting to the balancerd
- # port.
- "--tls-mode=disable",
- f"--frontegg-tenant={TENANT_ID}",
- "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
- f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
- f"--frontegg-admin-role={ADMIN_ROLE}",
- ],
- # We do not do anything interesting on the Mz side
- # to justify the extra restarts
- sanity_restart=False,
- depends_on=["test-certs"],
- volumes_extra=[
- "secrets:/secrets",
- ],
- ),
- Balancerd(
- command=[
- "service",
- "--pgwire-listen-addr=0.0.0.0:6875",
- "--https-listen-addr=0.0.0.0:6876",
- "--internal-http-listen-addr=0.0.0.0:6878",
- "--frontegg-resolver-template=materialized:6875",
- "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
- f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
- f"--frontegg-admin-role={ADMIN_ROLE}",
- "--https-resolver-template=materialized:6876",
- "--tls-key=/secrets/balancerd.key",
- "--tls-cert=/secrets/balancerd.crt",
- "--default-config=balancerd_inject_proxy_protocol_header_http=true",
- # Nonsensical but we don't need cancellations here
- "--cancellation-resolver-dir=/secrets/",
- ],
- depends_on=["test-certs"],
- volumes=[
- "secrets:/secrets",
- ],
- ),
- ):
- with c.test_case("plaintext_http"):
- c.workflow("http")
- with c.test_case("plaintext_wide_result"):
- c.workflow("wide-result")
- def workflow_http(c: Composition) -> None:
- """Test http endpoint"""
- c.up("balancerd", "frontegg-mock", "materialized")
- result = c.exec(
- "materialized",
- "curl",
- "https://balancerd:6876/api/sql",
- "-k",
- "-s",
- "--header",
- "Content-Type: application/json",
- "--user",
- f"{OTHER_USER}:{app_password(OTHER_USER)}",
- "--data",
- '{"query": "SELECT 123"}',
- capture=True,
- )
- assert json.loads(result.stdout)["results"][0]["rows"][0][0] == "123"
- # TODO: We can't assert metrics for `mz_balancer_tenant_connection_active{source="https"` here
- # because there's no CNAME. Does docker-compose support this somehow?
- def workflow_ip_forwarding(c: Composition) -> None:
- """Test that forwarding the client IP through the balancer works over both HTTP and SQL."""
- c.up("balancerd", "frontegg-mock", "materialized")
- # balancer is going to be running with https
- # in this scenario we should validate that connections
- # via the balancer come from the current ip
- # and that we can use proxy_protocol when talking to
- # envd directly.
- balancer_port = c.port("balancerd", 6876)
- # mz internal (unencrypted port)
- materialize_port = c.port("materialized", 6878)
- # We want to make sure the request we're making through the balancer does not use the balancers
- # ip for the sessions.
- # https://stackoverflow.com/questions/5281341/get-local-network-interface-addresses-using-only-proc
- balancer_ip = [
- ip
- for ip in c.exec(
- "balancerd",
- "awk",
- r"/32 host/ { print i } {i=$2}",
- "/proc/net/fib_trie",
- capture=True,
- ).stdout.split("\n")
- if ip != "127.0.0.1"
- ][0]
- r = requests.post(
- f"https://localhost:{balancer_port}/api/sql",
- headers={},
- auth=(OTHER_USER, app_password(OTHER_USER)),
- json={
- "query": "select client_ip from mz_internal.mz_sessions where connection_id = pg_backend_pid();"
- },
- verify=False,
- )
- print(f"response {r.text}")
- session_ip = json.loads(r.text)["results"][0]["rows"][0][0]
- assert (
- session_ip != balancer_ip
- ), f"requests from ({session_ip}) proxied by balancer should not use balancer ip ({balancer_ip}) in session"
- # Also assert psql connections don't use the balancer ip
- cursor = sql_cursor(c)
- cursor.execute(
- "select client_ip from mz_internal.mz_sessions where connection_id = pg_backend_pid();"
- )
- rows = cursor.fetchall()
- session_ip = rows[0][0]
- assert (
- session_ip != balancer_ip
- ), f"requests from ({session_ip}) proxied by balancer should not use balancer ip ({balancer_ip}) in session"
- def create_proxy_protocol_v2_header(
- client_ip: str, client_port: int, server_ip: str, server_port: int
- ):
- # Signature for Proxy Protocol v2
- signature = b"\r\n\r\n\x00\r\nQUIT\n"
- # Version and command (0x21 means version 2, PROXY command)
- version_and_command = 0x21
- # Address family and protocol (0x11 means INET (IPv4) + STREAM (TCP))
- family_and_protocol = 0x11
- # Source and destination address are sent as bytes.
- src_addr = socket.inet_aton(client_ip)
- dst_addr = socket.inet_aton(server_ip)
- # Pack ports into 2-byte unsigned integers
- src_port = struct.pack("!H", client_port)
- dst_port = struct.pack("!H", server_port)
- # Length of the address information (IPv4(4*2) + ports(1*2) = 12 bytes)
- addr_len = struct.pack("!H", 12)
- # Construct the final Proxy Protocol v2 header
- header = (
- signature
- + struct.pack("!BB", version_and_command, family_and_protocol)
- + addr_len
- + src_addr
- + dst_addr
- + src_port
- + dst_port
- )
- return header
- with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
- sock.connect(("127.0.0.1", materialize_port))
- # Pick an ip we couldn't normal connect from and trick envd into
- # thinking we're connecting with
- proxy_header = create_proxy_protocol_v2_header(
- "1.1.1.1", 1111, "127.0.0.1", 1111
- )
- # Make an http request over the socket
- json_data = {
- "query": "select client_ip from mz_internal.mz_sessions where connection_id = pg_backend_pid();"
- }
- json_data = json.dumps(json_data)
- content_length = len(json_data.encode())
- http_sql_query_request = dedent(
- f"""\
- POST /api/sql HTTP/1.1\r
- Host: 127.0.0.1:{materialize_port}\r
- Authorization: Basic {OTHER_USER}:{app_password(OTHER_USER)}\r
- Content-Type: application/json\r
- Content-Length: {content_length}\r
- \r
- {json_data}"""
- )
- sock.sendall(proxy_header + http_sql_query_request.encode())
- # read and parse the response
- body_separator = "\r\n\r\n"
- tcp_resp = sock.recv(8192)
- resp_split = tcp_resp.split(body_separator.encode())
- assert (
- len(resp_split) > 1
- ), f"expected response with header and body, found: {resp_split}"
- body = resp_split[1]
- # assert that we tricked environmentd
- assert json.loads(body)["results"][0]["rows"][0][0] == "1.1.1.1"
- def workflow_wide_result(c: Composition) -> None:
- """Test passthrough of wide rows"""
- c.up("balancerd", "frontegg-mock", "materialized")
- cursor = sql_cursor(c)
- cursor.execute("SELECT 'ABC' || REPEAT('x', 1024 * 1024 * 96) || 'XYZ'")
- rows = cursor.fetchall()
- assert len(rows) == 1
- cols = rows[0]
- assert len(cols) == 1
- col = cols[0]
- assert len(col) == (1024 * 1024 * 96) + (2 * 3)
- assert col.startswith("ABCx")
- assert col.endswith("xXYZ")
- def workflow_long_result(c: Composition) -> None:
- """Test passthrough of long results"""
- c.up("balancerd", "frontegg-mock", "materialized")
- cursor = sql_cursor(c)
- cursor.execute(
- "SELECT 'ABC', generate_series, 'XYZ' FROM generate_series(1, 10 * 1024 * 1024)"
- )
- cnt = 0
- for row in cursor.fetchall():
- cnt = cnt + 1
- assert len(row) == 3
- assert row[0] == "ABC"
- assert row[2] == "XYZ"
- assert cnt == 10 * 1024 * 1024
- def workflow_long_query(c: Composition) -> None:
- """Test passthrough of a long SQL query."""
- c.up("balancerd", "frontegg-mock", "materialized")
- cursor = sql_cursor(c)
- small_pad_size = 512 * 1024
- small_pad = "x" * small_pad_size
- cursor.execute(f"SELECT 'ABC{small_pad}XYZ';")
- rows = cursor.fetchall()
- assert len(rows) == 1
- cols = rows[0]
- assert len(cols) == 1
- col = cols[0]
- assert len(col) == small_pad_size + (2 * 3)
- assert col.startswith("ABCx")
- assert col.endswith("xXYZ")
- medium_pad_size = 1 * 1024 * 1024
- medium_pad = "x" * medium_pad_size
- try:
- cursor.execute(f"SELECT 'ABC{medium_pad}XYZ';")
- raise RuntimeError("execute() expected to fail")
- except ProgramLimitExceeded as e:
- assert "statement batch size cannot exceed 1000.0 KB" in str(e)
- except:
- raise RuntimeError("execute() threw an unexpected exception")
- large_pad_size = 512 * 1024 * 1024
- large_pad = "x" * large_pad_size
- try:
- cursor.execute(f"SELECT 'ABC{large_pad}XYZ';")
- raise RuntimeError("execute() expected to fail")
- except OperationalError as e:
- msg = str(e)
- assert (
- "server closed the connection unexpectedly" in msg
- or "EOF detected" in msg
- or "frame size too big" in msg
- )
- except:
- raise RuntimeError("execute() threw an unexpected exception")
- # Confirm that balancerd remains up
- cursor = sql_cursor(c)
- cursor.execute("SELECT 1;")
- def workflow_mz_restarted(c: Composition) -> None:
- """Existing connections should fail if Mz is restarted.
- This protects against the client not being informed
- that their transaction has been aborted on the Mz side
- """
- c.up("balancerd", "frontegg-mock", "materialized")
- grant_all_admin_user(c)
- cursor = sql_cursor(c)
- cursor.execute("CREATE TABLE restart_mz (f1 INTEGER)")
- cursor.execute("START TRANSACTION")
- cursor.execute("INSERT INTO restart_mz VALUES (1)")
- c.kill("materialized")
- c.up("materialized")
- try:
- cursor.execute("INSERT INTO restart_mz VALUES (2)")
- raise RuntimeError("execute() expected to fail")
- except OperationalError as e:
- assert "SSL connection has been closed unexpectedly" in str(e)
- except:
- raise RuntimeError("execute() threw an unexpected exception")
- # Future connections work
- sql_cursor(c)
- def workflow_pgwire_param_rejection(c: Composition) -> None:
- """Parameters should be rejected"""
- c.up("balancerd", "frontegg-mock", "materialized")
- def check_error(
- message: str, f: Callable[..., Any], ExpectedError: type[Exception]
- ):
- try:
- f()
- except ExpectedError:
- return
- raise AssertionError(f"Expected {message} to raise {ExpectedError}")
- # Uses pg8000, because with psycopg/libpq only a notice is printed, and
- # catching it during the connection process is not easy:
- # NOTICE: startup setting mz_forwarded_for not set: unrecognized configuration parameter "mz_forwarded_for"
- check_error(
- "connect with mz_forwarded_for param",
- lambda: pg8000_sql_cursor(c, startup_params={"mz_forwarded_for": "1.1.1.1"}),
- InterfaceError,
- )
- check_error(
- "connect with mz_connection_uuid param",
- lambda: pg8000_sql_cursor(c, startup_params={"mz_connection_uuid": "123456"}),
- InterfaceError,
- )
- def workflow_balancerd_restarted(c: Composition) -> None:
- """Existing connections should fail if balancerd is restarted"""
- c.up("balancerd", "frontegg-mock", "materialized")
- grant_all_admin_user(c)
- cursor = sql_cursor(c)
- cursor.execute("CREATE TABLE restart_balancerd (f1 INTEGER)")
- cursor.execute("START TRANSACTION")
- cursor.execute("INSERT INTO restart_balancerd VALUES (1)")
- c.kill("balancerd")
- c.up("balancerd")
- try:
- cursor.execute("INSERT INTO restart_balancerd VALUES (2)")
- raise RuntimeError("execute() expected to fail")
- except OperationalError as e:
- msg = str(e)
- assert (
- "EOF detected" in msg
- or "failed to lookup address information: Name or service not known" in msg
- )
- except:
- raise RuntimeError("execute() threw an unexpected exception")
- # Future connections work
- sql_cursor(c)
- def workflow_mz_not_running(c: Composition) -> None:
- """New connections should fail if Mz is down"""
- c.up("balancerd", "frontegg-mock", "materialized")
- c.kill("materialized")
- try:
- sql_cursor(c)
- raise RuntimeError("connect() expected to fail")
- except OperationalError as e:
- assert any(
- expected in str(e)
- for expected in [
- "No route to host",
- "Connection timed out",
- "failure in name resolution",
- "failed to lookup address information",
- "Name or service not known",
- ]
- )
- except:
- raise RuntimeError("connect() threw an unexpected exception")
- # Things should work now
- c.up("materialized")
- sql_cursor(c)
- def workflow_user(c: Composition) -> None:
- """Test that the user is passed all the way to Mz itself."""
- c.up("balancerd", "frontegg-mock", "materialized")
- # Non-admin user.
- cursor = sql_cursor(c, email=OTHER_USER)
- try:
- cursor.execute("DROP DATABASE materialize CASCADE")
- raise RuntimeError("execute() expected to fail")
- except ProgrammingError as e:
- assert "must be owner of DATABASE materialize" in str(e)
- except:
- raise RuntimeError("execute() threw an unexpected exception")
- cursor.execute("SELECT current_user()")
- assert OTHER_USER in str(cursor.fetchall())
- assert_metrics(c, 'mz_balancer_tenant_connection_active{source="pgwire"')
- assert_metrics(c, 'mz_balancer_tenant_connection_rx{source="pgwire"')
- def workflow_many_connections(c: Composition) -> None:
- c.up("balancerd", "frontegg-mock", "materialized")
- cursors = []
- connections = 1000 - 10 # Go almost to the limit, but not above
- print(f"Opening {connections} connections.")
- for i in range(connections):
- cursor = sql_cursor(c)
- cursors.append(cursor)
- for cursor in cursors:
- cursor.execute("SELECT 'abc'")
- data = cursor.fetchall()
- assert len(data) == 1
- row = data[0]
- assert len(row) == 1
- col = row[0]
- assert col == "abc"
- def workflow_webhook(c: Composition) -> None:
- c.up(
- "balancerd",
- "frontegg-mock",
- "materialized",
- {"name": "testdrive", "persistent": True},
- )
- grant_all_admin_user(c)
- c.testdrive(
- dedent(
- """
- > CREATE SOURCE wh FROM WEBHOOK BODY FORMAT TEXT;
- $ webhook-append database=materialize schema=public name=wh
- a
- > SELECT * FROM wh
- a
- """
- )
- )
|