source_table_migration.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Utilities for testing the source table migration"""
  10. from materialize.mz_version import MzVersion
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.version_list import get_published_minor_mz_versions
  13. def verify_sources_after_source_table_migration(
  14. c: Composition, file: str, fail: bool = False
  15. ) -> None:
  16. source_names_rows = c.sql_query(
  17. "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
  18. )
  19. source_names = [row[0] for row in source_names_rows]
  20. print(f"Sources created in {file} are: {source_names}")
  21. c.sql("SET statement_timeout = '20s'")
  22. for source_name in source_names:
  23. _verify_source(c, file, source_name, fail=fail)
  24. def _verify_source(
  25. c: Composition, file: str, source_name: str, fail: bool = False
  26. ) -> None:
  27. try:
  28. print(f"Checking source: {source_name}")
  29. # must not crash
  30. statement = f"SELECT count(*) FROM {source_name};"
  31. c.sql_query(statement)
  32. statement = f"SHOW CREATE SOURCE {source_name};"
  33. result = c.sql_query(statement)
  34. sql = result[0][1]
  35. assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
  36. assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"
  37. if not source_name.endswith("_progress"):
  38. assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"
  39. print("OK.")
  40. except Exception as e:
  41. print(f"source-table-migration issue in {file}: {str(e)}")
  42. if fail:
  43. raise e
  44. _last_version: MzVersion | None = None
  45. def get_old_image_for_source_table_migration_test() -> str:
  46. global _last_version
  47. if _last_version is None:
  48. current_version = MzVersion.parse_cargo()
  49. minor_versions = [
  50. v
  51. for v in get_published_minor_mz_versions(
  52. limit=4, exclude_current_minor_version=True
  53. )
  54. if v < current_version
  55. ]
  56. _last_version = minor_versions[0]
  57. return f"materialize/materialized:{_last_version}"
  58. def get_new_image_for_source_table_migration_test() -> str | None:
  59. return None