executor.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from collections.abc import Callable
  10. from typing import Any
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.mzcompose.services.clusterd import Clusterd
  13. from materialize.mzcompose.services.materialized import Materialized
  14. from materialize.mzcompose.services.mysql import MySql
  15. class Executor:
  16. _known_fragments: set[str] = set()
  17. def Lambda(self, _lambda: Callable[["Executor"], float]) -> float:
  18. return _lambda(self)
  19. def Td(self, input: str) -> Any:
  20. raise NotImplementedError
  21. def Kgen(self, topic: str, args: list[str]) -> Any:
  22. raise NotImplementedError
  23. def add_known_fragment(self, fragment: str) -> bool:
  24. """
  25. Record whether a TD fragment has been printed already. Returns true
  26. if it wasn't added before.
  27. """
  28. result = fragment not in self._known_fragments
  29. self._known_fragments.add(fragment)
  30. return result
  31. def DockerMemMz(self) -> int:
  32. raise NotImplementedError
  33. def DockerMemClusterd(self) -> int:
  34. raise NotImplementedError
  35. class Docker(Executor):
  36. def __init__(
  37. self,
  38. composition: Composition,
  39. seed: int,
  40. materialized: Materialized,
  41. clusterd: Clusterd,
  42. ) -> None:
  43. self._composition = composition
  44. self._seed = seed
  45. self._materialized = materialized
  46. self._clusterd = clusterd
  47. def RestartMzClusterd(self) -> None:
  48. self._composition.kill("materialized")
  49. self._composition.kill("clusterd")
  50. # Make sure we are restarting Materialized() with the
  51. # same parameters (docker tag, SIZE) it was initially started with
  52. with self._composition.override(self._materialized, self._clusterd):
  53. self._composition.up("materialized")
  54. self._composition.up("clusterd")
  55. return None
  56. def Td(self, input: str) -> Any:
  57. return self._composition.exec(
  58. "testdrive",
  59. "--no-reset",
  60. f"--seed={self._seed}",
  61. "--initial-backoff=10ms", # Retry every 10ms until success
  62. "--backoff-factor=0",
  63. "--consistency-checks=disable",
  64. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  65. stdin=input,
  66. capture=True,
  67. ).stdout
  68. def Kgen(self, topic: str, args: list[str]) -> Any:
  69. return self._composition.run(
  70. "kgen", f"--topic=testdrive-{topic}-{self._seed}", *args
  71. )
  72. def DockerMemMz(self) -> int:
  73. return self._composition.mem("materialized")
  74. def DockerMemClusterd(self) -> int:
  75. return self._composition.mem("clusterd")
  76. class MzCloud(Executor):
  77. def __init__(
  78. self,
  79. composition: Composition,
  80. seed: int,
  81. mzcloud_url: str,
  82. external_addr: str,
  83. ) -> None:
  84. self._composition = composition
  85. self._seed = seed
  86. self._mzcloud_url = mzcloud_url
  87. self._external_addr = external_addr
  88. self._testdrive_args = [
  89. f"--materialize-url={self._mzcloud_url}",
  90. f"--kafka-addr={self._external_addr}:9092",
  91. f"--schema-registry-url=http://{self._external_addr}:8081",
  92. f"--seed={self._seed}",
  93. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  94. ]
  95. def RestartMzClusterd(self) -> None:
  96. assert False, "We can't restart the cloud"
  97. def Reset(self) -> None:
  98. print("resetting")
  99. self._composition.exec(
  100. "testdrive",
  101. *self._testdrive_args,
  102. # Use a lower timeout so we complain if the mzcloud_url was wrong or inaccessible.
  103. "--default-timeout=10s",
  104. stdin="",
  105. )
  106. print("reset done")
  107. def Td(self, input: str) -> Any:
  108. return self._composition.exec(
  109. "testdrive",
  110. "--no-reset",
  111. *self._testdrive_args,
  112. "--initial-backoff=10ms",
  113. "--backoff-factor=0",
  114. "--consistency-checks=disable",
  115. stdin=input,
  116. capture=True,
  117. ).stdout
  118. def Kgen(self, topic: str, args: list[str]) -> Any:
  119. # TODO: Implement
  120. raise NotImplementedError