query_execution_manager.py 12 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from datetime import datetime
  10. from materialize.output_consistency.common.configuration import (
  11. ConsistencyTestConfiguration,
  12. )
  13. from materialize.output_consistency.execution.evaluation_strategy import (
  14. EvaluationStrategy,
  15. )
  16. from materialize.output_consistency.execution.sql_executor import SqlExecutionError
  17. from materialize.output_consistency.execution.sql_executors import SqlExecutors
  18. from materialize.output_consistency.input_data.test_input_data import (
  19. ConsistencyTestInputData,
  20. )
  21. from materialize.output_consistency.output.output_printer import OutputPrinter
  22. from materialize.output_consistency.query.query_format import QueryOutputFormat
  23. from materialize.output_consistency.query.query_result import (
  24. QueryExecution,
  25. QueryFailure,
  26. QueryResult,
  27. )
  28. from materialize.output_consistency.query.query_template import QueryTemplate
  29. from materialize.output_consistency.selection.column_selection import (
  30. ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
  31. )
  32. from materialize.output_consistency.status.test_summary import ConsistencyTestSummary
  33. from materialize.output_consistency.validation.result_comparator import ResultComparator
  34. from materialize.output_consistency.validation.validation_outcome import (
  35. ValidationOutcome,
  36. ValidationVerdict,
  37. )
  38. class QueryExecutionManager:
  39. """Requests the execution of queries and handles transactions"""
  40. def __init__(
  41. self,
  42. evaluation_strategies: list[EvaluationStrategy],
  43. config: ConsistencyTestConfiguration,
  44. executors: SqlExecutors,
  45. comparator: ResultComparator,
  46. output_printer: OutputPrinter,
  47. ):
  48. self.evaluation_strategies = evaluation_strategies
  49. self.config = config
  50. self.executors = executors
  51. self.comparator = comparator
  52. self.output_printer = output_printer
  53. self.query_counter = 0
  54. def setup_database_objects(
  55. self,
  56. input_data: ConsistencyTestInputData,
  57. evaluation_strategies: list[EvaluationStrategy],
  58. ) -> None:
  59. self.output_printer.start_section("Setup code", collapsed=True)
  60. for strategy in evaluation_strategies:
  61. self.output_printer.print_info(
  62. f"Setup for evaluation strategy '{strategy.name}'"
  63. )
  64. executor = self.executors.get_executor(strategy)
  65. ddl_statements = strategy.generate_sources(
  66. input_data.types_input, self.config.vertical_join_tables
  67. )
  68. for sql_statement in ddl_statements:
  69. self.output_printer.print_sql(sql_statement)
  70. try:
  71. executor.ddl(sql_statement)
  72. except SqlExecutionError as e:
  73. self.output_printer.print_error(
  74. f"Setting up data structures failed ({e.message})!"
  75. )
  76. raise e
  77. def execute_query(
  78. self, query: QueryTemplate, summary_to_update: ConsistencyTestSummary
  79. ) -> bool:
  80. if self.query_counter % self.config.queries_per_tx == 0:
  81. # commit after every couple of queries
  82. for strategy in self.evaluation_strategies:
  83. self.begin_tx(strategy, commit_previous_tx=self.query_counter > 0)
  84. query_index = self.query_counter
  85. self.query_counter += 1
  86. test_outcomes = self.fire_and_compare_query(
  87. query, query_index, "", self.evaluation_strategies
  88. )
  89. all_comparisons_passed = True
  90. for test_outcome in test_outcomes:
  91. if test_outcome.verdict() == ValidationVerdict.FAILURE:
  92. all_comparisons_passed = False
  93. summary_to_update.accept_execution_result(
  94. query, test_outcome, self.output_printer.reproduction_code_printer
  95. )
  96. return all_comparisons_passed
  97. def complete(self, strategy: EvaluationStrategy) -> None:
  98. self.commit_tx(strategy)
  99. def begin_tx(self, strategy: EvaluationStrategy, commit_previous_tx: bool) -> None:
  100. if commit_previous_tx:
  101. self.commit_tx(strategy)
  102. self.executors.get_executor(strategy).before_new_tx()
  103. self.executors.get_executor(strategy).begin_tx("SERIALIZABLE")
  104. self.executors.get_executor(strategy).after_new_tx()
  105. def commit_tx(
  106. self,
  107. strategy: EvaluationStrategy,
  108. ) -> None:
  109. if not self.config.use_autocommit:
  110. self.executors.get_executor(strategy).commit()
  111. def rollback_tx(self, strategy: EvaluationStrategy, start_new_tx: bool) -> None:
  112. # do this also when in autocommit mode
  113. self.executors.get_executor(strategy).rollback()
  114. if start_new_tx:
  115. self.begin_tx(strategy, commit_previous_tx=False)
  116. # May return multiple outcomes if a query is split and retried. Will always return at least one outcome.
  117. def fire_and_compare_query(
  118. self,
  119. query_template: QueryTemplate,
  120. query_index: int,
  121. query_id_prefix: str,
  122. evaluation_strategies: list[EvaluationStrategy],
  123. ) -> list[ValidationOutcome]:
  124. query_no = query_index + 1
  125. query_id = f"{query_id_prefix}{query_no}"
  126. query_execution = QueryExecution(
  127. query_template, query_id, self.config.query_output_mode
  128. )
  129. if self.config.verbose_output:
  130. # print the header with the query before the execution to have information if it gets stuck
  131. self.print_query_header(query_id, query_execution, collapsed=True)
  132. for strategy in evaluation_strategies:
  133. sql_query_string = query_template.to_sql(
  134. strategy,
  135. QueryOutputFormat.SINGLE_LINE,
  136. ALL_QUERY_COLUMNS_BY_INDEX_SELECTION,
  137. self.config.query_output_mode,
  138. )
  139. start_time = datetime.now()
  140. try:
  141. self.executors.get_executor(strategy).before_query_execution()
  142. start_time = datetime.now()
  143. data = self.executors.get_executor(strategy).query(sql_query_string)
  144. duration = self._get_duration_in_ms(start_time)
  145. self.executors.get_executor(strategy).after_query_execution()
  146. result = QueryResult(
  147. strategy, sql_query_string, query_template.column_count(), data
  148. )
  149. query_execution.outcomes.append(result)
  150. query_execution.durations.append(duration)
  151. except SqlExecutionError as err:
  152. duration = self._get_duration_in_ms(start_time)
  153. self.rollback_tx(strategy, start_new_tx=True)
  154. if self.shall_retry_with_smaller_query(query_template):
  155. # abort and retry with smaller query
  156. # this will discard the outcomes of all strategies
  157. return self.split_and_retry_queries(
  158. query_template, query_id, evaluation_strategies
  159. )
  160. failure = QueryFailure(
  161. strategy, sql_query_string, query_template.column_count(), str(err)
  162. )
  163. query_execution.outcomes.append(failure)
  164. query_execution.durations.append(duration)
  165. if self.config.dry_run:
  166. return [ValidationOutcome()]
  167. validation_outcome = self.comparator.compare_results(query_execution)
  168. self.print_test_result(query_id, query_execution, validation_outcome)
  169. return [validation_outcome]
  170. def _get_duration_in_ms(self, start_time: datetime) -> float:
  171. end_time = datetime.now()
  172. duration = end_time - start_time
  173. return duration.total_seconds()
  174. def shall_retry_with_smaller_query(self, query_template: QueryTemplate) -> bool:
  175. return (
  176. self.config.split_and_retry_on_db_error
  177. and query_template.column_count() > 1
  178. )
  179. def split_and_retry_queries(
  180. self,
  181. original_query_template: QueryTemplate,
  182. query_id: str,
  183. evaluation_strategies: list[EvaluationStrategy],
  184. ) -> list[ValidationOutcome]:
  185. args_count = len(original_query_template.select_expressions)
  186. if args_count < 2:
  187. raise RuntimeError("Cannot split query")
  188. # This code assumes that the query failed because of the SELECT expressions.
  189. # However, it is also possible that the where condition was invalid.
  190. # This is ignored as of now.
  191. arg_split_index = int(args_count / 2)
  192. query1_select_expressions = original_query_template.select_expressions[
  193. arg_split_index:
  194. ]
  195. query2_select_expressions = original_query_template.select_expressions[
  196. :arg_split_index
  197. ]
  198. new_query_template1 = original_query_template.clone(
  199. False,
  200. query1_select_expressions,
  201. )
  202. new_query_template2 = original_query_template.clone(
  203. False,
  204. query2_select_expressions,
  205. )
  206. query_id_prefix = f"{query_id}."
  207. validation_outcomes = []
  208. validation_outcomes.extend(
  209. self.fire_and_compare_query(
  210. new_query_template1, 0, query_id_prefix, evaluation_strategies
  211. )
  212. )
  213. validation_outcomes.extend(
  214. self.fire_and_compare_query(
  215. new_query_template2, 1, query_id_prefix, evaluation_strategies
  216. )
  217. )
  218. return validation_outcomes
  219. def print_query_header(
  220. self,
  221. query_id: str,
  222. query_execution: QueryExecution,
  223. collapsed: bool,
  224. status: str | None = None,
  225. flush: bool = False,
  226. ) -> None:
  227. status = "" if status is None else f" ({status})"
  228. self.output_printer.start_section(
  229. f"Test query #{query_id}{status}", collapsed=collapsed
  230. )
  231. self.output_printer.print_sql(query_execution.generic_sql)
  232. if flush:
  233. self.output_printer.flush()
  234. def print_test_result(
  235. self,
  236. query_id: str,
  237. query_execution: QueryExecution,
  238. validation_outcome: ValidationOutcome,
  239. ) -> None:
  240. if (
  241. validation_outcome.verdict() == ValidationVerdict.SUCCESS
  242. and not self.config.verbose_output
  243. ):
  244. return
  245. status = validation_outcome.verdict().name
  246. if not self.config.verbose_output:
  247. # In verbose mode, the header has already been printed
  248. self.print_query_header(
  249. query_id,
  250. query_execution,
  251. collapsed=validation_outcome.verdict().accepted(),
  252. status=status,
  253. flush=True,
  254. )
  255. result_desc = "PASSED" if validation_outcome.verdict().accepted() else "FAILED"
  256. success_reason = (
  257. f" ({validation_outcome.success_reason})"
  258. if validation_outcome.success_reason is not None
  259. and validation_outcome.verdict().succeeded()
  260. else ""
  261. )
  262. self.output_printer.print_info(
  263. f"Test with query #{query_id} {result_desc}{success_reason}."
  264. )
  265. duration_info = ", ".join(
  266. f"{duration:.3f}" for duration in query_execution.durations
  267. )
  268. self.output_printer.print_info(f"Durations: {duration_info}")
  269. if validation_outcome.has_errors():
  270. self.output_printer.print_info(
  271. f"Errors:\n{validation_outcome.error_output()}"
  272. )
  273. if self.config.print_reproduction_code:
  274. self.output_printer.print_reproduction_code(validation_outcome.errors)
  275. if validation_outcome.has_warnings():
  276. self.output_printer.print_info(
  277. f"Warnings:\n{validation_outcome.warning_output()}"
  278. )
  279. if validation_outcome.has_remarks():
  280. self.output_printer.print_info(
  281. f"Remarks:\n{validation_outcome.remark_output()}"
  282. )