worker.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. import threading
  11. import time
  12. from collections import Counter, defaultdict
  13. import psycopg
  14. import websocket
  15. from materialize.data_ingest.query_error import QueryError
  16. from materialize.mzcompose.composition import Composition
  17. from materialize.parallel_workload.action import (
  18. Action,
  19. ActionList,
  20. ReconnectAction,
  21. ws_connect,
  22. )
  23. from materialize.parallel_workload.database import Database
  24. from materialize.parallel_workload.executor import Executor
  25. class Worker:
  26. rng: random.Random
  27. action_list: ActionList | None
  28. actions: list[Action]
  29. weights: list[float]
  30. end_time: float
  31. num_queries: Counter[type[Action]]
  32. autocommit: bool
  33. system: bool
  34. exe: Executor | None
  35. ignored_errors: defaultdict[str, Counter[type[Action]]]
  36. composition: Composition | None
  37. occurred_exception: Exception | None
  38. def __init__(
  39. self,
  40. rng: random.Random,
  41. actions: list[Action],
  42. weights: list[float],
  43. end_time: float,
  44. autocommit: bool,
  45. system: bool,
  46. composition: Composition | None,
  47. action_list: ActionList | None = None,
  48. ):
  49. self.rng = rng
  50. self.action_list = action_list
  51. self.actions = actions
  52. self.weights = weights
  53. self.end_time = end_time
  54. self.num_queries = Counter()
  55. self.autocommit = autocommit
  56. self.system = system
  57. self.ignored_errors = defaultdict(Counter)
  58. self.composition = composition
  59. self.occurred_exception = None
  60. self.exe = None
  61. def run(
  62. self, host: str, pg_port: int, http_port: int, user: str, database: Database
  63. ) -> None:
  64. self.conn = psycopg.connect(
  65. host=host, port=pg_port, user=user, dbname="materialize"
  66. )
  67. self.conn.autocommit = self.autocommit
  68. cur = self.conn.cursor()
  69. ws = websocket.WebSocket()
  70. ws_conn_id, ws_secret_key = ws_connect(ws, host, http_port, user)
  71. self.exe = Executor(self.rng, cur, ws, database)
  72. self.exe.set_isolation("SERIALIZABLE")
  73. cur.execute("SET auto_route_catalog_queries TO false")
  74. if self.exe.use_ws:
  75. self.exe.pg_pid = ws_conn_id
  76. else:
  77. cur.execute("SELECT pg_backend_pid()")
  78. self.exe.pg_pid = cur.fetchall()[0][0]
  79. while time.time() < self.end_time:
  80. action = self.rng.choices(self.actions, self.weights)[0]
  81. try:
  82. if self.exe.rollback_next:
  83. try:
  84. self.exe.rollback()
  85. except QueryError as e:
  86. if (
  87. "Please disconnect and re-connect" in e.msg
  88. or "server closed the connection unexpectedly" in e.msg
  89. or "Can't create a connection to host" in e.msg
  90. or "Connection refused" in e.msg
  91. or "the connection is lost" in e.msg
  92. or "connection in transaction status INERROR" in e.msg
  93. ):
  94. self.exe.reconnect_next = True
  95. self.exe.rollback_next = False
  96. continue
  97. self.exe.rollback_next = False
  98. if self.exe.reconnect_next:
  99. ReconnectAction(self.rng, self.composition, random_role=False).run(
  100. self.exe
  101. )
  102. self.exe.reconnect_next = False
  103. if action.run(self.exe):
  104. self.num_queries[type(action)] += 1
  105. except QueryError as e:
  106. self.num_queries[type(action)] += 1
  107. for error_to_ignore in action.errors_to_ignore(self.exe):
  108. if error_to_ignore in e.msg:
  109. self.ignored_errors[error_to_ignore][type(action)] += 1
  110. if (
  111. "Please disconnect and re-connect" in e.msg
  112. or "server closed the connection unexpectedly" in e.msg
  113. or "Can't create a connection to host" in e.msg
  114. or "Connection refused" in e.msg
  115. or "the connection is lost" in e.msg
  116. or "connection in transaction status INERROR" in e.msg
  117. ):
  118. self.exe.reconnect_next = True
  119. else:
  120. self.exe.rollback_next = True
  121. break
  122. else:
  123. thread_name = threading.current_thread().getName()
  124. self.occurred_exception = e
  125. print(f"+++ [{thread_name}] Query failed: {e.query} {e.msg}")
  126. raise
  127. except Exception as e:
  128. self.occurred_exception = e
  129. raise e
  130. self.exe.cur.connection.close()
  131. if self.exe.ws:
  132. self.exe.ws.close()