postgres_consistency_test.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import argparse
  10. from psycopg import Connection
  11. from psycopg.errors import OperationalError
  12. from materialize.output_consistency.common.configuration import (
  13. ConsistencyTestConfiguration,
  14. )
  15. from materialize.output_consistency.execution.evaluation_strategy import (
  16. DataFlowRenderingEvaluation,
  17. EvaluationStrategy,
  18. )
  19. from materialize.output_consistency.execution.query_output_mode import QueryOutputMode
  20. from materialize.output_consistency.execution.sql_executors import SqlExecutors
  21. from materialize.output_consistency.ignore_filter.inconsistency_ignore_filter import (
  22. GenericInconsistencyIgnoreFilter,
  23. )
  24. from materialize.output_consistency.input_data.scenarios.evaluation_scenario import (
  25. EvaluationScenario,
  26. )
  27. from materialize.output_consistency.input_data.test_input_data import (
  28. ConsistencyTestInputData,
  29. )
  30. from materialize.output_consistency.output.output_printer import OutputPrinter
  31. from materialize.output_consistency.output_consistency_test import (
  32. OutputConsistencyTest,
  33. connect,
  34. )
  35. from materialize.output_consistency.validation.error_message_normalizer import (
  36. ErrorMessageNormalizer,
  37. )
  38. from materialize.output_consistency.validation.result_comparator import ResultComparator
  39. from materialize.postgres_consistency.custom.predefined_pg_queries import (
  40. create_custom_pg_consistency_queries,
  41. )
  42. from materialize.postgres_consistency.execution.pg_evaluation_strategy import (
  43. PgEvaluation,
  44. )
  45. from materialize.postgres_consistency.execution.pg_sql_executors import PgSqlExecutors
  46. from materialize.postgres_consistency.ignore_filter.pg_inconsistency_ignore_filter import (
  47. PgInconsistencyIgnoreFilter,
  48. )
  49. from materialize.postgres_consistency.validation.pg_result_comparator import (
  50. PostgresResultComparator,
  51. )
  52. class PostgresConsistencyTest(OutputConsistencyTest):
  53. def __init__(self) -> None:
  54. self.pg_connection: Connection | None = None
  55. def get_scenario(self) -> EvaluationScenario:
  56. return EvaluationScenario.POSTGRES_CONSISTENCY
  57. def create_sql_executors(
  58. self,
  59. config: ConsistencyTestConfiguration,
  60. default_connection: Connection,
  61. mz_system_connection: Connection,
  62. output_printer: OutputPrinter,
  63. ) -> SqlExecutors:
  64. if self.pg_connection is None:
  65. raise RuntimeError("Postgres connection is not initialized")
  66. return PgSqlExecutors(
  67. self.create_sql_executor(
  68. config, default_connection, mz_system_connection, output_printer, "mz"
  69. ),
  70. self.create_sql_executor(
  71. config, self.pg_connection, None, output_printer, "pg", is_mz=False
  72. ),
  73. )
  74. def create_result_comparator(
  75. self, ignore_filter: GenericInconsistencyIgnoreFilter
  76. ) -> ResultComparator:
  77. return PostgresResultComparator(ignore_filter, ErrorMessageNormalizer())
  78. def create_inconsistency_ignore_filter(self) -> GenericInconsistencyIgnoreFilter:
  79. return PgInconsistencyIgnoreFilter()
  80. def create_evaluation_strategies(
  81. self, sql_executors: SqlExecutors
  82. ) -> list[EvaluationStrategy]:
  83. mz_evaluation_strategy = DataFlowRenderingEvaluation()
  84. mz_evaluation_strategy.name = "Materialize evaluation"
  85. mz_evaluation_strategy.simple_db_object_name = "mz_evaluation"
  86. return [
  87. # Materialize
  88. mz_evaluation_strategy,
  89. # Postgres
  90. PgEvaluation(),
  91. ]
  92. def create_input_data(self) -> ConsistencyTestInputData:
  93. input_data = super().create_input_data()
  94. input_data.predefined_queries.extend(create_custom_pg_consistency_queries())
  95. return input_data
  96. def filter_input_data(self, input_data: ConsistencyTestInputData) -> None:
  97. input_data.types_input.remove_types(
  98. lambda data_type: not data_type.is_pg_compatible
  99. )
  100. input_data.types_input.remove_values(
  101. lambda data_value: not data_value.is_pg_compatible
  102. )
  103. input_data.operations_input.remove_functions(
  104. lambda db_operation: not db_operation.is_pg_compatible
  105. )
  106. def main() -> int:
  107. test = PostgresConsistencyTest()
  108. parser = argparse.ArgumentParser(
  109. prog="postgres-consistency-test",
  110. formatter_class=argparse.RawDescriptionHelpFormatter,
  111. description="Test the consistency of Materialize and Postgres",
  112. )
  113. parser.add_argument("--mz-host", default="localhost", type=str)
  114. parser.add_argument("--mz-port", default=6875, type=int)
  115. parser.add_argument("--mz-system-port", default=6877, type=int)
  116. parser.add_argument("--pg-host", default="localhost", type=str)
  117. parser.add_argument("--pg-port", default=5432, type=int)
  118. parser.add_argument("--pg-password", default=None, type=str)
  119. args = test.parse_output_consistency_input_args(parser)
  120. try:
  121. mz_db_user = "materialize"
  122. mz_system_user = "mz_system"
  123. mz_connection = connect(args.mz_host, args.mz_port, mz_db_user)
  124. mz_system_connection = connect(
  125. args.mz_host, args.mz_system_port, mz_system_user
  126. )
  127. pg_db_user = "postgres"
  128. test.pg_connection = connect(
  129. args.pg_host, args.pg_port, pg_db_user, args.pg_password
  130. )
  131. except OperationalError:
  132. return 1
  133. result = test.run_output_consistency_tests(
  134. mz_connection,
  135. mz_system_connection,
  136. args,
  137. query_output_mode=QueryOutputMode.SELECT,
  138. )
  139. return 0 if result.all_passed() else 1
  140. if __name__ == "__main__":
  141. exit(main())