mzcompose.py 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208
  1. # Copyright Materialize, Inc. and contributors. All rights reserved. #
  2. # Use of this software is governed by the Business Source License
  3. # included in the LICENSE file at the root of this repository.
  4. #
  5. # As of the Change Date specified in that file, in accordance with
  6. # the Business Source License, use of this software will be governed
  7. # by the Apache License, Version 2.0.
  8. """
  9. Tests the mz command line tool against a real Cloud instance
  10. """
  11. import argparse
  12. import json
  13. import os
  14. import signal
  15. import subprocess
  16. import threading
  17. import time
  18. from collections.abc import Sequence
  19. from pathlib import Path
  20. from textwrap import dedent
  21. from typing import IO
  22. import psycopg
  23. import yaml
  24. from materialize import MZ_ROOT, ci_util, git, spawn
  25. from materialize.mzcompose.composition import (
  26. Composition,
  27. WorkflowArgumentParser,
  28. )
  29. from materialize.mzcompose.services.testdrive import Testdrive
  30. from materialize.version_list import get_self_managed_versions
  31. SERVICES = [
  32. Testdrive(), # overridden below
  33. ]
  34. TD_CMD = [
  35. "--var=default-replica-size=25cc",
  36. "--var=default-storage-size=25cc",
  37. ]
  38. COMPATIBLE_TESTDRIVE_FILES = [
  39. "array.td",
  40. "cancel-subscribe.td",
  41. "char-varchar-distinct.td",
  42. "char-varchar-joins.td",
  43. "char-varchar-multibyte.td",
  44. "constants.td",
  45. "coordinator-multiplicities.td",
  46. "create-views.td",
  47. "date_func.td",
  48. "decimal-distinct.td",
  49. "decimal-join.td",
  50. "decimal-order.td",
  51. "decimal-overflow.td",
  52. "decimal-sum.td",
  53. "decimal-zero.td",
  54. "delete-using.td",
  55. "drop.td",
  56. "duplicate-table-names.td",
  57. "failpoints.td",
  58. "fetch-tail-large-diff.td",
  59. "fetch-tail-limit-timeout.td",
  60. "fetch-tail-timestamp-zero.td",
  61. "fetch-timeout.td",
  62. "float_sum.td",
  63. "get-started.td",
  64. "github-11563.td",
  65. "github-1947.td",
  66. "github-3281.td",
  67. "github-5502.td",
  68. "github-5774.td",
  69. "github-5873.td",
  70. "github-5983.td",
  71. "github-5984.td",
  72. "github-6335.td",
  73. "github-6744.td",
  74. "github-6950.td",
  75. "github-7171.td",
  76. "github-7191.td",
  77. "github-795.td",
  78. "joins.td",
  79. "jsonb.td",
  80. "list.td",
  81. # Flaky on Azure: https://buildkite.com/materialize/nightly/builds/11906#019661aa-2f41-43e1-b08f-6195c66a7ab9
  82. # "load-generator-key-value.td",
  83. "logging.td",
  84. "map.td",
  85. "multijoins.td",
  86. "numeric-sum.td",
  87. "numeric.td",
  88. "oid.td",
  89. "orms.td",
  90. "pg-catalog.td",
  91. "runtime-errors.td",
  92. "search_path.td",
  93. "self-test.td",
  94. "string.td",
  95. "subquery-scalar-errors.td",
  96. "system-functions.td",
  97. "test-skip-if.td",
  98. "tpch.td",
  99. "type_char_quoted.td",
  100. "version.td",
  101. # Hangs on GCP in check-shard-tombstone
  102. # "webhook.td",
  103. ]
  104. def add_arguments_temporary_test(parser: WorkflowArgumentParser) -> None:
  105. parser.add_argument(
  106. "--setup",
  107. default=True,
  108. action=argparse.BooleanOptionalAction,
  109. help="Run setup steps",
  110. )
  111. parser.add_argument(
  112. "--cleanup",
  113. default=True,
  114. action=argparse.BooleanOptionalAction,
  115. help="Destroy the region at the end of the workflow.",
  116. )
  117. parser.add_argument(
  118. "--test",
  119. default=True,
  120. action=argparse.BooleanOptionalAction,
  121. help="Run the actual test part",
  122. )
  123. parser.add_argument(
  124. "--run-testdrive-files",
  125. default=True,
  126. action=argparse.BooleanOptionalAction,
  127. help="Run testdrive files",
  128. )
  129. parser.add_argument(
  130. "--run-mz-debug",
  131. default=True,
  132. action=argparse.BooleanOptionalAction,
  133. help="Run mz-debug",
  134. )
  135. parser.add_argument(
  136. "--tag",
  137. type=str,
  138. help="Custom version tag to use",
  139. )
  140. parser.add_argument(
  141. "files",
  142. nargs="*",
  143. default=COMPATIBLE_TESTDRIVE_FILES,
  144. help="run against the specified files",
  145. )
  146. def run_ignore_error(
  147. args: Sequence[Path | str],
  148. cwd: Path | None = None,
  149. stdin: None | int | IO[bytes] | bytes = None,
  150. env: dict[str, str] | None = None,
  151. ):
  152. try:
  153. spawn.runv(args, cwd=cwd, stdin=stdin, env=env)
  154. except subprocess.CalledProcessError:
  155. pass
  156. def testdrive(no_reset: bool) -> Testdrive:
  157. return Testdrive(
  158. materialize_url="postgres://materialize@127.0.0.1:6875/materialize",
  159. materialize_url_internal="postgres://mz_system:materialize@127.0.0.1:6877/materialize",
  160. materialize_use_https=False,
  161. no_consistency_checks=True,
  162. set_persist_urls=False,
  163. network_mode="host",
  164. volume_workdir="../testdrive:/workdir",
  165. no_reset=no_reset,
  166. default_timeout="360s",
  167. # For full testdrive support we'll need:
  168. # kafka_url=...
  169. # schema_registry_url=...
  170. # aws_endpoint=...
  171. )
  172. def get_tag(tag: str | None) -> str:
  173. return tag or f"v{ci_util.get_mz_version()}--pr.g{git.rev_parse('HEAD')}"
  174. def build_mz_debug_async(env: dict[str, str] | None = None) -> threading.Thread:
  175. def run():
  176. spawn.capture(
  177. [
  178. "cargo",
  179. "build",
  180. "--bin",
  181. "mz-debug",
  182. ],
  183. cwd=MZ_ROOT,
  184. stderr=subprocess.STDOUT,
  185. env=env,
  186. )
  187. thread = threading.Thread(target=run)
  188. thread.start()
  189. return thread
  190. def run_mz_debug(env: dict[str, str] | None = None) -> None:
  191. print("--- Running mz-debug")
  192. try:
  193. # mz-debug (and its compilation) is rather noisy, so ignore the output
  194. spawn.capture(
  195. [
  196. "cargo",
  197. "run",
  198. "--bin",
  199. "mz-debug",
  200. "--",
  201. "self-managed",
  202. "--k8s-namespace",
  203. "materialize-environment",
  204. "--k8s-namespace",
  205. "materialize",
  206. ],
  207. cwd=MZ_ROOT,
  208. stderr=subprocess.STDOUT,
  209. env=env,
  210. )
  211. except:
  212. pass
  213. class State:
  214. materialize_environment: dict | None
  215. path: Path
  216. environmentd_port_forward_process: subprocess.Popen[bytes] | None
  217. balancerd_port_forward_process: subprocess.Popen[bytes] | None
  218. version: int
  219. def __init__(self, path: Path):
  220. self.materialize_environment = None
  221. self.path = path
  222. self.environmentd_port_forward_process = None
  223. self.balancerd_port_forward_process = None
  224. self.version = 0
  225. def kubectl_setup(
  226. self, tag: str, metadata_backend_url: str, persist_backend_url: str
  227. ) -> None:
  228. self.metadata_backend_url = metadata_backend_url
  229. self.persist_backend_url = persist_backend_url
  230. spawn.runv(["kubectl", "get", "nodes"])
  231. for i in range(60):
  232. try:
  233. spawn.runv(
  234. ["kubectl", "get", "pods", "-n", "materialize"],
  235. cwd=self.path,
  236. )
  237. print("Logging all pods in materialize:")
  238. pod_names = (
  239. spawn.capture(
  240. [
  241. "kubectl",
  242. "get",
  243. "pods",
  244. "-n",
  245. "materialize",
  246. "-o",
  247. "name",
  248. ],
  249. cwd=self.path,
  250. )
  251. .strip()
  252. .split("\n")
  253. )
  254. for pod_name in pod_names:
  255. spawn.runv(
  256. [
  257. "kubectl",
  258. "logs",
  259. "-n",
  260. "materialize",
  261. pod_name,
  262. "--all-containers=true",
  263. ],
  264. cwd=self.path,
  265. )
  266. status = spawn.capture(
  267. [
  268. "kubectl",
  269. "get",
  270. "pods",
  271. "-n",
  272. "materialize",
  273. "-o",
  274. "jsonpath={.items[0].status.phase}",
  275. ],
  276. cwd=self.path,
  277. )
  278. if status == "Running":
  279. break
  280. except subprocess.CalledProcessError:
  281. time.sleep(1)
  282. else:
  283. raise ValueError("Never completed")
  284. spawn.runv(["kubectl", "create", "namespace", "materialize-environment"])
  285. materialize_backend_secret = {
  286. "apiVersion": "v1",
  287. "kind": "Secret",
  288. "metadata": {
  289. "name": "materialize-backend",
  290. "namespace": "materialize-environment",
  291. },
  292. "stringData": {
  293. "metadata_backend_url": self.metadata_backend_url,
  294. "persist_backend_url": self.persist_backend_url,
  295. "license_key": os.getenv("MZ_CI_LICENSE_KEY"),
  296. },
  297. }
  298. spawn.runv(
  299. ["kubectl", "apply", "-f", "-"],
  300. cwd=self.path,
  301. stdin=yaml.dump(materialize_backend_secret).encode(),
  302. )
  303. self.materialize_environment = {
  304. "apiVersion": "materialize.cloud/v1alpha1",
  305. "kind": "Materialize",
  306. "metadata": {
  307. "name": "12345678-1234-1234-1234-123456789012",
  308. "namespace": "materialize-environment",
  309. },
  310. "spec": {
  311. "environmentdImageRef": f"materialize/environmentd:{tag}",
  312. "environmentdResourceRequirements": {
  313. "limits": {"memory": "4Gi"},
  314. "requests": {"cpu": "2", "memory": "4Gi"},
  315. },
  316. "balancerdResourceRequirements": {
  317. "limits": {"memory": "256Mi"},
  318. "requests": {"cpu": "100m", "memory": "256Mi"},
  319. },
  320. "backendSecretName": "materialize-backend",
  321. "authenticatorKind": "None",
  322. },
  323. }
  324. spawn.runv(
  325. ["kubectl", "apply", "-f", "-"],
  326. cwd=self.path,
  327. stdin=yaml.dump(self.materialize_environment).encode(),
  328. )
  329. for i in range(60):
  330. try:
  331. spawn.runv(
  332. [
  333. "kubectl",
  334. "get",
  335. "materializes",
  336. "-n",
  337. "materialize-environment",
  338. ],
  339. cwd=self.path,
  340. )
  341. break
  342. except subprocess.CalledProcessError:
  343. time.sleep(1)
  344. else:
  345. raise ValueError("Never completed")
  346. for i in range(240):
  347. try:
  348. spawn.runv(
  349. ["kubectl", "get", "pods", "-n", "materialize-environment"],
  350. cwd=self.path,
  351. )
  352. status = spawn.capture(
  353. [
  354. "kubectl",
  355. "get",
  356. "pods",
  357. "-l",
  358. "app=environmentd",
  359. "-n",
  360. "materialize-environment",
  361. "-o",
  362. "jsonpath={.items[0].status.phase}",
  363. ],
  364. cwd=self.path,
  365. )
  366. if status == "Running":
  367. break
  368. except subprocess.CalledProcessError:
  369. time.sleep(1)
  370. else:
  371. print("Getting all pods:")
  372. spawn.runv(
  373. [
  374. "kubectl",
  375. "get",
  376. "pods",
  377. "-n",
  378. "materialize-environment",
  379. ],
  380. cwd=self.path,
  381. )
  382. print("Describing all pods in materialize-environment:")
  383. spawn.runv(
  384. [
  385. "kubectl",
  386. "describe",
  387. "pods",
  388. "-n",
  389. "materialize-environment",
  390. ],
  391. cwd=self.path,
  392. )
  393. raise ValueError("Never completed")
  394. # Can take a while for balancerd to come up
  395. for i in range(300):
  396. try:
  397. status = spawn.capture(
  398. [
  399. "kubectl",
  400. "get",
  401. "pods",
  402. "-l",
  403. "app=balancerd",
  404. "-n",
  405. "materialize-environment",
  406. "-o",
  407. "jsonpath={.items[0].status.phase}",
  408. ],
  409. cwd=self.path,
  410. )
  411. if status == "Running":
  412. break
  413. except subprocess.CalledProcessError:
  414. time.sleep(1)
  415. else:
  416. print("Getting all pods:")
  417. spawn.runv(
  418. [
  419. "kubectl",
  420. "get",
  421. "pods",
  422. "-n",
  423. "materialize-environment",
  424. ],
  425. cwd=self.path,
  426. )
  427. print("Describing all pods in materialize-environment:")
  428. spawn.runv(
  429. [
  430. "kubectl",
  431. "describe",
  432. "pods",
  433. "-n",
  434. "materialize-environment",
  435. ],
  436. cwd=self.path,
  437. )
  438. raise ValueError("Never completed")
  439. def cleanup(self) -> None:
  440. if self.environmentd_port_forward_process:
  441. os.killpg(
  442. os.getpgid(self.environmentd_port_forward_process.pid), signal.SIGTERM
  443. )
  444. if self.balancerd_port_forward_process:
  445. os.killpg(
  446. os.getpgid(self.balancerd_port_forward_process.pid), signal.SIGTERM
  447. )
  448. def destroy(self, env=None) -> None:
  449. print("--- Destroying")
  450. if self.materialize_environment:
  451. run_ignore_error(
  452. ["kubectl", "delete", "-f", "-"],
  453. cwd=self.path,
  454. stdin=yaml.dump(self.materialize_environment).encode(),
  455. )
  456. run_ignore_error(
  457. [
  458. "kubectl",
  459. "delete",
  460. "materialize.materialize.cloud/12345678-1234-1234-1234-123456789012",
  461. "-n" "materialize-environment",
  462. ]
  463. )
  464. run_ignore_error(["kubectl", "delete", "namespace", "materialize-environment"])
  465. run_ignore_error(["kubectl", "delete", "namespace", "materialize"])
  466. spawn.runv(["terraform", "destroy", "-auto-approve"], cwd=self.path, env=env)
  467. def test(
  468. self, c: Composition, tag: str, run_testdrive_files: bool, files: list[str]
  469. ) -> None:
  470. print("--- Running tests")
  471. self.connect(c)
  472. time.sleep(10)
  473. with psycopg.connect(
  474. "postgres://materialize@127.0.0.1:6875/materialize"
  475. ) as conn:
  476. with conn.cursor() as cur:
  477. cur.execute("SELECT 1")
  478. results = cur.fetchall()
  479. assert results == [(1,)], results
  480. cur.execute("SELECT mz_version()")
  481. version = cur.fetchall()[0][0]
  482. assert version.startswith(tag.split("--")[0] + " ")
  483. with open(
  484. MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml"
  485. ) as f:
  486. content = yaml.load(f, Loader=yaml.Loader)
  487. helm_chart_version = content["version"]
  488. assert version.endswith(
  489. f", helm chart: {helm_chart_version})"
  490. ), f"Actual version: {version}, expected to contain {helm_chart_version}"
  491. if run_testdrive_files:
  492. with c.override(testdrive(no_reset=False)):
  493. c.up({"name": "testdrive", "persistent": True})
  494. c.run_testdrive_files(*TD_CMD, *files)
  495. def connect(self, c: Composition) -> None:
  496. environmentd_name = spawn.capture(
  497. [
  498. "kubectl",
  499. "get",
  500. "pods",
  501. "-l",
  502. "app=environmentd",
  503. "-n",
  504. "materialize-environment",
  505. "-o",
  506. "jsonpath={.items[*].metadata.name}",
  507. ],
  508. cwd=self.path,
  509. )
  510. balancerd_name = spawn.capture(
  511. [
  512. "kubectl",
  513. "get",
  514. "pods",
  515. "-l",
  516. "app=balancerd",
  517. "-n",
  518. "materialize-environment",
  519. "-o",
  520. "jsonpath={.items[*].metadata.name}",
  521. ],
  522. cwd=self.path,
  523. )
  524. # error: arguments in resource/name form must have a single resource and name
  525. print(f"Got balancerd name: {balancerd_name}")
  526. self.environmentd_port_forward_process = subprocess.Popen(
  527. [
  528. "kubectl",
  529. "port-forward",
  530. f"pod/{environmentd_name}",
  531. "-n",
  532. "materialize-environment",
  533. "6877:6877",
  534. "6878:6878",
  535. ],
  536. preexec_fn=os.setpgrp,
  537. )
  538. self.balancerd_port_forward_process = subprocess.Popen(
  539. [
  540. "kubectl",
  541. "port-forward",
  542. f"pod/{balancerd_name}",
  543. "-n",
  544. "materialize-environment",
  545. "6875:6875",
  546. "6876:6876",
  547. ],
  548. preexec_fn=os.setpgrp,
  549. )
  550. time.sleep(10)
  551. with psycopg.connect(
  552. "postgres://mz_system:materialize@127.0.0.1:6877/materialize",
  553. autocommit=True,
  554. ) as conn:
  555. with conn.cursor() as cur:
  556. # Required for some testdrive tests
  557. cur.execute("ALTER CLUSTER mz_system SET (REPLICATION FACTOR 2)")
  558. cur.execute("ALTER SYSTEM SET enable_create_table_from_source = true")
  559. with c.override(testdrive(no_reset=False)):
  560. c.up({"name": "testdrive", "persistent": True})
  561. c.testdrive(
  562. dedent(
  563. """
  564. > SELECT 1
  565. 1
  566. """
  567. )
  568. )
  569. class AWS(State):
  570. def setup(
  571. self,
  572. prefix: str,
  573. setup: bool,
  574. tag: str,
  575. orchestratord_tag: str | None = None,
  576. ) -> None:
  577. if not setup:
  578. spawn.runv(
  579. [
  580. "aws",
  581. "eks",
  582. "update-kubeconfig",
  583. "--name",
  584. f"{prefix}-dev-eks",
  585. "--region",
  586. "us-east-1",
  587. ]
  588. )
  589. return
  590. vars = [
  591. "-var",
  592. "operator_version=v25.3.0-beta.1",
  593. ]
  594. vars += [
  595. "-var",
  596. f"orchestratord_version={get_tag(orchestratord_tag or tag)}",
  597. ]
  598. print("--- Setup")
  599. spawn.runv(
  600. ["helm", "package", "../../../misc/helm-charts/operator/"], cwd=self.path
  601. )
  602. spawn.runv(["terraform", "init"], cwd=self.path)
  603. spawn.runv(["terraform", "validate"], cwd=self.path)
  604. spawn.runv(["terraform", "plan", *vars], cwd=self.path)
  605. try:
  606. spawn.runv(["terraform", "apply", "-auto-approve", *vars], cwd=self.path)
  607. except:
  608. # Sometimes fails for unknown reason, so just retry:
  609. # > Error: namespaces is forbidden: User "arn:aws:sts::400121260767:assumed-role/ci/ci" cannot create resource "namespaces" in API group "" at the cluster scope
  610. spawn.runv(["terraform", "apply", "-auto-approve", *vars], cwd=self.path)
  611. spawn.runv(
  612. [
  613. "aws",
  614. "eks",
  615. "update-kubeconfig",
  616. "--name",
  617. f"{prefix}-dev-eks",
  618. "--region",
  619. "us-east-1",
  620. ]
  621. )
  622. metadata_backend_url = spawn.capture(
  623. ["terraform", "output", "-raw", "metadata_backend_url"], cwd=self.path
  624. ).strip()
  625. persist_backend_url = spawn.capture(
  626. ["terraform", "output", "-raw", "persist_backend_url"], cwd=self.path
  627. ).strip()
  628. self.kubectl_setup(tag, metadata_backend_url, persist_backend_url)
  629. def upgrade(self, tag: str) -> None:
  630. print(f"--- Upgrading to {tag}")
  631. # Following https://materialize.com/docs/self-managed/v25.1/installation/install-on-aws/upgrade-on-aws/
  632. self.materialize_environment = {
  633. "apiVersion": "materialize.cloud/v1alpha1",
  634. "kind": "Materialize",
  635. "metadata": {
  636. "name": "12345678-1234-1234-1234-123456789012",
  637. "namespace": "materialize-environment",
  638. },
  639. "spec": {
  640. "inPlaceRollout": True,
  641. "requestRollout": f"12345678-9012-3456-7890-12345678901{self.version+3}",
  642. "environmentdImageRef": f"materialize/environmentd:{tag}",
  643. "environmentdResourceRequirements": {
  644. "limits": {"memory": "4Gi"},
  645. "requests": {"cpu": "2", "memory": "4Gi"},
  646. },
  647. "balancerdResourceRequirements": {
  648. "limits": {"memory": "256Mi"},
  649. "requests": {"cpu": "100m", "memory": "256Mi"},
  650. },
  651. "backendSecretName": "materialize-backend",
  652. "authenticatorKind": "None",
  653. },
  654. }
  655. self.version += 1
  656. spawn.runv(
  657. ["kubectl", "apply", "-f", "-"],
  658. cwd=self.path,
  659. stdin=yaml.dump(self.materialize_environment).encode(),
  660. )
  661. for i in range(60):
  662. try:
  663. spawn.runv(
  664. [
  665. "kubectl",
  666. "get",
  667. "materializes",
  668. "-n",
  669. "materialize-environment",
  670. ],
  671. cwd=self.path,
  672. )
  673. break
  674. except subprocess.CalledProcessError:
  675. time.sleep(1)
  676. else:
  677. raise ValueError("Never completed")
  678. for i in range(240):
  679. try:
  680. spawn.runv(
  681. ["kubectl", "get", "pods", "-n", "materialize-environment"],
  682. cwd=self.path,
  683. )
  684. status = spawn.capture(
  685. [
  686. "kubectl",
  687. "get",
  688. "pods",
  689. "-l",
  690. "app=environmentd",
  691. "-n",
  692. "materialize-environment",
  693. "-o",
  694. "jsonpath={.items[0].status.phase}",
  695. ],
  696. cwd=self.path,
  697. )
  698. if status == "Running":
  699. break
  700. except subprocess.CalledProcessError:
  701. time.sleep(1)
  702. else:
  703. raise ValueError("Never completed")
  704. # Can take a while for balancerd to come up
  705. for i in range(300):
  706. try:
  707. status = spawn.capture(
  708. [
  709. "kubectl",
  710. "get",
  711. "pods",
  712. "-l",
  713. "app=balancerd",
  714. "-n",
  715. "materialize-environment",
  716. "-o",
  717. "jsonpath={.items[0].status.phase}",
  718. ],
  719. cwd=self.path,
  720. )
  721. if status == "Running":
  722. break
  723. except subprocess.CalledProcessError:
  724. time.sleep(1)
  725. else:
  726. raise ValueError("Never completed")
  727. def workflow_aws_temporary(c: Composition, parser: WorkflowArgumentParser) -> None:
  728. """To run locally use `aws sso login` first."""
  729. add_arguments_temporary_test(parser)
  730. args = parser.parse_args()
  731. tag = get_tag(args.tag)
  732. path = MZ_ROOT / "test" / "terraform" / "aws-temporary"
  733. aws = AWS(path)
  734. mz_debug_build_thread: threading.Thread | None = None
  735. try:
  736. if args.run_mz_debug:
  737. mz_debug_build_thread = build_mz_debug_async()
  738. aws.setup("aws-test", args.setup, tag)
  739. if args.test:
  740. aws.test(c, tag, args.run_testdrive_files, args.files)
  741. finally:
  742. aws.cleanup()
  743. if args.run_mz_debug:
  744. assert mz_debug_build_thread
  745. mz_debug_build_thread.join()
  746. run_mz_debug()
  747. if args.cleanup:
  748. aws.destroy()
  749. def workflow_aws_upgrade(c: Composition, parser: WorkflowArgumentParser) -> None:
  750. """To run locally use `aws sso login` first."""
  751. add_arguments_temporary_test(parser)
  752. args = parser.parse_args()
  753. previous_tags = get_self_managed_versions()
  754. tag = get_tag(args.tag)
  755. path = MZ_ROOT / "test" / "terraform" / "aws-upgrade"
  756. aws = AWS(path)
  757. mz_debug_build_thread: threading.Thread | None = None
  758. try:
  759. if args.run_mz_debug:
  760. mz_debug_build_thread = build_mz_debug_async()
  761. aws.setup("aws-upgrade", args.setup, str(previous_tags[0]), str(tag))
  762. for previous_tag in previous_tags[1:]:
  763. aws.upgrade(str(previous_tag))
  764. aws.upgrade(tag)
  765. if args.test:
  766. # Try waiting a bit, otherwise connection error, should be handled better
  767. time.sleep(180)
  768. print("--- Running tests")
  769. aws.connect(c)
  770. with psycopg.connect(
  771. "postgres://materialize@127.0.0.1:6875/materialize"
  772. ) as conn:
  773. with conn.cursor() as cur:
  774. cur.execute("SELECT 1")
  775. results = cur.fetchall()
  776. assert results == [(1,)], results
  777. cur.execute("SELECT mz_version()")
  778. version = cur.fetchall()[0][0]
  779. assert version.startswith(
  780. tag.split("--")[0] + " "
  781. ), f"Version expected to start with {tag.split('--')[0]}, but is actually {version}"
  782. with open(
  783. MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml"
  784. ) as f:
  785. content = yaml.load(f, Loader=yaml.Loader)
  786. helm_chart_version = content["version"]
  787. assert version.endswith(
  788. f", helm chart: {helm_chart_version})"
  789. ), f"Actual version: {version}, expected to contain {helm_chart_version}"
  790. if args.run_testdrive_files:
  791. with c.override(testdrive(no_reset=False)):
  792. c.up({"name": "testdrive", "persistent": True})
  793. c.run_testdrive_files(*TD_CMD, *args.files)
  794. finally:
  795. aws.cleanup()
  796. if args.run_mz_debug:
  797. assert mz_debug_build_thread
  798. mz_debug_build_thread.join()
  799. run_mz_debug()
  800. if args.cleanup:
  801. aws.destroy()
  802. PATH_AWS_PERSISTENT = MZ_ROOT / "test" / "terraform" / "aws-persistent"
  803. PREFIX_AWS_PERSISTENT = "aws-persistent"
  804. def workflow_aws_persistent_setup(
  805. c: Composition, parser: WorkflowArgumentParser
  806. ) -> None:
  807. """Setup the AWS persistent Terraform and Helm Chart"""
  808. parser.add_argument(
  809. "--tag",
  810. type=str,
  811. help="Custom version tag to use",
  812. )
  813. args = parser.parse_args()
  814. tag = get_tag(args.tag)
  815. aws = AWS(PATH_AWS_PERSISTENT)
  816. try:
  817. aws.setup(PREFIX_AWS_PERSISTENT, True, tag)
  818. with c.override(testdrive(no_reset=True)):
  819. aws.connect(c)
  820. c.testdrive(
  821. dedent(
  822. """
  823. > CREATE SOURCE counter FROM LOAD GENERATOR COUNTER
  824. > CREATE TABLE table (c INT)
  825. > CREATE MATERIALIZED VIEW mv AS SELECT count(*) FROM table
  826. """
  827. )
  828. )
  829. finally:
  830. aws.cleanup()
  831. def workflow_aws_persistent_test(
  832. c: Composition, parser: WorkflowArgumentParser
  833. ) -> None:
  834. """Run a test workload against the AWS persistent setup"""
  835. parser.add_argument(
  836. "--tag",
  837. type=str,
  838. help="Custom version tag to use",
  839. )
  840. parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
  841. args = parser.parse_args()
  842. start_time = time.time()
  843. tag = get_tag(args.tag)
  844. aws = AWS(PATH_AWS_PERSISTENT)
  845. try:
  846. aws.setup(PREFIX_AWS_PERSISTENT, False, tag)
  847. with c.override(testdrive(no_reset=True)):
  848. aws.connect(c)
  849. count = 1
  850. c.testdrive(
  851. dedent(
  852. """
  853. > DELETE FROM table
  854. """
  855. )
  856. )
  857. while time.time() - start_time < args.runtime:
  858. c.testdrive(
  859. dedent(
  860. f"""
  861. > SELECT 1
  862. 1
  863. > INSERT INTO table VALUES ({count})
  864. > SELECT count(*) FROM table
  865. {count}
  866. > SELECT * FROM mv
  867. {count}
  868. > DROP VIEW IF EXISTS temp
  869. > CREATE VIEW temp AS SELECT * FROM mv
  870. > SELECT * FROM temp
  871. {count}
  872. """
  873. )
  874. )
  875. count += 1
  876. with psycopg.connect(
  877. "postgres://materialize@127.0.0.1:6875/materialize", autocommit=True
  878. ) as conn:
  879. with conn.cursor() as cur:
  880. cur.execute("SELECT max(counter) FROM counter")
  881. old_max = cur.fetchall()[0][0]
  882. time.sleep(5)
  883. with conn.cursor() as cur:
  884. cur.execute("SELECT max(counter) FROM counter")
  885. new_max = cur.fetchall()[0][0]
  886. assert new_max > old_max, f"{new_max} should be greater than {old_max}"
  887. finally:
  888. aws.cleanup()
  889. def workflow_aws_persistent_destroy(
  890. c: Composition, parser: WorkflowArgumentParser
  891. ) -> None:
  892. """Setup the AWS persistent Terraform and Helm Chart"""
  893. aws = AWS(PATH_AWS_PERSISTENT)
  894. aws.destroy()
  895. def workflow_gcp_temporary(c: Composition, parser: WorkflowArgumentParser) -> None:
  896. add_arguments_temporary_test(parser)
  897. args = parser.parse_args()
  898. tag = get_tag(args.tag)
  899. path = MZ_ROOT / "test" / "terraform" / "gcp-temporary"
  900. state = State(path)
  901. gcp_service_account_json = os.getenv("GCP_SERVICE_ACCOUNT_JSON")
  902. assert (
  903. gcp_service_account_json
  904. ), "GCP_SERVICE_ACCOUNT_JSON environment variable has to be set"
  905. gcloud_creds_path = path / "gcp.json"
  906. with open(gcloud_creds_path, "w") as f:
  907. f.write(gcp_service_account_json)
  908. os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = str(gcloud_creds_path)
  909. mz_debug_build_thread: threading.Thread | None = None
  910. try:
  911. if args.run_mz_debug:
  912. mz_debug_build_thread = build_mz_debug_async()
  913. spawn.runv(["gcloud", "config", "set", "project", "materialize-ci"])
  914. spawn.runv(
  915. [
  916. "gcloud",
  917. "auth",
  918. "activate-service-account",
  919. f"--key-file={gcloud_creds_path}",
  920. ],
  921. )
  922. vars = [
  923. "-var",
  924. "operator_version=v25.3.0-beta.1",
  925. ]
  926. vars += [
  927. "-var",
  928. f"orchestratord_version={get_tag(tag)}",
  929. ]
  930. if args.setup:
  931. print("--- Setup")
  932. spawn.runv(
  933. ["helm", "package", "../../../misc/helm-charts/operator/"],
  934. cwd=path,
  935. )
  936. spawn.runv(["terraform", "init"], cwd=path)
  937. spawn.runv(["terraform", "validate"], cwd=path)
  938. spawn.runv(["terraform", "plan"], cwd=path)
  939. spawn.runv(["terraform", "apply", "-auto-approve", *vars], cwd=path)
  940. gke_cluster = json.loads(
  941. spawn.capture(
  942. ["terraform", "output", "-json", "gke_cluster"], cwd=path
  943. ).strip()
  944. )
  945. connection_strings = json.loads(
  946. spawn.capture(
  947. ["terraform", "output", "-json", "connection_strings"], cwd=path
  948. ).strip()
  949. )
  950. spawn.runv(
  951. [
  952. "gcloud",
  953. "container",
  954. "clusters",
  955. "get-credentials",
  956. gke_cluster["name"],
  957. "--region",
  958. gke_cluster["location"],
  959. "--project",
  960. "materialize-ci",
  961. ]
  962. )
  963. if args.setup:
  964. print("--- Setup")
  965. state.kubectl_setup(
  966. tag,
  967. connection_strings["metadata_backend_url"],
  968. connection_strings["persist_backend_url"],
  969. )
  970. if args.test:
  971. state.test(c, tag, args.run_testdrive_files, args.files)
  972. finally:
  973. state.cleanup()
  974. if args.run_mz_debug:
  975. assert mz_debug_build_thread
  976. mz_debug_build_thread.join()
  977. run_mz_debug()
  978. if args.cleanup:
  979. state.destroy()
  980. def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> None:
  981. add_arguments_temporary_test(parser)
  982. args = parser.parse_args()
  983. tag = get_tag(args.tag)
  984. path = MZ_ROOT / "test" / "terraform" / "azure-temporary"
  985. state = State(path)
  986. spawn.runv(["bin/ci-builder", "run", "stable", "uv", "venv", str(path / "venv")])
  987. venv_env = os.environ.copy()
  988. venv_env["PATH"] = f"{path/'venv'/'bin'}:{os.getenv('PATH')}"
  989. venv_env["VIRTUAL_ENV"] = str(path / "venv")
  990. spawn.runv(
  991. ["uv", "pip", "install", "-r", "requirements.txt", "--prerelease=allow"],
  992. cwd=path,
  993. env=venv_env,
  994. )
  995. mz_debug_build_thread: threading.Thread | None = None
  996. try:
  997. if args.run_mz_debug:
  998. mz_debug_build_thread = build_mz_debug_async()
  999. if os.getenv("CI"):
  1000. username = os.getenv("AZURE_SERVICE_ACCOUNT_USERNAME")
  1001. password = os.getenv("AZURE_SERVICE_ACCOUNT_PASSWORD")
  1002. tenant = os.getenv("AZURE_SERVICE_ACCOUNT_TENANT")
  1003. assert username, "AZURE_SERVICE_ACCOUNT_USERNAME has to be set"
  1004. assert password, "AZURE_SERVICE_ACCOUNT_PASSWORD has to be set"
  1005. assert tenant, "AZURE_SERVICE_ACOUNT_TENANT has to be set"
  1006. subprocess.run(
  1007. [
  1008. "az",
  1009. "login",
  1010. "--service-principal",
  1011. "--username",
  1012. username,
  1013. "--password",
  1014. password,
  1015. "--tenant",
  1016. tenant,
  1017. ],
  1018. env=venv_env,
  1019. )
  1020. vars = [
  1021. "-var",
  1022. "operator_version=v25.3.0-beta.1",
  1023. ]
  1024. vars += [
  1025. "-var",
  1026. f"orchestratord_version={get_tag(tag)}",
  1027. ]
  1028. if args.setup:
  1029. spawn.runv(
  1030. ["helm", "package", "../../../misc/helm-charts/operator/"],
  1031. cwd=path,
  1032. )
  1033. spawn.runv(["terraform", "init"], cwd=path, env=venv_env)
  1034. spawn.runv(["terraform", "validate"], cwd=path, env=venv_env)
  1035. spawn.runv(["terraform", "plan"], cwd=path, env=venv_env)
  1036. try:
  1037. spawn.runv(
  1038. ["terraform", "apply", "-auto-approve", *vars],
  1039. cwd=path,
  1040. env=venv_env,
  1041. )
  1042. except:
  1043. print("terraform apply failed, retrying")
  1044. spawn.runv(
  1045. ["terraform", "apply", "-auto-approve", *vars],
  1046. cwd=path,
  1047. env=venv_env,
  1048. )
  1049. aks_cluster = json.loads(
  1050. spawn.capture(
  1051. ["terraform", "output", "-json", "aks_cluster"], cwd=path, env=venv_env
  1052. ).strip()
  1053. )
  1054. connection_strings = json.loads(
  1055. spawn.capture(
  1056. ["terraform", "output", "-json", "connection_strings"],
  1057. cwd=path,
  1058. env=venv_env,
  1059. ).strip()
  1060. )
  1061. resource_group_name = spawn.capture(
  1062. ["terraform", "output", "-raw", "resource_group_name"],
  1063. cwd=path,
  1064. env=venv_env,
  1065. ).strip()
  1066. spawn.runv(
  1067. [
  1068. "az",
  1069. "aks",
  1070. "get-credentials",
  1071. "--overwrite-existing",
  1072. "--resource-group",
  1073. resource_group_name,
  1074. "--name",
  1075. aks_cluster["name"],
  1076. ],
  1077. env=venv_env,
  1078. )
  1079. if args.setup:
  1080. state.kubectl_setup(
  1081. tag,
  1082. connection_strings["metadata_backend_url"],
  1083. connection_strings["persist_backend_url"],
  1084. )
  1085. if args.test:
  1086. state.test(c, tag, args.run_testdrive_files, args.files)
  1087. finally:
  1088. state.cleanup()
  1089. if args.run_mz_debug:
  1090. assert mz_debug_build_thread
  1091. mz_debug_build_thread.join()
  1092. run_mz_debug(env=venv_env)
  1093. if args.cleanup:
  1094. state.destroy(env=venv_env)