mzexplore.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from pathlib import Path
  10. from typing import Any
  11. import click
  12. import materialize.mzexplore as api
  13. import materialize.mzexplore.common as common
  14. # import logging
  15. # logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
  16. # Click CLI Application
  17. # ---------------------
  18. @click.group()
  19. def app() -> None:
  20. pass
  21. class Arg:
  22. repository: dict[str, Any] = dict(
  23. type=click.Path(
  24. exists=False,
  25. file_okay=False,
  26. dir_okay=True,
  27. writable=True,
  28. readable=True,
  29. resolve_path=True,
  30. ),
  31. callback=lambda ctx, param, value: Path(value), # type: ignore
  32. )
  33. output_file: dict[str, Any] = dict(
  34. type=click.Path(
  35. exists=False,
  36. file_okay=True,
  37. dir_okay=False,
  38. writable=True,
  39. readable=True,
  40. resolve_path=True,
  41. ),
  42. callback=lambda ctx, param, value: Path(value), # type: ignore
  43. )
  44. base_suffix: dict[str, Any] = dict(
  45. type=str,
  46. metavar="BASE",
  47. )
  48. diff_suffix: dict[str, Any] = dict(
  49. type=str,
  50. metavar="DIFF",
  51. )
  52. class Opt:
  53. db_port: dict[str, Any] = dict(
  54. default=6877,
  55. help="DB connection port.",
  56. envvar="PGPORT",
  57. )
  58. db_host: dict[str, Any] = dict(
  59. default="localhost",
  60. help="DB connection host.",
  61. envvar="PGHOST",
  62. )
  63. db_user: dict[str, Any] = dict(
  64. default="mz_support",
  65. help="DB connection user.",
  66. envvar="PGUSER",
  67. )
  68. db_pass: dict[str, Any] = dict(
  69. default=None,
  70. help="DB connection password.",
  71. envvar="PGPASSWORD",
  72. )
  73. db_require_ssl: dict[str, Any] = dict(
  74. is_flag=True,
  75. help="DB connection requires SSL.",
  76. envvar="PGREQUIRESSL",
  77. )
  78. mzfmt: dict[str, Any] = dict(
  79. default=True,
  80. help="Format SQL statements with `mzfmt` if present.",
  81. )
  82. explainee_type: dict[str, Any] = dict(
  83. type=click.Choice([v.name.lower() for v in list(api.ExplaineeType)]),
  84. default="catalog_item",
  85. callback=lambda ctx, param, v: api.ExplaineeType[v.upper()], # type: ignore
  86. help="EXPLAIN mode.",
  87. metavar="MODE",
  88. )
  89. explain_options: dict[str, Any] = dict(
  90. type=api.ExplainOptionType(),
  91. multiple=True,
  92. help="WITH key=val pairs to pass to the EXPLAIN command.",
  93. metavar="KEY=VAL",
  94. )
  95. explain_stage: dict[str, Any] = dict(
  96. type=click.Choice([str(v.name.lower()) for v in list(api.ExplainStage)]),
  97. multiple=True,
  98. default=["optimized_plan"], # Most often we'll only the optimized plan.
  99. callback=lambda ctx, param, vals: [api.ExplainStage[v.upper()] for v in vals], # type: ignore
  100. help="EXPLAIN stage to export.",
  101. metavar="STAGE",
  102. )
  103. explain_suffix: dict[str, Any] = dict(
  104. type=str,
  105. default="",
  106. help="A suffix to append to the EXPLAIN output files.",
  107. metavar="SUFFIX",
  108. )
  109. explain_format: dict[str, Any] = dict(
  110. type=click.Choice([str(v.name.lower()) for v in list(api.ExplainFormat)]),
  111. default="text",
  112. callback=lambda ctx, param, v: api.ExplainFormat[v.upper()], # type: ignore
  113. help="AS [FORMAT] clause to pass to the EXPLAIN command.",
  114. metavar="FORMAT",
  115. )
  116. system: dict[str, Any] = dict(
  117. is_flag=True,
  118. show_default=True,
  119. default=False,
  120. help="Inspect system or user tables.",
  121. )
  122. def is_documented_by(original: Any) -> Any:
  123. def wrapper(target):
  124. target.__doc__ = original.__doc__
  125. return target
  126. return wrapper
  127. @app.group()
  128. @is_documented_by(api.extract)
  129. def extract() -> None:
  130. pass
  131. @extract.command(name="defs")
  132. @click.argument("target", **Arg.repository)
  133. @click.argument("database", type=str)
  134. @click.argument("schema", type=str)
  135. @click.argument("name", type=str)
  136. @click.option("--db-port", **Opt.db_port)
  137. @click.option("--db-host", **Opt.db_host)
  138. @click.option("--db-user", **Opt.db_user)
  139. @click.option("--db-pass", **Opt.db_pass)
  140. @click.option("--db-require-ssl", **Opt.db_require_ssl)
  141. @click.option("--mzfmt/--no-mzfmt", **Opt.mzfmt)
  142. @is_documented_by(api.extract.defs)
  143. def extract_defs(
  144. target: Path,
  145. database: str,
  146. schema: str,
  147. name: str,
  148. db_port: int,
  149. db_host: str,
  150. db_user: str,
  151. db_pass: str | None,
  152. db_require_ssl: bool,
  153. mzfmt: bool,
  154. ) -> None:
  155. try:
  156. api.extract.defs(
  157. target=target,
  158. database=database,
  159. schema=schema,
  160. name=name,
  161. db_port=db_port,
  162. db_host=db_host,
  163. db_user=db_user,
  164. db_pass=db_pass,
  165. db_require_ssl=db_require_ssl,
  166. mzfmt=mzfmt,
  167. )
  168. except Exception as e:
  169. import traceback
  170. traceback.print_tb(e.__traceback__)
  171. raise click.ClickException(f"extract defs command failed: {e=}, {type(e)=}")
  172. @extract.command(name="plans")
  173. @click.argument("target", **Arg.repository)
  174. @click.argument("database", type=str)
  175. @click.argument("schema", type=str)
  176. @click.argument("name", type=str)
  177. @click.option("--db-port", **Opt.db_port)
  178. @click.option("--db-host", **Opt.db_host)
  179. @click.option("--db-user", **Opt.db_user)
  180. @click.option("--db-pass", **Opt.db_pass)
  181. @click.option("--db-require-ssl", **Opt.db_require_ssl)
  182. @click.option("--explainee-type", "-t", **Opt.explainee_type)
  183. @click.option("--with", "-w", "explain_options", **Opt.explain_options)
  184. @click.option("--stage", "-s", "explain_stages", **Opt.explain_stage)
  185. @click.option("--format", "-f", "explain_format", **Opt.explain_format)
  186. @click.option("--system/--user", "system", **Opt.system)
  187. @click.option("--suffix", **Opt.explain_suffix)
  188. @is_documented_by(api.extract.plans)
  189. def extract_plans(
  190. target: Path,
  191. database: str,
  192. schema: str,
  193. name: str,
  194. db_port: int,
  195. db_host: str,
  196. db_user: str,
  197. db_pass: str | None,
  198. db_require_ssl: bool,
  199. explainee_type: api.ExplaineeType,
  200. explain_options: list[api.ExplainOption],
  201. explain_stages: set[api.ExplainStage],
  202. explain_format: api.ExplainFormat,
  203. system: bool,
  204. suffix: str | None = None,
  205. ) -> None:
  206. try:
  207. api.extract.plans(
  208. target=target,
  209. database=database,
  210. schema=schema,
  211. name=name,
  212. db_port=db_port,
  213. db_host=db_host,
  214. db_user=db_user,
  215. db_pass=db_pass,
  216. db_require_ssl=db_require_ssl,
  217. explainee_type=explainee_type,
  218. explain_options=explain_options,
  219. explain_stages=explain_stages,
  220. explain_format=explain_format,
  221. system=system,
  222. suffix=suffix,
  223. )
  224. except Exception as e:
  225. import traceback
  226. traceback.print_tb(e.__traceback__)
  227. raise click.ClickException(f"extract plans command failed: {e=}, {type(e)=}")
  228. @extract.command(name="arrangement-sizes")
  229. @click.argument("target", **Arg.repository)
  230. @click.argument("cluster", type=str)
  231. @click.argument("cluster_replica", type=str)
  232. @click.argument("database", type=str)
  233. @click.argument("schema", type=str)
  234. @click.argument("name", type=str)
  235. @click.option("--db-port", **Opt.db_port)
  236. @click.option("--db-host", **Opt.db_host)
  237. @click.option("--db-user", **Opt.db_user)
  238. @click.option("--db-pass", **Opt.db_pass)
  239. @click.option("--db-require-ssl", **Opt.db_require_ssl)
  240. @click.option("--print-results", is_flag=True, default=False)
  241. @is_documented_by(api.extract.arrangement_sizes)
  242. def extract_arrangement_sizes(
  243. target: Path,
  244. cluster: str,
  245. cluster_replica: str,
  246. database: str,
  247. schema: str,
  248. name: str,
  249. db_port: int,
  250. db_host: str,
  251. db_user: str,
  252. db_pass: str | None,
  253. db_require_ssl: bool,
  254. print_results: bool,
  255. ) -> None:
  256. try:
  257. api.extract.arrangement_sizes(
  258. target=target,
  259. cluster=cluster,
  260. cluster_replica=cluster_replica,
  261. database=database,
  262. schema=schema,
  263. name=name,
  264. db_port=db_port,
  265. db_host=db_host,
  266. db_user=db_user,
  267. db_pass=db_pass,
  268. db_require_ssl=db_require_ssl,
  269. print_results=print_results,
  270. )
  271. except Exception as e:
  272. import traceback
  273. traceback.print_tb(e.__traceback__)
  274. raise click.ClickException(
  275. f"extract arrangement-sizes command failed: {e=}, {type(e)=}"
  276. )
  277. @app.group()
  278. @is_documented_by(api.analyze)
  279. def analyze() -> None:
  280. pass
  281. @analyze.command(name="changes")
  282. @click.argument("target", **Arg.repository) # type: ignore
  283. @click.argument("summary_file", **Arg.output_file) # type: ignore
  284. @click.argument("base_suffix", **Arg.base_suffix)
  285. @click.argument("diff_suffix", **Arg.diff_suffix)
  286. @is_documented_by(api.analyze.changes)
  287. def analyze_changes(
  288. target: Path,
  289. summary_file: Path,
  290. base_suffix: str,
  291. diff_suffix: str,
  292. ) -> None:
  293. """
  294. Prepare an analysis report of plan changes as a Markdown document.
  295. """
  296. try:
  297. with summary_file.open("a+", encoding="utf-8") as out:
  298. api.analyze.changes(
  299. out=out,
  300. target=target,
  301. header_name=str(target),
  302. base_suffix=base_suffix,
  303. diff_suffix=diff_suffix,
  304. )
  305. common.info(f"Summary written to {summary_file}")
  306. except Exception as e:
  307. import traceback
  308. traceback.print_tb(e.__traceback__)
  309. msg = f"analyze changes command failed: {e=}, {type(e)=}"
  310. raise click.ClickException(msg) from e
  311. @app.group()
  312. @is_documented_by(api.clone)
  313. def clone() -> None:
  314. pass
  315. @clone.command(name="defs")
  316. @click.argument("database", type=str)
  317. @click.argument("schema", type=str)
  318. @click.argument("cluster", type=str)
  319. @click.argument("object_ids", type=str, nargs=-1)
  320. @click.argument("ddl_file", **Arg.output_file) # type: ignore
  321. @click.option("--db-port", **Opt.db_port)
  322. @click.option("--db-host", **Opt.db_host)
  323. @click.option("--db-user", **Opt.db_user)
  324. @click.option("--db-pass", **Opt.db_pass)
  325. @click.option("--db-require-ssl", **Opt.db_require_ssl)
  326. @click.option("--mzfmt/--no-mzfmt", **Opt.mzfmt)
  327. @is_documented_by(api.clone.defs)
  328. def clone_defs(
  329. database: str,
  330. schema: str,
  331. cluster: str,
  332. object_ids: list[str],
  333. ddl_file: Path,
  334. db_port: int,
  335. db_host: str,
  336. db_user: str,
  337. db_pass: str | None,
  338. db_require_ssl: bool,
  339. mzfmt: bool,
  340. ) -> None:
  341. try:
  342. cmp_file = Path(ddl_file.parent, f"__compare__{ddl_file.name}")
  343. with ddl_file.open("w", encoding="utf-8") as ddl_out:
  344. with cmp_file.open("w", encoding="utf-8") as cmp_out:
  345. api.clone.defs(
  346. ddl_out=ddl_out,
  347. cmp_out=cmp_out,
  348. database=database,
  349. schema=schema,
  350. cluster=cluster,
  351. object_ids=object_ids,
  352. db_port=db_port,
  353. db_host=db_host,
  354. db_user=db_user,
  355. db_pass=db_pass,
  356. db_require_ssl=db_require_ssl,
  357. mzfmt=mzfmt,
  358. )
  359. common.info(f"Modified DDL written to {ddl_file}")
  360. common.info(f"Original DDL written to {cmp_file}")
  361. common.warn("Please inspect the diff between the two!!!")
  362. except Exception as e:
  363. import traceback
  364. traceback.print_tb(e.__traceback__)
  365. msg = f"clone defs command failed: {e=}, {type(e)=}"
  366. raise click.ClickException(msg) from e
  367. # Entrypoint
  368. # ----------
  369. if __name__ == "__main__":
  370. app()