123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- import threading
- from inspect import Traceback
- from typing import Any
- from materialize.cloudtest.app.materialize_application import MaterializeApplication
- from materialize.mz_version import MzVersion
- from materialize.mzcompose.composition import Composition
- class Executor:
- # Store the current Materialize version and keep it up-to-date during
- # upgrades so that actions can depend not just on the base version, but
- # also on the current version. This enables testing more interesting
- # scenarios which are still in development and not available a few versions
- # back already.
- current_mz_version: MzVersion
- # All the system settings we have already set in previous Mz versions. No
- # need to set them again in a future version since they should be
- # persisted.
- system_settings: set[str] = set()
- def testdrive(
- self, input: str, caller: Traceback | None = None, mz_service: str | None = None
- ) -> None:
- raise NotImplementedError
- def mzcompose_composition(self) -> Composition:
- raise NotImplementedError
- def cloudtest_application(self) -> MaterializeApplication:
- raise NotImplementedError
- def join(self, handle: Any) -> None:
- pass
- class MzcomposeExecutor(Executor):
- def __init__(self, composition: Composition) -> None:
- self.composition = composition
- def mzcompose_composition(self) -> Composition:
- return self.composition
- def testdrive(
- self, input: str, caller: Traceback | None = None, mz_service: str | None = None
- ) -> None:
- self.composition.testdrive(input, caller=caller, mz_service=mz_service)
- class MzcomposeExecutorParallel(MzcomposeExecutor):
- def __init__(self, composition: Composition) -> None:
- self.composition = composition
- self.exception: BaseException | None = None
- def testdrive(
- self, input: str, caller: Traceback | None = None, mz_service: str | None = None
- ) -> Any:
- thread = threading.Thread(target=self._testdrive, args=[input, caller])
- thread.start()
- return thread
- def _testdrive(
- self, input: str, caller: Traceback | None = None, mz_service: str | None = None
- ) -> None:
- try:
- self.composition.testdrive(input, caller=caller, mz_service=mz_service)
- except BaseException as e:
- self.exception = e
- def join(self, handle: Any) -> None:
- assert type(handle) is threading.Thread
- handle.join()
- if self.exception:
- raise self.exception
- class CloudtestExecutor(Executor):
- def __init__(self, application: MaterializeApplication, version: MzVersion) -> None:
- self.application = application
- self.seed = random.getrandbits(32)
- self.current_mz_version = version
- def cloudtest_application(self) -> MaterializeApplication:
- return self.application
- def testdrive(
- self, input: str, caller: Traceback | None = None, mz_service: str | None = None
- ) -> None:
- assert (
- mz_service is None
- ), "CloudtestExecutor not yet compatible with custom mz_service names"
- self.application.testdrive.run(
- input=input, no_reset=True, seed=self.seed, caller=caller
- )
|