output_consistency_test.py 14 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import argparse
  10. import psycopg
  11. from psycopg import Connection
  12. from psycopg.errors import OperationalError
  13. from materialize import buildkite
  14. from materialize.mzcompose.composition import Composition
  15. from materialize.output_consistency.common.configuration import (
  16. ConsistencyTestConfiguration,
  17. )
  18. from materialize.output_consistency.execution.evaluation_strategy import (
  19. ConstantFoldingEvaluation,
  20. DataFlowRenderingEvaluation,
  21. EvaluationStrategy,
  22. )
  23. from materialize.output_consistency.execution.query_output_mode import QueryOutputMode
  24. from materialize.output_consistency.execution.sql_executor import (
  25. DryRunSqlExecutor,
  26. MzDatabaseSqlExecutor,
  27. PgWireDatabaseSqlExecutor,
  28. SqlExecutor,
  29. )
  30. from materialize.output_consistency.execution.sql_executors import SqlExecutors
  31. from materialize.output_consistency.generators.expression_generator import (
  32. ExpressionGenerator,
  33. )
  34. from materialize.output_consistency.generators.query_generator import QueryGenerator
  35. from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
  36. GenericInconsistencyIgnoreFilter,
  37. )
  38. from materialize.output_consistency.ignore_filter.internal_output_inconsistency_ignore_filter import (
  39. InternalOutputInconsistencyIgnoreFilter,
  40. )
  41. from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
  42. EvaluationScenario,
  43. )
  44. from materialize.output_consistency.input_data.test_input_data import (
  45. ConsistencyTestInputData,
  46. )
  47. from materialize.output_consistency.output.output_printer import OutputPrinter
  48. from materialize.output_consistency.runner.test_runner import ConsistencyTestRunner
  49. from materialize.output_consistency.selection.randomized_picker import RandomizedPicker
  50. from materialize.output_consistency.status.test_summary import ConsistencyTestSummary
  51. from materialize.output_consistency.validation.error_message_normalizer import (
  52. ErrorMessageNormalizer,
  53. )
  54. from materialize.output_consistency.validation.result_comparator import ResultComparator
  55. from materialize.test_analytics.config.test_analytics_db_config import (
  56. create_test_analytics_config,
  57. )
  58. from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
  59. class OutputConsistencyTest:
  60. def run_output_consistency_tests(
  61. self,
  62. default_connection: Connection,
  63. mz_system_connection: Connection,
  64. args: argparse.Namespace,
  65. query_output_mode: QueryOutputMode,
  66. override_max_runtime_in_sec: int | None = None,
  67. ) -> ConsistencyTestSummary:
  68. """Entry point for output consistency tests"""
  69. return self._run_output_consistency_tests_internal(
  70. default_connection,
  71. mz_system_connection,
  72. args.seed,
  73. args.dry_run,
  74. args.fail_fast,
  75. args.verbose,
  76. args.max_cols_per_query,
  77. override_max_runtime_in_sec or args.max_runtime_in_sec,
  78. args.max_iterations,
  79. args.max_failures_until_abort,
  80. args.avoid_expressions_expecting_db_error,
  81. args.disable_predefined_queries,
  82. query_output_mode=query_output_mode,
  83. )
  84. def parse_output_consistency_input_args(
  85. self,
  86. parser: argparse.ArgumentParser,
  87. ) -> argparse.Namespace:
  88. parser.add_argument("--seed", default="0", type=str)
  89. parser.add_argument(
  90. "--dry-run", default=False, type=bool, action=argparse.BooleanOptionalAction
  91. )
  92. parser.add_argument(
  93. "--fail-fast",
  94. default=False,
  95. type=bool,
  96. action=argparse.BooleanOptionalAction,
  97. )
  98. parser.add_argument(
  99. "--verbose",
  100. default=False,
  101. type=bool,
  102. action=argparse.BooleanOptionalAction,
  103. )
  104. parser.add_argument("--max-cols-per-query", default=20, type=int)
  105. parser.add_argument("--max-runtime-in-sec", default=600, type=int)
  106. parser.add_argument("--max-iterations", default=100000, type=int)
  107. parser.add_argument("--max-failures-until-abort", default=15, type=int)
  108. parser.add_argument(
  109. "--avoid-expressions-expecting-db-error",
  110. default=False,
  111. type=bool,
  112. action=argparse.BooleanOptionalAction,
  113. )
  114. parser.add_argument(
  115. "--disable-predefined-queries",
  116. default=False,
  117. type=bool,
  118. action=argparse.BooleanOptionalAction,
  119. )
  120. return parser.parse_args()
  121. def _run_output_consistency_tests_internal(
  122. self,
  123. default_connection: Connection,
  124. mz_system_connection: Connection,
  125. random_seed: str,
  126. dry_run: bool,
  127. fail_fast: bool,
  128. verbose_output: bool,
  129. max_cols_per_query: int,
  130. max_runtime_in_sec: int,
  131. max_iterations: int,
  132. max_failures_until_abort: int,
  133. avoid_expressions_expecting_db_error: bool,
  134. disable_predefined_queries: bool,
  135. query_output_mode: QueryOutputMode,
  136. ) -> ConsistencyTestSummary:
  137. input_data = self.create_input_data()
  138. scenario = self.get_scenario()
  139. if fail_fast:
  140. max_failures_until_abort = 1
  141. config = ConsistencyTestConfiguration(
  142. random_seed=random_seed,
  143. scenario=scenario,
  144. dry_run=dry_run,
  145. verbose_output=verbose_output,
  146. max_cols_per_query=max_cols_per_query,
  147. max_runtime_in_sec=max_runtime_in_sec,
  148. max_iterations=max_iterations,
  149. max_failures_until_abort=max_failures_until_abort,
  150. avoid_expressions_expecting_db_error=avoid_expressions_expecting_db_error,
  151. queries_per_tx=20,
  152. max_pending_expressions=100,
  153. use_autocommit=True,
  154. split_and_retry_on_db_error=True,
  155. print_reproduction_code=True,
  156. disable_predefined_queries=disable_predefined_queries,
  157. query_output_mode=query_output_mode,
  158. vertical_join_tables=4,
  159. )
  160. output_printer = OutputPrinter(input_data, config.query_output_mode)
  161. output_printer.print_config(config)
  162. config.validate()
  163. randomized_picker = RandomizedPicker(config)
  164. sql_executors = self.create_sql_executors(
  165. config, default_connection, mz_system_connection, output_printer
  166. )
  167. evaluation_strategies = self.create_evaluation_strategies(sql_executors)
  168. # prerequisite: sql_executors need to be created
  169. self.filter_input_data(input_data)
  170. # prerequisite: sql_executors need to be created
  171. ignore_filter = self.create_inconsistency_ignore_filter()
  172. # prerequisite: input data needs to be filtered
  173. expression_generator = ExpressionGenerator(
  174. config, randomized_picker, input_data
  175. )
  176. query_generator = QueryGenerator(
  177. config, randomized_picker, input_data, expression_generator, ignore_filter
  178. )
  179. output_comparator = self.create_result_comparator(ignore_filter)
  180. output_printer.print_info(sql_executors.get_database_infos())
  181. output_printer.print_empty_line()
  182. # prerequisite: input data needs to be filtered
  183. output_printer.print_info(input_data.get_stats())
  184. output_printer.print_empty_line()
  185. if not self.shall_run(sql_executors):
  186. output_printer.print_info("Not running the test, criteria are not met.")
  187. return ConsistencyTestSummary()
  188. test_runner = ConsistencyTestRunner(
  189. config,
  190. input_data,
  191. evaluation_strategies,
  192. expression_generator,
  193. query_generator,
  194. output_comparator,
  195. sql_executors,
  196. randomized_picker,
  197. ignore_filter,
  198. output_printer,
  199. )
  200. test_runner.setup()
  201. output_printer.start_section("Test remarks")
  202. if not config.verbose_output:
  203. output_printer.print_info(
  204. "Printing only queries with inconsistencies or warnings in non-verbose mode."
  205. )
  206. output_printer.print_empty_line()
  207. test_summary = test_runner.start()
  208. output_printer.print_test_summary(test_summary)
  209. return test_summary
  210. def shall_run(self, sql_executors: SqlExecutors) -> bool:
  211. return True
  212. def create_input_data(self) -> ConsistencyTestInputData:
  213. return ConsistencyTestInputData()
  214. def filter_input_data(self, input_data: ConsistencyTestInputData) -> None:
  215. # This allows to filter the input data when sql_executors are created
  216. pass
  217. def create_sql_executors(
  218. self,
  219. config: ConsistencyTestConfiguration,
  220. default_connection: Connection,
  221. mz_system_connection: Connection | None,
  222. output_printer: OutputPrinter,
  223. ) -> SqlExecutors:
  224. return SqlExecutors(
  225. self.create_sql_executor(
  226. config, default_connection, mz_system_connection, output_printer, "mz"
  227. )
  228. )
  229. def create_sql_executor(
  230. self,
  231. config: ConsistencyTestConfiguration,
  232. default_connection: Connection,
  233. mz_system_connection: Connection | None,
  234. output_printer: OutputPrinter,
  235. name: str,
  236. is_mz: bool = True,
  237. ) -> SqlExecutor:
  238. if config.dry_run:
  239. return DryRunSqlExecutor(output_printer, name)
  240. if is_mz:
  241. return self.create_mz_sql_executor(
  242. config, default_connection, mz_system_connection, output_printer, name
  243. )
  244. return PgWireDatabaseSqlExecutor(
  245. default_connection, config.use_autocommit, output_printer, name
  246. )
  247. def create_mz_sql_executor(
  248. self,
  249. config: ConsistencyTestConfiguration,
  250. default_connection: Connection,
  251. mz_system_connection: Connection | None,
  252. output_printer: OutputPrinter,
  253. name: str,
  254. ) -> SqlExecutor:
  255. assert mz_system_connection is not None
  256. return MzDatabaseSqlExecutor(
  257. default_connection,
  258. mz_system_connection,
  259. config.use_autocommit,
  260. output_printer,
  261. name,
  262. )
  263. def get_scenario(self) -> EvaluationScenario:
  264. return EvaluationScenario.OUTPUT_CONSISTENCY
  265. def create_result_comparator(
  266. self, ignore_filter: GenericInconsistencyIgnoreFilter
  267. ) -> ResultComparator:
  268. return ResultComparator(ignore_filter, ErrorMessageNormalizer())
  269. def create_inconsistency_ignore_filter(self) -> GenericInconsistencyIgnoreFilter:
  270. return InternalOutputInconsistencyIgnoreFilter()
  271. def create_evaluation_strategies(
  272. self, sql_executors: SqlExecutors
  273. ) -> list[EvaluationStrategy]:
  274. return [
  275. DataFlowRenderingEvaluation(),
  276. ConstantFoldingEvaluation(),
  277. ]
  278. def upload_output_consistency_results_to_test_analytics(
  279. c: Composition,
  280. test_summary: ConsistencyTestSummary,
  281. ) -> None:
  282. if not buildkite.is_in_buildkite():
  283. return
  284. test_analytics = TestAnalyticsDb(create_test_analytics_config(c))
  285. test_analytics.builds.add_build_job(was_successful=test_summary.all_passed())
  286. test_analytics.output_consistency.add_stats(
  287. count_executed_queries=test_summary.count_executed_query_templates,
  288. count_successful_queries=test_summary.count_successful_query_templates,
  289. count_ignored_error_queries=test_summary.count_ignored_error_query_templates,
  290. count_failures=len(test_summary.failures),
  291. count_predefined_queries=test_summary.count_predefined_queries,
  292. count_available_data_types=test_summary.count_available_data_types,
  293. count_available_op_variants=test_summary.count_available_op_variants,
  294. count_used_ops=test_summary.count_used_ops(),
  295. count_generated_select_expressions=test_summary.count_generated_select_expressions,
  296. count_ignored_select_expressions=test_summary.count_ignored_select_expressions,
  297. )
  298. try:
  299. test_analytics.submit_updates()
  300. print("Uploaded results.")
  301. except Exception as e:
  302. # An error during an upload must never cause the build to fail
  303. test_analytics.on_upload_failed(e)
  304. def connect(host: str, port: int, user: str, password: str | None = None) -> Connection:
  305. try:
  306. print(
  307. f"Connecting to database (host={host}, port={port}, user={user}, password={'****' if password else 'None'})"
  308. )
  309. return psycopg.connect(host=host, port=port, user=user, password=password)
  310. except OperationalError:
  311. print(f"Connecting to database failed (host={host}, port={port}, user={user})!")
  312. raise
  313. def main() -> int:
  314. test = OutputConsistencyTest()
  315. parser = argparse.ArgumentParser(
  316. prog="output-consistency-test",
  317. formatter_class=argparse.RawDescriptionHelpFormatter,
  318. description="Test the output consistency of different query evaluation strategies (e.g., dataflow rendering "
  319. "and constant folding).",
  320. )
  321. parser.add_argument("--host", default="localhost", type=str)
  322. parser.add_argument("--port", default=6875, type=int)
  323. parser.add_argument("--system-port", default=6877, type=int)
  324. args = test.parse_output_consistency_input_args(parser)
  325. default_db_user = "materialize"
  326. mz_system_db_user = "mz_system"
  327. try:
  328. default_connection = connect(args.host, args.port, default_db_user)
  329. mz_system_connection = connect(args.host, args.system_port, mz_system_db_user)
  330. except OperationalError:
  331. return 1
  332. result = test.run_output_consistency_tests(
  333. default_connection,
  334. mz_system_connection,
  335. args,
  336. query_output_mode=QueryOutputMode.SELECT,
  337. )
  338. return 0 if result.all_passed() else 1
  339. if __name__ == "__main__":
  340. exit(main())