testdrive.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. import random
  11. from typing import Any
  12. from materialize import buildkite
  13. from materialize.mzcompose import DEFAULT_MZ_VOLUMES, cluster_replica_size_map
  14. from materialize.mzcompose.service import (
  15. Service,
  16. ServiceConfig,
  17. )
  18. from materialize.mzcompose.services.azurite import azure_blob_uri
  19. from materialize.mzcompose.services.minio import minio_blob_uri
  20. from materialize.mzcompose.services.postgres import METADATA_STORE
  21. class Testdrive(Service):
  22. def __init__(
  23. self,
  24. name: str = "testdrive",
  25. mzbuild: str = "testdrive",
  26. materialize_url: str = "postgres://materialize@materialized:6875",
  27. materialize_url_internal: str = "postgres://materialize@materialized:6877",
  28. materialize_use_https: bool = False,
  29. materialize_params: dict[str, str] = {},
  30. kafka_url: str = "kafka:9092",
  31. kafka_default_partitions: int | None = None,
  32. kafka_args: str | None = None,
  33. schema_registry_url: str = "http://schema-registry:8081",
  34. no_reset: bool = False,
  35. default_timeout: str | None = None,
  36. seed: int | None = None,
  37. consistent_seed: bool = False,
  38. validate_catalog_store: bool = False,
  39. entrypoint: list[str] | None = None,
  40. entrypoint_extra: list[str] = [],
  41. environment: list[str] | None = None,
  42. volumes_extra: list[str] = [],
  43. volume_workdir: str = ".:/workdir",
  44. propagate_uid_gid: bool = True,
  45. forward_buildkite_shard: bool = False,
  46. aws_region: str | None = None,
  47. aws_endpoint: str | None = "http://minio:9000",
  48. aws_access_key_id: str | None = "minioadmin",
  49. aws_secret_access_key: str | None = "minioadmin",
  50. no_consistency_checks: bool = False,
  51. external_metadata_store: bool = False,
  52. external_blob_store: bool = False,
  53. blob_store_is_azure: bool = False,
  54. fivetran_destination: bool = False,
  55. fivetran_destination_url: str = "http://fivetran-destination:6874",
  56. fivetran_destination_files_path: str = "/share/tmp",
  57. mz_service: str = "materialized",
  58. metadata_store: str = METADATA_STORE,
  59. stop_grace_period: str = "120s",
  60. cluster_replica_size: dict[str, dict[str, Any]] | None = None,
  61. network_mode: str | None = None,
  62. set_persist_urls: bool = True,
  63. backoff_factor: float = 1.0,
  64. ) -> None:
  65. if cluster_replica_size is None:
  66. cluster_replica_size = cluster_replica_size_map()
  67. if environment is None:
  68. environment = [
  69. "TMPDIR=/share/tmp",
  70. "MZ_SOFT_ASSERTIONS=1",
  71. # Please think twice before forwarding additional environment
  72. # variables from the host, as it's easy to write tests that are
  73. # then accidentally dependent on the state of the host machine.
  74. #
  75. # To pass arguments to a testdrive script, use the `--var` CLI
  76. # option rather than environment variables.
  77. "MZ_LOG_FILTER",
  78. "AWS_ACCESS_KEY_ID",
  79. "AWS_SECRET_ACCESS_KEY",
  80. "AWS_SESSION_TOKEN",
  81. ]
  82. environment += [
  83. f"CLUSTER_REPLICA_SIZES={json.dumps(cluster_replica_size)}",
  84. "MZ_CI_LICENSE_KEY",
  85. "LD_PRELOAD=libeatmydata.so",
  86. ]
  87. volumes = [
  88. volume_workdir,
  89. *(v for v in DEFAULT_MZ_VOLUMES if v.startswith("tmp:")),
  90. ]
  91. if volumes_extra:
  92. volumes.extend(volumes_extra)
  93. if entrypoint is None:
  94. entrypoint = [
  95. "testdrive",
  96. f"--kafka-addr={kafka_url}",
  97. f"--schema-registry-url={schema_registry_url}",
  98. f"--materialize-url={materialize_url}",
  99. f"--materialize-internal-url={materialize_url_internal}",
  100. *(["--materialize-use-https"] if materialize_use_https else []),
  101. # Faster retries
  102. ]
  103. entrypoint.append(f"--backoff-factor={backoff_factor}")
  104. if aws_region:
  105. entrypoint.append(f"--aws-region={aws_region}")
  106. if aws_endpoint and not aws_region:
  107. entrypoint.append(f"--aws-endpoint={aws_endpoint}")
  108. entrypoint.append(f"--var=aws-endpoint={aws_endpoint}")
  109. if aws_access_key_id:
  110. entrypoint.append(f"--aws-access-key-id={aws_access_key_id}")
  111. entrypoint.append(f"--var=aws-access-key-id={aws_access_key_id}")
  112. if aws_secret_access_key:
  113. entrypoint.append(f"--aws-secret-access-key={aws_secret_access_key}")
  114. entrypoint.append(f"--var=aws-secret-access-key={aws_secret_access_key}")
  115. if validate_catalog_store:
  116. entrypoint.append("--validate-catalog-store")
  117. if no_reset:
  118. entrypoint.append("--no-reset")
  119. for k, v in materialize_params.items():
  120. entrypoint.append(f"--materialize-param={k}={v}")
  121. if default_timeout is None:
  122. default_timeout = "20s"
  123. entrypoint.append(f"--default-timeout={default_timeout}")
  124. if kafka_default_partitions:
  125. entrypoint.append(f"--kafka-default-partitions={kafka_default_partitions}")
  126. if forward_buildkite_shard:
  127. shard = buildkite.get_parallelism_index()
  128. shard_count = buildkite.get_parallelism_count()
  129. entrypoint += [f"--shard={shard}", f"--shard-count={shard_count}"]
  130. if seed is not None and consistent_seed:
  131. raise RuntimeError("Can't pass `seed` and `consistent_seed` at same time")
  132. elif consistent_seed:
  133. entrypoint.append(f"--seed={random.getrandbits(32)}")
  134. elif seed is not None:
  135. entrypoint.append(f"--seed={seed}")
  136. if no_consistency_checks:
  137. entrypoint.append("--consistency-checks=disable")
  138. if fivetran_destination:
  139. entrypoint.append(f"--fivetran-destination-url={fivetran_destination_url}")
  140. entrypoint.append(
  141. f"--fivetran-destination-files-path={fivetran_destination_files_path}"
  142. )
  143. if set_persist_urls:
  144. if external_blob_store:
  145. blob_store = "azurite" if blob_store_is_azure else "minio"
  146. address = (
  147. blob_store if external_blob_store == True else external_blob_store
  148. )
  149. persist_blob_url = (
  150. azure_blob_uri(address)
  151. if blob_store_is_azure
  152. else minio_blob_uri(address)
  153. )
  154. entrypoint.append(f"--persist-blob-url={persist_blob_url}")
  155. else:
  156. entrypoint.append("--persist-blob-url=file:///mzdata/persist/blob")
  157. if external_metadata_store:
  158. entrypoint.append(
  159. "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus"
  160. )
  161. else:
  162. entrypoint.append(
  163. f"--persist-consensus-url=postgres://root@{mz_service}:26257?options=--search_path=consensus"
  164. )
  165. entrypoint.extend(entrypoint_extra)
  166. config: ServiceConfig = {
  167. "mzbuild": mzbuild,
  168. "entrypoint": entrypoint,
  169. "environment": environment,
  170. "volumes": volumes,
  171. "propagate_uid_gid": propagate_uid_gid,
  172. "init": True,
  173. "stop_grace_period": stop_grace_period,
  174. }
  175. if network_mode:
  176. config["network_mode"] = network_mode
  177. super().__init__(
  178. name=name,
  179. config=config,
  180. )