testdrive.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. import os
  11. import subprocess
  12. import sys
  13. from inspect import Traceback
  14. from kubernetes.client import V1Container, V1EnvVar, V1ObjectMeta, V1Pod, V1PodSpec
  15. from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
  16. from materialize.cloudtest.k8s.api.k8s_pod import K8sPod
  17. from materialize.mzcompose import (
  18. cluster_replica_size_map,
  19. )
  20. from materialize.mzcompose.test_result import (
  21. extract_error_chunks_from_output,
  22. )
  23. from materialize.ui import CommandFailureCausedUIError
  24. class TestdriveBase:
  25. def __init__(
  26. self,
  27. aws_region: str | None = None,
  28. materialize_url: str | None = None,
  29. materialize_internal_url: str | None = None,
  30. kafka_addr: str | None = None,
  31. schema_registry_url: str | None = None,
  32. ) -> None:
  33. self.aws_region = aws_region
  34. self.materialize_url = (
  35. materialize_url
  36. or "postgres://materialize:materialize@environmentd:6875/materialize"
  37. )
  38. self.materialize_internal_url = (
  39. materialize_internal_url
  40. or "postgres://mz_system@environmentd:6877/materialize"
  41. )
  42. self.kafka_addr = kafka_addr or "redpanda:9092"
  43. self.schema_registry_url = schema_registry_url or "http://redpanda:8081"
  44. self.aws_endpoint = "http://minio-service.default:9000"
  45. def run(
  46. self,
  47. *args: str,
  48. input: str | None = None,
  49. no_reset: bool = False,
  50. seed: int | None = None,
  51. caller: Traceback | None = None,
  52. default_timeout: str = "300s",
  53. kafka_options: str | None = None,
  54. log_filter: str = "off",
  55. suppress_command_error_output: bool = False,
  56. ) -> None:
  57. command: list[str] = [
  58. "testdrive",
  59. f"--materialize-url={self.materialize_url}",
  60. f"--materialize-internal-url={self.materialize_internal_url}",
  61. f"--kafka-addr={self.kafka_addr}",
  62. f"--schema-registry-url={self.schema_registry_url}",
  63. f"--default-timeout={default_timeout}",
  64. f"--log-filter={log_filter}",
  65. "--var=replicas=1",
  66. "--var=single-replica-cluster=quickstart",
  67. "--var=default-storage-size=1",
  68. "--var=default-replica-size=1",
  69. f"--cluster-replica-sizes={json.dumps(cluster_replica_size_map())}",
  70. *([f"--aws-region={self.aws_region}"] if self.aws_region else []),
  71. *(
  72. [
  73. f"--aws-endpoint={self.aws_endpoint}",
  74. f"--var=aws-endpoint={self.aws_endpoint}",
  75. "--aws-access-key-id=minio",
  76. "--var=aws-access-key-id=minio",
  77. "--aws-secret-access-key=minio123",
  78. "--var=aws-secret-access-key=minio123",
  79. ]
  80. if not self.aws_region
  81. else []
  82. ),
  83. *(["--no-reset"] if no_reset else []),
  84. *([f"--seed={seed}"] if seed else []),
  85. *([f"--source={caller.filename}:{caller.lineno}"] if caller else []),
  86. *([f"--kafka-option={kafka_options}"] if kafka_options else []),
  87. *args,
  88. ]
  89. self._run_internal(
  90. command,
  91. input,
  92. suppress_command_error_output,
  93. )
  94. def _run_internal(
  95. self,
  96. command: list[str],
  97. input: str | None = None,
  98. suppress_command_error_output: bool = False,
  99. ) -> None:
  100. raise NotImplementedError
  101. class TestdrivePod(K8sPod, TestdriveBase):
  102. def __init__(
  103. self,
  104. release_mode: bool,
  105. aws_region: str | None = None,
  106. namespace: str = DEFAULT_K8S_NAMESPACE,
  107. materialize_url: str | None = None,
  108. materialize_internal_url: str | None = None,
  109. kafka_addr: str | None = None,
  110. schema_registry_url: str | None = None,
  111. apply_node_selectors: bool = False,
  112. ) -> None:
  113. K8sPod.__init__(self, namespace)
  114. TestdriveBase.__init__(
  115. self,
  116. aws_region=aws_region,
  117. materialize_url=materialize_url,
  118. materialize_internal_url=materialize_internal_url,
  119. kafka_addr=kafka_addr,
  120. schema_registry_url=schema_registry_url,
  121. )
  122. metadata = V1ObjectMeta(name="testdrive", namespace=namespace)
  123. # Pass through AWS credentials from the host
  124. env = [
  125. V1EnvVar(name=var, value=os.environ.get(var))
  126. for var in [
  127. "AWS_ACCESS_KEY_ID",
  128. "AWS_SECRET_ACCESS_KEY",
  129. "AWS_SESSION_TOKEN",
  130. ]
  131. ]
  132. container = V1Container(
  133. name="testdrive",
  134. image=self.image("testdrive", release_mode=release_mode),
  135. command=["sleep", "infinity"],
  136. env=env,
  137. )
  138. node_selector = None
  139. if apply_node_selectors:
  140. node_selector = {"supporting-services": "true"}
  141. pod_spec = V1PodSpec(containers=[container], node_selector=node_selector)
  142. self.pod = V1Pod(metadata=metadata, spec=pod_spec)
  143. def _run_internal(
  144. self,
  145. command: list[str],
  146. input: str | None = None,
  147. suppress_command_error_output: bool = False,
  148. ) -> None:
  149. self.wait(condition="condition=Ready", resource="pod/testdrive")
  150. try:
  151. self.kubectl(
  152. "exec",
  153. "-it",
  154. "testdrive",
  155. "--",
  156. *command,
  157. input=input,
  158. # needed to extract errors
  159. capture_output=True,
  160. suppress_command_error_output=suppress_command_error_output,
  161. )
  162. except subprocess.CalledProcessError as e:
  163. if e.stdout is not None:
  164. print(e.stdout, end="")
  165. if e.stderr is not None:
  166. print(e.stderr, file=sys.stderr, end="")
  167. output = e.stderr or e.stdout
  168. if output is None and suppress_command_error_output:
  169. output = "(not captured)"
  170. assert (
  171. output is not None
  172. ), f"Missing stdout and stderr when running '{e.cmd}' without success"
  173. error_chunks = extract_error_chunks_from_output(output)
  174. error_text = "\n".join(error_chunks)
  175. raise CommandFailureCausedUIError(
  176. f"Running {' '.join(command)} in testdrive failed with:\n{error_text}",
  177. e.cmd,
  178. e.stdout,
  179. e.stderr,
  180. )