scenarios_upgrade.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.checks.actions import Action, Initialize, Manipulate, Sleep, Validate
  10. from materialize.checks.checks import Check
  11. from materialize.checks.executors import Executor
  12. from materialize.checks.features import Features
  13. from materialize.checks.mzcompose_actions import (
  14. KillClusterdCompute,
  15. KillMz,
  16. PromoteMz,
  17. StartClusterdCompute,
  18. StartMz,
  19. UseClusterdCompute,
  20. WaitReadyMz,
  21. )
  22. from materialize.checks.scenarios import Scenario
  23. from materialize.mz_version import MzVersion
  24. from materialize.mzcompose.services.materialized import LEADER_STATUS_HEALTHCHECK
  25. from materialize.version_list import (
  26. get_published_minor_mz_versions,
  27. get_self_managed_versions,
  28. )
  29. # late initialization
  30. _minor_versions: list[MzVersion] | None = None
  31. _last_version: MzVersion | None = None
  32. _previous_version: MzVersion | None = None
  33. def get_minor_versions() -> list[MzVersion]:
  34. global _minor_versions
  35. if _minor_versions is None:
  36. current_version = MzVersion.parse_cargo()
  37. _minor_versions = [
  38. v
  39. for v in get_published_minor_mz_versions(exclude_current_minor_version=True)
  40. if v < current_version
  41. ]
  42. return _minor_versions
  43. def get_last_version() -> MzVersion:
  44. global _last_version
  45. if _last_version is None:
  46. _last_version = get_minor_versions()[0]
  47. return _last_version
  48. def get_previous_version() -> MzVersion:
  49. global _previous_version
  50. if _previous_version is None:
  51. _previous_version = get_minor_versions()[1]
  52. return _previous_version
  53. def start_mz_read_only(
  54. scenario: Scenario,
  55. deploy_generation: int,
  56. mz_service: str = "materialized",
  57. tag: MzVersion | None = None,
  58. system_parameter_defaults: dict[str, str] | None = None,
  59. system_parameter_version: MzVersion | None = None,
  60. force_migrations: str | None = None,
  61. publish: bool | None = None,
  62. ) -> StartMz:
  63. return StartMz(
  64. scenario,
  65. tag=tag,
  66. mz_service=mz_service,
  67. deploy_generation=deploy_generation,
  68. healthcheck=LEADER_STATUS_HEALTHCHECK,
  69. restart="on-failure",
  70. system_parameter_defaults=system_parameter_defaults,
  71. system_parameter_version=system_parameter_version,
  72. force_migrations=force_migrations,
  73. publish=publish,
  74. )
  75. class UpgradeEntireMzFromLatestSelfManaged(Scenario):
  76. """Upgrade the entire Mz instance from the last Self-Managed version without any intermediate steps. This makes sure our Self-Managed releases for self-managed Materialize stay upgradable."""
  77. def base_version(self) -> MzVersion:
  78. return get_self_managed_versions()[-1]
  79. def actions(self) -> list[Action]:
  80. print(f"Upgrading from tag {self.base_version()}")
  81. return [
  82. StartMz(
  83. self,
  84. tag=self.base_version(),
  85. ),
  86. Initialize(self),
  87. Manipulate(self, phase=1),
  88. KillMz(
  89. capture_logs=True
  90. ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
  91. StartMz(
  92. self,
  93. tag=None,
  94. ),
  95. Manipulate(self, phase=2),
  96. Validate(self),
  97. # A second restart while already on the new version
  98. KillMz(capture_logs=True),
  99. StartMz(
  100. self,
  101. tag=None,
  102. ),
  103. Validate(self),
  104. ]
  105. class UpgradeEntireMzFromPreviousSelfManaged(Scenario):
  106. """Upgrade the entire Mz instance through the last two Self-Managed versions. This makes sure our Self-Managed releases for self-managed Materialize stay upgradable."""
  107. def base_version(self) -> MzVersion:
  108. return get_self_managed_versions()[-2]
  109. def actions(self) -> list[Action]:
  110. print(
  111. f"Upgrading from tag {self.base_version()} through {get_self_managed_versions()[-1]}"
  112. )
  113. return [
  114. StartMz(
  115. self,
  116. tag=self.base_version(),
  117. ),
  118. Initialize(self),
  119. Manipulate(self, phase=1),
  120. KillMz(
  121. capture_logs=True
  122. ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
  123. StartMz(self, tag=get_self_managed_versions()[-1]),
  124. Manipulate(self, phase=2),
  125. KillMz(
  126. capture_logs=True
  127. ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
  128. StartMz(
  129. self,
  130. tag=None,
  131. ),
  132. Validate(self),
  133. # A second restart while already on the new version
  134. KillMz(capture_logs=True),
  135. StartMz(
  136. self,
  137. tag=None,
  138. ),
  139. Validate(self),
  140. ]
  141. class UpgradeEntireMz(Scenario):
  142. """Upgrade the entire Mz instance from the last released version."""
  143. def base_version(self) -> MzVersion:
  144. return get_last_version()
  145. def actions(self) -> list[Action]:
  146. print(f"Upgrading from tag {self.base_version()}")
  147. return [
  148. StartMz(
  149. self,
  150. tag=self.base_version(),
  151. ),
  152. Initialize(self),
  153. Manipulate(self, phase=1),
  154. KillMz(
  155. capture_logs=True
  156. ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
  157. StartMz(
  158. self,
  159. tag=None,
  160. ),
  161. Manipulate(self, phase=2),
  162. Validate(self),
  163. # A second restart while already on the new version
  164. KillMz(capture_logs=True),
  165. StartMz(
  166. self,
  167. tag=None,
  168. ),
  169. Validate(self),
  170. ]
  171. class UpgradeEntireMzTwoVersions(Scenario):
  172. """Upgrade the entire Mz instance starting from the previous
  173. released version and passing through the last released version."""
  174. def base_version(self) -> MzVersion:
  175. return get_previous_version()
  176. def actions(self) -> list[Action]:
  177. print(f"Upgrade path: {self.base_version()} -> {get_last_version()} -> current")
  178. return [
  179. # Start with previous_version
  180. StartMz(
  181. self,
  182. tag=self.base_version(),
  183. ),
  184. Initialize(self),
  185. # Upgrade to last_version
  186. KillMz(capture_logs=True),
  187. StartMz(
  188. self,
  189. tag=get_last_version(),
  190. ),
  191. Manipulate(self, phase=1),
  192. # Upgrade to current source
  193. KillMz(capture_logs=True),
  194. StartMz(
  195. self,
  196. tag=None,
  197. ),
  198. Manipulate(self, phase=2),
  199. Validate(self),
  200. # A second restart while already on the current source
  201. KillMz(),
  202. StartMz(
  203. self,
  204. tag=None,
  205. ),
  206. Validate(self),
  207. ]
  208. class UpgradeEntireMzFourVersions(Scenario):
  209. """Test upgrade X-4 -> X-3 -> X-2 -> X-1 -> X"""
  210. def __init__(
  211. self,
  212. checks: list[type[Check]],
  213. executor: Executor,
  214. features: Features,
  215. seed: str | None = None,
  216. ):
  217. self.minor_versions = get_minor_versions()
  218. super().__init__(checks, executor, features, seed)
  219. def base_version(self) -> MzVersion:
  220. return self.minor_versions[3]
  221. def actions(self) -> list[Action]:
  222. print(
  223. f"Upgrade path: {self.minor_versions[3]} -> {self.minor_versions[2]} -> {get_previous_version()} -> {get_last_version()} -> current"
  224. )
  225. return [
  226. StartMz(
  227. self,
  228. tag=self.minor_versions[3],
  229. ),
  230. Initialize(self),
  231. KillMz(capture_logs=True),
  232. StartMz(
  233. self,
  234. tag=self.minor_versions[2],
  235. ),
  236. Manipulate(self, phase=1),
  237. KillMz(capture_logs=True),
  238. StartMz(
  239. self,
  240. tag=get_previous_version(),
  241. ),
  242. Manipulate(self, phase=2),
  243. KillMz(capture_logs=True),
  244. StartMz(
  245. self,
  246. tag=get_last_version(),
  247. ),
  248. KillMz(capture_logs=True),
  249. StartMz(
  250. self,
  251. tag=None,
  252. ),
  253. Validate(self),
  254. KillMz(),
  255. StartMz(
  256. self,
  257. tag=None,
  258. ),
  259. Validate(self),
  260. ]
  261. #
  262. # We are limited with respect to the different orders in which stuff can be upgraded:
  263. # - some sequences of events are invalid
  264. # - environmentd and storage clusterds are located in the same container
  265. #
  266. # Still, we would like to try as many scenarios as we can
  267. #
  268. class UpgradeClusterdComputeLast(Scenario):
  269. """Upgrade compute's clusterd separately after upgrading environmentd"""
  270. def base_version(self) -> MzVersion:
  271. return get_last_version()
  272. def actions(self) -> list[Action]:
  273. return [
  274. StartMz(
  275. self,
  276. tag=self.base_version(),
  277. ),
  278. StartClusterdCompute(tag=self.base_version()),
  279. UseClusterdCompute(self),
  280. Initialize(self),
  281. Manipulate(self, phase=1),
  282. KillMz(capture_logs=True),
  283. StartMz(
  284. self,
  285. tag=None,
  286. system_parameter_version=self.base_version(),
  287. ),
  288. # No useful work can be done while clusterd is old-version
  289. # and environmentd is new-version. So we proceed
  290. # to upgrade clusterd as well.
  291. # We sleep here to allow some period of coexistence, even
  292. # though we are not issuing queries during that time.
  293. Sleep(10),
  294. KillClusterdCompute(capture_logs=True),
  295. StartClusterdCompute(tag=None),
  296. Manipulate(self, phase=2),
  297. Validate(self),
  298. # A second restart while already on the new version
  299. KillMz(),
  300. StartMz(
  301. self,
  302. tag=None,
  303. ),
  304. Validate(self),
  305. ]
  306. class UpgradeClusterdComputeFirst(Scenario):
  307. """Upgrade compute's clusterd separately before environmentd"""
  308. def base_version(self) -> MzVersion:
  309. return get_last_version()
  310. def actions(self) -> list[Action]:
  311. return [
  312. StartMz(
  313. self,
  314. tag=self.base_version(),
  315. ),
  316. StartClusterdCompute(tag=self.base_version()),
  317. UseClusterdCompute(self),
  318. Initialize(self),
  319. Manipulate(self, phase=1),
  320. KillClusterdCompute(capture_logs=True),
  321. StartClusterdCompute(tag=None),
  322. # No useful work can be done while clusterd is new-version
  323. # and environmentd is old-version. So we
  324. # proceed to upgrade them as well.
  325. # We sleep here to allow some period of coexistence, even
  326. # though we are not issuing queries during that time.
  327. Sleep(10),
  328. KillMz(),
  329. StartMz(
  330. self,
  331. tag=None,
  332. ),
  333. Manipulate(self, phase=2),
  334. Validate(self),
  335. KillMz(),
  336. StartMz(
  337. self,
  338. tag=None,
  339. ),
  340. Validate(self),
  341. ]
  342. class PreflightCheckContinue(Scenario):
  343. """Preflight check, then upgrade"""
  344. def base_version(self) -> MzVersion:
  345. return get_last_version()
  346. def _include_check_class(self, check_class: type[Check]) -> bool:
  347. if not super()._include_check_class(check_class):
  348. return False
  349. return True
  350. def actions(self) -> list[Action]:
  351. print(f"Upgrading from tag {self.base_version()}")
  352. return [
  353. StartMz(
  354. self,
  355. tag=self.base_version(),
  356. ),
  357. Initialize(self),
  358. Manipulate(self, phase=1),
  359. KillMz(
  360. capture_logs=True
  361. ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
  362. start_mz_read_only(self, tag=None, deploy_generation=1),
  363. WaitReadyMz(),
  364. PromoteMz(),
  365. Manipulate(self, phase=2),
  366. Validate(self),
  367. # A second restart while already on the new version
  368. KillMz(),
  369. StartMz(
  370. self,
  371. tag=None,
  372. deploy_generation=1,
  373. ),
  374. Validate(self),
  375. ]
  376. class PreflightCheckRollback(Scenario):
  377. """Preflight check, then roll back"""
  378. def base_version(self) -> MzVersion:
  379. return get_last_version()
  380. def actions(self) -> list[Action]:
  381. print(f"Upgrading from tag {self.base_version()}")
  382. return [
  383. StartMz(
  384. self,
  385. tag=self.base_version(),
  386. ),
  387. Initialize(self),
  388. Manipulate(self, phase=1),
  389. KillMz(
  390. capture_logs=True
  391. ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
  392. start_mz_read_only(
  393. self,
  394. tag=None,
  395. deploy_generation=1,
  396. system_parameter_version=self.base_version(),
  397. ),
  398. WaitReadyMz(),
  399. KillMz(capture_logs=True),
  400. StartMz(
  401. self,
  402. tag=self.base_version(),
  403. ),
  404. Manipulate(self, phase=2),
  405. Validate(self),
  406. # A second restart while still on old version
  407. KillMz(capture_logs=True),
  408. StartMz(
  409. self,
  410. tag=self.base_version(),
  411. ),
  412. Validate(self),
  413. ]
  414. class ActivateSourceVersioningMigration(Scenario):
  415. """
  416. Starts MZ, initializes and manipulates, then forces the migration
  417. of sources to the new table model (introducing Source Versioning).
  418. """
  419. def base_version(self) -> MzVersion:
  420. return get_last_version()
  421. def actions(self) -> list[Action]:
  422. print(f"Upgrading from tag {self.base_version()}")
  423. return [
  424. StartMz(
  425. self,
  426. tag=self.base_version(),
  427. ),
  428. Initialize(self),
  429. Manipulate(self, phase=1),
  430. KillMz(
  431. capture_logs=True
  432. ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
  433. StartMz(
  434. self,
  435. tag=None,
  436. # Activate the `force_source_table_syntax` flag
  437. # which should trigger the migration of sources
  438. # using the old syntax to the new table model.
  439. additional_system_parameter_defaults={
  440. "force_source_table_syntax": "true",
  441. },
  442. ),
  443. Manipulate(self, phase=2),
  444. Validate(self),
  445. # A second restart while already on the new version
  446. KillMz(capture_logs=True),
  447. StartMz(
  448. self,
  449. tag=None,
  450. additional_system_parameter_defaults={
  451. "force_source_table_syntax": "true",
  452. },
  453. ),
  454. Validate(self),
  455. ]