extract.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Utilities to extract data from a Materialize catalog for exploration purposes.
  11. """
  12. import csv
  13. import dataclasses
  14. import json
  15. import string
  16. import textwrap
  17. from contextlib import closing
  18. from pathlib import Path
  19. from psycopg.errors import DatabaseError
  20. from materialize.mzexplore import sql
  21. from materialize.mzexplore.common import (
  22. ArrangementSizesFile,
  23. CreateFile,
  24. ExplaineeType,
  25. ExplainFile,
  26. ExplainFormat,
  27. ExplainOption,
  28. ExplainStage,
  29. ItemType,
  30. info,
  31. warn,
  32. )
  33. def defs(
  34. target: Path,
  35. database: str,
  36. schema: str,
  37. name: str,
  38. db_port: int,
  39. db_host: str,
  40. db_user: str,
  41. db_pass: str | None,
  42. db_require_ssl: bool,
  43. mzfmt: bool,
  44. ) -> None:
  45. """
  46. Extract CREATE statements for selected catalog items.
  47. Processes only items that match the ILIKE pattern defined by the parameter
  48. triple (DATABASE, SCHEMA, NAME).
  49. """
  50. # Ensure that the target dir exists.
  51. target.mkdir(parents=True, exist_ok=True)
  52. with closing(
  53. sql.Database(
  54. port=db_port,
  55. host=db_host,
  56. user=db_user,
  57. database=None,
  58. password=db_pass,
  59. require_ssl=db_require_ssl,
  60. )
  61. ) as db:
  62. output_template = string.Template(
  63. textwrap.dedent(
  64. """
  65. -- id: $id
  66. -- oid: $oid
  67. $create_sql
  68. """
  69. ).lstrip()
  70. )
  71. # Extract materialized view definitions
  72. # -------------------------------------
  73. for item in db.catalog_items(database, schema, name, system=False):
  74. item_database = sql.identifier(item["database"])
  75. item_schema = sql.identifier(item["schema"])
  76. item_name = sql.identifier(item["name"])
  77. fqname = f"{item_database}.{item_schema}.{item_name}"
  78. try:
  79. item_type = ItemType(item["type"])
  80. except ValueError:
  81. warn(f"Unsupported item type `{item['type']}` for {fqname}")
  82. continue
  83. create_file = CreateFile(
  84. database=item["database"],
  85. schema=item["schema"],
  86. name=item["name"],
  87. item_type=item_type,
  88. )
  89. if create_file.skip():
  90. continue
  91. show_create_query = item_type.show_create(fqname)
  92. if show_create_query is None:
  93. continue
  94. try:
  95. info(f"Extracting DDL for {item_type.sql()} in `{create_file.path()}`")
  96. create_sql = db.query_one(show_create_query)["create_sql"]
  97. item["create_sql"] = sql.try_mzfmt(create_sql) if mzfmt else create_sql
  98. # Ensure that the parent folder exists.
  99. (target / create_file.folder()).mkdir(parents=True, exist_ok=True)
  100. # Write the definition into the file.
  101. with (target / create_file.path()).open("w") as file:
  102. file.write(output_template.substitute(item))
  103. except DatabaseError as e:
  104. warn(f"Cannot extract DDL for {item_type.sql()} {fqname}: {e}")
  105. def plans(
  106. target: Path,
  107. database: str,
  108. schema: str,
  109. name: str,
  110. db_port: int,
  111. db_host: str,
  112. db_user: str,
  113. db_pass: str | None,
  114. db_require_ssl: bool,
  115. explainee_type: ExplaineeType,
  116. explain_options: list[ExplainOption],
  117. explain_stages: set[ExplainStage],
  118. explain_format: ExplainFormat,
  119. suffix: str | None = None,
  120. system: bool = False,
  121. ) -> None:
  122. """
  123. Extract EXPLAIN plans for selected catalog items.
  124. Processes only items that match the ILIKE pattern defined by the parameter
  125. triple (DATABASE, SCHEMA, NAME).
  126. """
  127. # Click doesn't deduplicate, so we need to convert explain_stages (which is
  128. # actually a list) into a set explicitly.
  129. explain_stages = set(explain_stages)
  130. if not explain_options:
  131. # We should have at least arity for good measure.
  132. explain_options = [ExplainOption(key="arity")]
  133. with closing(
  134. sql.Database(
  135. port=db_port,
  136. host=db_host,
  137. user=db_user,
  138. database=None,
  139. password=db_pass,
  140. require_ssl=db_require_ssl,
  141. )
  142. ) as db:
  143. for item in db.catalog_items(database, schema, name, system):
  144. item_database = sql.identifier(item["database"])
  145. item_schema = sql.identifier(item["schema"])
  146. item_name = sql.identifier(item["name"])
  147. if item["database"] == "mz": # don't prepend pseudo-database `mz`
  148. fqname = f"{item_schema}.{item_name}"
  149. else:
  150. fqname = f"{item_database}.{item_schema}.{item_name}"
  151. try:
  152. item_type = ItemType(item["type"])
  153. except ValueError:
  154. warn(f"Unsupported item type `{item['type']}` for {fqname}")
  155. continue
  156. plans: dict[ExplainFile, str] = {}
  157. if ExplaineeType.CATALOG_ITEM.contains(explainee_type):
  158. # If the item can be explained, explain the DDL
  159. explainee = explain_item(item_type, fqname, False)
  160. if explainee is not None:
  161. supported_stages = supported_explain_stages(
  162. item_type, optimize=False
  163. )
  164. for stage in explain_stages:
  165. if stage not in supported_stages:
  166. continue
  167. explain_file = ExplainFile(
  168. database=item["database"],
  169. schema=item["schema"],
  170. name=item["name"],
  171. suffix=suffix,
  172. item_type=item_type,
  173. explainee_type=ExplaineeType.CATALOG_ITEM,
  174. stage=stage,
  175. ext=explain_format.ext(),
  176. )
  177. if explain_file.skip():
  178. continue
  179. try:
  180. info(
  181. f"Explaining {stage} for {item_type.sql()} "
  182. f"in `{explain_file}`"
  183. )
  184. plans[explain_file] = explain(
  185. db,
  186. stage,
  187. explainee,
  188. explain_options,
  189. explain_format,
  190. )
  191. except DatabaseError as e:
  192. warn(
  193. f"Cannot explain {stage} for {item_type.sql()} {fqname}: "
  194. f"{e}"
  195. )
  196. continue
  197. if ExplaineeType.CREATE_STATEMENT.contains(explainee_type):
  198. # If the DDL for the plan exists, explain it as well
  199. supported_stages = supported_explain_stages(item_type, optimize=True)
  200. create_file = CreateFile(
  201. database=item["database"],
  202. schema=item["schema"],
  203. name=item["name"],
  204. item_type=item_type,
  205. )
  206. if create_file.skip():
  207. explainee = None
  208. elif not (target / create_file.path()).is_file():
  209. if set.intersection(supported_stages, explain_stages):
  210. # No CREATE file, but a supported stage is requested
  211. info(
  212. f"WARNING: Skipping EXPLAIN CREATE for {fqname}: "
  213. f"CREATE statement path `{target / create_file.path()}` does not exist."
  214. )
  215. explainee = None
  216. else:
  217. explainee = (target / create_file.path()).read_text()
  218. for stage in explain_stages:
  219. if explainee is None:
  220. continue
  221. if stage not in supported_stages:
  222. continue
  223. explain_file = ExplainFile(
  224. database=item["database"],
  225. schema=item["schema"],
  226. name=item["name"],
  227. suffix=suffix,
  228. item_type=item_type,
  229. explainee_type=ExplaineeType.CREATE_STATEMENT,
  230. stage=stage,
  231. ext=explain_format.ext(),
  232. )
  233. if explain_file.skip():
  234. continue
  235. try:
  236. info(
  237. f"Explaining {stage} for CREATE {item_type.sql()} "
  238. f"in `{explain_file}`"
  239. )
  240. plans[explain_file] = explain(
  241. db,
  242. stage,
  243. explainee,
  244. explain_options,
  245. explain_format,
  246. )
  247. except DatabaseError as e:
  248. warn(
  249. f"Cannot explain {stage} for CREATE {item_type.sql()} {fqname}: "
  250. f"{e}"
  251. )
  252. continue
  253. if ExplaineeType.REPLAN_ITEM.contains(explainee_type):
  254. # If the item can be explained, explain the DDL
  255. explainee = explain_item(item_type, fqname, True)
  256. if explainee is not None:
  257. supported_stages = supported_explain_stages(item_type, True)
  258. for stage in explain_stages:
  259. if stage not in supported_stages:
  260. continue
  261. explain_file = ExplainFile(
  262. database=item["database"],
  263. schema=item["schema"],
  264. name=item["name"],
  265. suffix=suffix,
  266. item_type=item_type,
  267. explainee_type=ExplaineeType.REPLAN_ITEM,
  268. stage=stage,
  269. ext=explain_format.ext(),
  270. )
  271. if explain_file.skip():
  272. continue
  273. try:
  274. info(
  275. f"Explaining {stage} for REPLAN {item_type.sql()} "
  276. f"in `{explain_file}`"
  277. )
  278. plans[explain_file] = explain(
  279. db,
  280. stage,
  281. explainee,
  282. explain_options,
  283. explain_format,
  284. )
  285. except DatabaseError as e:
  286. warn(
  287. f"Cannot explain {stage} for REPLAN {item_type.sql()} {fqname}: "
  288. f" {e}"
  289. )
  290. continue
  291. for explain_file, plan in plans.items():
  292. # Ensure that the parent folder exists.
  293. (target / explain_file.folder()).mkdir(parents=True, exist_ok=True)
  294. # Write the plan into the file.
  295. with (target / explain_file.path()).open("w") as file:
  296. file.write(plan)
  297. def arrangement_sizes(
  298. target: Path,
  299. cluster: str,
  300. cluster_replica: str,
  301. database: str,
  302. schema: str,
  303. name: str,
  304. db_port: int,
  305. db_host: str,
  306. db_user: str,
  307. db_pass: str | None,
  308. db_require_ssl: bool,
  309. print_results: bool,
  310. ) -> None:
  311. """
  312. Extract arrangement sizes for selected catalog items.
  313. """
  314. # Don't make pandas depedency for the entire module, but just for this
  315. # command (otherwise all `extract` clients will have to install pandas, even
  316. # if they will never call `extract.arrangement_sizes`).
  317. import pandas as pd
  318. # Ensure that the target dir exists.
  319. target.mkdir(parents=True, exist_ok=True)
  320. with closing(
  321. sql.Database(
  322. port=db_port,
  323. host=db_host,
  324. user=db_user,
  325. database=None,
  326. password=db_pass,
  327. require_ssl=db_require_ssl,
  328. )
  329. ) as db:
  330. # Extract materialized view definitions
  331. # -------------------------------------
  332. with sql.update_environment(
  333. db,
  334. env=dict(
  335. cluster=cluster,
  336. cluster_replica=cluster_replica,
  337. ),
  338. ) as db:
  339. for item in db.catalog_items(database, schema, name, system=False):
  340. item_database = sql.identifier(item["database"])
  341. item_schema = sql.identifier(item["schema"])
  342. item_name = sql.identifier(item["name"])
  343. fqname = f"{item_database}.{item_schema}.{item_name}"
  344. try:
  345. item_type = ItemType(item["type"])
  346. except ValueError:
  347. warn(f"Unsupported item type `{item['type']}` for {fqname}")
  348. continue
  349. csv_file = ArrangementSizesFile(
  350. database=item["database"],
  351. schema=item["schema"],
  352. name=item["name"],
  353. item_type=item_type,
  354. )
  355. if csv_file.skip():
  356. continue
  357. try:
  358. info(
  359. f"Extracting arrangement sizes for {item_type.sql()} "
  360. f"in `{csv_file.path()}`"
  361. )
  362. # Extract arrangement sizes into a DataFrame.
  363. df = pd.DataFrame.from_records(
  364. db.arrangement_sizes(item["id"]),
  365. coerce_float=True,
  366. )
  367. if not df.empty:
  368. # Compute a `total` row of numeric columns.
  369. df.loc["total"] = df.sum(numeric_only=True)
  370. # Ensure that the parent folder exists.
  371. (target / csv_file.folder()).mkdir(parents=True, exist_ok=True)
  372. # Write CSV to the output file.
  373. with (target / csv_file.path()).open("w") as file:
  374. df.to_csv(file, index=False, quoting=csv.QUOTE_MINIMAL)
  375. if print_results: # Print results if requested.
  376. float_format = lambda x: f"{x:_.3f}"
  377. with pd.option_context("display.float_format", float_format):
  378. print(df.to_string())
  379. # Write DataFrame string to the output file.
  380. txt_file = dataclasses.replace(csv_file, ext="txt")
  381. with (target / txt_file.path()).open("w") as file:
  382. file.write(df.to_string())
  383. except DatabaseError as e:
  384. warn(
  385. f"Cannot extract arrangement sizes for {item_type.sql()} {fqname}: "
  386. f"{e}"
  387. )
  388. # Utility methods
  389. # ---------------
  390. def explain(
  391. db: sql.Database,
  392. explain_stage: ExplainStage,
  393. explainee: str,
  394. explain_options: list[ExplainOption],
  395. explain_format: ExplainFormat,
  396. ) -> str:
  397. explain_query = "\n".join(
  398. line
  399. for line in [
  400. f"EXPLAIN {explain_stage}",
  401. f"WITH({', '.join(map(str, explain_options))})" if explain_options else "",
  402. f"AS {explain_format} FOR",
  403. explainee,
  404. ]
  405. if line != ""
  406. )
  407. if explain_stage == ExplainStage.OPTIMIZER_TRACE:
  408. return json.dumps(
  409. {
  410. "explainee": {"query": explainee},
  411. "list": [
  412. {"id": id, **entry}
  413. for (id, entry) in enumerate(db.query_all(explain_query))
  414. ],
  415. },
  416. indent=4,
  417. )
  418. else:
  419. return next(iter(db.query_one(explain_query).values()))
  420. def explain_item(item_type: ItemType, fqname: str, replan: bool) -> str | None:
  421. prefix = "REPLAN" if replan else ""
  422. if item_type in {ItemType.MATERIALIZED_VIEW, ItemType.VIEW, ItemType.INDEX}:
  423. return " ".join((prefix, item_type.sql(), fqname)).strip()
  424. else:
  425. return None
  426. def supported_explain_stages(item_type: ItemType, optimize: bool) -> set[ExplainStage]:
  427. if item_type == ItemType.MATERIALIZED_VIEW:
  428. if optimize:
  429. return {
  430. ExplainStage.RAW_PLAN,
  431. ExplainStage.DECORRELATED_PLAN,
  432. ExplainStage.LOCAL_PLAN,
  433. ExplainStage.OPTIMIZED_PLAN,
  434. ExplainStage.PHYSICAL_PLAN,
  435. ExplainStage.OPTIMIZER_TRACE,
  436. }
  437. else:
  438. return {
  439. ExplainStage.RAW_PLAN,
  440. ExplainStage.LOCAL_PLAN,
  441. ExplainStage.OPTIMIZED_PLAN,
  442. ExplainStage.PHYSICAL_PLAN,
  443. }
  444. elif item_type == ItemType.VIEW:
  445. if optimize:
  446. return {
  447. ExplainStage.RAW_PLAN,
  448. ExplainStage.DECORRELATED_PLAN,
  449. ExplainStage.LOCAL_PLAN,
  450. ExplainStage.OPTIMIZER_TRACE,
  451. }
  452. else:
  453. return {
  454. ExplainStage.RAW_PLAN,
  455. ExplainStage.LOCAL_PLAN,
  456. }
  457. elif item_type == ItemType.INDEX:
  458. if optimize:
  459. return {
  460. ExplainStage.OPTIMIZED_PLAN,
  461. ExplainStage.PHYSICAL_PLAN,
  462. ExplainStage.OPTIMIZER_TRACE,
  463. }
  464. else:
  465. return {
  466. ExplainStage.OPTIMIZED_PLAN,
  467. ExplainStage.PHYSICAL_PLAN,
  468. }
  469. else:
  470. return set()