123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Utilities to extract data from a Materialize catalog for exploration purposes.
- """
- import csv
- import dataclasses
- import json
- import string
- import textwrap
- from contextlib import closing
- from pathlib import Path
- from psycopg.errors import DatabaseError
- from materialize.mzexplore import sql
- from materialize.mzexplore.common import (
- ArrangementSizesFile,
- CreateFile,
- ExplaineeType,
- ExplainFile,
- ExplainFormat,
- ExplainOption,
- ExplainStage,
- ItemType,
- info,
- warn,
- )
- def defs(
- target: Path,
- database: str,
- schema: str,
- name: str,
- db_port: int,
- db_host: str,
- db_user: str,
- db_pass: str | None,
- db_require_ssl: bool,
- mzfmt: bool,
- ) -> None:
- """
- Extract CREATE statements for selected catalog items.
- Processes only items that match the ILIKE pattern defined by the parameter
- triple (DATABASE, SCHEMA, NAME).
- """
- # Ensure that the target dir exists.
- target.mkdir(parents=True, exist_ok=True)
- with closing(
- sql.Database(
- port=db_port,
- host=db_host,
- user=db_user,
- database=None,
- password=db_pass,
- require_ssl=db_require_ssl,
- )
- ) as db:
- output_template = string.Template(
- textwrap.dedent(
- """
- -- id: $id
- -- oid: $oid
- $create_sql
- """
- ).lstrip()
- )
- # Extract materialized view definitions
- # -------------------------------------
- for item in db.catalog_items(database, schema, name, system=False):
- item_database = sql.identifier(item["database"])
- item_schema = sql.identifier(item["schema"])
- item_name = sql.identifier(item["name"])
- fqname = f"{item_database}.{item_schema}.{item_name}"
- try:
- item_type = ItemType(item["type"])
- except ValueError:
- warn(f"Unsupported item type `{item['type']}` for {fqname}")
- continue
- create_file = CreateFile(
- database=item["database"],
- schema=item["schema"],
- name=item["name"],
- item_type=item_type,
- )
- if create_file.skip():
- continue
- show_create_query = item_type.show_create(fqname)
- if show_create_query is None:
- continue
- try:
- info(f"Extracting DDL for {item_type.sql()} in `{create_file.path()}`")
- create_sql = db.query_one(show_create_query)["create_sql"]
- item["create_sql"] = sql.try_mzfmt(create_sql) if mzfmt else create_sql
- # Ensure that the parent folder exists.
- (target / create_file.folder()).mkdir(parents=True, exist_ok=True)
- # Write the definition into the file.
- with (target / create_file.path()).open("w") as file:
- file.write(output_template.substitute(item))
- except DatabaseError as e:
- warn(f"Cannot extract DDL for {item_type.sql()} {fqname}: {e}")
- def plans(
- target: Path,
- database: str,
- schema: str,
- name: str,
- db_port: int,
- db_host: str,
- db_user: str,
- db_pass: str | None,
- db_require_ssl: bool,
- explainee_type: ExplaineeType,
- explain_options: list[ExplainOption],
- explain_stages: set[ExplainStage],
- explain_format: ExplainFormat,
- suffix: str | None = None,
- system: bool = False,
- ) -> None:
- """
- Extract EXPLAIN plans for selected catalog items.
- Processes only items that match the ILIKE pattern defined by the parameter
- triple (DATABASE, SCHEMA, NAME).
- """
- # Click doesn't deduplicate, so we need to convert explain_stages (which is
- # actually a list) into a set explicitly.
- explain_stages = set(explain_stages)
- if not explain_options:
- # We should have at least arity for good measure.
- explain_options = [ExplainOption(key="arity")]
- with closing(
- sql.Database(
- port=db_port,
- host=db_host,
- user=db_user,
- database=None,
- password=db_pass,
- require_ssl=db_require_ssl,
- )
- ) as db:
- for item in db.catalog_items(database, schema, name, system):
- item_database = sql.identifier(item["database"])
- item_schema = sql.identifier(item["schema"])
- item_name = sql.identifier(item["name"])
- if item["database"] == "mz": # don't prepend pseudo-database `mz`
- fqname = f"{item_schema}.{item_name}"
- else:
- fqname = f"{item_database}.{item_schema}.{item_name}"
- try:
- item_type = ItemType(item["type"])
- except ValueError:
- warn(f"Unsupported item type `{item['type']}` for {fqname}")
- continue
- plans: dict[ExplainFile, str] = {}
- if ExplaineeType.CATALOG_ITEM.contains(explainee_type):
- # If the item can be explained, explain the DDL
- explainee = explain_item(item_type, fqname, False)
- if explainee is not None:
- supported_stages = supported_explain_stages(
- item_type, optimize=False
- )
- for stage in explain_stages:
- if stage not in supported_stages:
- continue
- explain_file = ExplainFile(
- database=item["database"],
- schema=item["schema"],
- name=item["name"],
- suffix=suffix,
- item_type=item_type,
- explainee_type=ExplaineeType.CATALOG_ITEM,
- stage=stage,
- ext=explain_format.ext(),
- )
- if explain_file.skip():
- continue
- try:
- info(
- f"Explaining {stage} for {item_type.sql()} "
- f"in `{explain_file}`"
- )
- plans[explain_file] = explain(
- db,
- stage,
- explainee,
- explain_options,
- explain_format,
- )
- except DatabaseError as e:
- warn(
- f"Cannot explain {stage} for {item_type.sql()} {fqname}: "
- f"{e}"
- )
- continue
- if ExplaineeType.CREATE_STATEMENT.contains(explainee_type):
- # If the DDL for the plan exists, explain it as well
- supported_stages = supported_explain_stages(item_type, optimize=True)
- create_file = CreateFile(
- database=item["database"],
- schema=item["schema"],
- name=item["name"],
- item_type=item_type,
- )
- if create_file.skip():
- explainee = None
- elif not (target / create_file.path()).is_file():
- if set.intersection(supported_stages, explain_stages):
- # No CREATE file, but a supported stage is requested
- info(
- f"WARNING: Skipping EXPLAIN CREATE for {fqname}: "
- f"CREATE statement path `{target / create_file.path()}` does not exist."
- )
- explainee = None
- else:
- explainee = (target / create_file.path()).read_text()
- for stage in explain_stages:
- if explainee is None:
- continue
- if stage not in supported_stages:
- continue
- explain_file = ExplainFile(
- database=item["database"],
- schema=item["schema"],
- name=item["name"],
- suffix=suffix,
- item_type=item_type,
- explainee_type=ExplaineeType.CREATE_STATEMENT,
- stage=stage,
- ext=explain_format.ext(),
- )
- if explain_file.skip():
- continue
- try:
- info(
- f"Explaining {stage} for CREATE {item_type.sql()} "
- f"in `{explain_file}`"
- )
- plans[explain_file] = explain(
- db,
- stage,
- explainee,
- explain_options,
- explain_format,
- )
- except DatabaseError as e:
- warn(
- f"Cannot explain {stage} for CREATE {item_type.sql()} {fqname}: "
- f"{e}"
- )
- continue
- if ExplaineeType.REPLAN_ITEM.contains(explainee_type):
- # If the item can be explained, explain the DDL
- explainee = explain_item(item_type, fqname, True)
- if explainee is not None:
- supported_stages = supported_explain_stages(item_type, True)
- for stage in explain_stages:
- if stage not in supported_stages:
- continue
- explain_file = ExplainFile(
- database=item["database"],
- schema=item["schema"],
- name=item["name"],
- suffix=suffix,
- item_type=item_type,
- explainee_type=ExplaineeType.REPLAN_ITEM,
- stage=stage,
- ext=explain_format.ext(),
- )
- if explain_file.skip():
- continue
- try:
- info(
- f"Explaining {stage} for REPLAN {item_type.sql()} "
- f"in `{explain_file}`"
- )
- plans[explain_file] = explain(
- db,
- stage,
- explainee,
- explain_options,
- explain_format,
- )
- except DatabaseError as e:
- warn(
- f"Cannot explain {stage} for REPLAN {item_type.sql()} {fqname}: "
- f" {e}"
- )
- continue
- for explain_file, plan in plans.items():
- # Ensure that the parent folder exists.
- (target / explain_file.folder()).mkdir(parents=True, exist_ok=True)
- # Write the plan into the file.
- with (target / explain_file.path()).open("w") as file:
- file.write(plan)
- def arrangement_sizes(
- target: Path,
- cluster: str,
- cluster_replica: str,
- database: str,
- schema: str,
- name: str,
- db_port: int,
- db_host: str,
- db_user: str,
- db_pass: str | None,
- db_require_ssl: bool,
- print_results: bool,
- ) -> None:
- """
- Extract arrangement sizes for selected catalog items.
- """
- # Don't make pandas depedency for the entire module, but just for this
- # command (otherwise all `extract` clients will have to install pandas, even
- # if they will never call `extract.arrangement_sizes`).
- import pandas as pd
- # Ensure that the target dir exists.
- target.mkdir(parents=True, exist_ok=True)
- with closing(
- sql.Database(
- port=db_port,
- host=db_host,
- user=db_user,
- database=None,
- password=db_pass,
- require_ssl=db_require_ssl,
- )
- ) as db:
- # Extract materialized view definitions
- # -------------------------------------
- with sql.update_environment(
- db,
- env=dict(
- cluster=cluster,
- cluster_replica=cluster_replica,
- ),
- ) as db:
- for item in db.catalog_items(database, schema, name, system=False):
- item_database = sql.identifier(item["database"])
- item_schema = sql.identifier(item["schema"])
- item_name = sql.identifier(item["name"])
- fqname = f"{item_database}.{item_schema}.{item_name}"
- try:
- item_type = ItemType(item["type"])
- except ValueError:
- warn(f"Unsupported item type `{item['type']}` for {fqname}")
- continue
- csv_file = ArrangementSizesFile(
- database=item["database"],
- schema=item["schema"],
- name=item["name"],
- item_type=item_type,
- )
- if csv_file.skip():
- continue
- try:
- info(
- f"Extracting arrangement sizes for {item_type.sql()} "
- f"in `{csv_file.path()}`"
- )
- # Extract arrangement sizes into a DataFrame.
- df = pd.DataFrame.from_records(
- db.arrangement_sizes(item["id"]),
- coerce_float=True,
- )
- if not df.empty:
- # Compute a `total` row of numeric columns.
- df.loc["total"] = df.sum(numeric_only=True)
- # Ensure that the parent folder exists.
- (target / csv_file.folder()).mkdir(parents=True, exist_ok=True)
- # Write CSV to the output file.
- with (target / csv_file.path()).open("w") as file:
- df.to_csv(file, index=False, quoting=csv.QUOTE_MINIMAL)
- if print_results: # Print results if requested.
- float_format = lambda x: f"{x:_.3f}"
- with pd.option_context("display.float_format", float_format):
- print(df.to_string())
- # Write DataFrame string to the output file.
- txt_file = dataclasses.replace(csv_file, ext="txt")
- with (target / txt_file.path()).open("w") as file:
- file.write(df.to_string())
- except DatabaseError as e:
- warn(
- f"Cannot extract arrangement sizes for {item_type.sql()} {fqname}: "
- f"{e}"
- )
- # Utility methods
- # ---------------
- def explain(
- db: sql.Database,
- explain_stage: ExplainStage,
- explainee: str,
- explain_options: list[ExplainOption],
- explain_format: ExplainFormat,
- ) -> str:
- explain_query = "\n".join(
- line
- for line in [
- f"EXPLAIN {explain_stage}",
- f"WITH({', '.join(map(str, explain_options))})" if explain_options else "",
- f"AS {explain_format} FOR",
- explainee,
- ]
- if line != ""
- )
- if explain_stage == ExplainStage.OPTIMIZER_TRACE:
- return json.dumps(
- {
- "explainee": {"query": explainee},
- "list": [
- {"id": id, **entry}
- for (id, entry) in enumerate(db.query_all(explain_query))
- ],
- },
- indent=4,
- )
- else:
- return next(iter(db.query_one(explain_query).values()))
- def explain_item(item_type: ItemType, fqname: str, replan: bool) -> str | None:
- prefix = "REPLAN" if replan else ""
- if item_type in {ItemType.MATERIALIZED_VIEW, ItemType.VIEW, ItemType.INDEX}:
- return " ".join((prefix, item_type.sql(), fqname)).strip()
- else:
- return None
- def supported_explain_stages(item_type: ItemType, optimize: bool) -> set[ExplainStage]:
- if item_type == ItemType.MATERIALIZED_VIEW:
- if optimize:
- return {
- ExplainStage.RAW_PLAN,
- ExplainStage.DECORRELATED_PLAN,
- ExplainStage.LOCAL_PLAN,
- ExplainStage.OPTIMIZED_PLAN,
- ExplainStage.PHYSICAL_PLAN,
- ExplainStage.OPTIMIZER_TRACE,
- }
- else:
- return {
- ExplainStage.RAW_PLAN,
- ExplainStage.LOCAL_PLAN,
- ExplainStage.OPTIMIZED_PLAN,
- ExplainStage.PHYSICAL_PLAN,
- }
- elif item_type == ItemType.VIEW:
- if optimize:
- return {
- ExplainStage.RAW_PLAN,
- ExplainStage.DECORRELATED_PLAN,
- ExplainStage.LOCAL_PLAN,
- ExplainStage.OPTIMIZER_TRACE,
- }
- else:
- return {
- ExplainStage.RAW_PLAN,
- ExplainStage.LOCAL_PLAN,
- }
- elif item_type == ItemType.INDEX:
- if optimize:
- return {
- ExplainStage.OPTIMIZED_PLAN,
- ExplainStage.PHYSICAL_PLAN,
- ExplainStage.OPTIMIZER_TRACE,
- }
- else:
- return {
- ExplainStage.OPTIMIZED_PLAN,
- ExplainStage.PHYSICAL_PLAN,
- }
- else:
- return set()
|