executor.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. import random
  11. import threading
  12. from enum import Enum
  13. from typing import TYPE_CHECKING, TextIO
  14. import psycopg
  15. import requests
  16. import websocket
  17. from materialize.data_ingest.query_error import QueryError
  18. from materialize.parallel_workload.settings import Scenario
  19. if TYPE_CHECKING:
  20. from materialize.parallel_workload.database import Database
  21. logging: TextIO | None
  22. lock: threading.Lock
  23. def initialize_logging() -> None:
  24. global logging, lock
  25. logging = open("parallel-workload-queries.log", "w")
  26. lock = threading.Lock()
  27. class Http(Enum):
  28. NO = 0
  29. RANDOM = 1
  30. YES = 2
  31. class Executor:
  32. rng: random.Random
  33. cur: psycopg.Cursor
  34. ws: websocket.WebSocket | None
  35. pg_pid: int
  36. # Used by INSERT action to prevent writing into different tables in the same transaction
  37. insert_table: int | None
  38. db: "Database"
  39. reconnect_next: bool
  40. rollback_next: bool
  41. last_log: str
  42. last_status: str
  43. action_run_since_last_commit_rollback: bool
  44. autocommit: bool
  45. def __init__(
  46. self,
  47. rng: random.Random,
  48. cur: psycopg.Cursor,
  49. ws: websocket.WebSocket | None,
  50. db: "Database",
  51. ):
  52. self.rng = rng
  53. self.cur = cur
  54. self.ws = ws
  55. self.db = db
  56. self.pg_pid = -1
  57. self.insert_table = None
  58. self.reconnect_next = True
  59. self.rollback_next = True
  60. self.last_log = ""
  61. self.last_status = ""
  62. self.action_run_since_last_commit_rollback = False
  63. self.use_ws = self.rng.choice([True, False]) if self.ws else False
  64. self.autocommit = cur.connection.autocommit
  65. self.mz_service = "materialized"
  66. def set_isolation(self, level: str) -> None:
  67. self.execute(f"SET TRANSACTION_ISOLATION TO '{level}'")
  68. def commit(self, http: Http = Http.RANDOM) -> None:
  69. self.insert_table = None
  70. self.execute("commit")
  71. # TODO(def-): Enable when things are stable
  72. # self.use_ws = self.rng.choice([True, False]) if self.ws else False
  73. def rollback(self, http: Http = Http.RANDOM) -> None:
  74. self.insert_table = None
  75. try:
  76. if self.use_ws and http != Http.NO:
  77. self.execute("rollback")
  78. else:
  79. self.log("rollback")
  80. self.cur.connection.rollback()
  81. except QueryError:
  82. raise
  83. except Exception as e:
  84. raise QueryError(str(e), "rollback")
  85. # TODO(def-): Enable when things are stable
  86. # self.use_ws = self.rng.choice([True, False]) if self.ws else False
  87. def log(self, msg: str) -> None:
  88. global logging, lock
  89. if not logging:
  90. return
  91. thread_name = threading.current_thread().getName()
  92. self.last_log = msg
  93. self.last_status = "logged"
  94. with lock:
  95. print(f"[{thread_name}][{self.mz_service}] {msg}", file=logging)
  96. logging.flush()
  97. def execute(
  98. self,
  99. query: str,
  100. extra_info: str = "",
  101. explainable: bool = False,
  102. http: Http = Http.NO,
  103. fetch: bool = False,
  104. ) -> None:
  105. is_http = (
  106. http == Http.RANDOM and self.rng.choice([True, False])
  107. ) or http == Http.YES
  108. if explainable and self.rng.choice([True, False]):
  109. query = f"EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR {query}"
  110. query += ";"
  111. extra_info_str = f" ({extra_info})" if extra_info else ""
  112. use_ws = self.use_ws and http != Http.NO
  113. http_str = " [HTTP]" if is_http else " [WS]" if use_ws and self.ws else ""
  114. self.log(f"{query}{extra_info_str}{http_str}")
  115. self.last_status = "running"
  116. try:
  117. if not is_http:
  118. if use_ws and self.ws:
  119. try:
  120. self.ws.send(json.dumps({"queries": [{"query": query}]}))
  121. except Exception as e:
  122. raise QueryError(str(e), query)
  123. else:
  124. try:
  125. if query == "commit;":
  126. self.log("commit")
  127. self.cur.connection.commit()
  128. elif query == "rollback;":
  129. self.log("rollback")
  130. self.cur.connection.rollback()
  131. else:
  132. self.cur.execute(query.encode())
  133. except Exception as e:
  134. raise QueryError(str(e), query)
  135. self.action_run_since_last_commit_rollback = True
  136. if use_ws and self.ws:
  137. error = None
  138. while True:
  139. try:
  140. result = json.loads(self.ws.recv())
  141. except (
  142. websocket._exceptions.WebSocketConnectionClosedException
  143. ) as e:
  144. raise QueryError(str(e), query)
  145. result_type = result["type"]
  146. if result_type in (
  147. "CommandStarting",
  148. "CommandComplete",
  149. "Notice",
  150. "Rows",
  151. "Row",
  152. "ParameterStatus",
  153. ):
  154. continue
  155. elif result_type == "Error":
  156. error = QueryError(
  157. f"""WS {result["payload"]["code"]}: {result["payload"]["message"]}
  158. {result["payload"].get("details", "")}""",
  159. query,
  160. )
  161. elif result_type == "ReadyForQuery":
  162. if error:
  163. raise error
  164. break
  165. else:
  166. raise RuntimeError(
  167. f"Unexpected result type: {result_type} in: {result}"
  168. )
  169. if fetch and not use_ws:
  170. try:
  171. self.cur.fetchall()
  172. except psycopg.DataError:
  173. # We don't care about psycopg being unable to parse, examples:
  174. # date too large (after year 10K): '97940-08-25'
  175. # timestamp too large (after year 10K): '10876-06-20 00:00:00'
  176. # can't parse interval '-178956970 years -8 months -2147483648 days -2562047788:00:54.775808': days=1252674755; must have magnitude <= 999999999
  177. pass
  178. return
  179. try:
  180. result = requests.post(
  181. f"http://{self.db.host}:{self.db.ports['http' if self.mz_service == 'materialized' else 'http2']}/api/sql",
  182. data=json.dumps({"query": query}),
  183. headers={"content-type": "application/json"},
  184. timeout=self.rng.uniform(0, 10),
  185. )
  186. if result.status_code != 200:
  187. raise QueryError(
  188. f"{result.status_code}: {result.text}", f"HTTP query: {query}"
  189. )
  190. for result in result.json()["results"]:
  191. if "error" in result:
  192. raise QueryError(
  193. f"HTTP {result['error']['code']}: {result['error']['message']}\n{result['error'].get('detail', '')}",
  194. query,
  195. )
  196. except requests.exceptions.ReadTimeout as e:
  197. raise QueryError(f"HTTP read timeout: {e}", query)
  198. except requests.exceptions.ConnectionError:
  199. # Expected when Mz is killed
  200. if self.db.scenario not in (
  201. Scenario.Kill,
  202. Scenario.BackupRestore,
  203. Scenario.ZeroDowntimeDeploy,
  204. ):
  205. raise
  206. finally:
  207. self.last_status = "finished"