composition.py 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """The implementation of the mzcompose system for Docker compositions.
  10. For an overview of what mzcompose is and why it exists, see the [user-facing
  11. documentation][user-docs].
  12. [user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
  13. """
  14. import argparse
  15. import copy
  16. import importlib
  17. import importlib.abc
  18. import importlib.util
  19. import inspect
  20. import json
  21. import os
  22. import re
  23. import selectors
  24. import subprocess
  25. import sys
  26. import threading
  27. import time
  28. import traceback
  29. import urllib.parse
  30. from collections import OrderedDict
  31. from collections.abc import Callable, Iterator, Sequence
  32. from contextlib import contextmanager
  33. from inspect import Traceback, getframeinfo, getmembers, isfunction, stack
  34. from tempfile import TemporaryFile
  35. from typing import Any, TextIO, TypeVar, cast
  36. import psycopg
  37. import sqlparse
  38. import yaml
  39. from psycopg import Connection, Cursor
  40. from materialize import MZ_ROOT, buildkite, mzbuild, spawn, ui
  41. from materialize.mzcompose import cluster_replica_size_map, loader
  42. from materialize.mzcompose.service import Service
  43. from materialize.mzcompose.services.materialized import (
  44. LEADER_STATUS_HEALTHCHECK,
  45. DeploymentStatus,
  46. Materialized,
  47. )
  48. from materialize.mzcompose.services.minio import minio_blob_uri
  49. from materialize.mzcompose.test_result import (
  50. FailedTestExecutionError,
  51. TestFailureDetails,
  52. TestResult,
  53. try_determine_errors_from_cmd_execution,
  54. )
  55. from materialize.parallel_workload.worker_exception import WorkerFailedException
  56. from materialize.ui import (
  57. CommandFailureCausedUIError,
  58. UIError,
  59. )
  60. SECRETS = [
  61. "mzp_",
  62. "-----BEGIN PRIVATE KEY-----",
  63. "-----BEGIN CERTIFICATE-----",
  64. "confluent-api-key=",
  65. "confluent-api-secret=",
  66. "aws-access-key-id=",
  67. "aws-secret-access-key=",
  68. ]
  69. class UnknownCompositionError(UIError):
  70. """The specified composition was unknown."""
  71. def __init__(self, name: str):
  72. super().__init__(f"unknown composition {name!r}")
  73. class WorkflowArgumentParser(argparse.ArgumentParser):
  74. """An argument parser provided to a workflow in a `Composition`.
  75. You can call `add_argument` and other methods on this argument parser like
  76. usual. When you are ready to parse arguments, call `parse_args` or
  77. `parse_known_args` like usual; the argument parser will automatically use
  78. the arguments that the user provided to the workflow.
  79. """
  80. def __init__(self, name: str, description: str | None, args: list[str]):
  81. self.args = args
  82. super().__init__(prog=f"mzcompose run {name}", description=description)
  83. def parse_known_args(
  84. self,
  85. args: Sequence[str] | None = None,
  86. namespace: argparse.Namespace | None = None,
  87. ) -> tuple[argparse.Namespace, list[str]]:
  88. return super().parse_known_args(args or self.args, namespace)
  89. class Composition:
  90. """A loaded mzcompose.py file."""
  91. def __init__(
  92. self,
  93. repo: mzbuild.Repository,
  94. name: str,
  95. preserve_ports: bool = False,
  96. silent: bool = False,
  97. munge_services: bool = True,
  98. project_name: str | None = None,
  99. sanity_restart_mz: bool = False,
  100. ):
  101. self.conns = {}
  102. self.name = name
  103. self.description = None
  104. self.repo = repo
  105. self.preserve_ports = preserve_ports
  106. self.project_name = project_name
  107. self.silent = silent
  108. self.workflows: dict[str, Callable[..., None]] = {}
  109. self.test_results: OrderedDict[str, TestResult] = OrderedDict()
  110. self.files = {}
  111. self.sources_and_sinks_ignored_from_validation = set()
  112. self.is_sanity_restart_mz = sanity_restart_mz
  113. self.current_test_case_name_override: str | None = None
  114. if name in self.repo.compositions:
  115. self.path = self.repo.compositions[name]
  116. else:
  117. raise UnknownCompositionError(name)
  118. self.compose: dict[str, Any] = {
  119. "services": {},
  120. }
  121. # Add default volumes
  122. self.compose.setdefault("volumes", {}).update(
  123. {
  124. "mzdata": None,
  125. "pgdata": None,
  126. "mysqldata": None,
  127. # Used for certain pg-cdc scenarios. The memory will not be
  128. # allocated for compositions that do not require this volume.
  129. "sourcedata_512Mb": {
  130. "driver_opts": {
  131. "device": "tmpfs",
  132. "type": "tmpfs",
  133. "o": "size=512m",
  134. }
  135. },
  136. "mydata": None,
  137. "tmp": None,
  138. "secrets": None,
  139. "scratch": None,
  140. }
  141. )
  142. # Load the mzcompose.py file, if one exists
  143. mzcompose_py = self.path / "mzcompose.py"
  144. if mzcompose_py.exists():
  145. spec = importlib.util.spec_from_file_location("mzcompose", mzcompose_py)
  146. assert spec
  147. module = importlib.util.module_from_spec(spec)
  148. assert isinstance(spec.loader, importlib.abc.Loader)
  149. loader.composition_path = self.path
  150. spec.loader.exec_module(module)
  151. loader.composition_path = None
  152. self.description = inspect.getdoc(module)
  153. for name, fn in getmembers(module, isfunction):
  154. if name.startswith("workflow_"):
  155. # The name of the workflow is the name of the function
  156. # with the "workflow_" prefix stripped and any underscores
  157. # replaced with dashes.
  158. name = name[len("workflow_") :].replace("_", "-")
  159. self.workflows[name] = fn
  160. for python_service in getattr(module, "SERVICES", []):
  161. name = python_service.name
  162. if name in self.compose["services"]:
  163. raise UIError(f"service {name!r} specified more than once")
  164. self.compose["services"][name] = python_service.config
  165. for volume_name, volume_def in getattr(module, "VOLUMES", {}).items():
  166. if volume_name in self.compose["volumes"]:
  167. raise UIError(f"volume {volume_name!r} specified more than once")
  168. self.compose["volumes"][volume_name] = volume_def
  169. # The CLI driver will handle acquiring these dependencies.
  170. if munge_services:
  171. self.dependencies = self._munge_services(self.compose["services"].items())
  172. self.files = {}
  173. def override_current_testcase_name(self, test_case_name: str) -> None:
  174. """
  175. This allows to override the name of the test case (usually the workflow name) with more information
  176. (e.g., the current scenario).
  177. """
  178. self.current_test_case_name_override = test_case_name
  179. def _munge_services(
  180. self, services: list[tuple[str, dict]]
  181. ) -> mzbuild.DependencySet:
  182. images = []
  183. for name, config in services:
  184. # Remember any mzbuild references.
  185. if "mzbuild" in config:
  186. image_name = config["mzbuild"]
  187. if image_name not in self.repo.images:
  188. raise UIError(f"mzcompose: unknown image {image_name}")
  189. image = self.repo.images[image_name]
  190. if config.get("publish") is not None:
  191. # Override whether an image is expected to be published, so
  192. # that we will build it in CI instead of failing.
  193. image.publish = config["publish"]
  194. del config["publish"]
  195. images.append(image)
  196. if "propagate_uid_gid" in config:
  197. if config["propagate_uid_gid"]:
  198. config["user"] = f"{os.getuid()}:{os.getgid()}"
  199. del config["propagate_uid_gid"]
  200. ports = config.setdefault("ports", [])
  201. for i, port in enumerate(ports):
  202. if self.preserve_ports and not ":" in str(port):
  203. # If preserving ports, bind the container port to the same
  204. # host port, assuming the host port is available.
  205. ports[i] = f"127.0.0.1:{port}:{port}"
  206. elif ":" in str(port).removeprefix("127.0.0.1::") and not config.get(
  207. "allow_host_ports", False
  208. ):
  209. # Raise an error for host-bound ports, unless
  210. # `allow_host_ports` is `True`
  211. raise UIError(
  212. f"programming error: disallowed host port in service {name!r}: {port}",
  213. hint='Add `"allow_host_ports": True` to the service config to disable this check.',
  214. )
  215. elif not str(port).startswith("127.0.0.1:"):
  216. # Only bind to localhost, otherwise the service is
  217. # available to anyone with network access to us
  218. if ":" in str(port):
  219. ports[i] = f"127.0.0.1:{port}"
  220. else:
  221. ports[i] = f"127.0.0.1::{port}"
  222. if "allow_host_ports" in config:
  223. config.pop("allow_host_ports")
  224. if self.repo.rd.coverage:
  225. coverage_volume = "./coverage:/coverage"
  226. if coverage_volume not in config.get("volumes", []):
  227. # Emit coverage information to a file in a directory that is
  228. # bind-mounted to the "coverage" directory on the host. We
  229. # inject the configuration to all services for simplicity, but
  230. # this only have an effect if the service runs instrumented Rust
  231. # binaries.
  232. config.setdefault("volumes", []).append(coverage_volume)
  233. llvm_profile_file = (
  234. f"LLVM_PROFILE_FILE=/coverage/{name}-%p-%9m%c.profraw"
  235. )
  236. for i, env in enumerate(config.get("environment", [])):
  237. # Make sure we don't have duplicate environment entries.
  238. if env.startswith("LLVM_PROFILE_FILE="):
  239. config["environment"][i] = llvm_profile_file
  240. break
  241. else:
  242. config.setdefault("environment", []).append(llvm_profile_file)
  243. # Determine mzbuild specs and inject them into services accordingly.
  244. deps = self.repo.resolve_dependencies(images)
  245. for _name, config in services:
  246. if "mzbuild" in config:
  247. config["image"] = deps[config["mzbuild"]].spec()
  248. del config["mzbuild"]
  249. return deps
  250. def invoke(
  251. self,
  252. *args: str,
  253. capture: bool | TextIO = False,
  254. capture_stderr: bool | TextIO = False,
  255. capture_and_print: bool = False,
  256. stdin: str | None = None,
  257. check: bool = True,
  258. max_tries: int = 1,
  259. silent: bool = False,
  260. environment: dict[str, str] | None = None,
  261. build: str | None = None,
  262. ) -> subprocess.CompletedProcess:
  263. """Invoke `docker compose` on the rendered composition.
  264. Args:
  265. args: The arguments to pass to `docker compose`.
  266. capture: Whether to capture the child's stdout stream, can be an
  267. opened file to capture stdout into the file directly.
  268. capture_stderr: Whether to capture the child's stderr stream, can
  269. be an opened file to capture stderr into the file directly.
  270. capture_and_print: Print during execution and capture the stdout and
  271. stderr of the `docker compose` invocation.
  272. input: A string to provide as stdin for the command.
  273. """
  274. if not self.silent and not silent:
  275. # Don't print out secrets in test logs
  276. filtered_args = [
  277. "[REDACTED]" if any(secret in arg for secret in SECRETS) else arg
  278. for arg in args
  279. ]
  280. print(f"$ docker compose {' '.join(filtered_args)}", file=sys.stderr)
  281. stdout = None
  282. if capture:
  283. stdout = subprocess.PIPE if capture == True else capture
  284. elif capture_stderr:
  285. # this is necessary for the stderr to work
  286. stdout = subprocess.PIPE
  287. stderr = None
  288. if capture_stderr:
  289. stderr = subprocess.PIPE if capture_stderr == True else capture_stderr
  290. project_name_args = (
  291. ("--project-name", self.project_name) if self.project_name else ()
  292. )
  293. # One file per thread to make sure we don't try to read a file which is
  294. # not seeked to 0, leading to "empty compose file" errors
  295. thread_id = threading.get_ident()
  296. file = self.files.get(thread_id)
  297. if not file:
  298. file = TemporaryFile(mode="w")
  299. os.set_inheritable(file.fileno(), True)
  300. yaml.dump(self.compose, file)
  301. os.fsync(file.fileno())
  302. self.files[thread_id] = file
  303. cmd = [
  304. "docker",
  305. "compose",
  306. f"-f/dev/fd/{file.fileno()}",
  307. "--project-directory",
  308. self.path,
  309. *project_name_args,
  310. *args,
  311. ]
  312. for retry in range(1, max_tries + 1):
  313. stdout_result = ""
  314. stderr_result = ""
  315. file.seek(0)
  316. try:
  317. if capture_and_print:
  318. p = subprocess.Popen(
  319. cmd,
  320. close_fds=False,
  321. stdin=subprocess.PIPE,
  322. stdout=subprocess.PIPE,
  323. stderr=subprocess.PIPE,
  324. text=True,
  325. bufsize=1,
  326. env=environment,
  327. )
  328. if stdin is not None:
  329. p.stdin.write(stdin) # type: ignore
  330. if p.stdin is not None:
  331. p.stdin.close()
  332. sel = selectors.DefaultSelector()
  333. sel.register(p.stdout, selectors.EVENT_READ) # type: ignore
  334. sel.register(p.stderr, selectors.EVENT_READ) # type: ignore
  335. assert p.stdout is not None
  336. assert p.stderr is not None
  337. os.set_blocking(p.stdout.fileno(), False)
  338. os.set_blocking(p.stderr.fileno(), False)
  339. running = True
  340. while running:
  341. running = False
  342. for key, val in sel.select():
  343. output = ""
  344. while True:
  345. new_output = key.fileobj.read(1024) # type: ignore
  346. if not new_output:
  347. break
  348. output += new_output
  349. if not output:
  350. continue
  351. # Keep running as long as stdout or stderr have any content
  352. running = True
  353. if key.fileobj is p.stdout:
  354. print(output, end="", flush=True)
  355. stdout_result += output
  356. else:
  357. print(output, end="", file=sys.stderr, flush=True)
  358. stderr_result += output
  359. p.wait()
  360. retcode = p.poll()
  361. assert retcode is not None
  362. if check and retcode:
  363. raise subprocess.CalledProcessError(
  364. retcode, p.args, output=stdout_result, stderr=stderr_result
  365. )
  366. return subprocess.CompletedProcess(
  367. p.args, retcode, stdout_result, stderr_result
  368. )
  369. else:
  370. return subprocess.run(
  371. cmd,
  372. close_fds=False,
  373. check=check,
  374. stdout=stdout,
  375. stderr=stderr,
  376. input=stdin,
  377. text=True,
  378. bufsize=1,
  379. env=environment,
  380. )
  381. except subprocess.CalledProcessError as e:
  382. if e.stdout and not capture_and_print:
  383. print(e.stdout)
  384. if e.stderr and not capture_and_print:
  385. print(e.stderr, file=sys.stderr)
  386. if retry < max_tries:
  387. print("Retrying ...")
  388. time.sleep(3)
  389. if build and buildkite.is_build_failed(build):
  390. print(f"Build {build} has been marked as failed, exiting hard")
  391. sys.exit(1)
  392. continue
  393. else:
  394. raise CommandFailureCausedUIError(
  395. f"running docker compose failed (exit status {e.returncode})",
  396. cmd=e.cmd,
  397. stdout=e.stdout,
  398. stderr=e.stderr,
  399. )
  400. assert False, "unreachable"
  401. def port(self, service: str, private_port: int | str) -> int:
  402. """Get the public port for a service's private port.
  403. Delegates to `docker compose port`. See that command's help for details.
  404. Args:
  405. service: The name of a service in the composition.
  406. private_port: A private port exposed by the service.
  407. """
  408. proc = self.invoke(
  409. "port", service, str(private_port), capture=True, silent=True
  410. )
  411. if not proc.stdout.strip():
  412. raise UIError(
  413. f"service f{service!r} is not exposing port {private_port!r}",
  414. hint="is the service running?",
  415. )
  416. return int(proc.stdout.split(":")[1])
  417. def default_port(self, service: str) -> int:
  418. """Get the default public port for a service.
  419. Args:
  420. service: The name of a service in the composition.
  421. """
  422. ports = self.compose["services"][service]["ports"]
  423. if not ports:
  424. raise UIError(f"service f{service!r} does not expose any ports")
  425. private_port = str(ports[0]).split(":")[-1]
  426. return self.port(service, private_port)
  427. def workflow(self, name: str, *args: str) -> None:
  428. """Run a workflow in the composition.
  429. Raises a `KeyError` if the workflow does not exist.
  430. Args:
  431. name: The name of the workflow to run.
  432. args: The arguments to pass to the workflow function.
  433. """
  434. print(f"--- Running workflow {name}")
  435. func = self.workflows[name]
  436. parser = WorkflowArgumentParser(name, inspect.getdoc(func), list(args))
  437. try:
  438. loader.composition_path = self.path
  439. if len(inspect.signature(func).parameters) > 1:
  440. func(self, parser)
  441. else:
  442. # If the workflow doesn't have an `args` parameter, parse them here
  443. # with an empty parser to reject bogus arguments and to handle the
  444. # trivial help message.
  445. parser.parse_args()
  446. func(self)
  447. if os.getenv("CI_FINAL_PREFLIGHT_CHECK_VERSION") is not None:
  448. self.final_preflight_check()
  449. elif self.is_sanity_restart_mz:
  450. self.sanity_restart_mz()
  451. finally:
  452. loader.composition_path = None
  453. @contextmanager
  454. def override(
  455. self, *services: "Service", fail_on_new_service: bool = True
  456. ) -> Iterator[None]:
  457. """Temporarily update the composition with the specified services.
  458. The services must already exist in the composition. They restored to
  459. their old definitions when the `with` block ends. Note that the service
  460. definition is written in its entirety; i.e., the configuration is not
  461. deep merged but replaced wholesale.
  462. Lest you are tempted to change this function to allow dynamically
  463. injecting new services: do not do this! These services will not be
  464. visible to other commands, like `mzcompose run`, `mzcompose logs`, or
  465. `mzcompose down`, which makes debugging or inspecting the composition
  466. challenging.
  467. """
  468. # Remember the old composition.
  469. old_compose = copy.deepcopy(self.compose)
  470. # Update the composition with the new service definitions.
  471. deps = self._munge_services([(s.name, cast(dict, s.config)) for s in services])
  472. for service in services:
  473. assert (
  474. not fail_on_new_service or service.name in self.compose["services"]
  475. ), f"Service {service.name} not found in SERVICES: {list(self.compose['services'].keys())}"
  476. self.compose["services"][service.name] = service.config
  477. # Re-acquire dependencies, as the override may have swapped an `image`
  478. # config for an `mzbuild` config.
  479. deps.acquire()
  480. self.files = {}
  481. # Ensure image freshness
  482. self.pull_if_variable([service.name for service in services])
  483. try:
  484. # Run the next composition.
  485. yield
  486. finally:
  487. # If sanity_restart existed in the overriden service, but
  488. # override() disabled it by removing the label,
  489. # keep the sanity check disabled
  490. if (
  491. "materialized" in old_compose["services"]
  492. and "labels" in old_compose["services"]["materialized"]
  493. and "sanity_restart"
  494. in old_compose["services"]["materialized"]["labels"]
  495. ):
  496. if (
  497. "labels" not in self.compose["services"]["materialized"]
  498. or "sanity_restart"
  499. not in self.compose["services"]["materialized"]["labels"]
  500. ):
  501. print("sanity_restart disabled by override(), keeping it disabled")
  502. del old_compose["services"]["materialized"]["labels"][
  503. "sanity_restart"
  504. ]
  505. # Restore the old composition.
  506. self.compose = old_compose
  507. self.files = {}
  508. @contextmanager
  509. def test_case(self, name: str) -> Iterator[None]:
  510. """Execute a test case.
  511. This context manager provides a very lightweight testing framework. If
  512. the body of the context manager raises an exception, the test case is
  513. considered to have failed; otherwise it is considered to have succeeded.
  514. In either case the execution time and status of the test are recorded in
  515. `test_results`.
  516. Example:
  517. A simple workflow that executes a table-driven test:
  518. ```
  519. @dataclass
  520. class TestCase:
  521. name: str
  522. files: list[str]
  523. test_cases = [
  524. TestCase(name="short", files=["quicktests.td"]),
  525. TestCase(name="long", files=["longtest1.td", "longtest2.td"]),
  526. ]
  527. def workflow_default(c: Composition):
  528. for tc in test_cases:
  529. with c.test_case(tc.name):
  530. c.run_testdrive_files(*tc.files)
  531. ```
  532. Args:
  533. name: The name of the test case. Must be unique across the lifetime
  534. of a composition.
  535. """
  536. if name in self.test_results:
  537. raise UIError(f"test case {name} executed twice")
  538. ui.header(f"Running test case {name}")
  539. errors = []
  540. start_time = time.time()
  541. try:
  542. yield
  543. end_time = time.time()
  544. ui.header(f"mzcompose: test case {name} succeeded")
  545. except Exception as e:
  546. end_time = time.time()
  547. error_message = f"{e.__class__.__module__}.{e.__class__.__name__}: {e}"
  548. ui.header(f"mzcompose: test case {name} failed: {error_message}")
  549. errors = self.extract_test_errors(e, error_message)
  550. if not isinstance(e, UIError):
  551. traceback.print_exc()
  552. duration = end_time - start_time
  553. self.test_results[name] = TestResult(duration, errors)
  554. def extract_test_errors(
  555. self, e: Exception, error_message: str
  556. ) -> list[TestFailureDetails]:
  557. errors = [
  558. TestFailureDetails(
  559. error_message,
  560. details=None,
  561. test_case_name_override=self.current_test_case_name_override,
  562. location=None,
  563. line_number=None,
  564. )
  565. ]
  566. if isinstance(e, CommandFailureCausedUIError):
  567. try:
  568. extracted_errors = try_determine_errors_from_cmd_execution(
  569. e, self.current_test_case_name_override
  570. )
  571. except:
  572. extracted_errors = []
  573. errors = extracted_errors if len(extracted_errors) > 0 else errors
  574. elif isinstance(e, FailedTestExecutionError):
  575. errors = e.errors
  576. assert len(errors) > 0, "Failed test execution does not contain any errors"
  577. elif isinstance(e, WorkerFailedException):
  578. errors = [
  579. TestFailureDetails(
  580. error_message,
  581. details=str(e.cause) if e.cause is not None else None,
  582. location=None,
  583. line_number=None,
  584. test_case_name_override=self.current_test_case_name_override,
  585. )
  586. ]
  587. return errors
  588. def sql_connection(
  589. self,
  590. service: str | None = None,
  591. user: str = "materialize",
  592. database: str = "materialize",
  593. port: int | None = None,
  594. password: str | None = None,
  595. sslmode: str = "disable",
  596. startup_params: dict[str, str] = {},
  597. reuse_connection: bool = False,
  598. ) -> Connection:
  599. if service is None:
  600. service = "materialized"
  601. """Get a connection (with autocommit enabled) to the materialized service."""
  602. port = self.port(service, port) if port else self.default_port(service)
  603. options = " ".join([f"-c {key}={val}" for key, val in startup_params.items()])
  604. key = (threading.get_ident(), database, user, password, port, sslmode, options)
  605. if reuse_connection and key in self.conns:
  606. return self.conns[key]
  607. conn = psycopg.connect(
  608. host="localhost",
  609. dbname=database,
  610. user=user,
  611. password=password,
  612. port=port,
  613. sslmode=sslmode,
  614. options=options,
  615. )
  616. conn.autocommit = True
  617. if reuse_connection:
  618. self.conns[key] = conn
  619. return conn
  620. def sql_cursor(
  621. self,
  622. service: str | None = None,
  623. user: str = "materialize",
  624. database: str = "materialize",
  625. port: int | None = None,
  626. password: str | None = None,
  627. sslmode: str = "disable",
  628. startup_params: dict[str, str] = {},
  629. reuse_connection: bool = False,
  630. ) -> Cursor:
  631. """Get a cursor to run SQL queries against the materialized service."""
  632. conn = self.sql_connection(
  633. service,
  634. user,
  635. database,
  636. port,
  637. password,
  638. sslmode,
  639. startup_params,
  640. reuse_connection,
  641. )
  642. return conn.cursor()
  643. def sql(
  644. self,
  645. sql: str,
  646. service: str | None = None,
  647. user: str = "materialize",
  648. database: str = "materialize",
  649. port: int | None = None,
  650. password: str | None = None,
  651. print_statement: bool = True,
  652. reuse_connection: bool = True,
  653. ) -> None:
  654. """Run a batch of SQL statements against the materialized service."""
  655. with self.sql_cursor(
  656. service=service,
  657. user=user,
  658. database=database,
  659. port=port,
  660. password=password,
  661. reuse_connection=reuse_connection,
  662. ) as cursor:
  663. for statement in sqlparse.split(sql):
  664. if print_statement:
  665. print(f"> {statement}")
  666. cursor.execute(statement.encode())
  667. def sql_query(
  668. self,
  669. sql: str,
  670. service: str | None = None,
  671. user: str = "materialize",
  672. database: str = "materialize",
  673. port: int | None = None,
  674. password: str | None = None,
  675. reuse_connection: bool = True,
  676. ) -> Any:
  677. """Execute and return results of a SQL query."""
  678. with self.sql_cursor(
  679. service=service,
  680. user=user,
  681. database=database,
  682. port=port,
  683. password=password,
  684. reuse_connection=reuse_connection,
  685. ) as cursor:
  686. cursor.execute(sql.encode())
  687. return cursor.fetchall()
  688. def query_mz_version(self, service: str | None = None) -> str:
  689. return self.sql_query("SELECT mz_version()", service=service)[0][0]
  690. def run(
  691. self,
  692. service: str,
  693. *args: str,
  694. detach: bool = False,
  695. rm: bool = False,
  696. env_extra: dict[str, str] = {},
  697. capture: bool = False,
  698. capture_stderr: bool = False,
  699. capture_and_print: bool = False,
  700. stdin: str | None = None,
  701. entrypoint: str | None = None,
  702. check: bool = True,
  703. ) -> subprocess.CompletedProcess:
  704. """Run a one-off command in a service.
  705. Delegates to `docker compose run`. See that command's help for details.
  706. Note that unlike `docker compose run`, any services whose definitions
  707. have changed are rebuilt (like `docker compose up` would do) before the
  708. command is executed.
  709. Args:
  710. service: The name of a service in the composition.
  711. args: Arguments to pass to the service's entrypoint.
  712. detach: Run the container in the background.
  713. stdin: read STDIN from a string.
  714. env_extra: Additional environment variables to set in the container.
  715. rm: Remove container after run.
  716. capture: Capture the stdout of the `docker compose` invocation.
  717. capture_stderr: Capture the stderr of the `docker compose` invocation.
  718. capture_and_print: Print during execution and capture the
  719. stdout+stderr of the `docker compose` invocation.
  720. """
  721. return self.invoke(
  722. "run",
  723. *(["--entrypoint", entrypoint] if entrypoint else []),
  724. *(f"-e{k}" for k in env_extra.keys()),
  725. *(["--detach"] if detach else []),
  726. *(["--rm"] if rm else []),
  727. service,
  728. *args,
  729. capture=capture,
  730. capture_stderr=capture_stderr,
  731. capture_and_print=capture_and_print,
  732. stdin=stdin,
  733. check=check,
  734. environment=os.environ | env_extra,
  735. )
  736. def run_testdrive_files(
  737. self,
  738. *args: str,
  739. service: str = "testdrive",
  740. mz_service: str | None = None,
  741. quiet: bool = False,
  742. persistent: bool = True,
  743. ) -> subprocess.CompletedProcess:
  744. if mz_service is not None:
  745. args = tuple(
  746. list(args)
  747. + [
  748. f"--materialize-url=postgres://materialize@{mz_service}:6875",
  749. f"--materialize-internal-url=postgres://mz_system@{mz_service}:6877",
  750. ]
  751. )
  752. environment = {"CLUSTER_REPLICA_SIZES": json.dumps(cluster_replica_size_map())}
  753. if persistent:
  754. if not self.is_running(service):
  755. self.up({"name": service, "persistent": True})
  756. return self.exec(
  757. service,
  758. *args,
  759. # needed for sufficient error information in the junit.xml while still printing to stdout during execution
  760. capture_and_print=not quiet,
  761. capture=quiet,
  762. capture_stderr=quiet,
  763. env_extra=environment,
  764. )
  765. else:
  766. return self.run(
  767. service,
  768. *args,
  769. # needed for sufficient error information in the junit.xml while still printing to stdout during execution
  770. capture_and_print=not quiet,
  771. capture=quiet,
  772. capture_stderr=quiet,
  773. env_extra=environment,
  774. )
  775. def exec(
  776. self,
  777. service: str,
  778. *args: str,
  779. detach: bool = False,
  780. capture: bool = False,
  781. capture_stderr: bool = False,
  782. capture_and_print: bool = False,
  783. stdin: str | None = None,
  784. check: bool = True,
  785. workdir: str | None = None,
  786. env_extra: dict[str, str] = {},
  787. silent: bool = False,
  788. ) -> subprocess.CompletedProcess:
  789. """Execute a one-off command in a service's running container
  790. Delegates to `docker compose exec`.
  791. Args:
  792. service: The service whose container will be used.
  793. command: The command to run.
  794. args: Arguments to pass to the command.
  795. detach: Run the container in the background.
  796. stdin: read STDIN from a string.
  797. """
  798. return self.invoke(
  799. "exec",
  800. *(["--detach"] if detach else []),
  801. *(["--workdir", workdir] if workdir else []),
  802. *(f"-e{k}={v}" for k, v in env_extra.items()),
  803. "-T",
  804. service,
  805. *(
  806. self.compose["services"][service]["entrypoint"]
  807. if "entrypoint" in self.compose["services"][service]
  808. else []
  809. ),
  810. *args,
  811. capture=capture,
  812. capture_stderr=capture_stderr,
  813. capture_and_print=capture_and_print,
  814. stdin=stdin,
  815. check=check,
  816. silent=silent,
  817. )
  818. def pull_if_variable(self, services: list[str], max_tries: int = 2) -> None:
  819. """Pull fresh service images in case the tag indicates the underlying image may change over time.
  820. Args:
  821. services: List of service names
  822. """
  823. for service in services:
  824. if "image" in self.compose["services"][service] and any(
  825. tag in self.compose["services"][service]["image"]
  826. for tag in [":latest", ":unstable", ":rolling"]
  827. ):
  828. self.pull_single_image_by_service_name(service, max_tries=max_tries)
  829. def pull_single_image_by_service_name(
  830. self, service_name: str, max_tries: int
  831. ) -> None:
  832. self.invoke("pull", service_name, max_tries=max_tries)
  833. def try_pull_service_image(self, service: Service, max_tries: int = 2) -> bool:
  834. """Tries to pull the specified image and returns if this was successful."""
  835. try:
  836. with self.override(service):
  837. self.pull_single_image_by_service_name(
  838. service.name, max_tries=max_tries
  839. )
  840. return True
  841. except UIError:
  842. return False
  843. def up(
  844. self,
  845. *services: str | dict[str, Any],
  846. detach: bool = True,
  847. wait: bool = True,
  848. max_tries: int = 5, # increased since quay.io returns 502 sometimes
  849. ) -> None:
  850. """Build, (re)create, and start the named services.
  851. Delegates to `docker compose up`. See that command's help for details.
  852. Args:
  853. services: The names of services in the composition.
  854. detach: Run containers in the background.
  855. wait: Wait for health checks to complete before returning.
  856. Implies `detach` mode.
  857. max_tries: Number of tries on failure.
  858. """
  859. persistent = set(
  860. [
  861. service["name"]
  862. for service in services
  863. if isinstance(service, dict) and service.get("persistent")
  864. ]
  865. )
  866. if persistent:
  867. old_compose = copy.deepcopy(self.compose)
  868. for service_name, service in self.compose["services"].items():
  869. if service_name in persistent:
  870. service["entrypoint"] = ["sleep", "infinity"]
  871. service["command"] = []
  872. self.files = {}
  873. service_names = [
  874. service["name"] if isinstance(service, dict) else service
  875. for service in services
  876. ]
  877. self.capture_logs()
  878. self.invoke(
  879. "up",
  880. *(["--detach"] if detach else []),
  881. *(["--wait"] if wait else []),
  882. *(["--quiet-pull"] if ui.env_is_truthy("CI") else []),
  883. *service_names,
  884. max_tries=300 if os.getenv("CI_WAITING_FOR_BUILD") else max_tries,
  885. build=os.getenv("CI_WAITING_FOR_BUILD"),
  886. )
  887. if persistent:
  888. self.compose = old_compose # type: ignore
  889. self.files = {}
  890. def validate_sources_sinks_clusters(self) -> str | None:
  891. """Validate that all sources, sinks & clusters are in a good state"""
  892. exclusion_clause = "true"
  893. if len(self.sources_and_sinks_ignored_from_validation) > 0:
  894. excluded_items = ", ".join(
  895. f"'{name}'" for name in self.sources_and_sinks_ignored_from_validation
  896. )
  897. exclusion_clause = f"name NOT IN ({excluded_items})"
  898. # starting sources are currently expected if no new data is produced, see database-issues#6605
  899. results = self.sql_query(
  900. f"""
  901. SELECT name, status, error, details
  902. FROM mz_internal.mz_source_statuses
  903. WHERE NOT(
  904. status IN ('running', 'starting', 'paused') OR
  905. (type = 'progress' AND status = 'created')
  906. )
  907. AND {exclusion_clause}
  908. """
  909. )
  910. for name, status, error, details in results:
  911. return f"Source {name} is expected to be running/created/paused, but is {status}, error: {error}, details: {details}"
  912. results = self.sql_query(
  913. f"""
  914. SELECT name, status, error, details
  915. FROM mz_internal.mz_sink_statuses
  916. WHERE status NOT IN ('running', 'dropped')
  917. AND {exclusion_clause}
  918. """
  919. )
  920. for name, status, error, details in results:
  921. return f"Sink {name} is expected to be running/dropped, but is {status}, error: {error}, details: {details}"
  922. results = self.sql_query(
  923. """
  924. SELECT mz_clusters.name, mz_cluster_replicas.name, status, reason
  925. FROM mz_internal.mz_cluster_replica_statuses
  926. JOIN mz_cluster_replicas
  927. ON mz_internal.mz_cluster_replica_statuses.replica_id = mz_cluster_replicas.id
  928. JOIN mz_clusters ON mz_cluster_replicas.cluster_id = mz_clusters.id
  929. WHERE status NOT IN ('online', 'offline')
  930. """
  931. )
  932. for cluster_name, replica_name, status, reason in results:
  933. return f"Cluster replica {cluster_name}.{replica_name} is expected to be online/offline, but is {status}, reason: {reason}"
  934. return None
  935. def final_preflight_check(self) -> None:
  936. """Check if Mz can do the preflight-check and upgrade/rollback with specified version."""
  937. version = os.getenv("CI_FINAL_PREFLIGHT_CHECK_VERSION")
  938. if version is None:
  939. return
  940. rollback = ui.env_is_truthy("CI_FINAL_PREFLIGHT_CHECK_ROLLBACK")
  941. if "materialized" in self.compose["services"]:
  942. ui.header("Final Preflight Check")
  943. ps = self.invoke("ps", "materialized", "--quiet", capture=True)
  944. if len(ps.stdout) == 0:
  945. print("Service materialized not running, will not upgrade it.")
  946. return
  947. self.kill("materialized")
  948. with self.override(
  949. Materialized(
  950. image=f"materialize/materialized:{version}",
  951. environment_extra=["MZ_DEPLOY_GENERATION=1"],
  952. healthcheck=LEADER_STATUS_HEALTHCHECK,
  953. )
  954. ):
  955. self.up("materialized")
  956. self.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE)
  957. if rollback:
  958. with self.override(Materialized()):
  959. self.up("materialized")
  960. else:
  961. self.promote_mz()
  962. self.sql("SELECT 1")
  963. NUM_RETRIES = 60
  964. for i in range(NUM_RETRIES + 1):
  965. error = self.validate_sources_sinks_clusters()
  966. if not error:
  967. break
  968. if i == NUM_RETRIES:
  969. raise ValueError(error)
  970. # Sources and cluster replicas need a few seconds to start up
  971. print(f"Retrying ({i+1}/{NUM_RETRIES})...")
  972. time.sleep(1)
  973. # In case the test has to continue, reset state
  974. self.kill("materialized")
  975. self.up("materialized")
  976. else:
  977. ui.header(
  978. "Persist Catalog Forward Compatibility Check skipped because Mz not in services"
  979. )
  980. def sanity_restart_mz(self) -> None:
  981. """Restart Materialized if it is part of the composition to find
  982. problems with persisted objects, functions as a sanity check."""
  983. if (
  984. "materialized" in self.compose["services"]
  985. and "labels" in self.compose["services"]["materialized"]
  986. and "sanity_restart" in self.compose["services"]["materialized"]["labels"]
  987. ):
  988. ui.header(
  989. "Sanity Restart: Restart Mz and verify source/sink/replica health"
  990. )
  991. ps = self.invoke("ps", "materialized", "--quiet", capture=True)
  992. if len(ps.stdout) == 0:
  993. print("Service materialized not running, will not restart it.")
  994. return
  995. self.kill("materialized")
  996. self.up("materialized")
  997. self.sql("SELECT 1")
  998. NUM_RETRIES = 60
  999. for i in range(NUM_RETRIES + 1):
  1000. error = self.validate_sources_sinks_clusters()
  1001. if not error:
  1002. break
  1003. if i == NUM_RETRIES:
  1004. raise ValueError(error)
  1005. # Sources and cluster replicas need a few seconds to start up
  1006. print(f"Retrying ({i+1}/{NUM_RETRIES})...")
  1007. time.sleep(1)
  1008. else:
  1009. ui.header(
  1010. "Sanity Restart skipped because Mz not in services or `sanity_restart` label not set"
  1011. )
  1012. def metadata_store(self) -> str:
  1013. for name in ["cockroach", "postgres-metadata"]:
  1014. if name in self.compose["services"]:
  1015. return name
  1016. raise RuntimeError(
  1017. f"No external metadata store found: {self.compose['services']}"
  1018. )
  1019. def blob_store(self) -> str:
  1020. for name in ["azurite", "minio"]:
  1021. if name in self.compose["services"]:
  1022. return name
  1023. raise RuntimeError(f"No external blob store found: {self.compose['services']}")
  1024. def capture_logs(self, *services: str) -> None:
  1025. # Capture logs into services.log since they will be lost otherwise
  1026. # after dowing a composition.
  1027. path = MZ_ROOT / "services.log"
  1028. # Don't capture log lines we received already
  1029. time = os.path.getmtime(path) if os.path.isfile(path) else 0
  1030. with open(path, "a") as f:
  1031. self.invoke(
  1032. "logs",
  1033. "--no-color",
  1034. "--timestamps",
  1035. "--since",
  1036. str(time),
  1037. *services,
  1038. capture=f,
  1039. )
  1040. def down(
  1041. self,
  1042. destroy_volumes: bool = True,
  1043. remove_orphans: bool = True,
  1044. sanity_restart_mz: bool = True,
  1045. ) -> None:
  1046. """Stop and remove resources.
  1047. Delegates to `docker compose down`. See that command's help for details.
  1048. Args:
  1049. destroy_volumes: Remove named volumes and anonymous volumes attached
  1050. to containers.
  1051. sanity_restart_mz: Try restarting materialize first if it is part
  1052. of the composition, as a sanity check.
  1053. """
  1054. if os.getenv("CI_FINAL_PREFLIGHT_CHECK_VERSION") is not None:
  1055. self.final_preflight_check()
  1056. elif sanity_restart_mz and self.is_sanity_restart_mz:
  1057. self.sanity_restart_mz()
  1058. self.capture_logs()
  1059. self.invoke(
  1060. "down",
  1061. *(["--volumes"] if destroy_volumes else []),
  1062. *(["--remove-orphans"] if remove_orphans else []),
  1063. )
  1064. def stop(self, *services: str, wait: bool = True) -> None:
  1065. """Stop the docker containers for the named services.
  1066. Delegates to `docker compose stop`. See that command's help for details.
  1067. Args:
  1068. services: The names of services in the composition.
  1069. """
  1070. self.invoke("stop", *services)
  1071. if wait:
  1072. self.wait(*services)
  1073. def kill(self, *services: str, signal: str = "SIGKILL", wait: bool = True) -> None:
  1074. """Force stop service containers.
  1075. Delegates to `docker compose kill`. See that command's help for details.
  1076. Args:
  1077. services: The names of services in the composition.
  1078. signal: The signal to deliver.
  1079. wait: Wait for the container die
  1080. """
  1081. self.invoke("kill", f"-s{signal}", *services)
  1082. if wait:
  1083. # Containers exit with exit code 137 ( = 128 + 9 ) when killed via SIGKILL
  1084. self.wait(*services, expect_exit_code=137 if signal == "SIGKILL" else None)
  1085. def is_running(self, container_name: str) -> bool:
  1086. qualified_container_name = f"{self.name}-{container_name}-1"
  1087. output_str = self.invoke(
  1088. "ps",
  1089. "--filter",
  1090. f"name=^/{qualified_container_name}$",
  1091. "--filter",
  1092. "status=running",
  1093. capture=True,
  1094. silent=True,
  1095. ).stdout
  1096. assert output_str is not None
  1097. # Output will be something like:
  1098. # CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  1099. # fd814abd6b36 mysql:8.0.35 "docker-entrypoint.s…" 5 minutes ago Up 5 minutes (healthy) 33060/tcp, 0.0.0.0:52605->3306/tcp mysql-cdc-resumption-mysql-replica-1-1
  1100. return qualified_container_name in output_str
  1101. def wait(self, *services: str, expect_exit_code: int | None = None) -> None:
  1102. """Wait for a container to exit
  1103. Delegates to `docker wait`. See that command's help for details.
  1104. Args:
  1105. services: The names of services in the composition.
  1106. expect_exit_code: Optionally expect a specific exit code
  1107. """
  1108. for service in services:
  1109. container_id = self.container_id(service)
  1110. if container_id is None:
  1111. # Container has already exited, can not `docker wait` on it
  1112. continue
  1113. try:
  1114. exit_code = int(
  1115. subprocess.run(
  1116. ["docker", "wait", container_id],
  1117. check=True,
  1118. stdout=subprocess.PIPE,
  1119. text=True,
  1120. bufsize=1,
  1121. ).stdout
  1122. )
  1123. if expect_exit_code is not None:
  1124. assert (
  1125. expect_exit_code == exit_code
  1126. ), f"service {service} exited with exit code {exit_code}, expected {expect_exit_code}"
  1127. except subprocess.CalledProcessError as e:
  1128. assert "No such container" in str(
  1129. e
  1130. ), f"docker wait unexpectedly returned {e}"
  1131. def pause(self, *services: str) -> None:
  1132. """Pause service containers.
  1133. Delegates to `docker compose pause`. See that command's help for details.
  1134. Args:
  1135. services: The names of services in the composition.
  1136. """
  1137. self.invoke("pause", *services)
  1138. def unpause(self, *services: str) -> None:
  1139. """Unpause service containers
  1140. Delegates to `docker compose unpause`. See that command's help for details.
  1141. Args:
  1142. services: The names of services in the composition.
  1143. """
  1144. self.invoke("unpause", *services)
  1145. def rm(
  1146. self, *services: str, stop: bool = True, destroy_volumes: bool = True
  1147. ) -> None:
  1148. """Remove stopped service containers.
  1149. Delegates to `docker compose rm`. See that command's help for details.
  1150. Args:
  1151. services: The names of services in the composition.
  1152. stop: Stop the containers if necessary.
  1153. destroy_volumes: Destroy any anonymous volumes associated with the
  1154. service. Note that this does not destroy any named volumes
  1155. attached to the service.
  1156. """
  1157. self.capture_logs()
  1158. self.invoke(
  1159. "rm",
  1160. "--force",
  1161. *(["--stop"] if stop else []),
  1162. *(["-v"] if destroy_volumes else []),
  1163. *services,
  1164. )
  1165. def rm_volumes(self, *volumes: str, force: bool = False) -> None:
  1166. """Remove the named volumes.
  1167. Args:
  1168. volumes: The names of volumes in the composition.
  1169. force: Whether to force the removal (i.e., don't error if the
  1170. volume does not exist).
  1171. """
  1172. volumes = tuple(f"{self.name}_{v}" for v in volumes)
  1173. spawn.runv(
  1174. ["docker", "volume", "rm", *(["--force"] if force else []), *volumes]
  1175. )
  1176. def sleep(self, duration: float) -> None:
  1177. """Sleep for the specified duration in seconds."""
  1178. print(f"Sleeping for {duration} seconds...")
  1179. time.sleep(duration)
  1180. def container_id(self, service: str) -> str | None:
  1181. """Return the container_id for the specified service
  1182. Delegates to `docker compose ps`
  1183. """
  1184. output_str = self.invoke(
  1185. "ps", "--quiet", service, capture=True, silent=True
  1186. ).stdout
  1187. assert output_str is not None
  1188. if output_str == "":
  1189. return None
  1190. output_list = output_str.strip("\n").split("\n")
  1191. assert len(output_list) == 1
  1192. assert output_list[0] is not None
  1193. return str(output_list[0])
  1194. def stats(
  1195. self,
  1196. service: str,
  1197. ) -> str:
  1198. """Delegates to `docker stats`
  1199. Args:
  1200. service: The service whose container's stats will be probed.
  1201. """
  1202. container_id = self.container_id(service)
  1203. assert container_id is not None
  1204. return subprocess.run(
  1205. [
  1206. "docker",
  1207. "stats",
  1208. container_id,
  1209. "--format",
  1210. "{{json .}}",
  1211. "--no-stream",
  1212. "--no-trunc",
  1213. ],
  1214. check=True,
  1215. stdout=subprocess.PIPE,
  1216. text=True,
  1217. bufsize=1,
  1218. ).stdout
  1219. def mem(self, service: str) -> int:
  1220. stats_str = self.stats(service)
  1221. stats = json.loads(stats_str)
  1222. assert service in stats["Name"]
  1223. mem_str, _ = stats["MemUsage"].split("/") # "MemUsage":"1.542GiB / 62.8GiB"
  1224. mem_float = float(re.findall(r"[\d.]+", mem_str)[0])
  1225. if "MiB" in mem_str:
  1226. mem_float = mem_float * 10**6
  1227. elif "GiB" in mem_str:
  1228. mem_float = mem_float * 10**9
  1229. else:
  1230. raise RuntimeError(f"Unable to parse {mem_str}")
  1231. return round(mem_float)
  1232. def testdrive(
  1233. self,
  1234. input: str,
  1235. service: str = "testdrive",
  1236. args: list[str] = [],
  1237. caller: Traceback | None = None,
  1238. mz_service: str | None = None,
  1239. quiet: bool = False,
  1240. ) -> subprocess.CompletedProcess:
  1241. """Run a string as a testdrive script.
  1242. Args:
  1243. args: Additional arguments to pass to testdrive
  1244. service: Optional name of the testdrive service to use.
  1245. input: The string to execute.
  1246. persistent: Whether a persistent testdrive container will be used.
  1247. caller: The python source line that invoked testdrive()
  1248. mz_service: The Materialize service name to target
  1249. """
  1250. caller = caller or getframeinfo(stack()[1][0])
  1251. args = args + [f"--source={caller.filename}:{caller.lineno}"]
  1252. if mz_service is not None:
  1253. args = args + [
  1254. f"--materialize-url=postgres://materialize@{mz_service}:6875",
  1255. f"--materialize-internal-url=postgres://mz_system@{mz_service}:6877",
  1256. f"--persist-consensus-url=postgres://root@{mz_service}:26257?options=--search_path=consensus",
  1257. ]
  1258. if not self.is_running(service):
  1259. self.up({"name": service, "persistent": True})
  1260. return self.exec(
  1261. service,
  1262. *args,
  1263. stdin=input,
  1264. capture_and_print=not quiet,
  1265. capture=quiet,
  1266. capture_stderr=quiet,
  1267. )
  1268. def enable_minio_versioning(self) -> None:
  1269. self.up("minio")
  1270. self.up({"name": "mc", "persistent": True})
  1271. self.exec(
  1272. "mc",
  1273. "mc",
  1274. "alias",
  1275. "set",
  1276. "persist",
  1277. "http://minio:9000/",
  1278. "minioadmin",
  1279. "minioadmin",
  1280. )
  1281. self.exec("mc", "mc", "version", "enable", "persist/persist")
  1282. def backup_cockroach(self) -> None:
  1283. self.up({"name": "mc", "persistent": True})
  1284. self.exec("mc", "mc", "mb", "--ignore-existing", "persist/crdb-backup")
  1285. self.exec(
  1286. "cockroach",
  1287. "cockroach",
  1288. "sql",
  1289. "--insecure",
  1290. "-e",
  1291. """
  1292. CREATE EXTERNAL CONNECTION backup_bucket
  1293. AS 's3://persist/crdb-backup?AWS_ENDPOINT=http://minio:9000/&AWS_REGION=minio&AWS_ACCESS_KEY_ID=minioadmin&AWS_SECRET_ACCESS_KEY=minioadmin';
  1294. BACKUP INTO 'external://backup_bucket';
  1295. DROP EXTERNAL CONNECTION backup_bucket;
  1296. """,
  1297. )
  1298. def restore_cockroach(self, mz_service: str = "materialized") -> None:
  1299. self.kill(mz_service)
  1300. self.exec(
  1301. "cockroach",
  1302. "cockroach",
  1303. "sql",
  1304. "--insecure",
  1305. "-e",
  1306. """
  1307. DROP DATABASE defaultdb;
  1308. CREATE EXTERNAL CONNECTION backup_bucket
  1309. AS 's3://persist/crdb-backup?AWS_ENDPOINT=http://minio:9000/&AWS_REGION=minio&AWS_ACCESS_KEY_ID=minioadmin&AWS_SECRET_ACCESS_KEY=minioadmin';
  1310. RESTORE DATABASE defaultdb
  1311. FROM LATEST IN 'external://backup_bucket';
  1312. DROP EXTERNAL CONNECTION backup_bucket;
  1313. """,
  1314. )
  1315. self.up({"name": "persistcli", "persistent": True})
  1316. self.exec(
  1317. "persistcli",
  1318. "persistcli",
  1319. "admin",
  1320. "--commit",
  1321. "restore-blob",
  1322. f"--blob-uri={minio_blob_uri()}",
  1323. "--consensus-uri=postgres://root@cockroach:26257?options=--search_path=consensus",
  1324. )
  1325. self.up(mz_service)
  1326. def backup_postgres(self) -> None:
  1327. backup = self.exec(
  1328. "postgres-metadata",
  1329. "pg_dumpall",
  1330. "--user",
  1331. "postgres",
  1332. capture=True,
  1333. ).stdout
  1334. with open("backup.sql", "w") as f:
  1335. f.write(backup)
  1336. def restore_postgres(self, mz_service: str = "materialized") -> None:
  1337. self.kill(mz_service)
  1338. self.kill("postgres-metadata")
  1339. self.rm("postgres-metadata")
  1340. self.up("postgres-metadata")
  1341. with open("backup.sql") as f:
  1342. backup = f.read()
  1343. self.exec(
  1344. "postgres-metadata",
  1345. "psql",
  1346. "--user",
  1347. "postgres",
  1348. "--file",
  1349. "-",
  1350. stdin=backup,
  1351. )
  1352. self.up({"name": "persistcli", "persistent": True})
  1353. self.exec(
  1354. "persistcli",
  1355. "persistcli",
  1356. "admin",
  1357. "--commit",
  1358. "restore-blob",
  1359. f"--blob-uri={minio_blob_uri()}",
  1360. "--consensus-uri=postgres://root@postgres-metadata:26257?options=--search_path=consensus",
  1361. )
  1362. self.up(mz_service)
  1363. def backup(self) -> None:
  1364. if self.metadata_store() == "cockroach":
  1365. self.backup_cockroach()
  1366. else:
  1367. self.backup_postgres()
  1368. def restore(self, mz_service: str = "materialized") -> None:
  1369. if self.metadata_store() == "cockroach":
  1370. self.restore_cockroach(mz_service)
  1371. else:
  1372. self.restore_postgres(mz_service)
  1373. def await_mz_deployment_status(
  1374. self,
  1375. status: DeploymentStatus,
  1376. mz_service: str = "materialized",
  1377. timeout: int | None = None,
  1378. sleep_time: float | None = 1.0,
  1379. ) -> None:
  1380. timeout = timeout or (1800 if ui.env_is_truthy("CI_COVERAGE_ENABLED") else 900)
  1381. print(
  1382. f"Awaiting {mz_service} deployment status {status.value} for {timeout}s",
  1383. end="",
  1384. )
  1385. result = {}
  1386. timeout_time = time.time() + timeout
  1387. while time.time() < timeout_time:
  1388. try:
  1389. result = json.loads(
  1390. self.exec(
  1391. mz_service,
  1392. "curl",
  1393. "-s",
  1394. "localhost:6878/api/leader/status",
  1395. capture=True,
  1396. silent=True,
  1397. ).stdout
  1398. )
  1399. if result["status"] == status.value:
  1400. print(" Reached!")
  1401. return
  1402. except:
  1403. pass
  1404. print(".", end="")
  1405. if sleep_time:
  1406. time.sleep(sleep_time)
  1407. raise UIError(
  1408. f"Timed out waiting for {mz_service} to reach Mz deployment status {status.value}, still in status {result.get('status')}"
  1409. )
  1410. def promote_mz(self, mz_service: str = "materialized") -> None:
  1411. result = json.loads(
  1412. self.exec(
  1413. mz_service,
  1414. "curl",
  1415. "-s",
  1416. "-X",
  1417. "POST",
  1418. "localhost:6878/api/leader/promote",
  1419. capture=True,
  1420. ).stdout
  1421. )
  1422. assert result["result"] == "Success", f"Unexpected result {result}"
  1423. def cloud_hostname(self, quiet: bool = False) -> str:
  1424. """Uses the mz command line tool to get the hostname of the cloud instance"""
  1425. if not quiet:
  1426. print("Obtaining hostname of cloud instance ...")
  1427. region_status = self.run("mz", "region", "show", capture=True)
  1428. sql_line = region_status.stdout.split("\n")[2]
  1429. cloud_url = sql_line.split("\t")[1].strip()
  1430. # It is necessary to append the 'https://' protocol; otherwise, urllib can't parse it correctly.
  1431. cloud_hostname = urllib.parse.urlparse("https://" + cloud_url).hostname
  1432. return str(cloud_hostname)
  1433. T = TypeVar("T")
  1434. def test_parts(self, parts: list[T], process_func: Callable[[T], Any]) -> None:
  1435. priority: dict[str, int] = {}
  1436. # TODO(def-): Revisit if this is worth enabling, currently adds ~15 seconds to each run
  1437. # if buildkite.is_in_buildkite():
  1438. # print("~~~ Fetching part priorities")
  1439. # test_analytics_config = create_test_analytics_config(self)
  1440. # test_analytics = TestAnalyticsDb(test_analytics_config)
  1441. # try:
  1442. # priority = test_analytics.builds.get_part_priorities(timeout=15)
  1443. # print(f"Priorities: {priority}")
  1444. # except Exception as e:
  1445. # print(f"Failed to fetch part priorities, using default order: {e}")
  1446. sorted_parts = sorted(
  1447. parts, key=lambda part: priority.get(str(part), 0), reverse=True
  1448. )
  1449. exceptions: list[Exception] = []
  1450. try:
  1451. for part in sorted_parts:
  1452. try:
  1453. process_func(part)
  1454. except Exception as e:
  1455. # if buildkite.is_in_buildkite():
  1456. # assert test_analytics
  1457. # test_analytics.builds.add_build_job_failure(str(part))
  1458. # raise
  1459. # We could also keep running, but then runtime is still
  1460. # slow when a test fails, and the annotation only shows up
  1461. # after the test finished:
  1462. exceptions.append(e)
  1463. finally:
  1464. if buildkite.is_in_buildkite():
  1465. from materialize.test_analytics.config.test_analytics_db_config import (
  1466. create_test_analytics_config,
  1467. )
  1468. from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
  1469. test_analytics_config = create_test_analytics_config(self)
  1470. test_analytics = TestAnalyticsDb(test_analytics_config)
  1471. test_analytics.database_connector.submit_update_statements()
  1472. if exceptions:
  1473. print(f"Further exceptions were raised:\n{exceptions[1:]}")
  1474. raise exceptions[0]