123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import json
- import random
- import threading
- from enum import Enum
- from typing import TYPE_CHECKING, TextIO
- import psycopg
- import requests
- import websocket
- from materialize.data_ingest.query_error import QueryError
- from materialize.parallel_workload.settings import Scenario
- if TYPE_CHECKING:
- from materialize.parallel_workload.database import Database
- logging: TextIO | None
- lock: threading.Lock
- def initialize_logging() -> None:
- global logging, lock
- logging = open("parallel-workload-queries.log", "w")
- lock = threading.Lock()
- class Http(Enum):
- NO = 0
- RANDOM = 1
- YES = 2
- class Executor:
- rng: random.Random
- cur: psycopg.Cursor
- ws: websocket.WebSocket | None
- pg_pid: int
- # Used by INSERT action to prevent writing into different tables in the same transaction
- insert_table: int | None
- db: "Database"
- reconnect_next: bool
- rollback_next: bool
- last_log: str
- last_status: str
- action_run_since_last_commit_rollback: bool
- autocommit: bool
- def __init__(
- self,
- rng: random.Random,
- cur: psycopg.Cursor,
- ws: websocket.WebSocket | None,
- db: "Database",
- ):
- self.rng = rng
- self.cur = cur
- self.ws = ws
- self.db = db
- self.pg_pid = -1
- self.insert_table = None
- self.reconnect_next = True
- self.rollback_next = True
- self.last_log = ""
- self.last_status = ""
- self.action_run_since_last_commit_rollback = False
- self.use_ws = self.rng.choice([True, False]) if self.ws else False
- self.autocommit = cur.connection.autocommit
- self.mz_service = "materialized"
- def set_isolation(self, level: str) -> None:
- self.execute(f"SET TRANSACTION_ISOLATION TO '{level}'")
- def commit(self, http: Http = Http.RANDOM) -> None:
- self.insert_table = None
- self.execute("commit")
- # TODO(def-): Enable when things are stable
- # self.use_ws = self.rng.choice([True, False]) if self.ws else False
- def rollback(self, http: Http = Http.RANDOM) -> None:
- self.insert_table = None
- try:
- if self.use_ws and http != Http.NO:
- self.execute("rollback")
- else:
- self.log("rollback")
- self.cur.connection.rollback()
- except QueryError:
- raise
- except Exception as e:
- raise QueryError(str(e), "rollback")
- # TODO(def-): Enable when things are stable
- # self.use_ws = self.rng.choice([True, False]) if self.ws else False
- def log(self, msg: str) -> None:
- global logging, lock
- if not logging:
- return
- thread_name = threading.current_thread().getName()
- self.last_log = msg
- self.last_status = "logged"
- with lock:
- print(f"[{thread_name}][{self.mz_service}] {msg}", file=logging)
- logging.flush()
- def execute(
- self,
- query: str,
- extra_info: str = "",
- explainable: bool = False,
- http: Http = Http.NO,
- fetch: bool = False,
- ) -> None:
- is_http = (
- http == Http.RANDOM and self.rng.choice([True, False])
- ) or http == Http.YES
- if explainable and self.rng.choice([True, False]):
- query = f"EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR {query}"
- query += ";"
- extra_info_str = f" ({extra_info})" if extra_info else ""
- use_ws = self.use_ws and http != Http.NO
- http_str = " [HTTP]" if is_http else " [WS]" if use_ws and self.ws else ""
- self.log(f"{query}{extra_info_str}{http_str}")
- self.last_status = "running"
- try:
- if not is_http:
- if use_ws and self.ws:
- try:
- self.ws.send(json.dumps({"queries": [{"query": query}]}))
- except Exception as e:
- raise QueryError(str(e), query)
- else:
- try:
- if query == "commit;":
- self.log("commit")
- self.cur.connection.commit()
- elif query == "rollback;":
- self.log("rollback")
- self.cur.connection.rollback()
- else:
- self.cur.execute(query.encode())
- except Exception as e:
- raise QueryError(str(e), query)
- self.action_run_since_last_commit_rollback = True
- if use_ws and self.ws:
- error = None
- while True:
- try:
- result = json.loads(self.ws.recv())
- except (
- websocket._exceptions.WebSocketConnectionClosedException
- ) as e:
- raise QueryError(str(e), query)
- result_type = result["type"]
- if result_type in (
- "CommandStarting",
- "CommandComplete",
- "Notice",
- "Rows",
- "Row",
- "ParameterStatus",
- ):
- continue
- elif result_type == "Error":
- error = QueryError(
- f"""WS {result["payload"]["code"]}: {result["payload"]["message"]}
- {result["payload"].get("details", "")}""",
- query,
- )
- elif result_type == "ReadyForQuery":
- if error:
- raise error
- break
- else:
- raise RuntimeError(
- f"Unexpected result type: {result_type} in: {result}"
- )
- if fetch and not use_ws:
- try:
- self.cur.fetchall()
- except psycopg.DataError:
- # We don't care about psycopg being unable to parse, examples:
- # date too large (after year 10K): '97940-08-25'
- # timestamp too large (after year 10K): '10876-06-20 00:00:00'
- # can't parse interval '-178956970 years -8 months -2147483648 days -2562047788:00:54.775808': days=1252674755; must have magnitude <= 999999999
- pass
- return
- try:
- result = requests.post(
- f"http://{self.db.host}:{self.db.ports['http' if self.mz_service == 'materialized' else 'http2']}/api/sql",
- data=json.dumps({"query": query}),
- headers={"content-type": "application/json"},
- timeout=self.rng.uniform(0, 10),
- )
- if result.status_code != 200:
- raise QueryError(
- f"{result.status_code}: {result.text}", f"HTTP query: {query}"
- )
- for result in result.json()["results"]:
- if "error" in result:
- raise QueryError(
- f"HTTP {result['error']['code']}: {result['error']['message']}\n{result['error'].get('detail', '')}",
- query,
- )
- except requests.exceptions.ReadTimeout as e:
- raise QueryError(f"HTTP read timeout: {e}", query)
- except requests.exceptions.ConnectionError:
- # Expected when Mz is killed
- if self.db.scenario not in (
- Scenario.Kill,
- Scenario.BackupRestore,
- Scenario.ZeroDowntimeDeploy,
- ):
- raise
- finally:
- self.last_status = "finished"
|