query_generator.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.output_consistency.common import probability
  10. from materialize.output_consistency.common.configuration import (
  11. ConsistencyTestConfiguration,
  12. )
  13. from materialize.output_consistency.data_value.data_column import DataColumn
  14. from materialize.output_consistency.execution.value_storage_layout import (
  15. ValueStorageLayout,
  16. )
  17. from materialize.output_consistency.expression.expression import Expression
  18. from materialize.output_consistency.expression.expression_with_args import (
  19. ExpressionWithArgs,
  20. )
  21. from materialize.output_consistency.generators.expression_generator import (
  22. ExpressionGenerator,
  23. )
  24. from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
  25. GenericInconsistencyIgnoreFilter,
  26. )
  27. from materialize.output_consistency.ignore_filter.internal_output_inconsistency_ignore_filter import (
  28. YesIgnore,
  29. )
  30. from materialize.output_consistency.input_data.constants.constant_expressions import (
  31. TRUE_EXPRESSION,
  32. )
  33. from materialize.output_consistency.input_data.operations.boolean_operations_provider import (
  34. NOT_OPERATION,
  35. )
  36. from materialize.output_consistency.input_data.operations.generic_operations_provider import (
  37. IS_NULL_OPERATION,
  38. )
  39. from materialize.output_consistency.input_data.test_input_data import (
  40. ConsistencyTestInputData,
  41. )
  42. from materialize.output_consistency.query.additional_source import (
  43. AdditionalSource,
  44. as_data_sources,
  45. )
  46. from materialize.output_consistency.query.data_source import (
  47. DataSource,
  48. )
  49. from materialize.output_consistency.query.join import JoinTarget
  50. from materialize.output_consistency.query.query_template import QueryTemplate
  51. from materialize.output_consistency.selection.randomized_picker import RandomizedPicker
  52. from materialize.output_consistency.selection.row_selection import (
  53. ALL_ROWS_SELECTION,
  54. DataRowSelection,
  55. )
  56. from materialize.output_consistency.status.consistency_test_logger import (
  57. ConsistencyTestLogger,
  58. )
  59. from materialize.output_consistency.status.test_summary import ConsistencyTestSummary
  60. class QueryGenerator:
  61. """Generates query templates based on expressions"""
  62. def __init__(
  63. self,
  64. config: ConsistencyTestConfiguration,
  65. randomized_picker: RandomizedPicker,
  66. input_data: ConsistencyTestInputData,
  67. expression_generator: ExpressionGenerator,
  68. ignore_filter: GenericInconsistencyIgnoreFilter,
  69. ):
  70. self.config = config
  71. self.randomized_picker = randomized_picker
  72. self.input_data = input_data
  73. self.expression_generator = expression_generator
  74. self.ignore_filter = ignore_filter
  75. self.count_pending_expressions = 0
  76. # ONE query PER expression using the storage layout specified in the expression, expressions presumably fail
  77. self.any_layout_presumably_failing_expressions: list[Expression] = []
  78. # ONE query FOR ALL expressions accessing the horizontal storage layout; expressions presumably succeed and do
  79. # not contain aggregations
  80. self.horizontal_layout_normal_expressions: list[Expression] = []
  81. # ONE query FOR ALL expressions accessing the horizontal storage layout and applying aggregations; expressions
  82. # presumably succeed
  83. self.horizontal_layout_aggregate_expressions: list[Expression] = []
  84. # ONE query FOR ALL expressions accessing the vertical storage layout; expressions presumably succeed and do not
  85. # contain aggregations
  86. self.vertical_layout_normal_expressions: list[Expression] = []
  87. # ONE query FOR ALL expressions accessing the vertical storage layout and applying aggregations; expressions
  88. # presumably succeed
  89. self.vertical_layout_aggregate_expressions: list[Expression] = []
  90. def push_expression(self, expression: Expression) -> None:
  91. if expression.is_expect_error:
  92. self.any_layout_presumably_failing_expressions.append(expression)
  93. return
  94. if expression.storage_layout == ValueStorageLayout.ANY:
  95. # does not matter, could be taken by all
  96. self.vertical_layout_normal_expressions.append(expression)
  97. elif expression.storage_layout == ValueStorageLayout.HORIZONTAL:
  98. if expression.is_aggregate:
  99. self.horizontal_layout_aggregate_expressions.append(expression)
  100. else:
  101. self.horizontal_layout_normal_expressions.append(expression)
  102. elif expression.storage_layout == ValueStorageLayout.VERTICAL:
  103. if expression.is_aggregate:
  104. self.vertical_layout_aggregate_expressions.append(expression)
  105. else:
  106. self.vertical_layout_normal_expressions.append(expression)
  107. else:
  108. raise RuntimeError(f"Unknown storage layout: {expression.storage_layout}")
  109. self.count_pending_expressions += 1
  110. def shall_consume_queries(self) -> bool:
  111. return self.count_pending_expressions > self.config.max_pending_expressions
  112. def consume_queries(
  113. self,
  114. test_summary: ConsistencyTestSummary,
  115. ) -> list[QueryTemplate]:
  116. queries = []
  117. queries.extend(
  118. self._create_multi_column_queries(
  119. test_summary,
  120. self.horizontal_layout_normal_expressions,
  121. False,
  122. ValueStorageLayout.HORIZONTAL,
  123. False,
  124. )
  125. )
  126. queries.extend(
  127. self._create_multi_column_queries(
  128. test_summary,
  129. self.horizontal_layout_aggregate_expressions,
  130. False,
  131. ValueStorageLayout.HORIZONTAL,
  132. True,
  133. )
  134. )
  135. queries.extend(
  136. self._create_multi_column_queries(
  137. test_summary,
  138. self.vertical_layout_normal_expressions,
  139. False,
  140. ValueStorageLayout.VERTICAL,
  141. False,
  142. )
  143. )
  144. queries.extend(
  145. self._create_multi_column_queries(
  146. test_summary,
  147. self.vertical_layout_aggregate_expressions,
  148. False,
  149. ValueStorageLayout.VERTICAL,
  150. True,
  151. )
  152. )
  153. queries.extend(
  154. self._create_single_column_queries(
  155. test_summary, self.any_layout_presumably_failing_expressions
  156. )
  157. )
  158. self.reset_state()
  159. return queries
  160. def add_random_where_condition_to_query(
  161. self, query: QueryTemplate, test_summary: ConsistencyTestSummary
  162. ) -> None:
  163. if not self.randomized_picker.random_boolean(
  164. probability.GENERATE_WHERE_EXPRESSION
  165. ):
  166. return
  167. where_expression = self.expression_generator.generate_boolean_expression(
  168. False, query.storage_layout
  169. )
  170. if where_expression is None:
  171. return
  172. ignore_verdict = self.ignore_filter.shall_ignore_expression(
  173. where_expression, query.row_selection
  174. )
  175. if isinstance(ignore_verdict, YesIgnore):
  176. test_summary.record_ignore_reason_usage(ignore_verdict.reason)
  177. else:
  178. query.where_expression = where_expression
  179. self._assign_random_sources(
  180. query.get_all_data_sources(), [query.where_expression]
  181. )
  182. def _create_multi_column_queries(
  183. self,
  184. test_summary: ConsistencyTestSummary,
  185. expressions: list[Expression],
  186. expect_error: bool,
  187. storage_layout: ValueStorageLayout,
  188. contains_aggregations: bool,
  189. ) -> list[QueryTemplate]:
  190. """Creates queries not exceeding the maximum column count"""
  191. if len(expressions) == 0:
  192. return []
  193. queries = []
  194. for offset_index in range(0, len(expressions), self.config.max_cols_per_query):
  195. expressions = expressions[
  196. offset_index : offset_index + self.config.max_cols_per_query
  197. ]
  198. data_source, additional_sources = self._select_sources(
  199. storage_layout, test_summary
  200. )
  201. self._assign_random_sources(
  202. [data_source] + as_data_sources(additional_sources),
  203. expressions,
  204. contains_aggregations,
  205. )
  206. row_selection = self._select_rows(
  207. storage_layout, [data_source] + as_data_sources(additional_sources)
  208. )
  209. expressions = self._remove_known_inconsistencies(
  210. test_summary, expressions, row_selection
  211. )
  212. if len(expressions) == 0:
  213. continue
  214. if self.randomized_picker.random_boolean(
  215. probability.NO_SOURCE_MINIMIZATION
  216. ):
  217. # do not minimize sources to catch errors like database-issues#8463
  218. pass
  219. else:
  220. # remove sources that are not used by any (remaining) expression
  221. data_source, additional_sources = self.minimize_sources(
  222. data_source, additional_sources, expressions
  223. )
  224. row_selection.trim_to_minimized_sources(
  225. [data_source] + as_data_sources(additional_sources)
  226. )
  227. uses_joins = len(additional_sources) > 0
  228. query = QueryTemplate(
  229. expect_error,
  230. expressions,
  231. None,
  232. storage_layout,
  233. data_source,
  234. contains_aggregations,
  235. row_selection,
  236. offset=self._generate_offset(
  237. storage_layout,
  238. data_source,
  239. uses_joins=uses_joins,
  240. contains_aggregations=contains_aggregations,
  241. ),
  242. limit=self._generate_limit(
  243. storage_layout,
  244. data_source,
  245. uses_joins=uses_joins,
  246. contains_aggregations=contains_aggregations,
  247. ),
  248. additional_sources=additional_sources,
  249. )
  250. queries.append(query)
  251. return queries
  252. def _create_single_column_queries(
  253. self, test_summary: ConsistencyTestSummary, expressions: list[Expression]
  254. ) -> list[QueryTemplate]:
  255. """Creates one query per expression"""
  256. queries = []
  257. for expression in expressions:
  258. storage_layout = expression.storage_layout
  259. if storage_layout == ValueStorageLayout.ANY:
  260. storage_layout = ValueStorageLayout.VERTICAL
  261. queries.extend(
  262. self._create_multi_column_queries(
  263. test_summary,
  264. [expression],
  265. expression.is_expect_error,
  266. storage_layout,
  267. expression.is_aggregate,
  268. )
  269. )
  270. return queries
  271. def _select_rows(
  272. self, storage_layout: ValueStorageLayout, data_sources: list[DataSource]
  273. ) -> DataRowSelection:
  274. if storage_layout == ValueStorageLayout.ANY:
  275. raise RuntimeError("Unresolved storage layout")
  276. elif storage_layout == ValueStorageLayout.HORIZONTAL:
  277. return ALL_ROWS_SELECTION
  278. elif storage_layout == ValueStorageLayout.VERTICAL:
  279. if self.randomized_picker.random_boolean(
  280. probability.RESTRICT_VERTICAL_LAYOUT_TO_ROWS_DISABLED_FOR_ALL_SOURCES
  281. ):
  282. return ALL_ROWS_SELECTION
  283. row_selection = DataRowSelection()
  284. for data_source in data_sources:
  285. if self.randomized_picker.random_boolean(
  286. probability.RESTRICT_VERTICAL_LAYOUT_TO_ROWS_DISABLED_FOR_SOURCE
  287. ):
  288. # do not add an entry regarding this source into the selection
  289. continue
  290. row_count = (
  291. self.input_data.types_input.get_max_value_count_of_all_types(
  292. data_source.table_index
  293. )
  294. )
  295. if self.randomized_picker.random_boolean(
  296. probability.RESTRICT_VERTICAL_LAYOUT_ONLY_TO_FEW_ROWS
  297. ):
  298. # With some probability, try to pick a few rows
  299. max_number_of_rows_to_select = self.randomized_picker.random_number(
  300. 2, 4
  301. )
  302. else:
  303. # With some probability, pick an arbitrary number of rows
  304. max_number_of_rows_to_select = self.randomized_picker.random_number(
  305. 0, row_count
  306. )
  307. # when using joins, the number of rows may be lower or higher
  308. row_indices_of_source = self.randomized_picker.random_row_indices(
  309. row_count, max_number_of_rows_to_select
  310. )
  311. row_selection.set_row_indices(data_source, row_indices_of_source)
  312. return row_selection
  313. else:
  314. raise RuntimeError(f"Unsupported storage layout: {storage_layout}")
  315. def _assign_source(
  316. self, data_source: DataSource, expression: Expression, force: bool = False
  317. ) -> None:
  318. self._assign_random_sources([data_source], [expression], force=force)
  319. def _assign_random_sources(
  320. self,
  321. all_data_sources: list[DataSource],
  322. expressions: list[Expression],
  323. force: bool = False,
  324. ) -> None:
  325. assert len(all_data_sources) > 0, "No data sources provided"
  326. for expression in expressions:
  327. for leaf_expression in expression.collect_leaves():
  328. if isinstance(leaf_expression, DataColumn):
  329. random_source = self.randomized_picker.random_data_source(
  330. list(all_data_sources)
  331. )
  332. leaf_expression.assign_data_source(random_source, force=force)
  333. def _select_sources(
  334. self,
  335. storage_layout: ValueStorageLayout,
  336. test_summary: ConsistencyTestSummary,
  337. ) -> tuple[DataSource, list[AdditionalSource]]:
  338. if storage_layout == ValueStorageLayout.HORIZONTAL:
  339. return DataSource(table_index=None), []
  340. return self._random_source_tables(storage_layout, test_summary)
  341. def minimize_sources(
  342. self,
  343. data_source: DataSource,
  344. additional_sources: list[AdditionalSource],
  345. all_expressions: list[Expression],
  346. ) -> tuple[DataSource, list[AdditionalSource]]:
  347. all_used_data_sources: set[DataSource] = set()
  348. for expression in all_expressions:
  349. all_used_data_sources.update(expression.collect_data_sources())
  350. additional_sources = [
  351. additional_source
  352. for additional_source in additional_sources
  353. if additional_source.data_source in all_used_data_sources
  354. ]
  355. if data_source not in all_used_data_sources:
  356. if len(additional_sources) == 0:
  357. # No data sources are needed by the query. This can be the case when expressions only hold enum
  358. # constants as args. Still return the main data source so that all queries have one. This will allow to
  359. # add a where clause. As a side effect, it will also influence the row count.
  360. return data_source, []
  361. return (
  362. additional_sources[0].data_source,
  363. additional_sources[1:],
  364. )
  365. return data_source, additional_sources
  366. def _random_source_tables(
  367. self,
  368. storage_layout: ValueStorageLayout,
  369. test_summary: ConsistencyTestSummary,
  370. ) -> tuple[DataSource, list[AdditionalSource]]:
  371. main_source = DataSource(table_index=0)
  372. if self.randomized_picker.random_boolean(0.4):
  373. return main_source, []
  374. additional_sources = []
  375. for i in range(1, self.config.vertical_join_tables):
  376. if self.randomized_picker.random_boolean(0.3):
  377. additional_source = AdditionalSource(
  378. data_source=DataSource(table_index=i),
  379. join_operator=self.randomized_picker.random_join_operator(),
  380. join_constraint=TRUE_EXPRESSION,
  381. )
  382. join_constraint = self._generate_join_constraint(
  383. storage_layout,
  384. main_source,
  385. additional_source,
  386. )
  387. ignore_verdict = self.ignore_filter.shall_ignore_expression(
  388. join_constraint, ALL_ROWS_SELECTION
  389. )
  390. if isinstance(ignore_verdict, YesIgnore):
  391. test_summary.record_ignore_reason_usage(ignore_verdict.reason)
  392. else:
  393. self._validate_join_constraint(join_constraint)
  394. additional_source.join_constraint = join_constraint
  395. additional_sources.append(additional_source)
  396. return main_source, additional_sources
  397. def _validate_join_constraint(self, join_constraint: Expression) -> None:
  398. # this will fail if no data source was assigned to a leaf
  399. join_constraint.collect_data_sources()
  400. def _remove_known_inconsistencies(
  401. self,
  402. test_summary: ConsistencyTestSummary,
  403. expressions: list[Expression],
  404. row_selection: DataRowSelection,
  405. ) -> list[Expression]:
  406. indices_to_remove: list[int] = []
  407. for index, expression in enumerate(expressions):
  408. ignore_verdict = self.ignore_filter.shall_ignore_expression(
  409. expression, row_selection
  410. )
  411. if isinstance(ignore_verdict, YesIgnore):
  412. test_summary.count_ignored_select_expressions = (
  413. test_summary.count_ignored_select_expressions + 1
  414. )
  415. test_summary.record_ignore_reason_usage(ignore_verdict.reason)
  416. self._log_skipped_expression(
  417. test_summary, expression, ignore_verdict.reason
  418. )
  419. indices_to_remove.append(index)
  420. for index_to_remove in sorted(indices_to_remove, reverse=True):
  421. del expressions[index_to_remove]
  422. return expressions
  423. def _generate_offset(
  424. self,
  425. storage_layout: ValueStorageLayout,
  426. data_source: DataSource,
  427. uses_joins: bool,
  428. contains_aggregations: bool,
  429. ) -> int | None:
  430. return self._generate_offset_or_limit(
  431. storage_layout,
  432. data_source,
  433. uses_joins=uses_joins,
  434. contains_aggregations=contains_aggregations,
  435. )
  436. def _generate_limit(
  437. self,
  438. storage_layout: ValueStorageLayout,
  439. data_source: DataSource,
  440. uses_joins: bool,
  441. contains_aggregations: bool,
  442. ) -> int | None:
  443. return self._generate_offset_or_limit(
  444. storage_layout,
  445. data_source,
  446. uses_joins=uses_joins,
  447. contains_aggregations=contains_aggregations,
  448. )
  449. def _generate_offset_or_limit(
  450. self,
  451. storage_layout: ValueStorageLayout,
  452. data_source: DataSource,
  453. uses_joins: bool,
  454. contains_aggregations: bool,
  455. ) -> int | None:
  456. if storage_layout != ValueStorageLayout.VERTICAL:
  457. return None
  458. likelihood_of_offset_or_limit = 0.025 if contains_aggregations else 0.25
  459. if not self.randomized_picker.random_boolean(likelihood_of_offset_or_limit):
  460. # do not apply it
  461. return None
  462. main_source_row_count = (
  463. self.input_data.types_input.get_max_value_count_of_all_types(
  464. data_source.table_index
  465. )
  466. )
  467. if uses_joins:
  468. # the main data source might have most rows; though, the number might be even higher because of joins
  469. max_value = main_source_row_count + 3
  470. else:
  471. max_value = main_source_row_count + 1
  472. if self.randomized_picker.random_boolean(0.7):
  473. # prefer lower numbers since queries may already contain where conditions or apply aggregations
  474. # (or contain offsets when generating a limit)
  475. max_value = int(max_value / 3)
  476. value = self.randomized_picker.random_number(0, max_value)
  477. if value == 0 and self.randomized_picker.random_boolean(0.95):
  478. # drop most 0 values for readability (but keep a few)
  479. value = None
  480. return value
  481. def _generate_join_constraint(
  482. self,
  483. storage_layout: ValueStorageLayout,
  484. data_source: DataSource,
  485. joined_source: AdditionalSource,
  486. ) -> Expression:
  487. assert (
  488. storage_layout == ValueStorageLayout.VERTICAL
  489. ), f"Joins not supported for {storage_layout}"
  490. join_target = self.randomized_picker.random_join_target()
  491. if join_target in {
  492. JoinTarget.SAME_DATA_TYPE,
  493. JoinTarget.SAME_DATA_TYPE_CATEGORY,
  494. JoinTarget.ANY_COLUMN,
  495. }:
  496. random_type_with_values_1 = self.randomized_picker.random_type_with_values(
  497. self.input_data.types_input.all_data_types_with_values
  498. )
  499. if join_target == JoinTarget.SAME_DATA_TYPE:
  500. random_types_with_values_2 = [random_type_with_values_1]
  501. elif join_target == JoinTarget.SAME_DATA_TYPE_CATEGORY:
  502. random_types_with_values_2 = [
  503. type_with_values
  504. for type_with_values in self.input_data.types_input.all_data_types_with_values
  505. if type_with_values.data_type.category
  506. == random_type_with_values_1.data_type.category
  507. ]
  508. elif join_target == JoinTarget.ANY_COLUMN:
  509. random_types_with_values_2 = [
  510. self.randomized_picker.random_type_with_values(
  511. self.input_data.types_input.all_data_types_with_values
  512. )
  513. ]
  514. else:
  515. raise RuntimeError(f"Unexpected join target: {join_target}")
  516. expression1 = self.expression_generator.generate_leaf_expression(
  517. storage_layout, [random_type_with_values_1]
  518. )
  519. expression2 = self.expression_generator.generate_leaf_expression(
  520. storage_layout, random_types_with_values_2
  521. )
  522. self._assign_source(data_source, expression1)
  523. self._assign_source(joined_source.data_source, expression2)
  524. return self.expression_generator.generate_equals_expression(
  525. expression1, expression2
  526. )
  527. elif join_target == JoinTarget.RANDOM_COLUMN_IS_NOT_NULL:
  528. random_type_with_values = self.randomized_picker.random_type_with_values(
  529. self.input_data.types_input.all_data_types_with_values
  530. )
  531. leaf_expression = self.expression_generator.generate_leaf_expression(
  532. storage_layout, [random_type_with_values]
  533. )
  534. self._assign_source(joined_source.data_source, leaf_expression)
  535. is_null_expression = ExpressionWithArgs(
  536. operation=IS_NULL_OPERATION,
  537. args=[leaf_expression],
  538. is_aggregate=leaf_expression.is_aggregate,
  539. )
  540. is_not_null_expression = ExpressionWithArgs(
  541. operation=NOT_OPERATION,
  542. args=[is_null_expression],
  543. is_aggregate=is_null_expression.is_aggregate,
  544. )
  545. return is_not_null_expression
  546. elif join_target == JoinTarget.BOOLEAN_EXPRESSION:
  547. expression = self.expression_generator.generate_boolean_expression(
  548. # aggregations in where conditions are not allowed
  549. use_aggregation=False,
  550. storage_layout=storage_layout,
  551. )
  552. if expression is None:
  553. expression = TRUE_EXPRESSION
  554. else:
  555. self._assign_source(joined_source.data_source, expression)
  556. return expression
  557. else:
  558. raise RuntimeError(f"Unexpected join target: {join_target}")
  559. def _log_skipped_expression(
  560. self,
  561. logger: ConsistencyTestLogger,
  562. expression: Expression,
  563. reason: str | None,
  564. ) -> None:
  565. if self.config.verbose_output:
  566. reason_desc = f" ({reason})" if reason else ""
  567. logger.add_global_warning(
  568. f"Skipping expression with known inconsistency{reason_desc}: {expression}"
  569. )
  570. def reset_state(self) -> None:
  571. self.count_pending_expressions = 0
  572. self.any_layout_presumably_failing_expressions = []
  573. self.horizontal_layout_normal_expressions = []
  574. self.horizontal_layout_aggregate_expressions = []
  575. self.vertical_layout_normal_expressions = []
  576. self.vertical_layout_aggregate_expressions = []