123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Verify that Materialize has the same results with different sets of feature flags.
- """
- from materialize.feature_flag_consistency.feature_flag_consistency_test import (
- FeatureFlagConsistencyTest,
- )
- from materialize.feature_flag_consistency.input_data.feature_flag_configurations import (
- FEATURE_FLAG_CONFIGURATION_PAIRS,
- )
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.cockroach import Cockroach
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.test_result import FailedTestExecutionError
- from materialize.output_consistency.execution.evaluation_strategy import (
- EVALUATION_STRATEGY_NAME_DFR,
- INTERNAL_EVALUATION_STRATEGY_NAMES,
- )
- from materialize.output_consistency.execution.query_output_mode import (
- QUERY_OUTPUT_MODE_CHOICES,
- QueryOutputMode,
- )
- from materialize.output_consistency.output_consistency_test import (
- upload_output_consistency_results_to_test_analytics,
- )
- SERVICES = [
- Cockroach(setup_materialize=True),
- Postgres(),
- Materialized(name="mz_this"), # Overridden below
- Materialized(name="mz_other"), # Overridden below
- Mz(app_password=""),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- """
- Test the consistency with another mz version.
- """
- c.down(destroy_volumes=True)
- test = FeatureFlagConsistencyTest()
- parser.add_argument("--configuration", action="append", default=[], type=str)
- parser.add_argument(
- "--evaluation-strategy",
- default=EVALUATION_STRATEGY_NAME_DFR,
- type=str,
- choices=INTERNAL_EVALUATION_STRATEGY_NAMES,
- )
- parser.add_argument(
- "--other-tag",
- type=str,
- default="common-ancestor",
- )
- parser.add_argument(
- "--query-output-mode",
- type=lambda mode: QueryOutputMode[mode.upper()],
- choices=QUERY_OUTPUT_MODE_CHOICES,
- default=QueryOutputMode.SELECT,
- )
- args = test.parse_output_consistency_input_args(parser)
- port_mz_default_internal, port_mz_system_internal = 6875, 6877
- port_mz_default_this, port_mz_system_this = 6875, 6877
- port_mz_default_other, port_mz_system_other = 16875, 16877
- configurations = args.configuration
- if len(configurations) == 0:
- configurations = FEATURE_FLAG_CONFIGURATION_PAIRS.keys()
- runtime_per_config_in_sec = args.max_runtime_in_sec / len(configurations)
- test_summaries = []
- for configuration in configurations:
- test.set_configuration_pair_by_name(configuration)
- assert test.flag_configuration_pair is not None
- print(f"Running flag configuration: {test.flag_configuration_pair.name}")
- with c.override(
- Materialized(
- name="mz_this",
- ports=[
- f"{port_mz_default_this}:{port_mz_default_internal}",
- f"{port_mz_system_this}:{port_mz_system_internal}",
- ],
- additional_system_parameter_defaults=test.flag_configuration_pair.config1.to_system_params(),
- use_default_volumes=False,
- ),
- Materialized(
- name="mz_other",
- ports=[
- f"{port_mz_default_other}:{port_mz_default_internal}",
- f"{port_mz_system_other}:{port_mz_system_internal}",
- ],
- additional_system_parameter_defaults=test.flag_configuration_pair.config2.to_system_params(),
- use_default_volumes=False,
- ),
- ):
- c.up("mz_this", "mz_other")
- default_connection = c.sql_connection(
- service="mz_this", port=port_mz_default_internal
- )
- mz_system_connection = c.sql_connection(
- service="mz_this", port=port_mz_system_internal, user="mz_system"
- )
- test.mz2_connection = c.sql_connection(
- service="mz_other", port=port_mz_default_internal
- )
- test.mz2_system_connection = c.sql_connection(
- service="mz_other", port=port_mz_system_internal, user="mz_system"
- )
- test.evaluation_strategy_name = args.evaluation_strategy
- test_summary = test.run_output_consistency_tests(
- default_connection,
- mz_system_connection,
- args,
- query_output_mode=args.query_output_mode,
- override_max_runtime_in_sec=runtime_per_config_in_sec,
- )
- test_summaries.append(test_summary)
- assert len(test_summaries) > 0
- merged_summary = test_summaries[0]
- for test_summary in test_summaries[1:]:
- merged_summary.merge(test_summary)
- upload_output_consistency_results_to_test_analytics(c, merged_summary)
- if not merged_summary.all_passed():
- raise FailedTestExecutionError(errors=merged_summary.failures)
|