executors.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. import threading
  11. from inspect import Traceback
  12. from typing import Any
  13. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  14. from materialize.mz_version import MzVersion
  15. from materialize.mzcompose.composition import Composition
  16. class Executor:
  17. # Store the current Materialize version and keep it up-to-date during
  18. # upgrades so that actions can depend not just on the base version, but
  19. # also on the current version. This enables testing more interesting
  20. # scenarios which are still in development and not available a few versions
  21. # back already.
  22. current_mz_version: MzVersion
  23. # All the system settings we have already set in previous Mz versions. No
  24. # need to set them again in a future version since they should be
  25. # persisted.
  26. system_settings: set[str] = set()
  27. def testdrive(
  28. self, input: str, caller: Traceback | None = None, mz_service: str | None = None
  29. ) -> None:
  30. raise NotImplementedError
  31. def mzcompose_composition(self) -> Composition:
  32. raise NotImplementedError
  33. def cloudtest_application(self) -> MaterializeApplication:
  34. raise NotImplementedError
  35. def join(self, handle: Any) -> None:
  36. pass
  37. class MzcomposeExecutor(Executor):
  38. def __init__(self, composition: Composition) -> None:
  39. self.composition = composition
  40. def mzcompose_composition(self) -> Composition:
  41. return self.composition
  42. def testdrive(
  43. self, input: str, caller: Traceback | None = None, mz_service: str | None = None
  44. ) -> None:
  45. self.composition.testdrive(input, caller=caller, mz_service=mz_service)
  46. class MzcomposeExecutorParallel(MzcomposeExecutor):
  47. def __init__(self, composition: Composition) -> None:
  48. self.composition = composition
  49. self.exception: BaseException | None = None
  50. def testdrive(
  51. self, input: str, caller: Traceback | None = None, mz_service: str | None = None
  52. ) -> Any:
  53. thread = threading.Thread(target=self._testdrive, args=[input, caller])
  54. thread.start()
  55. return thread
  56. def _testdrive(
  57. self, input: str, caller: Traceback | None = None, mz_service: str | None = None
  58. ) -> None:
  59. try:
  60. self.composition.testdrive(input, caller=caller, mz_service=mz_service)
  61. except BaseException as e:
  62. self.exception = e
  63. def join(self, handle: Any) -> None:
  64. assert type(handle) is threading.Thread
  65. handle.join()
  66. if self.exception:
  67. raise self.exception
  68. class CloudtestExecutor(Executor):
  69. def __init__(self, application: MaterializeApplication, version: MzVersion) -> None:
  70. self.application = application
  71. self.seed = random.getrandbits(32)
  72. self.current_mz_version = version
  73. def cloudtest_application(self) -> MaterializeApplication:
  74. return self.application
  75. def testdrive(
  76. self, input: str, caller: Traceback | None = None, mz_service: str | None = None
  77. ) -> None:
  78. assert (
  79. mz_service is None
  80. ), "CloudtestExecutor not yet compatible with custom mz_service names"
  81. self.application.testdrive.run(
  82. input=input, no_reset=True, seed=self.seed, caller=caller
  83. )