common.py 12 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import re
  10. from dataclasses import dataclass, replace
  11. from enum import Enum
  12. from importlib import resources
  13. from pathlib import Path
  14. from typing import cast
  15. import click
  16. from pg8000.native import literal
  17. class ExplaineeType(Enum):
  18. CREATE_STATEMENT = 1
  19. CATALOG_ITEM = 2
  20. REPLAN_ITEM = 4
  21. ALL = 7
  22. def __str__(self):
  23. return self.name.lower()
  24. def contains(self, other: "ExplaineeType") -> bool:
  25. return (self.value & other.value) > 0
  26. class ExplainFormat(Enum):
  27. TEXT = "TEXT"
  28. JSON = "JSON"
  29. def __str__(self):
  30. return self.name
  31. def ext(self):
  32. if self == ExplainFormat.JSON:
  33. return "json"
  34. return "txt"
  35. class ExplainStage(str, Enum):
  36. RAW_PLAN = "RAW PLAN"
  37. DECORRELATED_PLAN = "DECORRELATED PLAN"
  38. LOCAL_PLAN = "LOCALLY OPTIMIZED PLAN"
  39. OPTIMIZED_PLAN = "OPTIMIZED PLAN"
  40. PHYSICAL_PLAN = "PHYSICAL PLAN"
  41. OPTIMIZER_TRACE = "OPTIMIZER TRACE"
  42. def __str__(self):
  43. return self.value
  44. @dataclass(frozen=True)
  45. class ExplainOption:
  46. key: str
  47. val: str | bool | int | None = None
  48. def __str__(self):
  49. key = self.key.replace("_", " ").upper()
  50. if self.val is None:
  51. return f"{key}"
  52. else:
  53. return f"{key} = {literal(self.val)}" # type: ignore
  54. def affects_pipeline(self) -> bool:
  55. """
  56. Returns true iff the `EXPLAIN` feature flag might affect the output of
  57. the optimizer pipeline.
  58. """
  59. return any(
  60. (
  61. self.key.lower() == "reoptimize_imported_views",
  62. self.key.lower().startswith("enable_"),
  63. )
  64. )
  65. class ExplainOptionType(click.ParamType):
  66. name = "ExplainOption"
  67. pattern = re.compile(
  68. r"\s*(?P<key>[a-z0-9_]+)\s*(=\s*(?P<val>[a-z0-9_]+))?",
  69. re.IGNORECASE,
  70. )
  71. def convert(self, value, param, ctx): # type: ignore
  72. m = ExplainOptionType.pattern.match(value)
  73. try:
  74. if m is None:
  75. raise ValueError(f"bad {self.name} format")
  76. key = str(m.group("key"))
  77. val = str(m.group("val")) if m.group("val") else None
  78. if val is None:
  79. return ExplainOption(
  80. key=key,
  81. val=None,
  82. )
  83. try: # try converting to bool
  84. bool_values = dict(
  85. on=True,
  86. true=True,
  87. yes=True,
  88. y=True,
  89. off=False,
  90. false=False,
  91. no=False,
  92. n=False,
  93. )
  94. return ExplainOption(
  95. key=key.lower(),
  96. val=bool_values[val.lower()],
  97. )
  98. except KeyError:
  99. pass
  100. try: # try converting to integer
  101. return ExplainOption(
  102. key=key,
  103. val=int(val),
  104. )
  105. except ValueError:
  106. pass
  107. # cannot convert: use a single-quoted string
  108. return ExplainOption(
  109. key=key,
  110. val=val,
  111. )
  112. except Exception as e:
  113. raise ValueError(f"Bad explain option: {value}: {e!r}") from e
  114. class ItemType(str, Enum):
  115. CONNECTION = "connection"
  116. FUNCTION = "function"
  117. INDEX = "index"
  118. MATERIALIZED_VIEW = "materialized-view"
  119. SECRET = "secret"
  120. SINK = "sink"
  121. SOURCE = "source"
  122. TABLE = "table"
  123. TYPE = "type"
  124. VIEW = "view"
  125. def sql(self) -> str:
  126. """Return the SQL string corresponding to this item type."""
  127. return self.replace("-", " ").upper()
  128. def show_create(self, fqname: str) -> str | None:
  129. """
  130. Return a show create query for an item of the given type identified by
  131. the given `fqname` or `None` for item types that are currently not
  132. supported.
  133. """
  134. if self in [
  135. ItemType.INDEX,
  136. ItemType.MATERIALIZED_VIEW,
  137. ItemType.SOURCE,
  138. ItemType.TABLE,
  139. ItemType.VIEW,
  140. ]:
  141. return f"SHOW CREATE {self.sql()} {fqname}"
  142. else: # unsupported item type
  143. return None
  144. @dataclass(frozen=True)
  145. class CreateFile:
  146. database: str
  147. schema: str
  148. name: str
  149. item_type: ItemType
  150. def file_name(self) -> str:
  151. return f"{self.name}.sql"
  152. def folder(self) -> Path:
  153. return Path(self.item_type.value, self.database, self.schema)
  154. def path(self) -> Path:
  155. return self.folder() / self.file_name()
  156. def __str__(self) -> str:
  157. return str(self.path())
  158. def skip(self) -> bool:
  159. # Skip _progress sources (they don't have a DDL)
  160. if self.item_type == ItemType.SOURCE:
  161. return self.name.endswith("_progress")
  162. # Skip items with database, schema, or item names that contain a `/`
  163. # (not a valid UNIX folder character).
  164. elif "/" in self.database:
  165. warn(f"Skip processing of item with bad database name: `{self.database}`")
  166. return True
  167. elif "/" in self.schema:
  168. warn(f"Skip processing of item with bad schema name: `{self.schema}`")
  169. return True
  170. elif "/" in self.name:
  171. warn(f"Skip processing of item with bad item name: `{self.name}`")
  172. return True
  173. # All good!
  174. else:
  175. return False
  176. @dataclass(frozen=True)
  177. class ExplainFile:
  178. database: str
  179. schema: str
  180. name: str
  181. suffix: str | None
  182. item_type: ItemType
  183. explainee_type: ExplaineeType
  184. stage: ExplainStage
  185. ext: str
  186. def file_name(self) -> str:
  187. return ".".join(
  188. str(part)
  189. for part in [
  190. self.name,
  191. self.explainee_type,
  192. self.stage.name.lower(),
  193. self.suffix,
  194. "json" if self.stage == ExplainStage.OPTIMIZER_TRACE else self.ext,
  195. ]
  196. if part
  197. )
  198. def folder(self) -> Path:
  199. return Path(self.item_type.value, self.database, self.schema)
  200. def path(self) -> Path:
  201. return self.folder() / self.file_name()
  202. def __str__(self) -> str:
  203. return str(self.path())
  204. def skip(self) -> bool:
  205. # Skip items with database, schema, or item names that contain a `/`
  206. # (not a valid UNIX folder character).
  207. if "/" in self.database:
  208. warn(f"Skip processing of item with bad database name: `{self.database}`")
  209. return True
  210. elif "/" in self.schema:
  211. warn(f"Skip processing of item with bad schema name: `{self.schema}`")
  212. return True
  213. elif "/" in self.name:
  214. warn(f"Skip processing of item with bad item name: `{self.name}`")
  215. return True
  216. # All good!
  217. else:
  218. return False
  219. def explain_file(path: Path) -> ExplainFile | None:
  220. filename = path.name.split(".")
  221. if len(filename) == 5:
  222. ext = filename.pop()
  223. suffix = filename.pop()
  224. stage = filename.pop()
  225. explainee_type = filename.pop()
  226. name = filename.pop()
  227. elif len(filename) == 4:
  228. ext = filename.pop()
  229. suffix = None
  230. stage = filename.pop()
  231. explainee_type = filename.pop()
  232. name = filename.pop()
  233. else:
  234. return None
  235. parents = list(path.parents)
  236. parents.reverse()
  237. schema = parents.pop().name
  238. database = parents.pop().name
  239. item_type = parents.pop().name
  240. try:
  241. return ExplainFile(
  242. database=database,
  243. schema=schema,
  244. name=name,
  245. suffix=suffix,
  246. item_type=ItemType(item_type.lower()),
  247. explainee_type=ExplaineeType[explainee_type.upper()],
  248. stage=ExplainStage[stage.upper()],
  249. ext=ext,
  250. )
  251. except (KeyError, ValueError):
  252. return None
  253. def explain_diff(base: ExplainFile, diff_suffix: str) -> ExplainFile:
  254. return replace(base, explainee_type=ExplaineeType.REPLAN_ITEM, suffix=diff_suffix)
  255. @dataclass(frozen=True)
  256. class ClonedItem:
  257. database: str
  258. schema: str
  259. name: str
  260. id: str
  261. item_type: ItemType
  262. def name_old(self) -> str:
  263. from materialize.mzexplore import sql
  264. return f"{sql.identifier(self.name, True)}"
  265. def name_new(self) -> str:
  266. from materialize.mzexplore import sql
  267. if self.item_type == ItemType.INDEX:
  268. name = f"{self.name}_{self.id}"
  269. else:
  270. name = f"{self.database}_{self.schema}_{self.name}_{self.id}"
  271. return f"{sql.identifier(name, True)}"
  272. def fqname_old(self) -> str:
  273. from materialize.mzexplore import sql
  274. item_database = sql.identifier(self.database, True)
  275. item_schema = sql.identifier(self.schema, True)
  276. item_name = sql.identifier(self.name, True)
  277. return f"{item_database}.{item_schema}.{item_name}"
  278. def fqname_new(self, database: str, schema: str) -> str:
  279. return f"{database}.{schema}.{self.name_new()}"
  280. def create_name_old(self) -> str:
  281. if self.item_type == ItemType.INDEX:
  282. return self.name_old()
  283. else:
  284. return self.fqname_old()
  285. def create_name_new(self, database: str, schema: str) -> str:
  286. if self.item_type == ItemType.INDEX:
  287. return self.name_new()
  288. else:
  289. return self.fqname_new(database, schema)
  290. def aliased_ref_old(self) -> str:
  291. return f"{self.fqname_old()} AS"
  292. def aliased_ref_new(self, database: str, schema: str) -> str:
  293. return f"{self.fqname_new(database, schema)} AS"
  294. def index_on_ref_old(self) -> str:
  295. return f"ON {self.fqname_old()}"
  296. def index_on_ref_new(self, database: str, schema: str) -> str:
  297. return f"ON {self.fqname_new(database, schema)}"
  298. def simple_ref_old(self) -> str:
  299. return self.fqname_old()
  300. def simple_ref_new(self, database: str, schema: str) -> str:
  301. return f"{self.fqname_new(database, schema)} AS {self.name_old()}"
  302. @dataclass(frozen=True)
  303. class ArrangementSizesFile:
  304. database: str
  305. schema: str
  306. name: str
  307. item_type: ItemType
  308. ext: str = "csv"
  309. def file_name(self) -> str:
  310. return f"{self.name}.arrangement-sizes.{self.ext}"
  311. def folder(self) -> Path:
  312. return Path(self.item_type.value, self.database, self.schema)
  313. def path(self) -> Path:
  314. return self.folder() / self.file_name()
  315. def __str__(self) -> str:
  316. return str(self.path())
  317. def skip(self) -> bool:
  318. # Skip items with database, schema, or item names that contain a `/`
  319. # (not a valid UNIX folder character).
  320. if "/" in self.database:
  321. warn(f"Skip processing of item with bad database name: `{self.database}`")
  322. return True
  323. elif "/" in self.schema:
  324. warn(f"Skip processing of item with bad schema name: `{self.schema}`")
  325. return True
  326. elif "/" in self.name:
  327. warn(f"Skip processing of item with bad item name: `{self.name}`")
  328. return True
  329. else:
  330. # Arrangements only exists for the following item types
  331. return self.item_type not in {ItemType.MATERIALIZED_VIEW, ItemType.INDEX}
  332. def resource_path(name: str) -> Path:
  333. # NOTE: we have to do this cast because pyright is not comfortable with the
  334. # Traversable protocol.
  335. return cast(Path, resources.files(__package__)) / name
  336. def info(msg: str, fg: str = "green") -> None:
  337. click.secho(msg, fg=fg)
  338. def warn(msg: str, fg: str = "yellow") -> None:
  339. click.secho(msg, fg=fg, err=True)
  340. def err(msg: str, fg: str = "red") -> None:
  341. click.secho(msg, fg=fg, err=True)