123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from __future__ import annotations
- import re
- from dataclasses import dataclass
- from materialize import MZ_ROOT
- from materialize.ui import CommandFailureCausedUIError, UIError
- PEM_CONTENT_RE = r"-----BEGIN ([A-Z ]+)-----[^-]+-----END [A-Z ]+-----"
- PEM_CONTENT_REPLACEMENT = r"<\1>"
- @dataclass
- class TestResult:
- __test__ = False
- duration: float
- errors: list[TestFailureDetails]
- def is_successful(self) -> bool:
- return len(self.errors) == 0
- @dataclass
- class TestFailureDetails:
- __test__ = False
- message: str
- details: str | None
- additional_details_header: str | None = None
- additional_details: str | None = None
- test_class_name_override: str | None = None
- """The test class usually describes the framework."""
- test_case_name_override: str | None = None
- """The test case usually describes the workflow, unless more fine-grained information is available."""
- location: str | None = None
- """depending on the check, this may either be a file name or a path"""
- line_number: int | None = None
- def location_as_file_name(self) -> str | None:
- if self.location is None:
- return None
- if "/" in self.location:
- return self.location[self.location.rindex("/") + 1 :]
- return self.location
- class FailedTestExecutionError(UIError):
- """
- An UIError that is caused by a failing test.
- """
- def __init__(
- self,
- errors: list[TestFailureDetails],
- error_summary: str = "At least one test failed",
- ):
- super().__init__(error_summary)
- self.errors = errors
- def try_determine_errors_from_cmd_execution(
- e: CommandFailureCausedUIError, test_context: str | None
- ) -> list[TestFailureDetails]:
- output = e.stderr or e.stdout
- if "running docker compose failed" in str(e):
- return [determine_error_from_docker_compose_failure(e, output, test_context)]
- if output is None:
- return []
- error_chunks = extract_error_chunks_from_output(output)
- collected_errors = []
- for chunk in error_chunks:
- match = re.search(r"([^.]+\.td):(\d+):\d+:", chunk)
- if match is not None:
- # for .td files like Postgres CDC, file_path will just contain the file name
- file_path = match.group(1)
- line_number = int(match.group(2))
- else:
- # for .py files like platform checks, file_path will be a path
- file_path = try_determine_error_location_from_cmd(e.cmd)
- if file_path is None or ":" not in file_path:
- line_number = None
- else:
- parts = file_path.split(":")
- file_path = parts[0]
- line_number = int(parts[1])
- message = (
- f"Executing {file_path if file_path is not None else 'command'} failed!"
- )
- failure_details = TestFailureDetails(
- message,
- details=chunk,
- test_case_name_override=test_context,
- location=file_path,
- line_number=line_number,
- )
- if failure_details in collected_errors:
- # do not add an identical error again
- pass
- else:
- collected_errors.append(failure_details)
- return collected_errors
- def determine_error_from_docker_compose_failure(
- e: CommandFailureCausedUIError, output: str | None, test_context: str | None
- ) -> TestFailureDetails:
- command = to_sanitized_command_str(e.cmd)
- context_prefix = f"{test_context}: " if test_context is not None else ""
- return TestFailureDetails(
- f"{context_prefix}Docker compose failed: {command}",
- details=output,
- test_case_name_override=test_context,
- location=None,
- line_number=None,
- )
- def try_determine_error_location_from_cmd(cmd: list[str]) -> str | None:
- root_path_as_string = f"{MZ_ROOT}/"
- for cmd_part in cmd:
- if type(cmd_part) == str and cmd_part.startswith("--source="):
- return cmd_part.removeprefix("--source=").replace(root_path_as_string, "")
- return None
- def extract_error_chunks_from_output(output: str) -> list[str]:
- if "+++ !!! Error Report" not in output:
- return []
- error_output = output[: output.index("+++ !!! Error Report") - 1]
- error_chunks = error_output.split("^^^ +++")
- return [chunk.strip() for chunk in error_chunks if len(chunk.strip()) > 0]
- def to_sanitized_command_str(cmd: list[str]) -> str:
- command_str = " ".join([str(x) for x in cmd])
- return re.sub(PEM_CONTENT_RE, PEM_CONTENT_REPLACEMENT, command_str)
|