12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208 |
- # Copyright Materialize, Inc. and contributors. All rights reserved. #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Tests the mz command line tool against a real Cloud instance
- """
- import argparse
- import json
- import os
- import signal
- import subprocess
- import threading
- import time
- from collections.abc import Sequence
- from pathlib import Path
- from textwrap import dedent
- from typing import IO
- import psycopg
- import yaml
- from materialize import MZ_ROOT, ci_util, git, spawn
- from materialize.mzcompose.composition import (
- Composition,
- WorkflowArgumentParser,
- )
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.version_list import get_self_managed_versions
- SERVICES = [
- Testdrive(), # overridden below
- ]
- TD_CMD = [
- "--var=default-replica-size=25cc",
- "--var=default-storage-size=25cc",
- ]
- COMPATIBLE_TESTDRIVE_FILES = [
- "array.td",
- "cancel-subscribe.td",
- "char-varchar-distinct.td",
- "char-varchar-joins.td",
- "char-varchar-multibyte.td",
- "constants.td",
- "coordinator-multiplicities.td",
- "create-views.td",
- "date_func.td",
- "decimal-distinct.td",
- "decimal-join.td",
- "decimal-order.td",
- "decimal-overflow.td",
- "decimal-sum.td",
- "decimal-zero.td",
- "delete-using.td",
- "drop.td",
- "duplicate-table-names.td",
- "failpoints.td",
- "fetch-tail-large-diff.td",
- "fetch-tail-limit-timeout.td",
- "fetch-tail-timestamp-zero.td",
- "fetch-timeout.td",
- "float_sum.td",
- "get-started.td",
- "github-11563.td",
- "github-1947.td",
- "github-3281.td",
- "github-5502.td",
- "github-5774.td",
- "github-5873.td",
- "github-5983.td",
- "github-5984.td",
- "github-6335.td",
- "github-6744.td",
- "github-6950.td",
- "github-7171.td",
- "github-7191.td",
- "github-795.td",
- "joins.td",
- "jsonb.td",
- "list.td",
- # Flaky on Azure: https://buildkite.com/materialize/nightly/builds/11906#019661aa-2f41-43e1-b08f-6195c66a7ab9
- # "load-generator-key-value.td",
- "logging.td",
- "map.td",
- "multijoins.td",
- "numeric-sum.td",
- "numeric.td",
- "oid.td",
- "orms.td",
- "pg-catalog.td",
- "runtime-errors.td",
- "search_path.td",
- "self-test.td",
- "string.td",
- "subquery-scalar-errors.td",
- "system-functions.td",
- "test-skip-if.td",
- "tpch.td",
- "type_char_quoted.td",
- "version.td",
- # Hangs on GCP in check-shard-tombstone
- # "webhook.td",
- ]
- def add_arguments_temporary_test(parser: WorkflowArgumentParser) -> None:
- parser.add_argument(
- "--setup",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Run setup steps",
- )
- parser.add_argument(
- "--cleanup",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Destroy the region at the end of the workflow.",
- )
- parser.add_argument(
- "--test",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Run the actual test part",
- )
- parser.add_argument(
- "--run-testdrive-files",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Run testdrive files",
- )
- parser.add_argument(
- "--run-mz-debug",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Run mz-debug",
- )
- parser.add_argument(
- "--tag",
- type=str,
- help="Custom version tag to use",
- )
- parser.add_argument(
- "files",
- nargs="*",
- default=COMPATIBLE_TESTDRIVE_FILES,
- help="run against the specified files",
- )
- def run_ignore_error(
- args: Sequence[Path | str],
- cwd: Path | None = None,
- stdin: None | int | IO[bytes] | bytes = None,
- env: dict[str, str] | None = None,
- ):
- try:
- spawn.runv(args, cwd=cwd, stdin=stdin, env=env)
- except subprocess.CalledProcessError:
- pass
- def testdrive(no_reset: bool) -> Testdrive:
- return Testdrive(
- materialize_url="postgres://materialize@127.0.0.1:6875/materialize",
- materialize_url_internal="postgres://mz_system:materialize@127.0.0.1:6877/materialize",
- materialize_use_https=False,
- no_consistency_checks=True,
- set_persist_urls=False,
- network_mode="host",
- volume_workdir="../testdrive:/workdir",
- no_reset=no_reset,
- default_timeout="360s",
- # For full testdrive support we'll need:
- # kafka_url=...
- # schema_registry_url=...
- # aws_endpoint=...
- )
- def get_tag(tag: str | None) -> str:
- return tag or f"v{ci_util.get_mz_version()}--pr.g{git.rev_parse('HEAD')}"
- def build_mz_debug_async(env: dict[str, str] | None = None) -> threading.Thread:
- def run():
- spawn.capture(
- [
- "cargo",
- "build",
- "--bin",
- "mz-debug",
- ],
- cwd=MZ_ROOT,
- stderr=subprocess.STDOUT,
- env=env,
- )
- thread = threading.Thread(target=run)
- thread.start()
- return thread
- def run_mz_debug(env: dict[str, str] | None = None) -> None:
- print("--- Running mz-debug")
- try:
- # mz-debug (and its compilation) is rather noisy, so ignore the output
- spawn.capture(
- [
- "cargo",
- "run",
- "--bin",
- "mz-debug",
- "--",
- "self-managed",
- "--k8s-namespace",
- "materialize-environment",
- "--k8s-namespace",
- "materialize",
- ],
- cwd=MZ_ROOT,
- stderr=subprocess.STDOUT,
- env=env,
- )
- except:
- pass
- class State:
- materialize_environment: dict | None
- path: Path
- environmentd_port_forward_process: subprocess.Popen[bytes] | None
- balancerd_port_forward_process: subprocess.Popen[bytes] | None
- version: int
- def __init__(self, path: Path):
- self.materialize_environment = None
- self.path = path
- self.environmentd_port_forward_process = None
- self.balancerd_port_forward_process = None
- self.version = 0
- def kubectl_setup(
- self, tag: str, metadata_backend_url: str, persist_backend_url: str
- ) -> None:
- self.metadata_backend_url = metadata_backend_url
- self.persist_backend_url = persist_backend_url
- spawn.runv(["kubectl", "get", "nodes"])
- for i in range(60):
- try:
- spawn.runv(
- ["kubectl", "get", "pods", "-n", "materialize"],
- cwd=self.path,
- )
- print("Logging all pods in materialize:")
- pod_names = (
- spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-n",
- "materialize",
- "-o",
- "name",
- ],
- cwd=self.path,
- )
- .strip()
- .split("\n")
- )
- for pod_name in pod_names:
- spawn.runv(
- [
- "kubectl",
- "logs",
- "-n",
- "materialize",
- pod_name,
- "--all-containers=true",
- ],
- cwd=self.path,
- )
- status = spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-n",
- "materialize",
- "-o",
- "jsonpath={.items[0].status.phase}",
- ],
- cwd=self.path,
- )
- if status == "Running":
- break
- except subprocess.CalledProcessError:
- time.sleep(1)
- else:
- raise ValueError("Never completed")
- spawn.runv(["kubectl", "create", "namespace", "materialize-environment"])
- materialize_backend_secret = {
- "apiVersion": "v1",
- "kind": "Secret",
- "metadata": {
- "name": "materialize-backend",
- "namespace": "materialize-environment",
- },
- "stringData": {
- "metadata_backend_url": self.metadata_backend_url,
- "persist_backend_url": self.persist_backend_url,
- "license_key": os.getenv("MZ_CI_LICENSE_KEY"),
- },
- }
- spawn.runv(
- ["kubectl", "apply", "-f", "-"],
- cwd=self.path,
- stdin=yaml.dump(materialize_backend_secret).encode(),
- )
- self.materialize_environment = {
- "apiVersion": "materialize.cloud/v1alpha1",
- "kind": "Materialize",
- "metadata": {
- "name": "12345678-1234-1234-1234-123456789012",
- "namespace": "materialize-environment",
- },
- "spec": {
- "environmentdImageRef": f"materialize/environmentd:{tag}",
- "environmentdResourceRequirements": {
- "limits": {"memory": "4Gi"},
- "requests": {"cpu": "2", "memory": "4Gi"},
- },
- "balancerdResourceRequirements": {
- "limits": {"memory": "256Mi"},
- "requests": {"cpu": "100m", "memory": "256Mi"},
- },
- "backendSecretName": "materialize-backend",
- "authenticatorKind": "None",
- },
- }
- spawn.runv(
- ["kubectl", "apply", "-f", "-"],
- cwd=self.path,
- stdin=yaml.dump(self.materialize_environment).encode(),
- )
- for i in range(60):
- try:
- spawn.runv(
- [
- "kubectl",
- "get",
- "materializes",
- "-n",
- "materialize-environment",
- ],
- cwd=self.path,
- )
- break
- except subprocess.CalledProcessError:
- time.sleep(1)
- else:
- raise ValueError("Never completed")
- for i in range(240):
- try:
- spawn.runv(
- ["kubectl", "get", "pods", "-n", "materialize-environment"],
- cwd=self.path,
- )
- status = spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-l",
- "app=environmentd",
- "-n",
- "materialize-environment",
- "-o",
- "jsonpath={.items[0].status.phase}",
- ],
- cwd=self.path,
- )
- if status == "Running":
- break
- except subprocess.CalledProcessError:
- time.sleep(1)
- else:
- print("Getting all pods:")
- spawn.runv(
- [
- "kubectl",
- "get",
- "pods",
- "-n",
- "materialize-environment",
- ],
- cwd=self.path,
- )
- print("Describing all pods in materialize-environment:")
- spawn.runv(
- [
- "kubectl",
- "describe",
- "pods",
- "-n",
- "materialize-environment",
- ],
- cwd=self.path,
- )
- raise ValueError("Never completed")
- # Can take a while for balancerd to come up
- for i in range(300):
- try:
- status = spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-l",
- "app=balancerd",
- "-n",
- "materialize-environment",
- "-o",
- "jsonpath={.items[0].status.phase}",
- ],
- cwd=self.path,
- )
- if status == "Running":
- break
- except subprocess.CalledProcessError:
- time.sleep(1)
- else:
- print("Getting all pods:")
- spawn.runv(
- [
- "kubectl",
- "get",
- "pods",
- "-n",
- "materialize-environment",
- ],
- cwd=self.path,
- )
- print("Describing all pods in materialize-environment:")
- spawn.runv(
- [
- "kubectl",
- "describe",
- "pods",
- "-n",
- "materialize-environment",
- ],
- cwd=self.path,
- )
- raise ValueError("Never completed")
- def cleanup(self) -> None:
- if self.environmentd_port_forward_process:
- os.killpg(
- os.getpgid(self.environmentd_port_forward_process.pid), signal.SIGTERM
- )
- if self.balancerd_port_forward_process:
- os.killpg(
- os.getpgid(self.balancerd_port_forward_process.pid), signal.SIGTERM
- )
- def destroy(self, env=None) -> None:
- print("--- Destroying")
- if self.materialize_environment:
- run_ignore_error(
- ["kubectl", "delete", "-f", "-"],
- cwd=self.path,
- stdin=yaml.dump(self.materialize_environment).encode(),
- )
- run_ignore_error(
- [
- "kubectl",
- "delete",
- "materialize.materialize.cloud/12345678-1234-1234-1234-123456789012",
- "-n" "materialize-environment",
- ]
- )
- run_ignore_error(["kubectl", "delete", "namespace", "materialize-environment"])
- run_ignore_error(["kubectl", "delete", "namespace", "materialize"])
- spawn.runv(["terraform", "destroy", "-auto-approve"], cwd=self.path, env=env)
- def test(
- self, c: Composition, tag: str, run_testdrive_files: bool, files: list[str]
- ) -> None:
- print("--- Running tests")
- self.connect(c)
- time.sleep(10)
- with psycopg.connect(
- "postgres://materialize@127.0.0.1:6875/materialize"
- ) as conn:
- with conn.cursor() as cur:
- cur.execute("SELECT 1")
- results = cur.fetchall()
- assert results == [(1,)], results
- cur.execute("SELECT mz_version()")
- version = cur.fetchall()[0][0]
- assert version.startswith(tag.split("--")[0] + " ")
- with open(
- MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml"
- ) as f:
- content = yaml.load(f, Loader=yaml.Loader)
- helm_chart_version = content["version"]
- assert version.endswith(
- f", helm chart: {helm_chart_version})"
- ), f"Actual version: {version}, expected to contain {helm_chart_version}"
- if run_testdrive_files:
- with c.override(testdrive(no_reset=False)):
- c.up({"name": "testdrive", "persistent": True})
- c.run_testdrive_files(*TD_CMD, *files)
- def connect(self, c: Composition) -> None:
- environmentd_name = spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-l",
- "app=environmentd",
- "-n",
- "materialize-environment",
- "-o",
- "jsonpath={.items[*].metadata.name}",
- ],
- cwd=self.path,
- )
- balancerd_name = spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-l",
- "app=balancerd",
- "-n",
- "materialize-environment",
- "-o",
- "jsonpath={.items[*].metadata.name}",
- ],
- cwd=self.path,
- )
- # error: arguments in resource/name form must have a single resource and name
- print(f"Got balancerd name: {balancerd_name}")
- self.environmentd_port_forward_process = subprocess.Popen(
- [
- "kubectl",
- "port-forward",
- f"pod/{environmentd_name}",
- "-n",
- "materialize-environment",
- "6877:6877",
- "6878:6878",
- ],
- preexec_fn=os.setpgrp,
- )
- self.balancerd_port_forward_process = subprocess.Popen(
- [
- "kubectl",
- "port-forward",
- f"pod/{balancerd_name}",
- "-n",
- "materialize-environment",
- "6875:6875",
- "6876:6876",
- ],
- preexec_fn=os.setpgrp,
- )
- time.sleep(10)
- with psycopg.connect(
- "postgres://mz_system:materialize@127.0.0.1:6877/materialize",
- autocommit=True,
- ) as conn:
- with conn.cursor() as cur:
- # Required for some testdrive tests
- cur.execute("ALTER CLUSTER mz_system SET (REPLICATION FACTOR 2)")
- cur.execute("ALTER SYSTEM SET enable_create_table_from_source = true")
- with c.override(testdrive(no_reset=False)):
- c.up({"name": "testdrive", "persistent": True})
- c.testdrive(
- dedent(
- """
- > SELECT 1
- 1
- """
- )
- )
- class AWS(State):
- def setup(
- self,
- prefix: str,
- setup: bool,
- tag: str,
- orchestratord_tag: str | None = None,
- ) -> None:
- if not setup:
- spawn.runv(
- [
- "aws",
- "eks",
- "update-kubeconfig",
- "--name",
- f"{prefix}-dev-eks",
- "--region",
- "us-east-1",
- ]
- )
- return
- vars = [
- "-var",
- "operator_version=v25.3.0-beta.1",
- ]
- vars += [
- "-var",
- f"orchestratord_version={get_tag(orchestratord_tag or tag)}",
- ]
- print("--- Setup")
- spawn.runv(
- ["helm", "package", "../../../misc/helm-charts/operator/"], cwd=self.path
- )
- spawn.runv(["terraform", "init"], cwd=self.path)
- spawn.runv(["terraform", "validate"], cwd=self.path)
- spawn.runv(["terraform", "plan", *vars], cwd=self.path)
- try:
- spawn.runv(["terraform", "apply", "-auto-approve", *vars], cwd=self.path)
- except:
- # Sometimes fails for unknown reason, so just retry:
- # > Error: namespaces is forbidden: User "arn:aws:sts::400121260767:assumed-role/ci/ci" cannot create resource "namespaces" in API group "" at the cluster scope
- spawn.runv(["terraform", "apply", "-auto-approve", *vars], cwd=self.path)
- spawn.runv(
- [
- "aws",
- "eks",
- "update-kubeconfig",
- "--name",
- f"{prefix}-dev-eks",
- "--region",
- "us-east-1",
- ]
- )
- metadata_backend_url = spawn.capture(
- ["terraform", "output", "-raw", "metadata_backend_url"], cwd=self.path
- ).strip()
- persist_backend_url = spawn.capture(
- ["terraform", "output", "-raw", "persist_backend_url"], cwd=self.path
- ).strip()
- self.kubectl_setup(tag, metadata_backend_url, persist_backend_url)
- def upgrade(self, tag: str) -> None:
- print(f"--- Upgrading to {tag}")
- # Following https://materialize.com/docs/self-managed/v25.1/installation/install-on-aws/upgrade-on-aws/
- self.materialize_environment = {
- "apiVersion": "materialize.cloud/v1alpha1",
- "kind": "Materialize",
- "metadata": {
- "name": "12345678-1234-1234-1234-123456789012",
- "namespace": "materialize-environment",
- },
- "spec": {
- "inPlaceRollout": True,
- "requestRollout": f"12345678-9012-3456-7890-12345678901{self.version+3}",
- "environmentdImageRef": f"materialize/environmentd:{tag}",
- "environmentdResourceRequirements": {
- "limits": {"memory": "4Gi"},
- "requests": {"cpu": "2", "memory": "4Gi"},
- },
- "balancerdResourceRequirements": {
- "limits": {"memory": "256Mi"},
- "requests": {"cpu": "100m", "memory": "256Mi"},
- },
- "backendSecretName": "materialize-backend",
- "authenticatorKind": "None",
- },
- }
- self.version += 1
- spawn.runv(
- ["kubectl", "apply", "-f", "-"],
- cwd=self.path,
- stdin=yaml.dump(self.materialize_environment).encode(),
- )
- for i in range(60):
- try:
- spawn.runv(
- [
- "kubectl",
- "get",
- "materializes",
- "-n",
- "materialize-environment",
- ],
- cwd=self.path,
- )
- break
- except subprocess.CalledProcessError:
- time.sleep(1)
- else:
- raise ValueError("Never completed")
- for i in range(240):
- try:
- spawn.runv(
- ["kubectl", "get", "pods", "-n", "materialize-environment"],
- cwd=self.path,
- )
- status = spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-l",
- "app=environmentd",
- "-n",
- "materialize-environment",
- "-o",
- "jsonpath={.items[0].status.phase}",
- ],
- cwd=self.path,
- )
- if status == "Running":
- break
- except subprocess.CalledProcessError:
- time.sleep(1)
- else:
- raise ValueError("Never completed")
- # Can take a while for balancerd to come up
- for i in range(300):
- try:
- status = spawn.capture(
- [
- "kubectl",
- "get",
- "pods",
- "-l",
- "app=balancerd",
- "-n",
- "materialize-environment",
- "-o",
- "jsonpath={.items[0].status.phase}",
- ],
- cwd=self.path,
- )
- if status == "Running":
- break
- except subprocess.CalledProcessError:
- time.sleep(1)
- else:
- raise ValueError("Never completed")
- def workflow_aws_temporary(c: Composition, parser: WorkflowArgumentParser) -> None:
- """To run locally use `aws sso login` first."""
- add_arguments_temporary_test(parser)
- args = parser.parse_args()
- tag = get_tag(args.tag)
- path = MZ_ROOT / "test" / "terraform" / "aws-temporary"
- aws = AWS(path)
- mz_debug_build_thread: threading.Thread | None = None
- try:
- if args.run_mz_debug:
- mz_debug_build_thread = build_mz_debug_async()
- aws.setup("aws-test", args.setup, tag)
- if args.test:
- aws.test(c, tag, args.run_testdrive_files, args.files)
- finally:
- aws.cleanup()
- if args.run_mz_debug:
- assert mz_debug_build_thread
- mz_debug_build_thread.join()
- run_mz_debug()
- if args.cleanup:
- aws.destroy()
- def workflow_aws_upgrade(c: Composition, parser: WorkflowArgumentParser) -> None:
- """To run locally use `aws sso login` first."""
- add_arguments_temporary_test(parser)
- args = parser.parse_args()
- previous_tags = get_self_managed_versions()
- tag = get_tag(args.tag)
- path = MZ_ROOT / "test" / "terraform" / "aws-upgrade"
- aws = AWS(path)
- mz_debug_build_thread: threading.Thread | None = None
- try:
- if args.run_mz_debug:
- mz_debug_build_thread = build_mz_debug_async()
- aws.setup("aws-upgrade", args.setup, str(previous_tags[0]), str(tag))
- for previous_tag in previous_tags[1:]:
- aws.upgrade(str(previous_tag))
- aws.upgrade(tag)
- if args.test:
- # Try waiting a bit, otherwise connection error, should be handled better
- time.sleep(180)
- print("--- Running tests")
- aws.connect(c)
- with psycopg.connect(
- "postgres://materialize@127.0.0.1:6875/materialize"
- ) as conn:
- with conn.cursor() as cur:
- cur.execute("SELECT 1")
- results = cur.fetchall()
- assert results == [(1,)], results
- cur.execute("SELECT mz_version()")
- version = cur.fetchall()[0][0]
- assert version.startswith(
- tag.split("--")[0] + " "
- ), f"Version expected to start with {tag.split('--')[0]}, but is actually {version}"
- with open(
- MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml"
- ) as f:
- content = yaml.load(f, Loader=yaml.Loader)
- helm_chart_version = content["version"]
- assert version.endswith(
- f", helm chart: {helm_chart_version})"
- ), f"Actual version: {version}, expected to contain {helm_chart_version}"
- if args.run_testdrive_files:
- with c.override(testdrive(no_reset=False)):
- c.up({"name": "testdrive", "persistent": True})
- c.run_testdrive_files(*TD_CMD, *args.files)
- finally:
- aws.cleanup()
- if args.run_mz_debug:
- assert mz_debug_build_thread
- mz_debug_build_thread.join()
- run_mz_debug()
- if args.cleanup:
- aws.destroy()
- PATH_AWS_PERSISTENT = MZ_ROOT / "test" / "terraform" / "aws-persistent"
- PREFIX_AWS_PERSISTENT = "aws-persistent"
- def workflow_aws_persistent_setup(
- c: Composition, parser: WorkflowArgumentParser
- ) -> None:
- """Setup the AWS persistent Terraform and Helm Chart"""
- parser.add_argument(
- "--tag",
- type=str,
- help="Custom version tag to use",
- )
- args = parser.parse_args()
- tag = get_tag(args.tag)
- aws = AWS(PATH_AWS_PERSISTENT)
- try:
- aws.setup(PREFIX_AWS_PERSISTENT, True, tag)
- with c.override(testdrive(no_reset=True)):
- aws.connect(c)
- c.testdrive(
- dedent(
- """
- > CREATE SOURCE counter FROM LOAD GENERATOR COUNTER
- > CREATE TABLE table (c INT)
- > CREATE MATERIALIZED VIEW mv AS SELECT count(*) FROM table
- """
- )
- )
- finally:
- aws.cleanup()
- def workflow_aws_persistent_test(
- c: Composition, parser: WorkflowArgumentParser
- ) -> None:
- """Run a test workload against the AWS persistent setup"""
- parser.add_argument(
- "--tag",
- type=str,
- help="Custom version tag to use",
- )
- parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
- args = parser.parse_args()
- start_time = time.time()
- tag = get_tag(args.tag)
- aws = AWS(PATH_AWS_PERSISTENT)
- try:
- aws.setup(PREFIX_AWS_PERSISTENT, False, tag)
- with c.override(testdrive(no_reset=True)):
- aws.connect(c)
- count = 1
- c.testdrive(
- dedent(
- """
- > DELETE FROM table
- """
- )
- )
- while time.time() - start_time < args.runtime:
- c.testdrive(
- dedent(
- f"""
- > SELECT 1
- 1
- > INSERT INTO table VALUES ({count})
- > SELECT count(*) FROM table
- {count}
- > SELECT * FROM mv
- {count}
- > DROP VIEW IF EXISTS temp
- > CREATE VIEW temp AS SELECT * FROM mv
- > SELECT * FROM temp
- {count}
- """
- )
- )
- count += 1
- with psycopg.connect(
- "postgres://materialize@127.0.0.1:6875/materialize", autocommit=True
- ) as conn:
- with conn.cursor() as cur:
- cur.execute("SELECT max(counter) FROM counter")
- old_max = cur.fetchall()[0][0]
- time.sleep(5)
- with conn.cursor() as cur:
- cur.execute("SELECT max(counter) FROM counter")
- new_max = cur.fetchall()[0][0]
- assert new_max > old_max, f"{new_max} should be greater than {old_max}"
- finally:
- aws.cleanup()
- def workflow_aws_persistent_destroy(
- c: Composition, parser: WorkflowArgumentParser
- ) -> None:
- """Setup the AWS persistent Terraform and Helm Chart"""
- aws = AWS(PATH_AWS_PERSISTENT)
- aws.destroy()
- def workflow_gcp_temporary(c: Composition, parser: WorkflowArgumentParser) -> None:
- add_arguments_temporary_test(parser)
- args = parser.parse_args()
- tag = get_tag(args.tag)
- path = MZ_ROOT / "test" / "terraform" / "gcp-temporary"
- state = State(path)
- gcp_service_account_json = os.getenv("GCP_SERVICE_ACCOUNT_JSON")
- assert (
- gcp_service_account_json
- ), "GCP_SERVICE_ACCOUNT_JSON environment variable has to be set"
- gcloud_creds_path = path / "gcp.json"
- with open(gcloud_creds_path, "w") as f:
- f.write(gcp_service_account_json)
- os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = str(gcloud_creds_path)
- mz_debug_build_thread: threading.Thread | None = None
- try:
- if args.run_mz_debug:
- mz_debug_build_thread = build_mz_debug_async()
- spawn.runv(["gcloud", "config", "set", "project", "materialize-ci"])
- spawn.runv(
- [
- "gcloud",
- "auth",
- "activate-service-account",
- f"--key-file={gcloud_creds_path}",
- ],
- )
- vars = [
- "-var",
- "operator_version=v25.3.0-beta.1",
- ]
- vars += [
- "-var",
- f"orchestratord_version={get_tag(tag)}",
- ]
- if args.setup:
- print("--- Setup")
- spawn.runv(
- ["helm", "package", "../../../misc/helm-charts/operator/"],
- cwd=path,
- )
- spawn.runv(["terraform", "init"], cwd=path)
- spawn.runv(["terraform", "validate"], cwd=path)
- spawn.runv(["terraform", "plan"], cwd=path)
- spawn.runv(["terraform", "apply", "-auto-approve", *vars], cwd=path)
- gke_cluster = json.loads(
- spawn.capture(
- ["terraform", "output", "-json", "gke_cluster"], cwd=path
- ).strip()
- )
- connection_strings = json.loads(
- spawn.capture(
- ["terraform", "output", "-json", "connection_strings"], cwd=path
- ).strip()
- )
- spawn.runv(
- [
- "gcloud",
- "container",
- "clusters",
- "get-credentials",
- gke_cluster["name"],
- "--region",
- gke_cluster["location"],
- "--project",
- "materialize-ci",
- ]
- )
- if args.setup:
- print("--- Setup")
- state.kubectl_setup(
- tag,
- connection_strings["metadata_backend_url"],
- connection_strings["persist_backend_url"],
- )
- if args.test:
- state.test(c, tag, args.run_testdrive_files, args.files)
- finally:
- state.cleanup()
- if args.run_mz_debug:
- assert mz_debug_build_thread
- mz_debug_build_thread.join()
- run_mz_debug()
- if args.cleanup:
- state.destroy()
- def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> None:
- add_arguments_temporary_test(parser)
- args = parser.parse_args()
- tag = get_tag(args.tag)
- path = MZ_ROOT / "test" / "terraform" / "azure-temporary"
- state = State(path)
- spawn.runv(["bin/ci-builder", "run", "stable", "uv", "venv", str(path / "venv")])
- venv_env = os.environ.copy()
- venv_env["PATH"] = f"{path/'venv'/'bin'}:{os.getenv('PATH')}"
- venv_env["VIRTUAL_ENV"] = str(path / "venv")
- spawn.runv(
- ["uv", "pip", "install", "-r", "requirements.txt", "--prerelease=allow"],
- cwd=path,
- env=venv_env,
- )
- mz_debug_build_thread: threading.Thread | None = None
- try:
- if args.run_mz_debug:
- mz_debug_build_thread = build_mz_debug_async()
- if os.getenv("CI"):
- username = os.getenv("AZURE_SERVICE_ACCOUNT_USERNAME")
- password = os.getenv("AZURE_SERVICE_ACCOUNT_PASSWORD")
- tenant = os.getenv("AZURE_SERVICE_ACCOUNT_TENANT")
- assert username, "AZURE_SERVICE_ACCOUNT_USERNAME has to be set"
- assert password, "AZURE_SERVICE_ACCOUNT_PASSWORD has to be set"
- assert tenant, "AZURE_SERVICE_ACOUNT_TENANT has to be set"
- subprocess.run(
- [
- "az",
- "login",
- "--service-principal",
- "--username",
- username,
- "--password",
- password,
- "--tenant",
- tenant,
- ],
- env=venv_env,
- )
- vars = [
- "-var",
- "operator_version=v25.3.0-beta.1",
- ]
- vars += [
- "-var",
- f"orchestratord_version={get_tag(tag)}",
- ]
- if args.setup:
- spawn.runv(
- ["helm", "package", "../../../misc/helm-charts/operator/"],
- cwd=path,
- )
- spawn.runv(["terraform", "init"], cwd=path, env=venv_env)
- spawn.runv(["terraform", "validate"], cwd=path, env=venv_env)
- spawn.runv(["terraform", "plan"], cwd=path, env=venv_env)
- try:
- spawn.runv(
- ["terraform", "apply", "-auto-approve", *vars],
- cwd=path,
- env=venv_env,
- )
- except:
- print("terraform apply failed, retrying")
- spawn.runv(
- ["terraform", "apply", "-auto-approve", *vars],
- cwd=path,
- env=venv_env,
- )
- aks_cluster = json.loads(
- spawn.capture(
- ["terraform", "output", "-json", "aks_cluster"], cwd=path, env=venv_env
- ).strip()
- )
- connection_strings = json.loads(
- spawn.capture(
- ["terraform", "output", "-json", "connection_strings"],
- cwd=path,
- env=venv_env,
- ).strip()
- )
- resource_group_name = spawn.capture(
- ["terraform", "output", "-raw", "resource_group_name"],
- cwd=path,
- env=venv_env,
- ).strip()
- spawn.runv(
- [
- "az",
- "aks",
- "get-credentials",
- "--overwrite-existing",
- "--resource-group",
- resource_group_name,
- "--name",
- aks_cluster["name"],
- ],
- env=venv_env,
- )
- if args.setup:
- state.kubectl_setup(
- tag,
- connection_strings["metadata_backend_url"],
- connection_strings["persist_backend_url"],
- )
- if args.test:
- state.test(c, tag, args.run_testdrive_files, args.files)
- finally:
- state.cleanup()
- if args.run_mz_debug:
- assert mz_debug_build_thread
- mz_debug_build_thread.join()
- run_mz_debug(env=venv_env)
- if args.cleanup:
- state.destroy(env=venv_env)
|