mzcompose.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Introduce a second Mz instance while a concurrent workload is running for the
  11. purpose of exercising fencing.
  12. """
  13. import argparse
  14. import random
  15. import time
  16. from concurrent import futures
  17. from dataclasses import dataclass
  18. from enum import Enum
  19. from materialize import buildkite
  20. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  21. from materialize.mzcompose.services.azurite import Azurite
  22. from materialize.mzcompose.services.materialized import Materialized
  23. from materialize.mzcompose.services.minio import Minio
  24. from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata
  25. class Operation(Enum):
  26. START_SECOND_MZ = 0
  27. INSERT = 1
  28. @dataclass
  29. class Workload:
  30. name: str
  31. txn_wal_first: str = "off"
  32. txn_wal_second: str = "eager"
  33. concurrency: int = 100
  34. tables: int = 1
  35. operation = Operation.INSERT
  36. second_mz_delay = 5
  37. operation_count = 3000
  38. max_transaction_size = 100
  39. @dataclass
  40. class SuccessfulCommit:
  41. table_id: int
  42. row_id: int
  43. transaction_size: int
  44. WORKLOADS = [
  45. Workload(
  46. name="off_to_eager_simple",
  47. ),
  48. Workload(
  49. name="off_to_lazy_simple",
  50. txn_wal_first="off",
  51. txn_wal_second="lazy",
  52. ),
  53. Workload(
  54. name="eager_to_lazy_simple",
  55. txn_wal_first="eager",
  56. txn_wal_second="lazy",
  57. ),
  58. Workload(
  59. name="eager_to_off_simple",
  60. txn_wal_first="eager",
  61. txn_wal_second="off",
  62. ),
  63. Workload(name="off_to_eager_many_tables", tables=100),
  64. Workload(name="off_to_eager_many_connections", concurrency=512),
  65. Workload(
  66. name="eager_to_lazy_many_tables",
  67. tables=100,
  68. txn_wal_first="eager",
  69. txn_wal_second="lazy",
  70. ),
  71. Workload(
  72. name="eager_to_lazy_many_connections",
  73. concurrency=512,
  74. txn_wal_first="eager",
  75. txn_wal_second="lazy",
  76. ),
  77. ]
  78. SERVICES = [
  79. Minio(setup_materialize=True),
  80. Azurite(),
  81. CockroachOrPostgresMetadata(),
  82. # Overriden below
  83. Materialized(name="mz_first"),
  84. Materialized(name="mz_second"),
  85. ]
  86. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  87. parser.add_argument(
  88. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  89. )
  90. args = parser.parse_args()
  91. workloads = buildkite.shard_list(WORKLOADS, lambda w: w.name)
  92. print(
  93. f"Workloads in shard with index {buildkite.get_parallelism_index()}: {[w.name for w in workloads]}"
  94. )
  95. for workload in workloads:
  96. run_workload(c, workload, args)
  97. def execute_operation(
  98. args: tuple[Composition, Workload, Operation, int]
  99. ) -> SuccessfulCommit | None:
  100. c, workload, operation, id = args
  101. if operation == Operation.START_SECOND_MZ:
  102. print(
  103. f"Will sleep {workload.second_mz_delay} before bringing up 'mz_second' ..."
  104. )
  105. time.sleep(workload.second_mz_delay)
  106. print("+++ Bringing up 'mz_second'...")
  107. c.up("mz_second")
  108. print("+++ 'mz_second' is now up.")
  109. return None
  110. elif operation == Operation.INSERT:
  111. table_id = id % workload.tables
  112. mz_service = random.choices(["mz_first", "mz_second"], weights=(66, 33))[0]
  113. transaction = random.choice([True, False])
  114. if transaction:
  115. transaction_size = random.randrange(workload.max_transaction_size) + 1
  116. else:
  117. transaction_size = 1
  118. try:
  119. cursor = c.sql_cursor(service=mz_service)
  120. if transaction_size > 1:
  121. cursor.execute("BEGIN")
  122. for i in range(transaction_size):
  123. cursor.execute(
  124. f"INSERT INTO table{table_id} VALUES ({id}, {i}, '{mz_service}')".encode()
  125. )
  126. cursor.execute("COMMIT")
  127. else:
  128. cursor.execute(
  129. f"INSERT INTO table{table_id} VALUES ({id}, 0, '{mz_service}')".encode()
  130. )
  131. except Exception as e:
  132. str_e = str(e)
  133. if "running docker compose failed" in str_e:
  134. # The query targeted a Mz container that is not up
  135. return None
  136. elif "server closed the connection unexpectedly" in str_e:
  137. # Container died while query was in progress
  138. return None
  139. elif "Connection refused" in str_e:
  140. # Container died before the SQL connection was established
  141. return None
  142. else:
  143. raise RuntimeError(f"unexpected exception: {e}")
  144. # No error, so we assume the INSERT successfully committed
  145. return SuccessfulCommit(
  146. table_id=table_id, row_id=id, transaction_size=transaction_size
  147. )
  148. def run_workload(c: Composition, workload: Workload, args: argparse.Namespace) -> None:
  149. print(f"+++ Running workload {workload.name} ...")
  150. c.silent = True
  151. c.down(destroy_volumes=True)
  152. c.up(c.metadata_store())
  153. mzs = {
  154. "mz_first": workload.txn_wal_first,
  155. "mz_second": workload.txn_wal_second,
  156. }
  157. with c.override(
  158. *[
  159. Materialized(
  160. name=mz_name,
  161. external_metadata_store=True,
  162. external_blob_store=True,
  163. blob_store_is_azure=args.azurite,
  164. sanity_restart=False,
  165. )
  166. for mz_name in mzs
  167. ]
  168. ):
  169. c.up("mz_first")
  170. c.sql(
  171. """
  172. ALTER SYSTEM SET max_tables = 1000;
  173. ALTER SYSTEM SET max_materialized_views = 1000;
  174. """,
  175. port=6877,
  176. user="mz_system",
  177. service="mz_first",
  178. )
  179. print("+++ Creating database objects ...")
  180. for table_id in range(workload.tables):
  181. c.sql(
  182. f"""
  183. CREATE TABLE IF NOT EXISTS table{table_id}(id INTEGER, subid INTEGER, mz_service STRING);
  184. CREATE MATERIALIZED VIEW view{table_id} AS SELECT DISTINCT id, subid, mz_service FROM table{table_id};
  185. """,
  186. service="mz_first",
  187. )
  188. print("+++ Running workload ...")
  189. start = time.time()
  190. # Schedule the start of the second Mz instance
  191. operations = [(c, workload, Operation.START_SECOND_MZ, 0)]
  192. # As well as all the other operations in the workload
  193. operations = operations + [
  194. (c, workload, workload.operation, id)
  195. for id in range(workload.operation_count)
  196. ]
  197. with futures.ThreadPoolExecutor(
  198. workload.concurrency,
  199. ) as executor:
  200. commits = executor.map(execute_operation, operations)
  201. elapsed = time.time() - start
  202. # The second Mz instance can come up slightly faster
  203. assert elapsed > (
  204. workload.second_mz_delay * 2
  205. ), f"Workload completed too soon - elapsed {elapsed}s is less than 2 x second_mz_delay({workload.second_mz_delay}s)"
  206. print(
  207. f"Workload completed in {elapsed} seconds, with second_mz_delay being {workload.second_mz_delay} seconds."
  208. )
  209. # Confirm that the first Mz has properly given up the ghost
  210. mz_first_log = c.invoke("logs", "mz_first", capture=True)
  211. assert (
  212. "unable to confirm leadership" in mz_first_log.stdout
  213. or "unexpected fence epoch" in mz_first_log.stdout
  214. or "fenced by new catalog upper" in mz_first_log.stdout
  215. or "fenced by envd" in mz_first_log.stdout
  216. )
  217. print("+++ Verifying committed transactions ...")
  218. cursor = c.sql_cursor(service="mz_second")
  219. for commit in commits:
  220. if commit is None:
  221. continue
  222. for target in ["table", "view"]:
  223. cursor.execute(
  224. f"""
  225. SELECT id, COUNT(*) AS transaction_size
  226. FROM {target}{commit.table_id}
  227. WHERE id = {commit.row_id}
  228. GROUP BY id
  229. """.encode()
  230. )
  231. result = cursor.fetchall()
  232. assert len(result) == 1
  233. assert (
  234. result[0][0] == commit.row_id
  235. ), f"Unexpected result {result}; commit: {commit}; target {target}"
  236. assert (
  237. result[0][1] == commit.transaction_size
  238. ), f"Unexpected result {result}; commit: {commit}; target {target}"
  239. print("Verification complete.")