result_comparator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import math
  10. from decimal import Decimal
  11. from typing import Any, cast
  12. from materialize.output_consistency.execution.query_output_mode import QueryOutputMode
  13. from materialize.output_consistency.expression.expression import Expression
  14. from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
  15. GenericInconsistencyIgnoreFilter,
  16. )
  17. from materialize.output_consistency.query.query_result import (
  18. QueryExecution,
  19. QueryFailure,
  20. QueryOutcome,
  21. QueryResult,
  22. )
  23. from materialize.output_consistency.validation.error_message_normalizer import (
  24. ErrorMessageNormalizer,
  25. )
  26. from materialize.output_consistency.validation.validation_message import (
  27. ValidationError,
  28. ValidationErrorDetails,
  29. ValidationErrorType,
  30. ValidationRemark,
  31. ValidationWarning,
  32. )
  33. from materialize.output_consistency.validation.validation_outcome import (
  34. ValidationOutcome,
  35. )
  36. class ResultComparator:
  37. """Compares the outcome (result or failure) of multiple query executions"""
  38. def __init__(
  39. self,
  40. ignore_filter: GenericInconsistencyIgnoreFilter,
  41. error_message_normalizer: ErrorMessageNormalizer,
  42. ):
  43. self.ignore_filter = ignore_filter
  44. self.error_message_normalizer = error_message_normalizer
  45. def compare_results(self, query_execution: QueryExecution) -> ValidationOutcome:
  46. validation_outcome = ValidationOutcome()
  47. if len(query_execution.outcomes) == 0:
  48. raise RuntimeError("Contains no outcomes!")
  49. if len(query_execution.outcomes) == 1:
  50. raise RuntimeError("Contains only one outcome, nothing to compare against!")
  51. if query_execution.query_template.expect_error:
  52. validation_outcome.add_remark(ValidationRemark("DB error is possible"))
  53. self.validate_outcomes_metadata(query_execution, validation_outcome)
  54. if not validation_outcome.verdict().succeeded():
  55. # do not continue with value comparison if metadata differs
  56. return validation_outcome
  57. # this statement must not be before the metadata validation (otherwise successful of the outcomes may differ)
  58. validation_outcome.query_execution_succeeded_in_all_strategies = (
  59. query_execution.outcomes[0].successful
  60. )
  61. if validation_outcome.query_execution_succeeded_in_all_strategies:
  62. self.validate_outcomes_data(query_execution, validation_outcome)
  63. if query_execution.query_output_mode in {
  64. QueryOutputMode.EXPLAIN,
  65. QueryOutputMode.EXPLAIN_PHYSICAL,
  66. }:
  67. validation_outcome.success_reason = "explain plan matches"
  68. else:
  69. validation_outcome.success_reason = "result data matches"
  70. else:
  71. # error messages were already validated at metadata validation
  72. validation_outcome.success_reason = "error message matches"
  73. return validation_outcome
  74. def validate_outcomes_metadata(
  75. self, query_execution: QueryExecution, validation_outcome: ValidationOutcome
  76. ) -> None:
  77. outcomes = query_execution.outcomes
  78. outcome1 = outcomes[0]
  79. for index in range(1, len(outcomes)):
  80. self.validate_metadata_of_two_outcomes(
  81. query_execution, outcome1, outcomes[index], validation_outcome
  82. )
  83. def validate_metadata_of_two_outcomes(
  84. self,
  85. query_execution: QueryExecution,
  86. outcome1: QueryOutcome,
  87. outcome2: QueryOutcome,
  88. validation_outcome: ValidationOutcome,
  89. ) -> None:
  90. if outcome1.successful != outcome2.successful:
  91. expression = self._expression_if_only_one_in_query(query_execution)
  92. validation_outcome.add_error(
  93. self.ignore_filter,
  94. ValidationError(
  95. query_execution,
  96. ValidationErrorType.SUCCESS_MISMATCH,
  97. "Outcome differs",
  98. details1=ValidationErrorDetails(
  99. strategy=outcome1.strategy,
  100. value=outcome1.__class__.__name__,
  101. sql=outcome1.sql,
  102. sql_error=(
  103. outcome1.error_message
  104. if isinstance(outcome1, QueryFailure)
  105. else None
  106. ),
  107. ),
  108. details2=ValidationErrorDetails(
  109. strategy=outcome2.strategy,
  110. value=outcome2.__class__.__name__,
  111. sql=outcome2.sql,
  112. sql_error=(
  113. outcome2.error_message
  114. if isinstance(outcome2, QueryFailure)
  115. else None
  116. ),
  117. ),
  118. concerned_expression=expression,
  119. ),
  120. )
  121. return
  122. both_successful = outcome1.successful and outcome2.successful
  123. both_failed = not outcome1.successful and not outcome2.successful
  124. if both_successful:
  125. self.validate_row_count(
  126. query_execution,
  127. cast(QueryResult, outcome1),
  128. cast(QueryResult, outcome2),
  129. validation_outcome,
  130. )
  131. # this needs will no longer be sensible when more than two evaluation strategies are used
  132. self.remark_on_success_with_single_column(outcome1, validation_outcome)
  133. if both_failed and self.shall_validate_error_message(query_execution):
  134. failure1 = cast(QueryFailure, outcome1)
  135. self.validate_error_messages(
  136. query_execution,
  137. failure1,
  138. cast(QueryFailure, outcome2),
  139. validation_outcome,
  140. )
  141. if not both_successful:
  142. any_failure = cast(
  143. QueryFailure, outcome1 if not outcome1.successful else outcome2
  144. )
  145. validation_outcome.add_remark(
  146. ValidationRemark(
  147. f"DB error in '{any_failure.strategy.name}' was: {any_failure.error_message}"
  148. )
  149. )
  150. self.warn_on_failure_with_multiple_columns(any_failure, validation_outcome)
  151. def shall_validate_error_message(self, query_execution: QueryExecution) -> bool:
  152. return not query_execution.query_template.disable_error_message_validation
  153. def validate_row_count(
  154. self,
  155. query_execution: QueryExecution,
  156. result1: QueryResult,
  157. result2: QueryResult,
  158. validation_outcome: ValidationOutcome,
  159. ) -> None:
  160. # It is ok if both results don't have any rows.
  161. num_rows1 = len(result1.result_rows)
  162. num_rows2 = len(result2.result_rows)
  163. if num_rows1 != num_rows2:
  164. expression = self._expression_if_only_one_in_query(query_execution)
  165. validation_outcome.add_error(
  166. self.ignore_filter,
  167. ValidationError(
  168. query_execution,
  169. ValidationErrorType.ROW_COUNT_MISMATCH,
  170. "Row count differs",
  171. details1=ValidationErrorDetails(
  172. strategy=result1.strategy, value=str(num_rows1), sql=result1.sql
  173. ),
  174. details2=ValidationErrorDetails(
  175. strategy=result2.strategy, value=str(num_rows2), sql=result2.sql
  176. ),
  177. concerned_expression=expression,
  178. ),
  179. )
  180. def validate_error_messages(
  181. self,
  182. query_execution: QueryExecution,
  183. failure1: QueryFailure,
  184. failure2: QueryFailure,
  185. validation_outcome: ValidationOutcome,
  186. ) -> None:
  187. norm_error_message_1 = self.error_message_normalizer.normalize(
  188. failure1.error_message
  189. )
  190. norm_error_message_2 = self.error_message_normalizer.normalize(
  191. failure2.error_message
  192. )
  193. if norm_error_message_1 != norm_error_message_2:
  194. expression = self._expression_if_only_one_in_query(query_execution)
  195. validation_outcome.add_error(
  196. self.ignore_filter,
  197. ValidationError(
  198. query_execution,
  199. ValidationErrorType.ERROR_MISMATCH,
  200. "Error message differs",
  201. details1=ValidationErrorDetails(
  202. strategy=failure1.strategy,
  203. value=norm_error_message_1,
  204. sql=failure1.sql,
  205. sql_error=failure1.error_message,
  206. ),
  207. details2=ValidationErrorDetails(
  208. strategy=failure2.strategy,
  209. value=norm_error_message_2,
  210. sql=failure2.sql,
  211. sql_error=failure2.error_message,
  212. ),
  213. concerned_expression=expression,
  214. ),
  215. )
  216. def warn_on_failure_with_multiple_columns(
  217. self,
  218. failure: QueryOutcome,
  219. validation_outcome: ValidationOutcome,
  220. ) -> None:
  221. if failure.query_column_count > 1:
  222. # this should not occur if the config property 'split_and_retry_on_db_error' is enabled
  223. validation_outcome.add_warning(
  224. ValidationWarning(
  225. "Query error with multiple columns",
  226. "Query expected to return an error should contain only one colum.",
  227. sql=failure.sql,
  228. )
  229. )
  230. def remark_on_success_with_single_column(
  231. self,
  232. result: QueryOutcome,
  233. validation_outcome: ValidationOutcome,
  234. ) -> None:
  235. if result.query_column_count == 1:
  236. validation_outcome.add_remark(
  237. ValidationRemark(
  238. "Query success with single column",
  239. "Query successfully returning a value could be run with other queries.",
  240. sql=result.sql,
  241. )
  242. )
  243. def validate_outcomes_data(
  244. self,
  245. query_execution: QueryExecution,
  246. validation_outcome: ValidationOutcome,
  247. ) -> None:
  248. # each outcome is known to have the same number of rows
  249. # each row is supposed to have the same number of columns
  250. outcomes = query_execution.outcomes
  251. result1 = cast(QueryResult, outcomes[0])
  252. if len(result1.result_rows) == 0:
  253. # this is a valid case; all outcomes have the same number of rows
  254. return
  255. for index in range(1, len(outcomes)):
  256. other_result = cast(QueryResult, outcomes[index])
  257. if query_execution.query_output_mode in {
  258. QueryOutputMode.EXPLAIN,
  259. QueryOutputMode.EXPLAIN_PHYSICAL,
  260. }:
  261. self.validate_explain_plan(
  262. query_execution, result1, other_result, validation_outcome
  263. )
  264. else:
  265. self.validate_data_of_two_outcomes(
  266. query_execution, result1, other_result, validation_outcome
  267. )
  268. def validate_data_of_two_outcomes(
  269. self,
  270. query_execution: QueryExecution,
  271. outcome1: QueryResult,
  272. outcome2: QueryResult,
  273. validation_outcome: ValidationOutcome,
  274. ) -> None:
  275. num_columns1 = len(outcome1.result_rows[0])
  276. num_columns2 = len(outcome2.result_rows[0])
  277. if num_columns1 == 0:
  278. raise RuntimeError("Result contains no columns!")
  279. if num_columns1 != num_columns2:
  280. raise RuntimeError("Results contain a different number of columns!")
  281. if num_columns1 != len(query_execution.query_template.select_expressions):
  282. # This would happen with the disabled .* operator on a row() function
  283. raise RuntimeError(
  284. "Number of columns in the result does not match the number of select expressions!"
  285. )
  286. for col_index in range(0, num_columns1):
  287. self.validate_column(
  288. query_execution, outcome1, outcome2, col_index, validation_outcome
  289. )
  290. def validate_column(
  291. self,
  292. query_execution: QueryExecution,
  293. result1: QueryResult,
  294. result2: QueryResult,
  295. col_index: int,
  296. validation_outcome: ValidationOutcome,
  297. ) -> None:
  298. # both results are known to be not empty and have the same number of rows
  299. row_length = len(result1.result_rows)
  300. column_values1 = []
  301. column_values2 = []
  302. expression = query_execution.query_template.select_expressions[col_index]
  303. for row_index in range(0, row_length):
  304. column_values1.append(result1.result_rows[row_index][col_index])
  305. column_values2.append(result2.result_rows[row_index][col_index])
  306. if self.ignore_row_order(expression):
  307. column_values1 = self._sort_column_values(column_values1)
  308. column_values2 = self._sort_column_values(column_values2)
  309. for row_index in range(0, row_length):
  310. result_value1 = column_values1[row_index]
  311. result_value2 = column_values2[row_index]
  312. if not self.is_value_equal(result_value1, result_value2, expression):
  313. error_type = ValidationErrorType.CONTENT_MISMATCH
  314. error_message = "Value differs"
  315. elif not self.is_type_equal(result_value1, result_value2):
  316. # check the type after the value because it has a lower relevance
  317. error_type = ValidationErrorType.CONTENT_TYPE_MISMATCH
  318. result_value1 = type(result_value1)
  319. result_value2 = type(result_value2)
  320. error_message = "Value type differs"
  321. else:
  322. continue
  323. validation_outcome.add_error(
  324. self.ignore_filter,
  325. ValidationError(
  326. query_execution,
  327. error_type,
  328. error_message,
  329. details1=ValidationErrorDetails(
  330. strategy=result1.strategy, value=result_value1, sql=result1.sql
  331. ),
  332. details2=ValidationErrorDetails(
  333. strategy=result2.strategy, value=result_value2, sql=result2.sql
  334. ),
  335. col_index=col_index,
  336. concerned_expression=expression,
  337. location=f"row index {row_index}, column index {col_index}",
  338. ),
  339. )
  340. def validate_explain_plan(
  341. self,
  342. query_execution: QueryExecution,
  343. outcome1: QueryResult,
  344. outcome2: QueryResult,
  345. validation_outcome: ValidationOutcome,
  346. ) -> None:
  347. num_columns1 = len(outcome1.result_rows[0])
  348. num_columns2 = len(outcome2.result_rows[0])
  349. assert num_columns1 == 1
  350. assert num_columns2 == 1
  351. explain_plan1 = outcome1.result_rows[0][0]
  352. explain_plan2 = outcome2.result_rows[0][0]
  353. for data_source in query_execution.query_template.get_all_data_sources():
  354. new_source_name = f"<db_object-{data_source.table_index or 1}>"
  355. explain_plan1 = explain_plan1.replace(
  356. data_source.get_db_object_name(
  357. outcome1.strategy.get_db_object_name(
  358. query_execution.query_template.storage_layout,
  359. data_source=data_source,
  360. ),
  361. ),
  362. new_source_name,
  363. )
  364. explain_plan2 = explain_plan2.replace(
  365. data_source.get_db_object_name(
  366. outcome2.strategy.get_db_object_name(
  367. query_execution.query_template.storage_layout,
  368. data_source=data_source,
  369. ),
  370. ),
  371. new_source_name,
  372. )
  373. if explain_plan1 == explain_plan2:
  374. return
  375. expression = self._expression_if_only_one_in_query(query_execution)
  376. validation_outcome.add_error(
  377. self.ignore_filter,
  378. ValidationError(
  379. query_execution,
  380. ValidationErrorType.EXPLAIN_PLAN_MISMATCH,
  381. "Explain plan differs",
  382. details1=ValidationErrorDetails(
  383. strategy=outcome1.strategy, value=explain_plan1, sql=outcome1.sql
  384. ),
  385. details2=ValidationErrorDetails(
  386. strategy=outcome2.strategy, value=explain_plan2, sql=outcome2.sql
  387. ),
  388. concerned_expression=expression,
  389. ),
  390. )
  391. def is_type_equal(self, value1: Any, value2: Any) -> bool:
  392. if value1 is None or value2 is None:
  393. # ignore None values
  394. return True
  395. return type(value1) == type(value2)
  396. def is_value_equal(
  397. self,
  398. value1: Any,
  399. value2: Any,
  400. expression: Expression,
  401. is_tolerant: bool = False,
  402. ) -> bool:
  403. if value1 == value2:
  404. return True
  405. if (isinstance(value1, list) and isinstance(value2, list)) or (
  406. isinstance(value1, tuple) and isinstance(value2, tuple)
  407. ):
  408. return self.is_list_or_tuple_equal(value1, value2, expression)
  409. if isinstance(value1, dict) and isinstance(value2, dict):
  410. return self.is_dict_equal(value1, value2, expression)
  411. if isinstance(value1, Decimal) and isinstance(value2, Decimal):
  412. if value1.is_nan() and value2.is_nan():
  413. return True
  414. else:
  415. return value1 == value2
  416. if isinstance(value1, float) and isinstance(value2, float):
  417. if math.isnan(value1) and math.isnan(value2):
  418. return True
  419. else:
  420. return value1 == value2
  421. return False
  422. def is_list_or_tuple_equal(
  423. self,
  424. collection1: list[Any] | tuple[Any],
  425. collection2: list[Any] | tuple[Any],
  426. expression: Expression,
  427. ) -> bool:
  428. if len(collection1) != len(collection2):
  429. return False
  430. if (
  431. self.ignore_order_when_comparing_collection(expression)
  432. and self._can_be_sorted(collection1)
  433. and self._can_be_sorted(collection2)
  434. ):
  435. collection1 = sorted(collection1)
  436. collection2 = sorted(collection2)
  437. for value1, value2 in zip(collection1, collection2):
  438. # use is_tolerant because tuples may contain all values as strings
  439. if not self.is_value_equal(value1, value2, expression, is_tolerant=True):
  440. return False
  441. return True
  442. def is_dict_equal(
  443. self,
  444. dict1: dict[Any, Any],
  445. dict2: dict[Any, Any],
  446. expression: Expression,
  447. ) -> bool:
  448. if len(dict1) != len(dict2):
  449. return False
  450. if not self.is_value_equal(dict1.keys(), dict2.keys(), expression):
  451. return False
  452. for key in dict1.keys():
  453. if not self.is_value_equal(dict1[key], dict2[key], expression):
  454. return False
  455. return True
  456. def ignore_row_order(self, expression: Expression) -> bool:
  457. return False
  458. def ignore_order_when_comparing_collection(self, expression: Expression) -> bool:
  459. return False
  460. def _can_be_sorted(self, collection: list[Any] | tuple[Any]) -> bool:
  461. for element in collection:
  462. if isinstance(element, dict):
  463. return False
  464. return True
  465. def _expression_if_only_one_in_query(
  466. self, query_execution: QueryExecution
  467. ) -> Expression | None:
  468. if len(query_execution.query_template.select_expressions) == 1:
  469. return query_execution.query_template.select_expressions[0]
  470. return None
  471. def _sort_column_values(self, column_values: list[Any]) -> list[Any]:
  472. # needed because, for example, None values have no order
  473. def sort_key(value: Any) -> Any:
  474. return str(value)
  475. return sorted(column_values, key=sort_key)