123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import argparse
- from psycopg import Connection
- from psycopg.errors import OperationalError
- from materialize.output_consistency.common.configuration import (
- ConsistencyTestConfiguration,
- )
- from materialize.output_consistency.execution.evaluation_strategy import (
- DataFlowRenderingEvaluation,
- EvaluationStrategy,
- )
- from materialize.output_consistency.execution.query_output_mode import QueryOutputMode
- from materialize.output_consistency.execution.sql_executors import SqlExecutors
- from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
- GenericInconsistencyIgnoreFilter,
- )
- from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
- EvaluationScenario,
- )
- from materialize.output_consistency.input_data.test_input_data import (
- ConsistencyTestInputData,
- )
- from materialize.output_consistency.output.output_printer import OutputPrinter
- from materialize.output_consistency.output_consistency_test import (
- OutputConsistencyTest,
- connect,
- )
- from materialize.output_consistency.validation.error_message_normalizer import (
- ErrorMessageNormalizer,
- )
- from materialize.output_consistency.validation.result_comparator import ResultComparator
- from materialize.postgres_consistency.custom.predefined_pg_queries import (
- create_custom_pg_consistency_queries,
- )
- from materialize.postgres_consistency.execution.pg_evaluation_strategy import (
- PgEvaluation,
- )
- from materialize.postgres_consistency.execution.pg_sql_executors import PgSqlExecutors
- from materialize.postgres_consistency.ignore_filter.pg_inconsistency_ignore_filter import (
- PgInconsistencyIgnoreFilter,
- )
- from materialize.postgres_consistency.validation.pg_result_comparator import (
- PostgresResultComparator,
- )
- class PostgresConsistencyTest(OutputConsistencyTest):
- def __init__(self) -> None:
- self.pg_connection: Connection | None = None
- def get_scenario(self) -> EvaluationScenario:
- return EvaluationScenario.POSTGRES_CONSISTENCY
- def create_sql_executors(
- self,
- config: ConsistencyTestConfiguration,
- default_connection: Connection,
- mz_system_connection: Connection,
- output_printer: OutputPrinter,
- ) -> SqlExecutors:
- if self.pg_connection is None:
- raise RuntimeError("Postgres connection is not initialized")
- return PgSqlExecutors(
- self.create_sql_executor(
- config, default_connection, mz_system_connection, output_printer, "mz"
- ),
- self.create_sql_executor(
- config, self.pg_connection, None, output_printer, "pg", is_mz=False
- ),
- )
- def create_result_comparator(
- self, ignore_filter: GenericInconsistencyIgnoreFilter
- ) -> ResultComparator:
- return PostgresResultComparator(ignore_filter, ErrorMessageNormalizer())
- def create_inconsistency_ignore_filter(self) -> GenericInconsistencyIgnoreFilter:
- return PgInconsistencyIgnoreFilter()
- def create_evaluation_strategies(
- self, sql_executors: SqlExecutors
- ) -> list[EvaluationStrategy]:
- mz_evaluation_strategy = DataFlowRenderingEvaluation()
- mz_evaluation_strategy.name = "Materialize evaluation"
- mz_evaluation_strategy.simple_db_object_name = "mz_evaluation"
- return [
- # Materialize
- mz_evaluation_strategy,
- # Postgres
- PgEvaluation(),
- ]
- def create_input_data(self) -> ConsistencyTestInputData:
- input_data = super().create_input_data()
- input_data.predefined_queries.extend(create_custom_pg_consistency_queries())
- return input_data
- def filter_input_data(self, input_data: ConsistencyTestInputData) -> None:
- input_data.types_input.remove_types(
- lambda data_type: not data_type.is_pg_compatible
- )
- input_data.types_input.remove_values(
- lambda data_value: not data_value.is_pg_compatible
- )
- input_data.operations_input.remove_functions(
- lambda db_operation: not db_operation.is_pg_compatible
- )
- def main() -> int:
- test = PostgresConsistencyTest()
- parser = argparse.ArgumentParser(
- prog="postgres-consistency-test",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- description="Test the consistency of Materialize and Postgres",
- )
- parser.add_argument("--mz-host", default="localhost", type=str)
- parser.add_argument("--mz-port", default=6875, type=int)
- parser.add_argument("--mz-system-port", default=6877, type=int)
- parser.add_argument("--pg-host", default="localhost", type=str)
- parser.add_argument("--pg-port", default=5432, type=int)
- parser.add_argument("--pg-password", default=None, type=str)
- args = test.parse_output_consistency_input_args(parser)
- try:
- mz_db_user = "materialize"
- mz_system_user = "mz_system"
- mz_connection = connect(args.mz_host, args.mz_port, mz_db_user)
- mz_system_connection = connect(
- args.mz_host, args.mz_system_port, mz_system_user
- )
- pg_db_user = "postgres"
- test.pg_connection = connect(
- args.pg_host, args.pg_port, pg_db_user, args.pg_password
- )
- except OperationalError:
- return 1
- result = test.run_output_consistency_tests(
- mz_connection,
- mz_system_connection,
- args,
- query_output_mode=QueryOutputMode.SELECT,
- )
- return 0 if result.all_passed() else 1
- if __name__ == "__main__":
- exit(main())
|