operation.py 11 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from enum import Enum
  10. from materialize.mz_version import MzVersion
  11. from materialize.output_consistency.data_type.data_type import DataType
  12. from materialize.output_consistency.expression.expression import Expression
  13. from materialize.output_consistency.expression.expression_characteristics import (
  14. ExpressionCharacteristics,
  15. )
  16. from materialize.output_consistency.operation.operation_args_validator import (
  17. OperationArgsValidator,
  18. )
  19. from materialize.output_consistency.operation.operation_param import OperationParam
  20. from materialize.output_consistency.operation.return_type_spec import ReturnTypeSpec
  21. EXPRESSION_PLACEHOLDER = "$"
  22. class OperationRelevance(Enum):
  23. # for testing
  24. EXTREME_HIGH = 1
  25. HIGH = 2
  26. DEFAULT = 3
  27. LOW = 4
  28. class DbOperationOrFunction:
  29. """Base class of `DbOperation` and `DbFunction`"""
  30. def __init__(
  31. self,
  32. params: list[OperationParam],
  33. min_param_count: int,
  34. max_param_count: int,
  35. return_type_spec: ReturnTypeSpec,
  36. args_validators: set[OperationArgsValidator] | None = None,
  37. is_aggregation: bool = False,
  38. is_table_function: bool = False,
  39. relevance: OperationRelevance = OperationRelevance.DEFAULT,
  40. comment: str | None = None,
  41. is_enabled: bool = True,
  42. is_pg_compatible: bool = True,
  43. tags: set[str] | None = None,
  44. since_mz_version: MzVersion | None = None,
  45. ):
  46. """
  47. :param is_enabled: an operation should only be disabled if its execution causes problems;
  48. if it just fails, it should be ignored
  49. """
  50. if args_validators is None:
  51. args_validators = set()
  52. self.params = params
  53. self.min_param_count = min_param_count
  54. self.max_param_count = max_param_count
  55. self.return_type_spec = return_type_spec
  56. self.args_validators: set[OperationArgsValidator] = args_validators
  57. self.is_aggregation = is_aggregation
  58. self.is_table_function = is_table_function
  59. self.relevance = relevance
  60. self.comment = comment
  61. self.is_enabled = is_enabled
  62. self.is_pg_compatible = is_pg_compatible
  63. self.tags = tags
  64. self.since_mz_version = since_mz_version
  65. self.added_characteristics: set[ExpressionCharacteristics] = set()
  66. def to_pattern(self, args_count: int) -> str:
  67. raise NotImplementedError
  68. def validate_args_count_in_range(self, args_count: int) -> None:
  69. if args_count < self.min_param_count:
  70. raise RuntimeError(
  71. f"To few arguments (got {args_count}, expected at least {self.min_param_count})"
  72. )
  73. if args_count > self.max_param_count:
  74. raise RuntimeError(
  75. f"To many arguments (got {args_count}, expected at most {self.max_param_count})"
  76. )
  77. def derive_characteristics(
  78. self, args: list[Expression]
  79. ) -> set[ExpressionCharacteristics]:
  80. return self.added_characteristics
  81. def __str__(self) -> str:
  82. raise NotImplementedError
  83. def operation_type_name(self) -> str:
  84. raise NotImplementedError
  85. def to_description(self, param_count: int) -> str:
  86. assert self.min_param_count <= param_count <= self.max_param_count
  87. return f"{self.operation_type_name()} '{self._to_pattern_with_named_params(param_count)}'"
  88. def _to_pattern_with_named_params(self, param_count: int) -> str:
  89. pattern = self.to_pattern(param_count)
  90. for i in range(param_count):
  91. pattern = pattern.replace("$", self.params[i].__class__.__name__, 1)
  92. return pattern
  93. def is_tagged(self, tag: str) -> bool:
  94. if self.tags is None:
  95. return False
  96. return tag in self.tags
  97. def try_resolve_exact_data_type(self, args: list[Expression]) -> DataType | None:
  98. return None
  99. def is_expected_to_cause_db_error(self, args: list[Expression]) -> bool:
  100. """checks incompatibilities (e.g., division by zero) and potential error scenarios (e.g., addition of two max
  101. data_type)
  102. """
  103. self.validate_args_count_in_range(len(args))
  104. for validator in self.args_validators:
  105. if validator.is_expected_to_cause_error(args):
  106. return True
  107. for param, arg in zip(self.params, args):
  108. if not param.supports_expression(arg):
  109. return True
  110. return False
  111. def count_variants(self) -> int:
  112. return self.max_param_count - self.min_param_count + 1
  113. class DbOperation(DbOperationOrFunction):
  114. """A database operation (e.g., `a + b`)"""
  115. def __init__(
  116. self,
  117. pattern: str,
  118. params: list[OperationParam],
  119. return_type_spec: ReturnTypeSpec,
  120. args_validators: set[OperationArgsValidator] | None = None,
  121. relevance: OperationRelevance = OperationRelevance.DEFAULT,
  122. comment: str | None = None,
  123. is_enabled: bool = True,
  124. is_pg_compatible: bool = True,
  125. tags: set[str] | None = None,
  126. since_mz_version: MzVersion | None = None,
  127. ):
  128. param_count = len(params)
  129. super().__init__(
  130. params,
  131. min_param_count=param_count,
  132. max_param_count=param_count,
  133. return_type_spec=return_type_spec,
  134. args_validators=args_validators,
  135. is_aggregation=False,
  136. relevance=relevance,
  137. comment=comment,
  138. is_enabled=is_enabled,
  139. is_pg_compatible=is_pg_compatible,
  140. tags=tags,
  141. since_mz_version=since_mz_version,
  142. )
  143. self.pattern = pattern
  144. if param_count != self.pattern.count(EXPRESSION_PLACEHOLDER):
  145. raise RuntimeError(
  146. f"Operation has pattern {self.pattern} but has only {param_count} parameters"
  147. )
  148. def to_pattern(self, args_count: int) -> str:
  149. self.validate_args_count_in_range(args_count)
  150. # wrap in parentheses
  151. return f"({self.pattern})"
  152. def __str__(self) -> str:
  153. comment = f" (comment: {self.comment})" if self.comment is not None else ""
  154. return f"DbOperation: {self.pattern}{comment}"
  155. def operation_type_name(self) -> str:
  156. return "operation"
  157. class DbFunction(DbOperationOrFunction):
  158. """A database function (e.g., `SUM(x)`)"""
  159. def __init__(
  160. self,
  161. function_name: str,
  162. params: list[OperationParam],
  163. return_type_spec: ReturnTypeSpec,
  164. args_validators: set[OperationArgsValidator] | None = None,
  165. is_aggregation: bool = False,
  166. is_table_function: bool = False,
  167. relevance: OperationRelevance = OperationRelevance.DEFAULT,
  168. comment: str | None = None,
  169. is_enabled: bool = True,
  170. is_pg_compatible: bool = True,
  171. tags: set[str] | None = None,
  172. since_mz_version: MzVersion | None = None,
  173. ):
  174. self.validate_params(params)
  175. super().__init__(
  176. params,
  177. min_param_count=self.get_min_param_count(params),
  178. max_param_count=len(params),
  179. return_type_spec=return_type_spec,
  180. args_validators=args_validators,
  181. is_aggregation=is_aggregation,
  182. is_table_function=is_table_function,
  183. relevance=relevance,
  184. comment=comment,
  185. is_enabled=is_enabled,
  186. is_pg_compatible=is_pg_compatible,
  187. tags=tags,
  188. since_mz_version=since_mz_version,
  189. )
  190. self.function_name_in_lower_case = function_name.lower()
  191. def validate_params(self, params: list[OperationParam]) -> None:
  192. optional_param_seen = False
  193. for param in params:
  194. if optional_param_seen and not param.optional:
  195. raise RuntimeError("Optional parameters must be at the end")
  196. if param.optional:
  197. optional_param_seen = True
  198. def get_min_param_count(self, params: list[OperationParam]) -> int:
  199. for index, param in enumerate(params):
  200. if param.optional:
  201. return index
  202. return len(params)
  203. def to_pattern(self, args_count: int) -> str:
  204. self.validate_args_count_in_range(args_count)
  205. args_pattern = ", ".join(["$"] * args_count)
  206. return f"{self.function_name_in_lower_case}({args_pattern})"
  207. def __str__(self) -> str:
  208. comment = f" (comment: {self.comment})" if self.comment is not None else ""
  209. return f"DbFunction: {self.function_name_in_lower_case}{comment}"
  210. def operation_type_name(self) -> str:
  211. return "function"
  212. class DbFunctionWithCustomPattern(DbFunction):
  213. def __init__(
  214. self,
  215. function_name: str,
  216. pattern_per_param_count: dict[int, str],
  217. params: list[OperationParam],
  218. return_type_spec: ReturnTypeSpec,
  219. args_validators: set[OperationArgsValidator] | None = None,
  220. is_aggregation: bool = False,
  221. is_table_function: bool = False,
  222. relevance: OperationRelevance = OperationRelevance.DEFAULT,
  223. comment: str | None = None,
  224. is_enabled: bool = True,
  225. tags: set[str] | None = None,
  226. ):
  227. super().__init__(
  228. function_name,
  229. params,
  230. return_type_spec,
  231. args_validators=args_validators,
  232. is_aggregation=is_aggregation,
  233. is_table_function=is_table_function,
  234. relevance=relevance,
  235. comment=comment,
  236. is_enabled=is_enabled,
  237. tags=tags,
  238. )
  239. self.pattern_per_param_count = pattern_per_param_count
  240. self.min_param_count = min(pattern_per_param_count.keys())
  241. self.max_param_count = max(pattern_per_param_count.keys())
  242. def to_pattern(self, args_count: int) -> str:
  243. self.validate_args_count_in_range(args_count)
  244. if args_count not in self.pattern_per_param_count:
  245. raise RuntimeError(
  246. f"No pattern specified for {self.function_name_in_lower_case} with {args_count} params"
  247. )
  248. return self.pattern_per_param_count[args_count]
  249. def count_variants(self) -> int:
  250. return len(self.pattern_per_param_count)
  251. def match_function_by_name(
  252. op: DbOperationOrFunction, function_name_in_lower_case: str
  253. ) -> bool:
  254. return (
  255. isinstance(op, DbFunction)
  256. and op.function_name_in_lower_case == function_name_in_lower_case
  257. )