123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from __future__ import annotations
- from collections.abc import Sequence
- from typing import Any
- from materialize.output_consistency.execution.evaluation_strategy import (
- DummyEvaluation,
- EvaluationStrategy,
- EvaluationStrategyKey,
- )
- from materialize.output_consistency.execution.query_output_mode import QueryOutputMode
- from materialize.output_consistency.query.query_format import QueryOutputFormat
- from materialize.output_consistency.query.query_template import QueryTemplate
- from materialize.output_consistency.selection.column_selection import (
- ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
- )
- class QueryExecution:
- """An executed query with the outcomes of the different evaluation strategies"""
- def __init__(
- self,
- query_template: QueryTemplate,
- query_id: str,
- query_output_mode: QueryOutputMode,
- ):
- self.generic_sql = query_template.to_sql(
- DummyEvaluation(),
- QueryOutputFormat.MULTI_LINE,
- ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
- query_output_mode,
- )
- self.query_id = query_id
- self.query_template = query_template
- self.query_output_mode = query_output_mode
- self.outcomes: list[QueryOutcome] = []
- self.durations: list[float] = []
- def get_outcome_by_strategy_key(self) -> dict[EvaluationStrategyKey, QueryOutcome]:
- return {outcome.strategy.identifier: outcome for outcome in self.outcomes}
- def get_first_failing_outcome(self) -> QueryFailure:
- for outcome in self.outcomes:
- if not outcome.successful:
- assert isinstance(outcome, QueryFailure)
- return outcome
- raise RuntimeError("No failing outcome found")
- def __str__(self) -> str:
- return f"QueryExecution with {len(self.outcomes)} outcomes for template query: {self.generic_sql})"
- class QueryOutcome:
- """Outcome (result or failure) of a query execution with an evaluation strategy"""
- def __init__(
- self,
- strategy: EvaluationStrategy,
- sql: str,
- successful: bool,
- query_column_count: int,
- ):
- self.strategy = strategy
- self.sql = sql
- self.successful = successful
- self.query_column_count = query_column_count
- class QueryResult(QueryOutcome):
- """Result of a successful query with data"""
- def __init__(
- self,
- strategy: EvaluationStrategy,
- sql: str,
- query_column_count: int,
- result_rows: Sequence[Sequence[Any]],
- ):
- super().__init__(strategy, sql, True, query_column_count)
- self.result_rows = result_rows
- def row_count(self) -> int:
- return len(self.result_rows)
- def __str__(self) -> str:
- return f"Result with {len(self.result_rows)} rows with strategy '{self.strategy.name}'"
- class QueryFailure(QueryOutcome):
- """Error of a failed query with its error message"""
- def __init__(
- self,
- strategy: EvaluationStrategy,
- sql: str,
- query_column_count: int,
- error_message: str,
- ):
- super().__init__(strategy, sql, False, query_column_count)
- self.error_message = error_message
- def __str__(self) -> str:
- return f"Failure({self.error_message}) with strategy '{self.strategy.name}'"
|