workload.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. import time
  11. import traceback
  12. from collections.abc import Iterator
  13. from typing import Any
  14. import psycopg
  15. from materialize.data_ingest.data_type import DATA_TYPES_FOR_AVRO, DATA_TYPES_FOR_KEY
  16. from materialize.data_ingest.definition import (
  17. Delete,
  18. Insert,
  19. Keyspace,
  20. Records,
  21. RecordSize,
  22. Upsert,
  23. )
  24. from materialize.data_ingest.executor import (
  25. KafkaExecutor,
  26. PgExecutor,
  27. PrintExecutor,
  28. )
  29. from materialize.data_ingest.field import Field
  30. from materialize.data_ingest.transaction import Transaction
  31. from materialize.data_ingest.transaction_def import (
  32. RestartMz,
  33. TransactionDef,
  34. TransactionSize,
  35. ZeroDowntimeDeploy,
  36. )
  37. from materialize.mzcompose.composition import Composition
  38. from materialize.util import all_subclasses
  39. class Workload:
  40. cycle: list[TransactionDef]
  41. mz_service: str
  42. deploy_generation: int
  43. def __init__(
  44. self,
  45. azurite: bool,
  46. mz_service: str = "materialized",
  47. deploy_generation: int = 0,
  48. ) -> None:
  49. self.azurite = azurite
  50. self.mz_service = mz_service
  51. self.deploy_generation = deploy_generation
  52. def generate(self, fields: list[Field]) -> Iterator[Transaction]:
  53. while True:
  54. for transaction_def in self.cycle:
  55. for transaction in transaction_def.generate(fields):
  56. if transaction:
  57. yield transaction
  58. class SingleSensorUpdating(Workload):
  59. def __init__(
  60. self,
  61. azurite: bool,
  62. composition: Composition | None = None,
  63. mz_service: str = "materialized",
  64. deploy_generation: int = 0,
  65. ) -> None:
  66. super().__init__(azurite, mz_service, deploy_generation)
  67. self.cycle = [
  68. TransactionDef(
  69. [
  70. Upsert(
  71. keyspace=Keyspace.SINGLE_VALUE,
  72. count=Records.ONE,
  73. record_size=RecordSize.SMALL,
  74. )
  75. ]
  76. )
  77. ]
  78. class SingleSensorUpdatingDisruptions(Workload):
  79. def __init__(
  80. self,
  81. azurite: bool,
  82. composition: Composition | None = None,
  83. mz_service: str = "materialized",
  84. deploy_generation: int = 0,
  85. ) -> None:
  86. super().__init__(azurite, mz_service, deploy_generation)
  87. self.cycle = [
  88. TransactionDef(
  89. [
  90. Upsert(
  91. keyspace=Keyspace.SINGLE_VALUE,
  92. count=Records.ONE,
  93. record_size=RecordSize.SMALL,
  94. ),
  95. ]
  96. ),
  97. ]
  98. if composition:
  99. self.cycle.append(
  100. RestartMz(
  101. composition, probability=0.1, workload=self, azurite=self.azurite
  102. )
  103. )
  104. class SingleSensorUpdating0dtDeploy(Workload):
  105. def __init__(
  106. self,
  107. azurite: bool,
  108. composition: Composition | None = None,
  109. mz_service: str = "materialized",
  110. deploy_generation: int = 0,
  111. ) -> None:
  112. super().__init__(azurite, mz_service, deploy_generation)
  113. self.cycle = [
  114. TransactionDef(
  115. [
  116. Upsert(
  117. keyspace=Keyspace.SINGLE_VALUE,
  118. count=Records.ONE,
  119. record_size=RecordSize.SMALL,
  120. ),
  121. ]
  122. ),
  123. ]
  124. if composition:
  125. self.cycle.append(
  126. ZeroDowntimeDeploy(
  127. composition, probability=0.1, workload=self, azurite=self.azurite
  128. )
  129. )
  130. class DeleteDataAtEndOfDay(Workload):
  131. def __init__(
  132. self,
  133. azurite: bool,
  134. composition: Composition | None = None,
  135. mz_service: str = "materialized",
  136. deploy_generation: int = 0,
  137. ) -> None:
  138. super().__init__(azurite, mz_service, deploy_generation)
  139. insert = Insert(
  140. count=Records.SOME,
  141. record_size=RecordSize.SMALL,
  142. )
  143. insert_phase = TransactionDef(
  144. size=TransactionSize.HUGE,
  145. operations=[insert],
  146. )
  147. # Delete all records in a single transaction
  148. delete_phase = TransactionDef(
  149. [
  150. Delete(
  151. number_of_records=Records.ALL,
  152. record_size=RecordSize.SMALL,
  153. num=insert.max_key(),
  154. )
  155. ]
  156. )
  157. self.cycle = [
  158. insert_phase,
  159. delete_phase,
  160. ]
  161. class DeleteDataAtEndOfDayDisruptions(Workload):
  162. def __init__(
  163. self,
  164. azurite: bool,
  165. composition: Composition | None = None,
  166. mz_service: str = "materialized",
  167. deploy_generation: int = 0,
  168. ) -> None:
  169. super().__init__(azurite, mz_service, deploy_generation)
  170. insert = Insert(
  171. count=Records.SOME,
  172. record_size=RecordSize.SMALL,
  173. )
  174. insert_phase = TransactionDef(
  175. size=TransactionSize.HUGE,
  176. operations=[insert],
  177. )
  178. # Delete all records in a single transaction
  179. delete_phase = TransactionDef(
  180. [
  181. Delete(
  182. number_of_records=Records.ALL,
  183. record_size=RecordSize.SMALL,
  184. num=insert.max_key(),
  185. )
  186. ]
  187. )
  188. self.cycle = [
  189. insert_phase,
  190. delete_phase,
  191. ]
  192. if composition:
  193. self.cycle.append(
  194. RestartMz(
  195. composition, probability=0.1, workload=self, azurite=self.azurite
  196. )
  197. )
  198. class DeleteDataAtEndOfDay0dtDeploys(Workload):
  199. def __init__(
  200. self,
  201. azurite: bool,
  202. composition: Composition | None = None,
  203. mz_service: str = "materialized",
  204. deploy_generation: int = 0,
  205. ) -> None:
  206. super().__init__(azurite, mz_service, deploy_generation)
  207. insert = Insert(
  208. count=Records.SOME,
  209. record_size=RecordSize.SMALL,
  210. )
  211. insert_phase = TransactionDef(
  212. size=TransactionSize.HUGE,
  213. operations=[insert],
  214. )
  215. # Delete all records in a single transaction
  216. delete_phase = TransactionDef(
  217. [
  218. Delete(
  219. number_of_records=Records.ALL,
  220. record_size=RecordSize.SMALL,
  221. num=insert.max_key(),
  222. )
  223. ]
  224. )
  225. self.cycle = [
  226. insert_phase,
  227. delete_phase,
  228. ]
  229. if composition:
  230. self.cycle.append(
  231. ZeroDowntimeDeploy(
  232. composition, probability=0.1, workload=self, azurite=self.azurite
  233. )
  234. )
  235. # TODO: Implement
  236. # class ProgressivelyEnrichRecords(Workload):
  237. # def __init__(
  238. # self, azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0
  239. # ) -> None:
  240. # super().__init__(azurite, mz_service, deploy_generation)
  241. # self.cycle: list[Definition] = [
  242. # ]
  243. WORKLOADS = all_subclasses(Workload)
  244. def execute_workload(
  245. executor_classes: list[Any],
  246. workload: Workload,
  247. num: int,
  248. ports: dict[str, int],
  249. runtime: int,
  250. verbose: bool,
  251. ) -> None:
  252. fields = []
  253. for i in range(random.randint(1, 10)):
  254. fields.append(Field(f"key{i}", random.choice(DATA_TYPES_FOR_KEY), True))
  255. for i in range(random.randint(0, 20)):
  256. fields.append(Field(f"value{i}", random.choice(DATA_TYPES_FOR_AVRO), False))
  257. print(f"With fields: {fields}")
  258. executors = [
  259. executor_class(
  260. num,
  261. ports,
  262. fields,
  263. "materialize",
  264. mz_service=workload.mz_service,
  265. cluster=(
  266. "quickstart" if executor_class == KafkaExecutor else "singlereplica"
  267. ),
  268. )
  269. for executor_class in [PgExecutor] + executor_classes
  270. ]
  271. pg_executor = executors[0]
  272. start = time.time()
  273. run_executors = ([PrintExecutor(ports)] if verbose else []) + executors
  274. for exe in run_executors:
  275. exe.create()
  276. for i, transaction in enumerate(workload.generate(fields)):
  277. duration = time.time() - start
  278. if duration > runtime:
  279. print(f"Ran {i} transactions in {duration} s")
  280. assert i > 0
  281. break
  282. for executor in run_executors:
  283. executor.mz_service = workload.mz_service
  284. executor.run(transaction)
  285. order_str = ", ".join(str(i + 1) for i in range(len(fields)))
  286. with pg_executor.pg_conn.cursor() as cur:
  287. cur.execute(f"SELECT * FROM {pg_executor.table} ORDER BY {order_str}".encode())
  288. expected_result = cur.fetchall()
  289. print(f"Expected (via Postgres): {expected_result}")
  290. # Reconnect as Mz disruptions may have destroyed the previous connection
  291. conn = psycopg.connect(
  292. host="localhost",
  293. port=ports[workload.mz_service],
  294. user="materialize",
  295. dbname="materialize",
  296. )
  297. for executor in executors:
  298. executor.mz_service = workload.mz_service
  299. conn.autocommit = True
  300. with conn.cursor() as cur:
  301. try:
  302. cur.execute("SET REAL_TIME_RECENCY TO TRUE")
  303. cur.execute(
  304. f"SELECT * FROM {executor.table} ORDER BY {order_str}".encode()
  305. )
  306. actual_result = cur.fetchall()
  307. cur.execute("SET REAL_TIME_RECENCY TO FALSE")
  308. except Exception as e:
  309. print(f"Comparing against {type(executor).__name__} failed: {e}")
  310. print(traceback.format_exc())
  311. raise
  312. conn.autocommit = False
  313. if actual_result != expected_result:
  314. raise ValueError(f"Unexpected result for {type(executor).__name__}: {actual_result} != {expected_result}") # type: ignore