query_result.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from __future__ import annotations
  10. from collections.abc import Sequence
  11. from typing import Any
  12. from materialize.output_consistency.execution.evaluation_strategy import (
  13. DummyEvaluation,
  14. EvaluationStrategy,
  15. EvaluationStrategyKey,
  16. )
  17. from materialize.output_consistency.execution.query_output_mode import QueryOutputMode
  18. from materialize.output_consistency.query.query_format import QueryOutputFormat
  19. from materialize.output_consistency.query.query_template import QueryTemplate
  20. from materialize.output_consistency.selection.column_selection import (
  21. ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
  22. )
  23. class QueryExecution:
  24. """An executed query with the outcomes of the different evaluation strategies"""
  25. def __init__(
  26. self,
  27. query_template: QueryTemplate,
  28. query_id: str,
  29. query_output_mode: QueryOutputMode,
  30. ):
  31. self.generic_sql = query_template.to_sql(
  32. DummyEvaluation(),
  33. QueryOutputFormat.MULTI_LINE,
  34. ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
  35. query_output_mode,
  36. )
  37. self.query_id = query_id
  38. self.query_template = query_template
  39. self.query_output_mode = query_output_mode
  40. self.outcomes: list[QueryOutcome] = []
  41. self.durations: list[float] = []
  42. def get_outcome_by_strategy_key(self) -> dict[EvaluationStrategyKey, QueryOutcome]:
  43. return {outcome.strategy.identifier: outcome for outcome in self.outcomes}
  44. def get_first_failing_outcome(self) -> QueryFailure:
  45. for outcome in self.outcomes:
  46. if not outcome.successful:
  47. assert isinstance(outcome, QueryFailure)
  48. return outcome
  49. raise RuntimeError("No failing outcome found")
  50. def __str__(self) -> str:
  51. return f"QueryExecution with {len(self.outcomes)} outcomes for template query: {self.generic_sql})"
  52. class QueryOutcome:
  53. """Outcome (result or failure) of a query execution with an evaluation strategy"""
  54. def __init__(
  55. self,
  56. strategy: EvaluationStrategy,
  57. sql: str,
  58. successful: bool,
  59. query_column_count: int,
  60. ):
  61. self.strategy = strategy
  62. self.sql = sql
  63. self.successful = successful
  64. self.query_column_count = query_column_count
  65. class QueryResult(QueryOutcome):
  66. """Result of a successful query with data"""
  67. def __init__(
  68. self,
  69. strategy: EvaluationStrategy,
  70. sql: str,
  71. query_column_count: int,
  72. result_rows: Sequence[Sequence[Any]],
  73. ):
  74. super().__init__(strategy, sql, True, query_column_count)
  75. self.result_rows = result_rows
  76. def row_count(self) -> int:
  77. return len(self.result_rows)
  78. def __str__(self) -> str:
  79. return f"Result with {len(self.result_rows)} rows with strategy '{self.strategy.name}'"
  80. class QueryFailure(QueryOutcome):
  81. """Error of a failed query with its error message"""
  82. def __init__(
  83. self,
  84. strategy: EvaluationStrategy,
  85. sql: str,
  86. query_column_count: int,
  87. error_message: str,
  88. ):
  89. super().__init__(strategy, sql, False, query_column_count)
  90. self.error_message = error_message
  91. def __str__(self) -> str:
  92. return f"Failure({self.error_message}) with strategy '{self.strategy.name}'"