123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Deploy the current version on a real Staging Cloud, and run some basic
- verifications, like ingesting data from Kafka and Redpanda Cloud using AWS
- Privatelink. Runs only on main and release branches.
- """
- import argparse
- import glob
- import itertools
- import os
- import secrets
- import string
- import time
- import urllib.parse
- import psycopg
- from materialize import MZ_ROOT
- from materialize.mz_version import MzVersion
- from materialize.mzcompose import (
- _wait_for_pg,
- )
- from materialize.mzcompose.composition import (
- Composition,
- WorkflowArgumentParser,
- )
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.redpanda_cloud import RedpandaCloud
- from materialize.ui import UIError
- REDPANDA_RESOURCE_GROUP = "ci-resource-group"
- REDPANDA_NETWORK = "ci-network"
- REDPANDA_CLUSTER = "ci-cluster"
- REDPANDA_USER = "ci-user"
- REGION = "aws/us-east-1"
- ENVIRONMENT = os.getenv("ENVIRONMENT", "staging")
- USERNAME = os.getenv("NIGHTLY_CANARY_USERNAME", "infra+nightly-canary@materialize.com")
- APP_PASSWORD = os.getenv("NIGHTLY_CANARY_APP_PASSWORD")
- VERSION = f"{MzVersion.parse_cargo()}--pr.g{os.getenv('BUILDKITE_COMMIT')}"
- # The DevEx account in the Confluent Cloud is used to provide Kafka services
- KAFKA_BOOTSTRAP_SERVER = "pkc-n00kk.us-east-1.aws.confluent.cloud:9092"
- SCHEMA_REGISTRY_ENDPOINT = "https://psrc-8kz20.us-east-2.aws.confluent.cloud"
- # The actual values are stored in the i2 repository
- CONFLUENT_API_KEY = os.getenv("CONFLUENT_CLOUD_DEVEX_KAFKA_USERNAME")
- CONFLUENT_API_SECRET = os.getenv("CONFLUENT_CLOUD_DEVEX_KAFKA_PASSWORD")
- class Redpanda:
- def __init__(self, c: Composition, cleanup: bool):
- self.cloud = RedpandaCloud()
- if cleanup:
- self.delete()
- self.resource_group_id = self.cloud.create(
- "resource-groups", {"name": REDPANDA_RESOURCE_GROUP}
- )["resource_group"]["id"]
- result = self.cloud.create(
- "networks",
- {
- "name": REDPANDA_NETWORK,
- "region": "us-east-1",
- "resource_group_id": self.resource_group_id,
- "cluster_type": "TYPE_DEDICATED",
- "cloud_provider": "CLOUD_PROVIDER_AWS",
- "cidr_block": "10.0.0.0/20",
- },
- )
- self.network_id = self.cloud.wait(result)["resource_id"]
- result = self.cloud.create(
- "clusters",
- {
- "name": REDPANDA_CLUSTER,
- "cloud_provider": "CLOUD_PROVIDER_AWS",
- "connection_type": "CONNECTION_TYPE_PUBLIC",
- "resource_group_id": self.resource_group_id,
- "network_id": self.network_id,
- "region": "us-east-1",
- "throughput_tier": "tier-1-aws-v3-arm",
- "type": "TYPE_DEDICATED",
- "zones": ["use1-az2"],
- "aws_private_link": {
- "enabled": True,
- "connect_console": True,
- "allowed_principals": [],
- },
- },
- )
- self.cluster_info = self.cloud.wait(result)["response"]["cluster"]
- self.aws_private_link = self.cloud.get(f"clusters/{self.cluster_info['id']}")[
- "cluster"
- ]["aws_private_link"]["status"]["service_name"]
- redpanda_cluster = self.cloud.get_cluster(self.cluster_info)
- self.password = "".join(
- secrets.choice(string.ascii_letters + string.digits) for i in range(32)
- )
- redpanda_cluster.create(
- "users",
- {
- "mechanism": "SASL_MECHANISM_SCRAM_SHA_512",
- "name": REDPANDA_USER,
- "password": self.password,
- },
- )
- redpanda_cluster.create(
- "acls",
- {
- "host": "*",
- "operation": "OPERATION_ALL",
- "permission_type": "PERMISSION_TYPE_ALLOW",
- "principal": f"User:{REDPANDA_USER}",
- "resource_name": "*",
- "resource_pattern_type": "RESOURCE_PATTERN_TYPE_LITERAL",
- "resource_type": "RESOURCE_TYPE_TOPIC",
- },
- )
- cloud_conn = psycopg.connect(
- host=c.cloud_hostname(),
- user=USERNAME,
- password=APP_PASSWORD,
- dbname="materialize",
- port=6875,
- sslmode="require",
- )
- cloud_conn.autocommit = True
- cloud_cursor = cloud_conn.cursor()
- cloud_cursor.execute(
- f"""CREATE CONNECTION privatelink_conn
- TO AWS PRIVATELINK (
- SERVICE NAME '{self.aws_private_link}',
- AVAILABILITY ZONES ('use1-az2')
- );""".encode()
- )
- cloud_cursor.execute(
- """SELECT principal
- FROM mz_aws_privatelink_connections plc
- JOIN mz_connections c on plc.id = c.id
- WHERE c.name = 'privatelink_conn';"""
- )
- results = cloud_cursor.fetchone()
- assert results
- privatelink_principal = results[0]
- cloud_cursor.close()
- cloud_conn.close()
- # Redpanda API sometimes returns a 404 for a while, ignore
- while True:
- try:
- result = self.cloud.patch(
- f"clusters/{self.cluster_info['id']}",
- {
- "aws_private_link": {
- "enabled": True,
- "allowed_principals": [privatelink_principal],
- }
- },
- )
- except ValueError as e:
- print(f"Failure, retrying in 10s: {e}")
- time.sleep(10)
- continue
- break
- self.cloud.wait(result)
- def delete(self):
- for cluster in self.cloud.get("clusters")["clusters"]:
- if cluster["name"] == REDPANDA_CLUSTER:
- result = self.cloud.delete("clusters", cluster["id"])
- self.cloud.wait(result)
- break
- for network in self.cloud.get("networks")["networks"]:
- if network["name"] == REDPANDA_NETWORK:
- result = self.cloud.delete("networks", network["id"])
- self.cloud.wait(result)
- for resource_group in self.cloud.get("resource-groups")["resource_groups"]:
- if resource_group["name"] == REDPANDA_RESOURCE_GROUP:
- self.cloud.delete("resource-groups", resource_group["id"])
- SERVICES = [
- CockroachOrPostgresMetadata(),
- Materialized(
- # We use materialize/environmentd and not materialize/materialized here
- # in order to ensure a perfect match to the container that should be
- # deployed to the cloud
- image=f"materialize/environmentd:{VERSION}",
- external_metadata_store=True,
- persist_blob_url="file:///mzdata/persist/blob",
- options=[
- "--orchestrator=process",
- "--orchestrator-process-secrets-directory=/mzdata/secrets",
- "--orchestrator-process-scratch-directory=/scratch",
- ],
- # We can not restart this container at will, as it does not have clusterd
- sanity_restart=False,
- ),
- Testdrive(), # Overriden below
- Mz(
- region=REGION,
- environment=ENVIRONMENT,
- app_password=APP_PASSWORD or "",
- ),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- """Deploy the current source to the cloud and run tests."""
- parser.add_argument(
- "--cleanup",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Destroy the region at the end of the workflow.",
- )
- parser.add_argument(
- "--version-check",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Perform a version check.",
- )
- parser.add_argument(
- "td_files",
- nargs="*",
- default=["*.td"],
- help="run against the specified files",
- )
- args = parser.parse_args()
- files = list(
- itertools.chain.from_iterable(
- [
- glob.glob(file_glob, root_dir=MZ_ROOT / "test" / "cloud-canary")
- for file_glob in args.td_files
- ]
- )
- )
- if args.cleanup:
- disable_region(c)
- try:
- test_failed = True
- print(f"Enabling region using Mz version {VERSION} ...")
- try:
- c.run("mz", "region", "enable", "--version", VERSION)
- except UIError:
- # Work around https://github.com/MaterializeInc/database-issues/issues/4989
- pass
- time.sleep(10)
- assert "materialize.cloud" in c.cloud_hostname()
- wait_for_cloud(c)
- if args.version_check:
- version_check(c)
- # Takes about 40 min to spin up
- redpanda = (
- Redpanda(c, cleanup=args.cleanup)
- if any(["redpanda" in filename for filename in files])
- else None
- )
- try:
- print("Running .td files ...")
- td(c, text="> CREATE CLUSTER canary_sources SIZE '25cc'")
- def process(filename: str) -> None:
- td(c, filename, redpanda=redpanda)
- c.test_parts(files, process)
- test_failed = False
- finally:
- if args.cleanup and redpanda is not None:
- redpanda.delete()
- finally:
- if args.cleanup:
- # Clean up
- disable_region(c)
- assert not test_failed
- def disable_region(c: Composition) -> None:
- print(f"Shutting down region {REGION} ...")
- try:
- c.run("mz", "region", "disable", "--hard")
- except UIError:
- # Can return: status 404 Not Found
- pass
- def wait_for_cloud(c: Composition) -> None:
- print(f"Waiting for cloud cluster to come up with username {USERNAME} ...")
- _wait_for_pg(
- host=c.cloud_hostname(),
- user=USERNAME,
- password=APP_PASSWORD,
- port=6875,
- query="SELECT 1",
- expected=[(1,)],
- timeout_secs=900,
- dbname="materialize",
- sslmode="require",
- # print_result=True
- )
- def version_check(c: Composition) -> None:
- print("Obtaining mz_version() string from local instance ...")
- c.up("materialized")
- # Remove the ($HASH) suffix because it can be different. The reason is that
- # the dockerhub devel-$HASH tag is just a link to the mzbuild-$CODEHASH. So
- # if the code has not changed, the environmentd image of the previous
- # version will be used.
- local_version = c.sql_query("SELECT mz_version();")[0][0].split(" ")[0]
- print("Obtaining mz_version() string from the cloud ...")
- cloud_cursor = psycopg.connect(
- host=c.cloud_hostname(),
- user=USERNAME,
- password=APP_PASSWORD,
- dbname="materialize",
- port=6875,
- sslmode="require",
- ).cursor()
- cloud_cursor.execute("SELECT mz_version()")
- result = cloud_cursor.fetchone()
- assert result is not None
- cloud_version = result[0].split(" ")[0]
- assert (
- local_version == cloud_version
- ), f"local version: {local_version} is not identical to cloud version: {cloud_version}"
- def td(
- c: Composition,
- filename: str | None = None,
- text: str | None = None,
- redpanda: Redpanda | None = None,
- ) -> None:
- assert APP_PASSWORD is not None
- materialize_url = f"postgres://{urllib.parse.quote(USERNAME)}:{urllib.parse.quote(APP_PASSWORD)}@{urllib.parse.quote(c.cloud_hostname())}:6875"
- assert bool(filename) != bool(text)
- testdrive = Testdrive(
- default_timeout="1200s",
- materialize_url=materialize_url,
- no_reset=True, # Required so that admin port 6877 is not used
- seed=1, # Required for predictable Kafka topic names
- kafka_url=KAFKA_BOOTSTRAP_SERVER,
- schema_registry_url=SCHEMA_REGISTRY_ENDPOINT,
- no_consistency_checks=True,
- environment=[
- "KAFKA_OPTION="
- + ",".join(
- [
- "security.protocol=SASL_SSL",
- "sasl.mechanisms=PLAIN",
- f"sasl.username={CONFLUENT_API_KEY}",
- f"sasl.password={CONFLUENT_API_SECRET}",
- ]
- ),
- ],
- entrypoint_extra=[
- f"--var=confluent-api-key={CONFLUENT_API_KEY}",
- f"--var=confluent-api-secret={CONFLUENT_API_SECRET}",
- ],
- )
- if redpanda and "redpanda" in str(filename):
- testdrive = Testdrive(
- default_timeout="1200s",
- materialize_url=materialize_url,
- no_reset=True, # Required so that admin port 6877 is not used
- seed=1, # Required for predictable Kafka topic names
- kafka_url=redpanda.cluster_info["kafka_api"]["seed_brokers"][0],
- schema_registry_url=redpanda.cluster_info["schema_registry"]["url"],
- no_consistency_checks=True,
- environment=[
- "KAFKA_OPTION="
- + ",".join(
- [
- "security.protocol=SASL_SSL",
- "sasl.mechanisms=SCRAM-SHA-512",
- f"sasl.username={REDPANDA_USER}",
- f"sasl.password={redpanda.password}",
- ]
- ),
- ],
- entrypoint_extra=[
- f"--var=redpanda-username={REDPANDA_USER}",
- f"--var=redpanda-password={redpanda.password}",
- ],
- )
- with c.override(testdrive):
- if text:
- c.testdrive(text)
- if filename:
- c.run_testdrive_files(
- filename,
- )
|