transaction_def.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. import time
  11. from collections.abc import Iterator
  12. from enum import Enum
  13. from typing import TYPE_CHECKING
  14. from materialize.data_ingest.definition import Definition
  15. from materialize.data_ingest.field import Field
  16. from materialize.data_ingest.rowlist import RowList
  17. from materialize.data_ingest.transaction import Transaction
  18. from materialize.mzcompose import get_default_system_parameters
  19. from materialize.mzcompose.composition import Composition
  20. from materialize.mzcompose.services.materialized import (
  21. LEADER_STATUS_HEALTHCHECK,
  22. DeploymentStatus,
  23. Materialized,
  24. )
  25. if TYPE_CHECKING:
  26. from materialize.data_ingest.workload import Workload
  27. class TransactionSize(Enum):
  28. SINGLE_OPERATION = 1
  29. HUGE = 1_000_000_000
  30. class TransactionDef:
  31. operations: list[Definition]
  32. size: TransactionSize
  33. def __init__(
  34. self,
  35. operations: list[Definition],
  36. size: TransactionSize = TransactionSize.SINGLE_OPERATION,
  37. ):
  38. self.operations = operations
  39. self.size = size
  40. def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
  41. full_rowlist: list[RowList] = []
  42. for definition in self.operations:
  43. for i, rowlist in enumerate(definition.generate(fields)):
  44. full_rowlist.append(rowlist)
  45. if i + 1 == self.size.value:
  46. yield Transaction(full_rowlist)
  47. full_rowlist = []
  48. if full_rowlist:
  49. yield Transaction(full_rowlist)
  50. class RestartMz(TransactionDef):
  51. composition: Composition
  52. probability: float
  53. workload: "Workload"
  54. def __init__(
  55. self,
  56. composition: Composition,
  57. probability: float,
  58. workload: "Workload",
  59. azurite: bool,
  60. ):
  61. self.composition = composition
  62. self.probability = probability
  63. self.workload = workload
  64. self.azurite = azurite
  65. def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
  66. if random.random() < self.probability:
  67. ports = (
  68. ["16875:6875"]
  69. if self.workload.mz_service == "materialized"
  70. else ["26875:6875"]
  71. )
  72. with self.composition.override(
  73. Materialized(
  74. name=self.workload.mz_service,
  75. ports=ports,
  76. external_blob_store=True,
  77. blob_store_is_azure=self.azurite,
  78. external_metadata_store=True,
  79. system_parameter_defaults=get_default_system_parameters(
  80. zero_downtime=True
  81. ),
  82. additional_system_parameter_defaults={
  83. "unsafe_enable_table_keys": "true"
  84. },
  85. deploy_generation=self.workload.deploy_generation,
  86. sanity_restart=False,
  87. ),
  88. ):
  89. self.composition.kill(self.workload.mz_service)
  90. time.sleep(1)
  91. self.composition.up(self.workload.mz_service)
  92. yield None
  93. class ZeroDowntimeDeploy(TransactionDef):
  94. composition: Composition
  95. probability: float
  96. workload: "Workload"
  97. def __init__(
  98. self,
  99. composition: Composition,
  100. probability: float,
  101. workload: "Workload",
  102. azurite: bool,
  103. ):
  104. self.composition = composition
  105. self.probability = probability
  106. self.workload = workload
  107. self.azurite = azurite
  108. def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
  109. if random.random() < self.probability:
  110. self.workload.deploy_generation += 1
  111. if self.workload.deploy_generation % 2 == 0:
  112. self.workload.mz_service = "materialized"
  113. ports = ["16875:6875"]
  114. else:
  115. self.workload.mz_service = "materialized2"
  116. ports = ["26875:6875"]
  117. print(
  118. f"Deploying generation {self.workload.deploy_generation} on {self.workload.mz_service}"
  119. )
  120. with self.composition.override(
  121. Materialized(
  122. name=self.workload.mz_service,
  123. ports=ports,
  124. external_blob_store=True,
  125. blob_store_is_azure=self.azurite,
  126. external_metadata_store=True,
  127. system_parameter_defaults=get_default_system_parameters(
  128. zero_downtime=True
  129. ),
  130. additional_system_parameter_defaults={
  131. "unsafe_enable_table_keys": "true"
  132. },
  133. deploy_generation=self.workload.deploy_generation,
  134. restart="on-failure",
  135. healthcheck=LEADER_STATUS_HEALTHCHECK,
  136. sanity_restart=False,
  137. ),
  138. ):
  139. self.composition.up(self.workload.mz_service, detach=True)
  140. self.composition.await_mz_deployment_status(
  141. DeploymentStatus.READY_TO_PROMOTE, self.workload.mz_service
  142. )
  143. self.composition.promote_mz(self.workload.mz_service)
  144. self.composition.await_mz_deployment_status(
  145. DeploymentStatus.IS_LEADER, self.workload.mz_service
  146. )
  147. yield None