mzcompose.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Deploy the current version on a real Staging Cloud, and run some basic
  11. verifications, like ingesting data from Kafka and Redpanda Cloud using AWS
  12. Privatelink. Runs only on main and release branches.
  13. """
  14. import argparse
  15. import glob
  16. import itertools
  17. import os
  18. import secrets
  19. import string
  20. import time
  21. import urllib.parse
  22. import psycopg
  23. from materialize import MZ_ROOT
  24. from materialize.mz_version import MzVersion
  25. from materialize.mzcompose import (
  26. _wait_for_pg,
  27. )
  28. from materialize.mzcompose.composition import (
  29. Composition,
  30. WorkflowArgumentParser,
  31. )
  32. from materialize.mzcompose.services.materialized import Materialized
  33. from materialize.mzcompose.services.mz import Mz
  34. from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata
  35. from materialize.mzcompose.services.testdrive import Testdrive
  36. from materialize.redpanda_cloud import RedpandaCloud
  37. from materialize.ui import UIError
  38. REDPANDA_RESOURCE_GROUP = "ci-resource-group"
  39. REDPANDA_NETWORK = "ci-network"
  40. REDPANDA_CLUSTER = "ci-cluster"
  41. REDPANDA_USER = "ci-user"
  42. REGION = "aws/us-east-1"
  43. ENVIRONMENT = os.getenv("ENVIRONMENT", "staging")
  44. USERNAME = os.getenv("NIGHTLY_CANARY_USERNAME", "infra+nightly-canary@materialize.com")
  45. APP_PASSWORD = os.getenv("NIGHTLY_CANARY_APP_PASSWORD")
  46. VERSION = f"{MzVersion.parse_cargo()}--pr.g{os.getenv('BUILDKITE_COMMIT')}"
  47. # The DevEx account in the Confluent Cloud is used to provide Kafka services
  48. KAFKA_BOOTSTRAP_SERVER = "pkc-n00kk.us-east-1.aws.confluent.cloud:9092"
  49. SCHEMA_REGISTRY_ENDPOINT = "https://psrc-8kz20.us-east-2.aws.confluent.cloud"
  50. # The actual values are stored in the i2 repository
  51. CONFLUENT_API_KEY = os.getenv("CONFLUENT_CLOUD_DEVEX_KAFKA_USERNAME")
  52. CONFLUENT_API_SECRET = os.getenv("CONFLUENT_CLOUD_DEVEX_KAFKA_PASSWORD")
  53. class Redpanda:
  54. def __init__(self, c: Composition, cleanup: bool):
  55. self.cloud = RedpandaCloud()
  56. if cleanup:
  57. self.delete()
  58. self.resource_group_id = self.cloud.create(
  59. "resource-groups", {"name": REDPANDA_RESOURCE_GROUP}
  60. )["resource_group"]["id"]
  61. result = self.cloud.create(
  62. "networks",
  63. {
  64. "name": REDPANDA_NETWORK,
  65. "region": "us-east-1",
  66. "resource_group_id": self.resource_group_id,
  67. "cluster_type": "TYPE_DEDICATED",
  68. "cloud_provider": "CLOUD_PROVIDER_AWS",
  69. "cidr_block": "10.0.0.0/20",
  70. },
  71. )
  72. self.network_id = self.cloud.wait(result)["resource_id"]
  73. result = self.cloud.create(
  74. "clusters",
  75. {
  76. "name": REDPANDA_CLUSTER,
  77. "cloud_provider": "CLOUD_PROVIDER_AWS",
  78. "connection_type": "CONNECTION_TYPE_PUBLIC",
  79. "resource_group_id": self.resource_group_id,
  80. "network_id": self.network_id,
  81. "region": "us-east-1",
  82. "throughput_tier": "tier-1-aws-v3-arm",
  83. "type": "TYPE_DEDICATED",
  84. "zones": ["use1-az2"],
  85. "aws_private_link": {
  86. "enabled": True,
  87. "connect_console": True,
  88. "allowed_principals": [],
  89. },
  90. },
  91. )
  92. self.cluster_info = self.cloud.wait(result)["response"]["cluster"]
  93. self.aws_private_link = self.cloud.get(f"clusters/{self.cluster_info['id']}")[
  94. "cluster"
  95. ]["aws_private_link"]["status"]["service_name"]
  96. redpanda_cluster = self.cloud.get_cluster(self.cluster_info)
  97. self.password = "".join(
  98. secrets.choice(string.ascii_letters + string.digits) for i in range(32)
  99. )
  100. redpanda_cluster.create(
  101. "users",
  102. {
  103. "mechanism": "SASL_MECHANISM_SCRAM_SHA_512",
  104. "name": REDPANDA_USER,
  105. "password": self.password,
  106. },
  107. )
  108. redpanda_cluster.create(
  109. "acls",
  110. {
  111. "host": "*",
  112. "operation": "OPERATION_ALL",
  113. "permission_type": "PERMISSION_TYPE_ALLOW",
  114. "principal": f"User:{REDPANDA_USER}",
  115. "resource_name": "*",
  116. "resource_pattern_type": "RESOURCE_PATTERN_TYPE_LITERAL",
  117. "resource_type": "RESOURCE_TYPE_TOPIC",
  118. },
  119. )
  120. cloud_conn = psycopg.connect(
  121. host=c.cloud_hostname(),
  122. user=USERNAME,
  123. password=APP_PASSWORD,
  124. dbname="materialize",
  125. port=6875,
  126. sslmode="require",
  127. )
  128. cloud_conn.autocommit = True
  129. cloud_cursor = cloud_conn.cursor()
  130. cloud_cursor.execute(
  131. f"""CREATE CONNECTION privatelink_conn
  132. TO AWS PRIVATELINK (
  133. SERVICE NAME '{self.aws_private_link}',
  134. AVAILABILITY ZONES ('use1-az2')
  135. );""".encode()
  136. )
  137. cloud_cursor.execute(
  138. """SELECT principal
  139. FROM mz_aws_privatelink_connections plc
  140. JOIN mz_connections c on plc.id = c.id
  141. WHERE c.name = 'privatelink_conn';"""
  142. )
  143. results = cloud_cursor.fetchone()
  144. assert results
  145. privatelink_principal = results[0]
  146. cloud_cursor.close()
  147. cloud_conn.close()
  148. # Redpanda API sometimes returns a 404 for a while, ignore
  149. while True:
  150. try:
  151. result = self.cloud.patch(
  152. f"clusters/{self.cluster_info['id']}",
  153. {
  154. "aws_private_link": {
  155. "enabled": True,
  156. "allowed_principals": [privatelink_principal],
  157. }
  158. },
  159. )
  160. except ValueError as e:
  161. print(f"Failure, retrying in 10s: {e}")
  162. time.sleep(10)
  163. continue
  164. break
  165. self.cloud.wait(result)
  166. def delete(self):
  167. for cluster in self.cloud.get("clusters")["clusters"]:
  168. if cluster["name"] == REDPANDA_CLUSTER:
  169. result = self.cloud.delete("clusters", cluster["id"])
  170. self.cloud.wait(result)
  171. break
  172. for network in self.cloud.get("networks")["networks"]:
  173. if network["name"] == REDPANDA_NETWORK:
  174. result = self.cloud.delete("networks", network["id"])
  175. self.cloud.wait(result)
  176. for resource_group in self.cloud.get("resource-groups")["resource_groups"]:
  177. if resource_group["name"] == REDPANDA_RESOURCE_GROUP:
  178. self.cloud.delete("resource-groups", resource_group["id"])
  179. SERVICES = [
  180. CockroachOrPostgresMetadata(),
  181. Materialized(
  182. # We use materialize/environmentd and not materialize/materialized here
  183. # in order to ensure a perfect match to the container that should be
  184. # deployed to the cloud
  185. image=f"materialize/environmentd:{VERSION}",
  186. external_metadata_store=True,
  187. persist_blob_url="file:///mzdata/persist/blob",
  188. options=[
  189. "--orchestrator=process",
  190. "--orchestrator-process-secrets-directory=/mzdata/secrets",
  191. "--orchestrator-process-scratch-directory=/scratch",
  192. ],
  193. # We can not restart this container at will, as it does not have clusterd
  194. sanity_restart=False,
  195. ),
  196. Testdrive(), # Overriden below
  197. Mz(
  198. region=REGION,
  199. environment=ENVIRONMENT,
  200. app_password=APP_PASSWORD or "",
  201. ),
  202. ]
  203. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  204. """Deploy the current source to the cloud and run tests."""
  205. parser.add_argument(
  206. "--cleanup",
  207. default=True,
  208. action=argparse.BooleanOptionalAction,
  209. help="Destroy the region at the end of the workflow.",
  210. )
  211. parser.add_argument(
  212. "--version-check",
  213. default=True,
  214. action=argparse.BooleanOptionalAction,
  215. help="Perform a version check.",
  216. )
  217. parser.add_argument(
  218. "td_files",
  219. nargs="*",
  220. default=["*.td"],
  221. help="run against the specified files",
  222. )
  223. args = parser.parse_args()
  224. files = list(
  225. itertools.chain.from_iterable(
  226. [
  227. glob.glob(file_glob, root_dir=MZ_ROOT / "test" / "cloud-canary")
  228. for file_glob in args.td_files
  229. ]
  230. )
  231. )
  232. if args.cleanup:
  233. disable_region(c)
  234. try:
  235. test_failed = True
  236. print(f"Enabling region using Mz version {VERSION} ...")
  237. try:
  238. c.run("mz", "region", "enable", "--version", VERSION)
  239. except UIError:
  240. # Work around https://github.com/MaterializeInc/database-issues/issues/4989
  241. pass
  242. time.sleep(10)
  243. assert "materialize.cloud" in c.cloud_hostname()
  244. wait_for_cloud(c)
  245. if args.version_check:
  246. version_check(c)
  247. # Takes about 40 min to spin up
  248. redpanda = (
  249. Redpanda(c, cleanup=args.cleanup)
  250. if any(["redpanda" in filename for filename in files])
  251. else None
  252. )
  253. try:
  254. print("Running .td files ...")
  255. td(c, text="> CREATE CLUSTER canary_sources SIZE '25cc'")
  256. def process(filename: str) -> None:
  257. td(c, filename, redpanda=redpanda)
  258. c.test_parts(files, process)
  259. test_failed = False
  260. finally:
  261. if args.cleanup and redpanda is not None:
  262. redpanda.delete()
  263. finally:
  264. if args.cleanup:
  265. # Clean up
  266. disable_region(c)
  267. assert not test_failed
  268. def disable_region(c: Composition) -> None:
  269. print(f"Shutting down region {REGION} ...")
  270. try:
  271. c.run("mz", "region", "disable", "--hard")
  272. except UIError:
  273. # Can return: status 404 Not Found
  274. pass
  275. def wait_for_cloud(c: Composition) -> None:
  276. print(f"Waiting for cloud cluster to come up with username {USERNAME} ...")
  277. _wait_for_pg(
  278. host=c.cloud_hostname(),
  279. user=USERNAME,
  280. password=APP_PASSWORD,
  281. port=6875,
  282. query="SELECT 1",
  283. expected=[(1,)],
  284. timeout_secs=900,
  285. dbname="materialize",
  286. sslmode="require",
  287. # print_result=True
  288. )
  289. def version_check(c: Composition) -> None:
  290. print("Obtaining mz_version() string from local instance ...")
  291. c.up("materialized")
  292. # Remove the ($HASH) suffix because it can be different. The reason is that
  293. # the dockerhub devel-$HASH tag is just a link to the mzbuild-$CODEHASH. So
  294. # if the code has not changed, the environmentd image of the previous
  295. # version will be used.
  296. local_version = c.sql_query("SELECT mz_version();")[0][0].split(" ")[0]
  297. print("Obtaining mz_version() string from the cloud ...")
  298. cloud_cursor = psycopg.connect(
  299. host=c.cloud_hostname(),
  300. user=USERNAME,
  301. password=APP_PASSWORD,
  302. dbname="materialize",
  303. port=6875,
  304. sslmode="require",
  305. ).cursor()
  306. cloud_cursor.execute("SELECT mz_version()")
  307. result = cloud_cursor.fetchone()
  308. assert result is not None
  309. cloud_version = result[0].split(" ")[0]
  310. assert (
  311. local_version == cloud_version
  312. ), f"local version: {local_version} is not identical to cloud version: {cloud_version}"
  313. def td(
  314. c: Composition,
  315. filename: str | None = None,
  316. text: str | None = None,
  317. redpanda: Redpanda | None = None,
  318. ) -> None:
  319. assert APP_PASSWORD is not None
  320. materialize_url = f"postgres://{urllib.parse.quote(USERNAME)}:{urllib.parse.quote(APP_PASSWORD)}@{urllib.parse.quote(c.cloud_hostname())}:6875"
  321. assert bool(filename) != bool(text)
  322. testdrive = Testdrive(
  323. default_timeout="1200s",
  324. materialize_url=materialize_url,
  325. no_reset=True, # Required so that admin port 6877 is not used
  326. seed=1, # Required for predictable Kafka topic names
  327. kafka_url=KAFKA_BOOTSTRAP_SERVER,
  328. schema_registry_url=SCHEMA_REGISTRY_ENDPOINT,
  329. no_consistency_checks=True,
  330. environment=[
  331. "KAFKA_OPTION="
  332. + ",".join(
  333. [
  334. "security.protocol=SASL_SSL",
  335. "sasl.mechanisms=PLAIN",
  336. f"sasl.username={CONFLUENT_API_KEY}",
  337. f"sasl.password={CONFLUENT_API_SECRET}",
  338. ]
  339. ),
  340. ],
  341. entrypoint_extra=[
  342. f"--var=confluent-api-key={CONFLUENT_API_KEY}",
  343. f"--var=confluent-api-secret={CONFLUENT_API_SECRET}",
  344. ],
  345. )
  346. if redpanda and "redpanda" in str(filename):
  347. testdrive = Testdrive(
  348. default_timeout="1200s",
  349. materialize_url=materialize_url,
  350. no_reset=True, # Required so that admin port 6877 is not used
  351. seed=1, # Required for predictable Kafka topic names
  352. kafka_url=redpanda.cluster_info["kafka_api"]["seed_brokers"][0],
  353. schema_registry_url=redpanda.cluster_info["schema_registry"]["url"],
  354. no_consistency_checks=True,
  355. environment=[
  356. "KAFKA_OPTION="
  357. + ",".join(
  358. [
  359. "security.protocol=SASL_SSL",
  360. "sasl.mechanisms=SCRAM-SHA-512",
  361. f"sasl.username={REDPANDA_USER}",
  362. f"sasl.password={redpanda.password}",
  363. ]
  364. ),
  365. ],
  366. entrypoint_extra=[
  367. f"--var=redpanda-username={REDPANDA_USER}",
  368. f"--var=redpanda-password={redpanda.password}",
  369. ],
  370. )
  371. with c.override(testdrive):
  372. if text:
  373. c.testdrive(text)
  374. if filename:
  375. c.run_testdrive_files(
  376. filename,
  377. )