mzcompose.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Verifies that objects created in previous versions of Materialize are still
  11. operational after an upgrade. See also the newer platform-checks' upgrade scenarios.
  12. """
  13. import random
  14. from materialize import buildkite
  15. from materialize.docker import image_of_release_version_exists
  16. from materialize.mz_version import MzVersion
  17. from materialize.mzcompose import get_default_system_parameters
  18. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  19. from materialize.mzcompose.services.cockroach import Cockroach
  20. from materialize.mzcompose.services.kafka import Kafka
  21. from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
  22. from materialize.mzcompose.services.mysql import MySql
  23. from materialize.mzcompose.services.postgres import Postgres
  24. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  25. from materialize.mzcompose.services.test_certs import TestCerts
  26. from materialize.mzcompose.services.testdrive import Testdrive
  27. from materialize.mzcompose.services.zookeeper import Zookeeper
  28. from materialize.version_list import (
  29. VersionsFromDocs,
  30. get_all_published_mz_versions,
  31. get_published_minor_mz_versions,
  32. get_self_managed_versions,
  33. )
  34. mz_options: dict[MzVersion, str] = {}
  35. SERVICES = [
  36. TestCerts(),
  37. Zookeeper(),
  38. Kafka(),
  39. SchemaRegistry(),
  40. Postgres(),
  41. MySql(),
  42. Cockroach(setup_materialize=True),
  43. # Overridden below
  44. Materialized(),
  45. Materialized(name="materialized2"),
  46. # N.B.: we need to use `validate_catalog_store=False` because testdrive uses
  47. # HEAD to load the catalog from disk but does *not* run migrations. There
  48. # is no guarantee that HEAD can load an old catalog without running
  49. # migrations.
  50. #
  51. # When testdrive is targeting a HEAD materialized, we re-enable catalog
  52. # validation.
  53. #
  54. # Disabling catalog validation is preferable to using a versioned testdrive
  55. # because that would involve maintaining backwards compatibility for all
  56. # testdrive commands.
  57. Testdrive(
  58. external_metadata_store=True,
  59. validate_catalog_store=False,
  60. volumes_extra=["secrets:/share/secrets", "mzdata:/mzdata"],
  61. metadata_store="cockroach",
  62. ),
  63. ]
  64. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  65. """Test upgrades from previous versions."""
  66. parser.add_argument(
  67. "filter", nargs="?", default="*", help="limit to only the files matching filter"
  68. )
  69. parser.add_argument(
  70. "--versions-source",
  71. default="docs",
  72. choices=["docs", "git"],
  73. help="from what source to fetch the versions",
  74. )
  75. parser.add_argument("--ignore-missing-version", action="store_true")
  76. parser.add_argument("--self-managed-upgrade", action="store_true")
  77. args = parser.parse_args()
  78. parallelism_index = buildkite.get_parallelism_index()
  79. parallelism_count = buildkite.get_parallelism_count()
  80. assert parallelism_count in [
  81. 1,
  82. 2,
  83. ], "Special cased parallelism, only allows values 1 or 2"
  84. all_versions, tested_versions = get_all_and_latest_two_minor_mz_versions(
  85. use_versions_from_docs=args.versions_source == "docs"
  86. )
  87. current_version = MzVersion.parse_cargo()
  88. min_upgradable_version = MzVersion.create(
  89. current_version.major, current_version.minor - 1, 0
  90. )
  91. for version in tested_versions:
  92. # Building the latest release might have failed, don't block PRs on
  93. # test pipeline for this.
  94. if args.ignore_missing_version and not image_of_release_version_exists(version):
  95. print(f"Unknown version {version}, skipping")
  96. continue
  97. priors = [
  98. v for v in all_versions if v <= version and v >= min_upgradable_version
  99. ]
  100. if len(priors) == 0:
  101. # this may happen when versions are marked as invalid
  102. print("No versions to test!")
  103. else:
  104. if parallelism_count == 1 or parallelism_index == 0:
  105. test_upgrade_from_version(
  106. c,
  107. f"{version}",
  108. priors,
  109. filter=args.filter,
  110. zero_downtime=True,
  111. force_source_table_syntax=False,
  112. )
  113. if parallelism_count == 1 or parallelism_index == 1:
  114. test_upgrade_from_version(
  115. c,
  116. f"{version}",
  117. priors,
  118. filter=args.filter,
  119. zero_downtime=False,
  120. force_source_table_syntax=False,
  121. )
  122. test_upgrade_from_version(
  123. c,
  124. f"{version}",
  125. priors,
  126. filter=args.filter,
  127. zero_downtime=False,
  128. force_source_table_syntax=True,
  129. )
  130. if parallelism_count == 1 or parallelism_index == 0:
  131. test_upgrade_from_version(
  132. c,
  133. "current_source",
  134. priors=[],
  135. filter=args.filter,
  136. zero_downtime=True,
  137. force_source_table_syntax=False,
  138. )
  139. if args.self_managed_upgrade:
  140. # Direct upgrade from latest Self-Managed version without any inbetween versions
  141. version = get_self_managed_versions()[-1]
  142. priors = [v for v in all_versions if v <= version]
  143. test_upgrade_from_version(
  144. c,
  145. f"{version}",
  146. priors,
  147. filter=args.filter,
  148. zero_downtime=False,
  149. force_source_table_syntax=True,
  150. self_managed_upgrade=True,
  151. )
  152. if parallelism_count == 1 or parallelism_index == 1:
  153. test_upgrade_from_version(
  154. c,
  155. "current_source",
  156. priors=[],
  157. filter=args.filter,
  158. zero_downtime=False,
  159. force_source_table_syntax=False,
  160. )
  161. test_upgrade_from_version(
  162. c,
  163. "current_source",
  164. priors=[],
  165. filter=args.filter,
  166. zero_downtime=False,
  167. force_source_table_syntax=True,
  168. )
  169. if args.self_managed_upgrade:
  170. # Direct upgrade from latest Self-Managed version without any inbetween versions
  171. version = get_self_managed_versions()[-1]
  172. priors = [v for v in all_versions if v <= version]
  173. test_upgrade_from_version(
  174. c,
  175. f"{version}",
  176. priors,
  177. filter=args.filter,
  178. zero_downtime=False,
  179. force_source_table_syntax=False,
  180. self_managed_upgrade=True,
  181. )
  182. def get_all_and_latest_two_minor_mz_versions(
  183. use_versions_from_docs: bool,
  184. ) -> tuple[list[MzVersion], list[MzVersion]]:
  185. current_version = MzVersion.parse_cargo()
  186. if use_versions_from_docs:
  187. version_list = VersionsFromDocs(respect_released_tag=False)
  188. all_versions = [v for v in version_list.all_versions() if v < current_version]
  189. tested_versions = version_list.minor_versions()[-2:]
  190. else:
  191. tested_versions = [
  192. v for v in get_published_minor_mz_versions() if v < current_version
  193. ]
  194. all_versions = [
  195. v
  196. for v in get_all_published_mz_versions(newest_first=False)
  197. if v < current_version
  198. ]
  199. return all_versions, tested_versions
  200. def test_upgrade_from_version(
  201. c: Composition,
  202. from_version: str,
  203. priors: list[MzVersion],
  204. filter: str,
  205. zero_downtime: bool,
  206. force_source_table_syntax: bool,
  207. self_managed_upgrade: bool = False,
  208. ) -> None:
  209. print(
  210. f"+++ Testing {'0dt upgrade' if zero_downtime else 'regular upgrade'} from Materialize {from_version} to current_source."
  211. )
  212. deploy_generation = 0
  213. # If we are testing vX.Y.Z, the glob should include all patch versions 0 to Z
  214. prior_patch_versions = []
  215. for prior in priors:
  216. for prior_patch_version in range(0, prior.patch):
  217. prior_patch_versions.append(
  218. MzVersion(
  219. major=prior.major, minor=prior.minor, patch=prior_patch_version
  220. )
  221. )
  222. # We need this to be a new variable binding otherwise `pyright` complains of
  223. # a type mismatch
  224. prior_strings = sorted(str(p) for p in priors + prior_patch_versions)
  225. if len(priors) == 0:
  226. prior_strings = ["*"]
  227. version_glob = "{" + ",".join(["any_version", *prior_strings, from_version]) + "}"
  228. print(">>> Version glob pattern: " + version_glob)
  229. c.down(destroy_volumes=True)
  230. c.up("zookeeper", "kafka", "schema-registry", "postgres", "mysql")
  231. mz_service = "materialized"
  232. if from_version != "current_source":
  233. version = MzVersion.parse_mz(from_version)
  234. system_parameter_defaults = get_default_system_parameters(
  235. version=version,
  236. zero_downtime=zero_downtime,
  237. )
  238. mz_from = Materialized(
  239. name=mz_service,
  240. image=f"materialize/materialized:{from_version}",
  241. options=[
  242. opt
  243. for start_version, opt in mz_options.items()
  244. if version >= start_version
  245. ],
  246. volumes_extra=["secrets:/share/secrets"],
  247. external_metadata_store=True,
  248. system_parameter_defaults=system_parameter_defaults,
  249. deploy_generation=deploy_generation,
  250. restart="on-failure",
  251. sanity_restart=False,
  252. metadata_store="cockroach",
  253. )
  254. with c.override(mz_from):
  255. c.up(mz_service)
  256. else:
  257. system_parameter_defaults = get_default_system_parameters(
  258. version=MzVersion.parse_cargo(),
  259. zero_downtime=zero_downtime,
  260. )
  261. mz_from = Materialized(
  262. name=mz_service,
  263. options=list(mz_options.values()),
  264. volumes_extra=["secrets:/share/secrets"],
  265. external_metadata_store=True,
  266. system_parameter_defaults=system_parameter_defaults,
  267. restart="on-failure",
  268. sanity_restart=False,
  269. metadata_store="cockroach",
  270. )
  271. with c.override(mz_from):
  272. c.up(mz_service)
  273. temp_dir = f"--temp-dir=/share/tmp/upgrade-from-{from_version}"
  274. seed = f"--seed={random.getrandbits(32)}"
  275. c.run_testdrive_files(
  276. "--no-reset",
  277. f"--var=upgrade-from-version={from_version}",
  278. "--var=created-cluster=quickstart",
  279. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  280. "--var=mysql-user-password=us3rp4ssw0rd",
  281. temp_dir,
  282. seed,
  283. f"create-in-{version_glob}-{filter}.td",
  284. mz_service=mz_service,
  285. )
  286. if zero_downtime:
  287. mz_service = "materialized2"
  288. deploy_generation += 1
  289. c.rm("testdrive")
  290. else:
  291. c.kill(mz_service)
  292. c.rm(mz_service, "testdrive")
  293. if from_version != "current_source" and not self_managed_upgrade:
  294. current_version = MzVersion.parse_cargo()
  295. # We can't skip in-between minor versions anymore, so go through all of them
  296. for version in [
  297. v
  298. for v in get_published_minor_mz_versions(newest_first=False)
  299. if v < current_version
  300. ]:
  301. if version <= from_version:
  302. continue
  303. if version >= MzVersion.parse_cargo():
  304. continue
  305. system_parameter_defaults = get_default_system_parameters(
  306. version=version,
  307. zero_downtime=zero_downtime,
  308. )
  309. print(
  310. f"'{'0dt-' if zero_downtime else ''}Upgrading to in-between version {version}"
  311. )
  312. with c.override(
  313. Materialized(
  314. name=mz_service,
  315. image=f"materialize/materialized:{version}",
  316. options=[
  317. opt
  318. for start_version, opt in mz_options.items()
  319. if version >= start_version
  320. ],
  321. volumes_extra=["secrets:/share/secrets"],
  322. external_metadata_store=True,
  323. system_parameter_defaults=system_parameter_defaults,
  324. deploy_generation=deploy_generation,
  325. restart="on-failure",
  326. sanity_restart=False,
  327. metadata_store="cockroach",
  328. )
  329. ):
  330. c.up(mz_service)
  331. if zero_downtime:
  332. c.await_mz_deployment_status(
  333. DeploymentStatus.READY_TO_PROMOTE, mz_service
  334. )
  335. c.promote_mz(mz_service)
  336. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz_service)
  337. mz_service = (
  338. "materialized2"
  339. if mz_service == "materialized"
  340. else "materialized"
  341. )
  342. deploy_generation += 1
  343. else:
  344. c.kill(mz_service)
  345. c.rm(mz_service)
  346. print(f"{'0dt-' if zero_downtime else ''}Upgrading to final version")
  347. system_parameter_defaults = get_default_system_parameters(
  348. zero_downtime=zero_downtime,
  349. # We can only force the syntax on the final version so that the migration to convert
  350. # sources to the new model can be applied without preventing sources from being
  351. # created in the old syntax on the older version.
  352. force_source_table_syntax=force_source_table_syntax,
  353. )
  354. mz_to = Materialized(
  355. name=mz_service,
  356. options=list(mz_options.values()),
  357. volumes_extra=["secrets:/share/secrets"],
  358. external_metadata_store=True,
  359. system_parameter_defaults=system_parameter_defaults,
  360. deploy_generation=deploy_generation,
  361. restart="on-failure",
  362. sanity_restart=False,
  363. metadata_store="cockroach",
  364. )
  365. with c.override(mz_to):
  366. c.up(mz_service)
  367. if zero_downtime:
  368. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, mz_service)
  369. c.promote_mz(mz_service)
  370. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz_service)
  371. else:
  372. # Restart once more, just in case
  373. c.kill(mz_service)
  374. c.rm(mz_service)
  375. c.up(mz_service)
  376. with c.override(
  377. Testdrive(
  378. external_metadata_store=True,
  379. validate_catalog_store=True,
  380. volumes_extra=["secrets:/share/secrets", "mzdata:/mzdata"],
  381. metadata_store="cockroach",
  382. )
  383. ):
  384. c.run_testdrive_files(
  385. "--no-reset",
  386. f"--var=upgrade-from-version={from_version}",
  387. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  388. "--var=created-cluster=quickstart",
  389. temp_dir,
  390. seed,
  391. f"check-from-{version_glob}-{filter}.td",
  392. mz_service=mz_service,
  393. )