version_consistency_test.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import argparse
  10. from psycopg import Connection
  11. from psycopg.errors import OperationalError
  12. from materialize.mz_version import MzVersion
  13. from materialize.output_consistency.common.configuration import (
  14. ConsistencyTestConfiguration,
  15. )
  16. from materialize.output_consistency.execution.evaluation_strategy import (
  17. EVALUATION_STRATEGY_NAME_DFR,
  18. INTERNAL_EVALUATION_STRATEGY_NAMES,
  19. EvaluationStrategy,
  20. create_internal_evaluation_strategy_twice,
  21. )
  22. from materialize.output_consistency.execution.query_output_mode import (
  23. QUERY_OUTPUT_MODE_CHOICES,
  24. QueryOutputMode,
  25. )
  26. from materialize.output_consistency.execution.sql_executors import SqlExecutors
  27. from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
  28. GenericInconsistencyIgnoreFilter,
  29. )
  30. from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
  31. EvaluationScenario,
  32. )
  33. from materialize.output_consistency.input_data.test_input_data import (
  34. ConsistencyTestInputData,
  35. )
  36. from materialize.output_consistency.operation.operation import DbOperationOrFunction
  37. from materialize.output_consistency.output.output_printer import OutputPrinter
  38. from materialize.output_consistency.output_consistency_test import (
  39. OutputConsistencyTest,
  40. connect,
  41. )
  42. from materialize.version_consistency.execution.multi_version_executors import (
  43. MultiVersionSqlExecutors,
  44. )
  45. from materialize.version_consistency.ignore_filter.version_consistency_ignore_filter import (
  46. VersionConsistencyIgnoreFilter,
  47. )
  48. class VersionConsistencyTest(OutputConsistencyTest):
  49. def __init__(self) -> None:
  50. self.mz2_connection: Connection | None = None
  51. self.mz2_system_connection: Connection | None = None
  52. self.evaluation_strategy_name: str | None = None
  53. self.allow_same_version_comparison = False
  54. # values will be available after create_sql_executors is called
  55. self.mz1_version_without_dev_suffix: MzVersion | None = None
  56. self.mz2_version_without_dev_suffix: MzVersion | None = None
  57. def shall_run(self, sql_executors: SqlExecutors) -> bool:
  58. assert isinstance(sql_executors, MultiVersionSqlExecutors)
  59. different_versions_involved = sql_executors.uses_different_versions()
  60. if not different_versions_involved:
  61. if self.allow_same_version_comparison:
  62. print(
  63. "Involved versions are identical, but continuing due to allow_same_version_comparison"
  64. )
  65. return True
  66. else:
  67. print("Involved versions are identical, aborting")
  68. return different_versions_involved
  69. def get_scenario(self) -> EvaluationScenario:
  70. return EvaluationScenario.VERSION_CONSISTENCY
  71. def create_sql_executors(
  72. self,
  73. config: ConsistencyTestConfiguration,
  74. default_connection: Connection,
  75. mz_system_connection: Connection,
  76. output_printer: OutputPrinter,
  77. ) -> SqlExecutors:
  78. assert self.mz2_connection is not None, "Second connection is not initialized"
  79. assert (
  80. self.mz2_system_connection is not None
  81. ), "Second system connection is not initialized"
  82. mz1_sql_executor = self.create_sql_executor(
  83. config, default_connection, mz_system_connection, output_printer, "mz1"
  84. )
  85. mz2_sql_executor = self.create_sql_executor(
  86. config,
  87. self.mz2_connection,
  88. self.mz2_system_connection,
  89. output_printer,
  90. "mz2",
  91. )
  92. self.mz1_version_without_dev_suffix = MzVersion.parse_mz(
  93. mz1_sql_executor.query_version(), drop_dev_suffix=True
  94. )
  95. self.mz2_version_without_dev_suffix = MzVersion.parse_mz(
  96. mz2_sql_executor.query_version(), drop_dev_suffix=True
  97. )
  98. return MultiVersionSqlExecutors(
  99. mz1_sql_executor,
  100. mz2_sql_executor,
  101. )
  102. def create_inconsistency_ignore_filter(self) -> GenericInconsistencyIgnoreFilter:
  103. assert self.mz1_version_without_dev_suffix is not None
  104. assert self.mz2_version_without_dev_suffix is not None
  105. assert (
  106. self.evaluation_strategy_name is not None
  107. ), "Evaluation strategy name is not initialized"
  108. uses_dfr = self.evaluation_strategy_name == EVALUATION_STRATEGY_NAME_DFR
  109. return VersionConsistencyIgnoreFilter(
  110. self.mz1_version_without_dev_suffix,
  111. self.mz2_version_without_dev_suffix,
  112. uses_dfr,
  113. )
  114. def create_evaluation_strategies(
  115. self, sql_executors: SqlExecutors
  116. ) -> list[EvaluationStrategy]:
  117. assert (
  118. self.evaluation_strategy_name is not None
  119. ), "Evaluation strategy name is not initialized"
  120. strategies = create_internal_evaluation_strategy_twice(
  121. self.evaluation_strategy_name
  122. )
  123. for i, strategy in enumerate(strategies):
  124. number = i + 1
  125. sql_executor = sql_executors.get_executor(strategy)
  126. version = sql_executor.query_version()
  127. sanitized_version_string = sanitize_and_shorten_version_string(version)
  128. strategy.name = f"{strategy.name} {version}"
  129. # include the number as well since the short version string may not be unique
  130. strategy.object_name_base = (
  131. f"{strategy.object_name_base}_{number}_{sanitized_version_string}"
  132. )
  133. strategy.simple_db_object_name = (
  134. f"{strategy.simple_db_object_name}_{number}_{sanitized_version_string}"
  135. )
  136. return strategies
  137. def filter_input_data(self, input_data: ConsistencyTestInputData) -> None:
  138. input_data.operations_input.remove_functions(
  139. self._is_operation_unsupported_in_any_versions
  140. )
  141. def _is_operation_unsupported_in_any_versions(
  142. self, operation: DbOperationOrFunction
  143. ) -> bool:
  144. if operation.since_mz_version is None:
  145. return False
  146. assert self.mz1_version_without_dev_suffix is not None
  147. assert self.mz2_version_without_dev_suffix is not None
  148. return (
  149. operation.since_mz_version > self.mz1_version_without_dev_suffix
  150. or operation.since_mz_version > self.mz2_version_without_dev_suffix
  151. )
  152. def sanitize_and_shorten_version_string(version: str) -> str:
  153. """
  154. Drop the commit hash and replace dots and dashes with an underscore
  155. :param version: looks like "v0.98.5 (4cfc26688)", version may contain a "-dev" suffix
  156. """
  157. mz_version = MzVersion.parse(version)
  158. return str(mz_version).replace(".", "_").replace("-", "_")
  159. def main() -> int:
  160. test = VersionConsistencyTest()
  161. parser = argparse.ArgumentParser(
  162. prog="version-consistency-test",
  163. formatter_class=argparse.RawDescriptionHelpFormatter,
  164. description="Test the consistency of different versions of mz",
  165. )
  166. parser.add_argument("--mz-host", default="localhost", type=str)
  167. parser.add_argument("--mz-port", default=6875, type=int)
  168. parser.add_argument("--mz-system-port", default=6877, type=int)
  169. parser.add_argument("--mz-host-2", default="localhost", type=str)
  170. parser.add_argument("--mz-port-2", default=6975, type=int)
  171. parser.add_argument("--mz-system-port-2", default=6977, type=int)
  172. parser.add_argument(
  173. "--evaluation-strategy",
  174. default=EVALUATION_STRATEGY_NAME_DFR,
  175. type=str,
  176. choices=INTERNAL_EVALUATION_STRATEGY_NAMES,
  177. )
  178. parser.add_argument(
  179. "--other-tag",
  180. type=str,
  181. default="common-ancestor",
  182. )
  183. parser.add_argument(
  184. "--allow-same-version-comparison",
  185. action=argparse.BooleanOptionalAction,
  186. default=False,
  187. )
  188. parser.add_argument(
  189. "--query-output-mode",
  190. type=lambda mode: QueryOutputMode[mode.upper()],
  191. choices=QUERY_OUTPUT_MODE_CHOICES,
  192. default=QueryOutputMode.SELECT,
  193. )
  194. args = test.parse_output_consistency_input_args(parser)
  195. try:
  196. mz_db_user = "materialize"
  197. mz_system_user = "mz_system"
  198. mz_connection = connect(args.mz_host, args.mz_port, mz_db_user)
  199. mz_system_connection = connect(
  200. args.mz_host, args.mz_system_port, mz_system_user
  201. )
  202. test.mz2_connection = connect(args.mz_host_2, args.mz_port_2, mz_db_user)
  203. test.mz2_system_connection = connect(
  204. args.mz_host_2, args.mz_system_port_2, mz_system_user
  205. )
  206. test.evaluation_strategy_name = args.evaluation_strategy
  207. test.allow_same_version_comparison = args.allow_same_version_comparison
  208. except OperationalError:
  209. return 1
  210. result = test.run_output_consistency_tests(
  211. mz_connection,
  212. mz_system_connection,
  213. args,
  214. query_output_mode=args.query_output_mode,
  215. )
  216. return 0 if result.all_passed() else 1
  217. if __name__ == "__main__":
  218. exit(main())