feature_flag_consistency_test.py 7.8 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import argparse
  10. import time
  11. from psycopg import Connection
  12. from psycopg.errors import OperationalError
  13. from materialize.feature_flag_consistency.execution.multi_config_executors import (
  14. MultiConfigSqlExecutors,
  15. )
  16. from materialize.feature_flag_consistency.feature_flag.feature_flag import (
  17. FeatureFlagSystemConfiguration,
  18. FeatureFlagSystemConfigurationPair,
  19. FeatureFlagValue,
  20. )
  21. from materialize.feature_flag_consistency.ignore_filter.feature_flag_consistency_ignore_filter import (
  22. FeatureFlagConsistencyIgnoreFilter,
  23. )
  24. from materialize.feature_flag_consistency.input_data.feature_flag_configurations import (
  25. FEATURE_FLAG_CONFIGURATION_PAIRS,
  26. )
  27. from materialize.output_consistency.common.configuration import (
  28. ConsistencyTestConfiguration,
  29. )
  30. from materialize.output_consistency.execution.evaluation_strategy import (
  31. EVALUATION_STRATEGY_NAME_DFR,
  32. INTERNAL_EVALUATION_STRATEGY_NAMES,
  33. EvaluationStrategy,
  34. create_internal_evaluation_strategy_twice,
  35. )
  36. from materialize.output_consistency.execution.query_output_mode import (
  37. QUERY_OUTPUT_MODE_CHOICES,
  38. QueryOutputMode,
  39. )
  40. from materialize.output_consistency.execution.sql_executors import SqlExecutors
  41. from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
  42. GenericInconsistencyIgnoreFilter,
  43. )
  44. from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
  45. EvaluationScenario,
  46. )
  47. from materialize.output_consistency.output.output_printer import OutputPrinter
  48. from materialize.output_consistency.output_consistency_test import (
  49. OutputConsistencyTest,
  50. connect,
  51. )
  52. class FeatureFlagConsistencyTest(OutputConsistencyTest):
  53. def __init__(self) -> None:
  54. self.evaluation_strategy_name: str | None = None
  55. self.flag_configuration_pair: FeatureFlagSystemConfigurationPair | None = None
  56. self.mz2_connection: Connection | None = None
  57. self.mz2_system_connection: Connection | None = None
  58. def get_scenario(self) -> EvaluationScenario:
  59. return EvaluationScenario.FEATURE_FLAG_CONSISTENCY
  60. def create_evaluation_strategies(
  61. self, sql_executors: SqlExecutors
  62. ) -> list[EvaluationStrategy]:
  63. assert (
  64. self.evaluation_strategy_name is not None
  65. ), "Evaluation strategy name is not initialized"
  66. assert (
  67. self.flag_configuration_pair is not None
  68. ), "Configuration is not initialized"
  69. strategies = create_internal_evaluation_strategy_twice(
  70. self.evaluation_strategy_name
  71. )
  72. for strategy, flag_config in zip(
  73. strategies, self.flag_configuration_pair.get_configs()
  74. ):
  75. strategy.name = f"{strategy.name} {flag_config.name}"
  76. strategy.object_name_base = (
  77. f"{strategy.object_name_base}_{flag_config.shortcut}"
  78. )
  79. strategy.simple_db_object_name = (
  80. f"{strategy.simple_db_object_name}_{flag_config.shortcut}"
  81. )
  82. strategy.additional_setup_info = f"Config: {flag_config.to_system_params()}"
  83. return strategies
  84. def find_configuration_pair_by_name(
  85. self, name: str
  86. ) -> FeatureFlagSystemConfigurationPair:
  87. if name not in FEATURE_FLAG_CONFIGURATION_PAIRS.keys():
  88. raise RuntimeError(
  89. f"Feature flag configuration with name '{name}' not found. Available configuration names: {list(FEATURE_FLAG_CONFIGURATION_PAIRS.keys())}"
  90. )
  91. return FEATURE_FLAG_CONFIGURATION_PAIRS[name]
  92. def set_configuration_pair_by_name(self, name: str) -> None:
  93. self.flag_configuration_pair = self.find_configuration_pair_by_name(name)
  94. def create_inconsistency_ignore_filter(self) -> GenericInconsistencyIgnoreFilter:
  95. assert self.flag_configuration_pair is not None
  96. return FeatureFlagConsistencyIgnoreFilter(self.flag_configuration_pair)
  97. def create_sql_executors(
  98. self,
  99. config: ConsistencyTestConfiguration,
  100. default_connection: Connection,
  101. mz_system_connection: Connection,
  102. output_printer: OutputPrinter,
  103. ) -> SqlExecutors:
  104. assert (
  105. self.flag_configuration_pair is not None
  106. ), "Flag configuration is not initialized"
  107. assert self.mz2_connection is not None, "Second connection is not initialized"
  108. assert (
  109. self.mz2_system_connection is not None
  110. ), "Second system connection is not initialized"
  111. mz1_sql_executor = self.create_sql_executor(
  112. config, default_connection, mz_system_connection, output_printer, "mz1"
  113. )
  114. mz2_sql_executor = self.create_sql_executor(
  115. config,
  116. self.mz2_connection,
  117. self.mz2_system_connection,
  118. output_printer,
  119. "mz2",
  120. )
  121. return MultiConfigSqlExecutors(
  122. mz1_sql_executor, mz2_sql_executor, self.flag_configuration_pair
  123. )
  124. def main() -> int:
  125. test = FeatureFlagConsistencyTest()
  126. parser = argparse.ArgumentParser(
  127. prog="feature-flag-consistency-test",
  128. formatter_class=argparse.RawDescriptionHelpFormatter,
  129. description="Test the consistency of different feature flag configurations",
  130. )
  131. parser.add_argument("--mz-host", default="localhost", type=str)
  132. parser.add_argument("--mz-port", default=6875, type=int)
  133. parser.add_argument("--mz-system-port", default=6877, type=int)
  134. parser.add_argument("--mz-host-2", default="localhost", type=str)
  135. parser.add_argument("--mz-port-2", default=6975, type=int)
  136. parser.add_argument("--mz-system-port-2", default=6977, type=int)
  137. parser.add_argument(
  138. "--evaluation-strategy",
  139. default=EVALUATION_STRATEGY_NAME_DFR,
  140. type=str,
  141. choices=INTERNAL_EVALUATION_STRATEGY_NAMES,
  142. )
  143. parser.add_argument(
  144. "--query-output-mode",
  145. type=lambda mode: QueryOutputMode[mode.upper()],
  146. choices=QUERY_OUTPUT_MODE_CHOICES,
  147. default=QueryOutputMode.SELECT,
  148. )
  149. args = test.parse_output_consistency_input_args(parser)
  150. test.flag_configuration_pair = FeatureFlagSystemConfigurationPair(
  151. name="as_started",
  152. config1=FeatureFlagSystemConfiguration(
  153. name=f"as mz_1 running on {args.mz_port}",
  154. shortcut="as_mz_1",
  155. flags=[FeatureFlagValue("as_started", "mz-1")],
  156. ),
  157. config2=FeatureFlagSystemConfiguration(
  158. name=f"as mz_2 running on {args.mz_port_2}",
  159. shortcut="as_mz_2",
  160. flags=[FeatureFlagValue("as_started", "mz-2")],
  161. ),
  162. )
  163. print(
  164. "When running outside of mzcompose, make sure that the instances are started with the desired feature flags!"
  165. )
  166. time.sleep(5)
  167. try:
  168. mz_db_user = "materialize"
  169. my_system_user = "mz_system"
  170. mz_connection = connect(args.mz_host, args.mz_port, mz_db_user)
  171. mz_system_connection = connect(
  172. args.mz_host, args.mz_system_port, my_system_user
  173. )
  174. test.mz2_connection = connect(args.mz_host_2, args.mz_port_2, mz_db_user)
  175. test.mz2_system_connection = connect(
  176. args.mz_host_2, args.mz_system_port_2, my_system_user
  177. )
  178. test.evaluation_strategy_name = args.evaluation_strategy
  179. except OperationalError:
  180. return 1
  181. result = test.run_output_consistency_tests(
  182. mz_connection,
  183. mz_system_connection,
  184. args,
  185. query_output_mode=args.query_output_mode,
  186. )
  187. return 0 if result.all_passed() else 1
  188. if __name__ == "__main__":
  189. exit(main())