123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import argparse
- import time
- from psycopg import Connection
- from psycopg.errors import OperationalError
- from materialize.feature_flag_consistency.execution.multi_config_executors import (
- MultiConfigSqlExecutors,
- )
- from materialize.feature_flag_consistency.feature_flag.feature_flag import (
- FeatureFlagSystemConfiguration,
- FeatureFlagSystemConfigurationPair,
- FeatureFlagValue,
- )
- from materialize.feature_flag_consistency.ignore_filter.feature_flag_consistency_ignore_filter import (
- FeatureFlagConsistencyIgnoreFilter,
- )
- from materialize.feature_flag_consistency.input_data.feature_flag_configurations import (
- FEATURE_FLAG_CONFIGURATION_PAIRS,
- )
- from materialize.output_consistency.common.configuration import (
- ConsistencyTestConfiguration,
- )
- from materialize.output_consistency.execution.evaluation_strategy import (
- EVALUATION_STRATEGY_NAME_DFR,
- INTERNAL_EVALUATION_STRATEGY_NAMES,
- EvaluationStrategy,
- create_internal_evaluation_strategy_twice,
- )
- from materialize.output_consistency.execution.query_output_mode import (
- QUERY_OUTPUT_MODE_CHOICES,
- QueryOutputMode,
- )
- from materialize.output_consistency.execution.sql_executors import SqlExecutors
- from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
- GenericInconsistencyIgnoreFilter,
- )
- from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
- EvaluationScenario,
- )
- from materialize.output_consistency.output.output_printer import OutputPrinter
- from materialize.output_consistency.output_consistency_test import (
- OutputConsistencyTest,
- connect,
- )
- class FeatureFlagConsistencyTest(OutputConsistencyTest):
- def __init__(self) -> None:
- self.evaluation_strategy_name: str | None = None
- self.flag_configuration_pair: FeatureFlagSystemConfigurationPair | None = None
- self.mz2_connection: Connection | None = None
- self.mz2_system_connection: Connection | None = None
- def get_scenario(self) -> EvaluationScenario:
- return EvaluationScenario.FEATURE_FLAG_CONSISTENCY
- def create_evaluation_strategies(
- self, sql_executors: SqlExecutors
- ) -> list[EvaluationStrategy]:
- assert (
- self.evaluation_strategy_name is not None
- ), "Evaluation strategy name is not initialized"
- assert (
- self.flag_configuration_pair is not None
- ), "Configuration is not initialized"
- strategies = create_internal_evaluation_strategy_twice(
- self.evaluation_strategy_name
- )
- for strategy, flag_config in zip(
- strategies, self.flag_configuration_pair.get_configs()
- ):
- strategy.name = f"{strategy.name} {flag_config.name}"
- strategy.object_name_base = (
- f"{strategy.object_name_base}_{flag_config.shortcut}"
- )
- strategy.simple_db_object_name = (
- f"{strategy.simple_db_object_name}_{flag_config.shortcut}"
- )
- strategy.additional_setup_info = f"Config: {flag_config.to_system_params()}"
- return strategies
- def find_configuration_pair_by_name(
- self, name: str
- ) -> FeatureFlagSystemConfigurationPair:
- if name not in FEATURE_FLAG_CONFIGURATION_PAIRS.keys():
- raise RuntimeError(
- f"Feature flag configuration with name '{name}' not found. Available configuration names: {list(FEATURE_FLAG_CONFIGURATION_PAIRS.keys())}"
- )
- return FEATURE_FLAG_CONFIGURATION_PAIRS[name]
- def set_configuration_pair_by_name(self, name: str) -> None:
- self.flag_configuration_pair = self.find_configuration_pair_by_name(name)
- def create_inconsistency_ignore_filter(self) -> GenericInconsistencyIgnoreFilter:
- assert self.flag_configuration_pair is not None
- return FeatureFlagConsistencyIgnoreFilter(self.flag_configuration_pair)
- def create_sql_executors(
- self,
- config: ConsistencyTestConfiguration,
- default_connection: Connection,
- mz_system_connection: Connection,
- output_printer: OutputPrinter,
- ) -> SqlExecutors:
- assert (
- self.flag_configuration_pair is not None
- ), "Flag configuration is not initialized"
- assert self.mz2_connection is not None, "Second connection is not initialized"
- assert (
- self.mz2_system_connection is not None
- ), "Second system connection is not initialized"
- mz1_sql_executor = self.create_sql_executor(
- config, default_connection, mz_system_connection, output_printer, "mz1"
- )
- mz2_sql_executor = self.create_sql_executor(
- config,
- self.mz2_connection,
- self.mz2_system_connection,
- output_printer,
- "mz2",
- )
- return MultiConfigSqlExecutors(
- mz1_sql_executor, mz2_sql_executor, self.flag_configuration_pair
- )
- def main() -> int:
- test = FeatureFlagConsistencyTest()
- parser = argparse.ArgumentParser(
- prog="feature-flag-consistency-test",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- description="Test the consistency of different feature flag configurations",
- )
- parser.add_argument("--mz-host", default="localhost", type=str)
- parser.add_argument("--mz-port", default=6875, type=int)
- parser.add_argument("--mz-system-port", default=6877, type=int)
- parser.add_argument("--mz-host-2", default="localhost", type=str)
- parser.add_argument("--mz-port-2", default=6975, type=int)
- parser.add_argument("--mz-system-port-2", default=6977, type=int)
- parser.add_argument(
- "--evaluation-strategy",
- default=EVALUATION_STRATEGY_NAME_DFR,
- type=str,
- choices=INTERNAL_EVALUATION_STRATEGY_NAMES,
- )
- parser.add_argument(
- "--query-output-mode",
- type=lambda mode: QueryOutputMode[mode.upper()],
- choices=QUERY_OUTPUT_MODE_CHOICES,
- default=QueryOutputMode.SELECT,
- )
- args = test.parse_output_consistency_input_args(parser)
- test.flag_configuration_pair = FeatureFlagSystemConfigurationPair(
- name="as_started",
- config1=FeatureFlagSystemConfiguration(
- name=f"as mz_1 running on {args.mz_port}",
- shortcut="as_mz_1",
- flags=[FeatureFlagValue("as_started", "mz-1")],
- ),
- config2=FeatureFlagSystemConfiguration(
- name=f"as mz_2 running on {args.mz_port_2}",
- shortcut="as_mz_2",
- flags=[FeatureFlagValue("as_started", "mz-2")],
- ),
- )
- print(
- "When running outside of mzcompose, make sure that the instances are started with the desired feature flags!"
- )
- time.sleep(5)
- try:
- mz_db_user = "materialize"
- my_system_user = "mz_system"
- mz_connection = connect(args.mz_host, args.mz_port, mz_db_user)
- mz_system_connection = connect(
- args.mz_host, args.mz_system_port, my_system_user
- )
- test.mz2_connection = connect(args.mz_host_2, args.mz_port_2, mz_db_user)
- test.mz2_system_connection = connect(
- args.mz_host_2, args.mz_system_port_2, my_system_user
- )
- test.evaluation_strategy_name = args.evaluation_strategy
- except OperationalError:
- return 1
- result = test.run_output_consistency_tests(
- mz_connection,
- mz_system_connection,
- args,
- query_output_mode=args.query_output_mode,
- )
- return 0 if result.all_passed() else 1
- if __name__ == "__main__":
- exit(main())
|