# Copyright Materialize, Inc. and contributors. All rights reserved. # # Use of this software is governed by the Business Source License # included in the LICENSE file at the root of this repository. # # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. from datetime import datetime from materialize.output_consistency.common.configuration import ( ConsistencyTestConfiguration, ) from materialize.output_consistency.execution.evaluation_strategy import ( EvaluationStrategy, ) from materialize.output_consistency.execution.sql_executor import SqlExecutionError from materialize.output_consistency.execution.sql_executors import SqlExecutors from materialize.output_consistency.input_data.test_input_data import ( ConsistencyTestInputData, ) from materialize.output_consistency.output.output_printer import OutputPrinter from materialize.output_consistency.query.query_format import QueryOutputFormat from materialize.output_consistency.query.query_result import ( QueryExecution, QueryFailure, QueryResult, ) from materialize.output_consistency.query.query_template import QueryTemplate from materialize.output_consistency.selection.column_selection import ( ALL_QUERY_COLUMNS_BY_INDEX_SELECTION, ) from materialize.output_consistency.status.test_summary import ConsistencyTestSummary from materialize.output_consistency.validation.result_comparator import ResultComparator from materialize.output_consistency.validation.validation_outcome import ( ValidationOutcome, ValidationVerdict, ) class QueryExecutionManager: """Requests the execution of queries and handles transactions""" def __init__( self, evaluation_strategies: list[EvaluationStrategy], config: ConsistencyTestConfiguration, executors: SqlExecutors, comparator: ResultComparator, output_printer: OutputPrinter, ): self.evaluation_strategies = evaluation_strategies self.config = config self.executors = executors self.comparator = comparator self.output_printer = output_printer self.query_counter = 0 def setup_database_objects( self, input_data: ConsistencyTestInputData, evaluation_strategies: list[EvaluationStrategy], ) -> None: self.output_printer.start_section("Setup code", collapsed=True) for strategy in evaluation_strategies: self.output_printer.print_info( f"Setup for evaluation strategy '{strategy.name}'" ) executor = self.executors.get_executor(strategy) ddl_statements = strategy.generate_sources( input_data.types_input, self.config.vertical_join_tables ) for sql_statement in ddl_statements: self.output_printer.print_sql(sql_statement) try: executor.ddl(sql_statement) except SqlExecutionError as e: self.output_printer.print_error( f"Setting up data structures failed ({e.message})!" ) raise e def execute_query( self, query: QueryTemplate, summary_to_update: ConsistencyTestSummary ) -> bool: if self.query_counter % self.config.queries_per_tx == 0: # commit after every couple of queries for strategy in self.evaluation_strategies: self.begin_tx(strategy, commit_previous_tx=self.query_counter > 0) query_index = self.query_counter self.query_counter += 1 test_outcomes = self.fire_and_compare_query( query, query_index, "", self.evaluation_strategies ) all_comparisons_passed = True for test_outcome in test_outcomes: if test_outcome.verdict() == ValidationVerdict.FAILURE: all_comparisons_passed = False summary_to_update.accept_execution_result( query, test_outcome, self.output_printer.reproduction_code_printer ) return all_comparisons_passed def complete(self, strategy: EvaluationStrategy) -> None: self.commit_tx(strategy) def begin_tx(self, strategy: EvaluationStrategy, commit_previous_tx: bool) -> None: if commit_previous_tx: self.commit_tx(strategy) self.executors.get_executor(strategy).before_new_tx() self.executors.get_executor(strategy).begin_tx("SERIALIZABLE") self.executors.get_executor(strategy).after_new_tx() def commit_tx( self, strategy: EvaluationStrategy, ) -> None: if not self.config.use_autocommit: self.executors.get_executor(strategy).commit() def rollback_tx(self, strategy: EvaluationStrategy, start_new_tx: bool) -> None: # do this also when in autocommit mode self.executors.get_executor(strategy).rollback() if start_new_tx: self.begin_tx(strategy, commit_previous_tx=False) # May return multiple outcomes if a query is split and retried. Will always return at least one outcome. def fire_and_compare_query( self, query_template: QueryTemplate, query_index: int, query_id_prefix: str, evaluation_strategies: list[EvaluationStrategy], ) -> list[ValidationOutcome]: query_no = query_index + 1 query_id = f"{query_id_prefix}{query_no}" query_execution = QueryExecution( query_template, query_id, self.config.query_output_mode ) if self.config.verbose_output: # print the header with the query before the execution to have information if it gets stuck self.print_query_header(query_id, query_execution, collapsed=True) for strategy in evaluation_strategies: sql_query_string = query_template.to_sql( strategy, QueryOutputFormat.SINGLE_LINE, ALL_QUERY_COLUMNS_BY_INDEX_SELECTION, self.config.query_output_mode, ) start_time = datetime.now() try: self.executors.get_executor(strategy).before_query_execution() start_time = datetime.now() data = self.executors.get_executor(strategy).query(sql_query_string) duration = self._get_duration_in_ms(start_time) self.executors.get_executor(strategy).after_query_execution() result = QueryResult( strategy, sql_query_string, query_template.column_count(), data ) query_execution.outcomes.append(result) query_execution.durations.append(duration) except SqlExecutionError as err: duration = self._get_duration_in_ms(start_time) self.rollback_tx(strategy, start_new_tx=True) if self.shall_retry_with_smaller_query(query_template): # abort and retry with smaller query # this will discard the outcomes of all strategies return self.split_and_retry_queries( query_template, query_id, evaluation_strategies ) failure = QueryFailure( strategy, sql_query_string, query_template.column_count(), str(err) ) query_execution.outcomes.append(failure) query_execution.durations.append(duration) if self.config.dry_run: return [ValidationOutcome()] validation_outcome = self.comparator.compare_results(query_execution) self.print_test_result(query_id, query_execution, validation_outcome) return [validation_outcome] def _get_duration_in_ms(self, start_time: datetime) -> float: end_time = datetime.now() duration = end_time - start_time return duration.total_seconds() def shall_retry_with_smaller_query(self, query_template: QueryTemplate) -> bool: return ( self.config.split_and_retry_on_db_error and query_template.column_count() > 1 ) def split_and_retry_queries( self, original_query_template: QueryTemplate, query_id: str, evaluation_strategies: list[EvaluationStrategy], ) -> list[ValidationOutcome]: args_count = len(original_query_template.select_expressions) if args_count < 2: raise RuntimeError("Cannot split query") # This code assumes that the query failed because of the SELECT expressions. # However, it is also possible that the where condition was invalid. # This is ignored as of now. arg_split_index = int(args_count / 2) query1_select_expressions = original_query_template.select_expressions[ arg_split_index: ] query2_select_expressions = original_query_template.select_expressions[ :arg_split_index ] new_query_template1 = original_query_template.clone( False, query1_select_expressions, ) new_query_template2 = original_query_template.clone( False, query2_select_expressions, ) query_id_prefix = f"{query_id}." validation_outcomes = [] validation_outcomes.extend( self.fire_and_compare_query( new_query_template1, 0, query_id_prefix, evaluation_strategies ) ) validation_outcomes.extend( self.fire_and_compare_query( new_query_template2, 1, query_id_prefix, evaluation_strategies ) ) return validation_outcomes def print_query_header( self, query_id: str, query_execution: QueryExecution, collapsed: bool, status: str | None = None, flush: bool = False, ) -> None: status = "" if status is None else f" ({status})" self.output_printer.start_section( f"Test query #{query_id}{status}", collapsed=collapsed ) self.output_printer.print_sql(query_execution.generic_sql) if flush: self.output_printer.flush() def print_test_result( self, query_id: str, query_execution: QueryExecution, validation_outcome: ValidationOutcome, ) -> None: if ( validation_outcome.verdict() == ValidationVerdict.SUCCESS and not self.config.verbose_output ): return status = validation_outcome.verdict().name if not self.config.verbose_output: # In verbose mode, the header has already been printed self.print_query_header( query_id, query_execution, collapsed=validation_outcome.verdict().accepted(), status=status, flush=True, ) result_desc = "PASSED" if validation_outcome.verdict().accepted() else "FAILED" success_reason = ( f" ({validation_outcome.success_reason})" if validation_outcome.success_reason is not None and validation_outcome.verdict().succeeded() else "" ) self.output_printer.print_info( f"Test with query #{query_id} {result_desc}{success_reason}." ) duration_info = ", ".join( f"{duration:.3f}" for duration in query_execution.durations ) self.output_printer.print_info(f"Durations: {duration_info}") if validation_outcome.has_errors(): self.output_printer.print_info( f"Errors:\n{validation_outcome.error_output()}" ) if self.config.print_reproduction_code: self.output_printer.print_reproduction_code(validation_outcome.errors) if validation_outcome.has_warnings(): self.output_printer.print_info( f"Warnings:\n{validation_outcome.warning_output()}" ) if validation_outcome.has_remarks(): self.output_printer.print_info( f"Remarks:\n{validation_outcome.remark_output()}" )