view_capabilities.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from typing import Union
  10. from materialize.zippy.debezium_capabilities import DebeziumSourceExists
  11. from materialize.zippy.mysql_cdc_capabilities import MySqlCdcTableExists
  12. from materialize.zippy.pg_cdc_capabilities import PostgresCdcTableExists
  13. from materialize.zippy.source_capabilities import SourceExists
  14. from materialize.zippy.table_capabilities import TableExists
  15. from materialize.zippy.watermarked_object_capabilities import WatermarkedObjectExists
  16. from materialize.zippy.watermarks import Watermarks
  17. WatermarkedObjectExistss = list[
  18. Union[
  19. TableExists,
  20. SourceExists,
  21. "ViewExists",
  22. DebeziumSourceExists,
  23. PostgresCdcTableExists,
  24. MySqlCdcTableExists,
  25. ]
  26. ]
  27. class ViewExists(WatermarkedObjectExists):
  28. """A view exists in Materialize."""
  29. @classmethod
  30. def format_str(cls) -> str:
  31. return "view_{}"
  32. def __init__(
  33. self,
  34. name: str,
  35. inputs: list[WatermarkedObjectExists],
  36. expensive_aggregates: bool | None = None,
  37. has_index: bool = False,
  38. ) -> None:
  39. self.name = name
  40. self.inputs = inputs
  41. self.expensive_aggregates = expensive_aggregates
  42. self.has_index = has_index
  43. def get_watermarks(self) -> Watermarks:
  44. """Calculate the intersection of the mins/maxs of the inputs. The result from the view should match the calculation."""
  45. return Watermarks(
  46. min_watermark=max([f.get_watermarks().min for f in self.inputs]),
  47. max_watermark=min([f.get_watermarks().max for f in self.inputs]),
  48. )