test_runner.py 8.9 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.output_consistency.common.configuration import (
  10. ConsistencyTestConfiguration,
  11. )
  12. from materialize.output_consistency.execution.evaluation_strategy import (
  13. EvaluationStrategy,
  14. )
  15. from materialize.output_consistency.execution.query_execution_manager import (
  16. QueryExecutionManager,
  17. )
  18. from materialize.output_consistency.execution.sql_executors import SqlExecutors
  19. from materialize.output_consistency.generators.expression_generator import (
  20. ExpressionGenerator,
  21. )
  22. from materialize.output_consistency.generators.query_generator import QueryGenerator
  23. from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
  24. GenericInconsistencyIgnoreFilter,
  25. )
  26. from materialize.output_consistency.input_data.test_input_data import (
  27. ConsistencyTestInputData,
  28. )
  29. from materialize.output_consistency.output.output_printer import OutputPrinter
  30. from materialize.output_consistency.runner.time_guard import TimeGuard
  31. from materialize.output_consistency.selection.randomized_picker import RandomizedPicker
  32. from materialize.output_consistency.status.test_summary import (
  33. ConsistencyTestSummary,
  34. )
  35. from materialize.output_consistency.validation.result_comparator import ResultComparator
  36. ENABLE_ADDING_WHERE_CONDITIONS = True
  37. class ConsistencyTestRunner:
  38. """Orchestrates the test execution"""
  39. def __init__(
  40. self,
  41. config: ConsistencyTestConfiguration,
  42. input_data: ConsistencyTestInputData,
  43. evaluation_strategies: list[EvaluationStrategy],
  44. expression_generator: ExpressionGenerator,
  45. query_generator: QueryGenerator,
  46. outcome_comparator: ResultComparator,
  47. sql_executors: SqlExecutors,
  48. randomized_picker: RandomizedPicker,
  49. ignore_filter: GenericInconsistencyIgnoreFilter,
  50. output_printer: OutputPrinter,
  51. ):
  52. self.config = config
  53. self.input_data = input_data
  54. self.evaluation_strategies = evaluation_strategies
  55. self.expression_generator = expression_generator
  56. self.query_generator = query_generator
  57. self.outcome_comparator = outcome_comparator
  58. self.execution_manager = QueryExecutionManager(
  59. evaluation_strategies,
  60. config,
  61. sql_executors,
  62. outcome_comparator,
  63. output_printer,
  64. )
  65. self.randomized_picker = randomized_picker
  66. self.ignore_filter = ignore_filter
  67. self.output_printer = output_printer
  68. def setup(self) -> None:
  69. self.input_data.assign_columns_to_tables(
  70. self.config.vertical_join_tables, self.randomized_picker
  71. )
  72. self.execution_manager.setup_database_objects(
  73. self.input_data, self.evaluation_strategies
  74. )
  75. # reset cache after having assigned columns to tables as a precaution
  76. self.input_data.types_input.cached_max_value_count_per_table_index.clear()
  77. def start(self) -> ConsistencyTestSummary:
  78. expression_count = 0
  79. test_summary = ConsistencyTestSummary(
  80. dry_run=self.config.dry_run,
  81. count_available_op_variants=self.input_data.count_available_op_variants(),
  82. count_available_data_types=self.input_data.count_available_data_types(),
  83. count_predefined_queries=self.input_data.count_predefined_queries(),
  84. )
  85. time_guard = TimeGuard(self.config.max_runtime_in_sec)
  86. if not self.config.disable_predefined_queries:
  87. self.output_printer.start_section("Running predefined queries")
  88. success = self._run_predefined_queries(test_summary)
  89. if not success and (
  90. test_summary.count_failures() >= self.config.max_failures_until_abort
  91. ):
  92. self.output_printer.print_info(
  93. f"Ending test run because {test_summary.count_failures()} failures occurred"
  94. )
  95. return test_summary
  96. else:
  97. self.output_printer.print_info(
  98. "Not running predefined queries because they are disabled"
  99. )
  100. self.output_printer.print_empty_line()
  101. self.output_printer.start_section("Running generated queries")
  102. while True:
  103. if expression_count > 0 and expression_count % 200 == 0:
  104. self.output_printer.print_status(
  105. f"Status: Expression {expression_count}..."
  106. f" (last executed query #{self.execution_manager.query_counter})"
  107. )
  108. if expression_count % 5000 == 0:
  109. self.output_printer.print_status(
  110. f"Random state verification: {self.expression_generator.randomized_picker.random_number(0, 10000)}"
  111. )
  112. operation = self.expression_generator.pick_random_operation(True)
  113. shall_abort_after_iteration = self._shall_abort(
  114. expression_count, time_guard
  115. )
  116. expression, number_of_args = (
  117. self.expression_generator.generate_expression_for_operation(operation)
  118. )
  119. test_summary.accept_expression_generation_statistics(
  120. operation, expression, number_of_args
  121. )
  122. if expression is None:
  123. continue
  124. self.query_generator.push_expression(expression)
  125. if (
  126. self.query_generator.shall_consume_queries()
  127. or shall_abort_after_iteration
  128. ):
  129. mismatch_occurred = self._consume_and_process_queries(
  130. test_summary, time_guard
  131. )
  132. if mismatch_occurred:
  133. shall_abort_after_iteration = True
  134. if time_guard.replied_abort_yes:
  135. shall_abort_after_iteration = True
  136. expression_count += 1
  137. if shall_abort_after_iteration:
  138. break
  139. for strategy in self.evaluation_strategies:
  140. self.execution_manager.complete(strategy)
  141. test_summary.count_generated_select_expressions = expression_count
  142. return test_summary
  143. def _consume_and_process_queries(
  144. self, test_summary: ConsistencyTestSummary, time_guard: TimeGuard
  145. ) -> bool:
  146. """
  147. :return: if the test run should be aborted
  148. """
  149. queries = self.query_generator.consume_queries(test_summary)
  150. for query in queries:
  151. if ENABLE_ADDING_WHERE_CONDITIONS:
  152. self.query_generator.add_random_where_condition_to_query(
  153. query, test_summary
  154. )
  155. success = self.execution_manager.execute_query(query, test_summary)
  156. if not success and (
  157. test_summary.count_failures() >= self.config.max_failures_until_abort
  158. ):
  159. self.output_printer.print_info(
  160. f"Ending test run because {test_summary.count_failures()} failures occurred"
  161. )
  162. return True
  163. if time_guard.shall_abort():
  164. self.output_printer.print_info(
  165. "Ending test run because the time elapsed"
  166. )
  167. return False
  168. return False
  169. def _shall_abort(self, iteration_count: int, time_guard: TimeGuard) -> bool:
  170. if (
  171. self.config.max_iterations != 0
  172. and iteration_count >= self.config.max_iterations
  173. ):
  174. self.output_printer.print_info(
  175. "Ending test run because the iteration count limit has been reached"
  176. )
  177. return True
  178. if time_guard.shall_abort():
  179. self.output_printer.print_info(
  180. "Ending test run because the maximum runtime has been reached"
  181. )
  182. return True
  183. return False
  184. def _run_predefined_queries(self, test_summary: ConsistencyTestSummary) -> bool:
  185. if len(self.input_data.predefined_queries) == 0:
  186. self.output_printer.print_status("No predefined queries exist")
  187. return True
  188. all_passed = True
  189. for predefined_query in self.input_data.predefined_queries:
  190. query_succeeded = self.execution_manager.execute_query(
  191. predefined_query, test_summary
  192. )
  193. all_passed = all_passed and query_succeeded
  194. if (
  195. not query_succeeded
  196. and test_summary.count_failures()
  197. >= self.config.max_failures_until_abort
  198. ):
  199. return False
  200. self.output_printer.print_status(
  201. f"Executed {len(self.input_data.predefined_queries)} predefined queries"
  202. )
  203. return all_passed