123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Verifies that objects created in previous versions of Materialize are still
- operational after an upgrade. See also the newer platform-checks' upgrade scenarios.
- """
- import random
- from materialize import buildkite
- from materialize.docker import image_of_release_version_exists
- from materialize.mz_version import MzVersion
- from materialize.mzcompose import get_default_system_parameters
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.cockroach import Cockroach
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
- from materialize.mzcompose.services.mysql import MySql
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.test_certs import TestCerts
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.zookeeper import Zookeeper
- from materialize.version_list import (
- VersionsFromDocs,
- get_all_published_mz_versions,
- get_published_minor_mz_versions,
- get_self_managed_versions,
- )
- mz_options: dict[MzVersion, str] = {}
- SERVICES = [
- TestCerts(),
- Zookeeper(),
- Kafka(),
- SchemaRegistry(),
- Postgres(),
- MySql(),
- Cockroach(setup_materialize=True),
- # Overridden below
- Materialized(),
- Materialized(name="materialized2"),
- # N.B.: we need to use `validate_catalog_store=False` because testdrive uses
- # HEAD to load the catalog from disk but does *not* run migrations. There
- # is no guarantee that HEAD can load an old catalog without running
- # migrations.
- #
- # When testdrive is targeting a HEAD materialized, we re-enable catalog
- # validation.
- #
- # Disabling catalog validation is preferable to using a versioned testdrive
- # because that would involve maintaining backwards compatibility for all
- # testdrive commands.
- Testdrive(
- external_metadata_store=True,
- validate_catalog_store=False,
- volumes_extra=["secrets:/share/secrets", "mzdata:/mzdata"],
- metadata_store="cockroach",
- ),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- """Test upgrades from previous versions."""
- parser.add_argument(
- "filter", nargs="?", default="*", help="limit to only the files matching filter"
- )
- parser.add_argument(
- "--versions-source",
- default="docs",
- choices=["docs", "git"],
- help="from what source to fetch the versions",
- )
- parser.add_argument("--ignore-missing-version", action="store_true")
- parser.add_argument("--self-managed-upgrade", action="store_true")
- args = parser.parse_args()
- parallelism_index = buildkite.get_parallelism_index()
- parallelism_count = buildkite.get_parallelism_count()
- assert parallelism_count in [
- 1,
- 2,
- ], "Special cased parallelism, only allows values 1 or 2"
- all_versions, tested_versions = get_all_and_latest_two_minor_mz_versions(
- use_versions_from_docs=args.versions_source == "docs"
- )
- current_version = MzVersion.parse_cargo()
- min_upgradable_version = MzVersion.create(
- current_version.major, current_version.minor - 1, 0
- )
- for version in tested_versions:
- # Building the latest release might have failed, don't block PRs on
- # test pipeline for this.
- if args.ignore_missing_version and not image_of_release_version_exists(version):
- print(f"Unknown version {version}, skipping")
- continue
- priors = [
- v for v in all_versions if v <= version and v >= min_upgradable_version
- ]
- if len(priors) == 0:
- # this may happen when versions are marked as invalid
- print("No versions to test!")
- else:
- if parallelism_count == 1 or parallelism_index == 0:
- test_upgrade_from_version(
- c,
- f"{version}",
- priors,
- filter=args.filter,
- zero_downtime=True,
- force_source_table_syntax=False,
- )
- if parallelism_count == 1 or parallelism_index == 1:
- test_upgrade_from_version(
- c,
- f"{version}",
- priors,
- filter=args.filter,
- zero_downtime=False,
- force_source_table_syntax=False,
- )
- test_upgrade_from_version(
- c,
- f"{version}",
- priors,
- filter=args.filter,
- zero_downtime=False,
- force_source_table_syntax=True,
- )
- if parallelism_count == 1 or parallelism_index == 0:
- test_upgrade_from_version(
- c,
- "current_source",
- priors=[],
- filter=args.filter,
- zero_downtime=True,
- force_source_table_syntax=False,
- )
- if args.self_managed_upgrade:
- # Direct upgrade from latest Self-Managed version without any inbetween versions
- version = get_self_managed_versions()[-1]
- priors = [v for v in all_versions if v <= version]
- test_upgrade_from_version(
- c,
- f"{version}",
- priors,
- filter=args.filter,
- zero_downtime=False,
- force_source_table_syntax=True,
- self_managed_upgrade=True,
- )
- if parallelism_count == 1 or parallelism_index == 1:
- test_upgrade_from_version(
- c,
- "current_source",
- priors=[],
- filter=args.filter,
- zero_downtime=False,
- force_source_table_syntax=False,
- )
- test_upgrade_from_version(
- c,
- "current_source",
- priors=[],
- filter=args.filter,
- zero_downtime=False,
- force_source_table_syntax=True,
- )
- if args.self_managed_upgrade:
- # Direct upgrade from latest Self-Managed version without any inbetween versions
- version = get_self_managed_versions()[-1]
- priors = [v for v in all_versions if v <= version]
- test_upgrade_from_version(
- c,
- f"{version}",
- priors,
- filter=args.filter,
- zero_downtime=False,
- force_source_table_syntax=False,
- self_managed_upgrade=True,
- )
- def get_all_and_latest_two_minor_mz_versions(
- use_versions_from_docs: bool,
- ) -> tuple[list[MzVersion], list[MzVersion]]:
- current_version = MzVersion.parse_cargo()
- if use_versions_from_docs:
- version_list = VersionsFromDocs(respect_released_tag=False)
- all_versions = [v for v in version_list.all_versions() if v < current_version]
- tested_versions = version_list.minor_versions()[-2:]
- else:
- tested_versions = [
- v for v in get_published_minor_mz_versions() if v < current_version
- ]
- all_versions = [
- v
- for v in get_all_published_mz_versions(newest_first=False)
- if v < current_version
- ]
- return all_versions, tested_versions
- def test_upgrade_from_version(
- c: Composition,
- from_version: str,
- priors: list[MzVersion],
- filter: str,
- zero_downtime: bool,
- force_source_table_syntax: bool,
- self_managed_upgrade: bool = False,
- ) -> None:
- print(
- f"+++ Testing {'0dt upgrade' if zero_downtime else 'regular upgrade'} from Materialize {from_version} to current_source."
- )
- deploy_generation = 0
- # If we are testing vX.Y.Z, the glob should include all patch versions 0 to Z
- prior_patch_versions = []
- for prior in priors:
- for prior_patch_version in range(0, prior.patch):
- prior_patch_versions.append(
- MzVersion(
- major=prior.major, minor=prior.minor, patch=prior_patch_version
- )
- )
- # We need this to be a new variable binding otherwise `pyright` complains of
- # a type mismatch
- prior_strings = sorted(str(p) for p in priors + prior_patch_versions)
- if len(priors) == 0:
- prior_strings = ["*"]
- version_glob = "{" + ",".join(["any_version", *prior_strings, from_version]) + "}"
- print(">>> Version glob pattern: " + version_glob)
- c.down(destroy_volumes=True)
- c.up("zookeeper", "kafka", "schema-registry", "postgres", "mysql")
- mz_service = "materialized"
- if from_version != "current_source":
- version = MzVersion.parse_mz(from_version)
- system_parameter_defaults = get_default_system_parameters(
- version=version,
- zero_downtime=zero_downtime,
- )
- mz_from = Materialized(
- name=mz_service,
- image=f"materialize/materialized:{from_version}",
- options=[
- opt
- for start_version, opt in mz_options.items()
- if version >= start_version
- ],
- volumes_extra=["secrets:/share/secrets"],
- external_metadata_store=True,
- system_parameter_defaults=system_parameter_defaults,
- deploy_generation=deploy_generation,
- restart="on-failure",
- sanity_restart=False,
- metadata_store="cockroach",
- )
- with c.override(mz_from):
- c.up(mz_service)
- else:
- system_parameter_defaults = get_default_system_parameters(
- version=MzVersion.parse_cargo(),
- zero_downtime=zero_downtime,
- )
- mz_from = Materialized(
- name=mz_service,
- options=list(mz_options.values()),
- volumes_extra=["secrets:/share/secrets"],
- external_metadata_store=True,
- system_parameter_defaults=system_parameter_defaults,
- restart="on-failure",
- sanity_restart=False,
- metadata_store="cockroach",
- )
- with c.override(mz_from):
- c.up(mz_service)
- temp_dir = f"--temp-dir=/share/tmp/upgrade-from-{from_version}"
- seed = f"--seed={random.getrandbits(32)}"
- c.run_testdrive_files(
- "--no-reset",
- f"--var=upgrade-from-version={from_version}",
- "--var=created-cluster=quickstart",
- f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
- "--var=mysql-user-password=us3rp4ssw0rd",
- temp_dir,
- seed,
- f"create-in-{version_glob}-{filter}.td",
- mz_service=mz_service,
- )
- if zero_downtime:
- mz_service = "materialized2"
- deploy_generation += 1
- c.rm("testdrive")
- else:
- c.kill(mz_service)
- c.rm(mz_service, "testdrive")
- if from_version != "current_source" and not self_managed_upgrade:
- current_version = MzVersion.parse_cargo()
- # We can't skip in-between minor versions anymore, so go through all of them
- for version in [
- v
- for v in get_published_minor_mz_versions(newest_first=False)
- if v < current_version
- ]:
- if version <= from_version:
- continue
- if version >= MzVersion.parse_cargo():
- continue
- system_parameter_defaults = get_default_system_parameters(
- version=version,
- zero_downtime=zero_downtime,
- )
- print(
- f"'{'0dt-' if zero_downtime else ''}Upgrading to in-between version {version}"
- )
- with c.override(
- Materialized(
- name=mz_service,
- image=f"materialize/materialized:{version}",
- options=[
- opt
- for start_version, opt in mz_options.items()
- if version >= start_version
- ],
- volumes_extra=["secrets:/share/secrets"],
- external_metadata_store=True,
- system_parameter_defaults=system_parameter_defaults,
- deploy_generation=deploy_generation,
- restart="on-failure",
- sanity_restart=False,
- metadata_store="cockroach",
- )
- ):
- c.up(mz_service)
- if zero_downtime:
- c.await_mz_deployment_status(
- DeploymentStatus.READY_TO_PROMOTE, mz_service
- )
- c.promote_mz(mz_service)
- c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz_service)
- mz_service = (
- "materialized2"
- if mz_service == "materialized"
- else "materialized"
- )
- deploy_generation += 1
- else:
- c.kill(mz_service)
- c.rm(mz_service)
- print(f"{'0dt-' if zero_downtime else ''}Upgrading to final version")
- system_parameter_defaults = get_default_system_parameters(
- zero_downtime=zero_downtime,
- # We can only force the syntax on the final version so that the migration to convert
- # sources to the new model can be applied without preventing sources from being
- # created in the old syntax on the older version.
- force_source_table_syntax=force_source_table_syntax,
- )
- mz_to = Materialized(
- name=mz_service,
- options=list(mz_options.values()),
- volumes_extra=["secrets:/share/secrets"],
- external_metadata_store=True,
- system_parameter_defaults=system_parameter_defaults,
- deploy_generation=deploy_generation,
- restart="on-failure",
- sanity_restart=False,
- metadata_store="cockroach",
- )
- with c.override(mz_to):
- c.up(mz_service)
- if zero_downtime:
- c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, mz_service)
- c.promote_mz(mz_service)
- c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz_service)
- else:
- # Restart once more, just in case
- c.kill(mz_service)
- c.rm(mz_service)
- c.up(mz_service)
- with c.override(
- Testdrive(
- external_metadata_store=True,
- validate_catalog_store=True,
- volumes_extra=["secrets:/share/secrets", "mzdata:/mzdata"],
- metadata_store="cockroach",
- )
- ):
- c.run_testdrive_files(
- "--no-reset",
- f"--var=upgrade-from-version={from_version}",
- f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
- "--var=created-cluster=quickstart",
- temp_dir,
- seed,
- f"check-from-{version_glob}-{filter}.td",
- mz_service=mz_service,
- )
|