mzcompose.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Tests the mz command line tool against a real Cloud instance
  11. """
  12. import argparse
  13. import csv
  14. import json
  15. import os
  16. import time
  17. import psycopg
  18. from materialize.mzcompose import (
  19. _wait_for_pg,
  20. )
  21. from materialize.mzcompose.composition import (
  22. Composition,
  23. WorkflowArgumentParser,
  24. )
  25. from materialize.mzcompose.services.mz import Mz
  26. from materialize.ui import UIError
  27. REGION = "aws/us-west-2"
  28. ENVIRONMENT = os.getenv("ENVIRONMENT", "production")
  29. USERNAME = os.getenv("NIGHTLY_MZ_USERNAME", "infra+bot@materialize.com")
  30. APP_PASSWORD = os.getenv("MZ_CLI_APP_PASSWORD")
  31. # The DevEx account in the Confluent Cloud is used to provide Kafka services
  32. KAFKA_BOOTSTRAP_SERVER = "pkc-n00kk.us-east-1.aws.confluent.cloud:9092"
  33. # The actual values are stored in the i2 repository
  34. CONFLUENT_API_KEY = os.getenv("CONFLUENT_CLOUD_DEVEX_KAFKA_USERNAME")
  35. CONFLUENT_API_SECRET = os.getenv("CONFLUENT_CLOUD_DEVEX_KAFKA_PASSWORD")
  36. SERVICES = [
  37. Mz(
  38. region=REGION,
  39. environment=ENVIRONMENT,
  40. app_password=APP_PASSWORD or "",
  41. ),
  42. ]
  43. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  44. """Deploy the current source to the cloud and run tests."""
  45. parser.add_argument(
  46. "--cleanup",
  47. default=True,
  48. action=argparse.BooleanOptionalAction,
  49. help="Destroy the region at the end of the workflow.",
  50. )
  51. args = parser.parse_args()
  52. # Look for the '.psqlrc' file in the home dir.
  53. home_dir = os.path.expanduser("~")
  54. psql_config_path = os.path.join(home_dir, ".psqlrc-mz")
  55. if args.cleanup:
  56. disable_region(c)
  57. if os.path.exists(psql_config_path):
  58. os.remove(psql_config_path)
  59. test_failed = True
  60. try:
  61. print("Enabling region using Mz ...")
  62. c.run("mz", "region", "enable")
  63. time.sleep(10)
  64. assert "materialize.cloud" in c.cloud_hostname()
  65. wait_for_cloud(c)
  66. # Test - `mz app-password`
  67. # Assert `mz app-password create`
  68. new_app_password_name = "Materialize CLI (mz) - Nightlies"
  69. output = c.run(
  70. "mz", "app-password", "create", new_app_password_name, capture=True
  71. )
  72. new_app_password = output.stdout.strip()
  73. assert "mzp_" in new_app_password
  74. psycopg.connect(
  75. host=c.cloud_hostname(),
  76. user=USERNAME,
  77. password=new_app_password,
  78. port=6875,
  79. sslmode="require",
  80. )
  81. # Assert `mz app-password list`
  82. output = c.run("mz", "app-password", "list", capture=True)
  83. assert new_app_password_name in output.stdout
  84. # // Test - `mz secrets`
  85. # Drop secret if exists
  86. output = c.run(
  87. "mz",
  88. "sql",
  89. "--",
  90. "-q",
  91. "-c",
  92. "DROP SECRET IF EXISTS CI_SECRET;",
  93. capture=True,
  94. )
  95. secret = "decode('c2VjcmV0Cg==', 'base64')"
  96. output = c.run(
  97. "mz", "secret", "create", "CI_SECRET", stdin=secret, capture=True
  98. )
  99. assert output.returncode == 0
  100. output = c.run(
  101. "mz",
  102. "secret",
  103. "create",
  104. "confluent_username",
  105. stdin=CONFLUENT_API_KEY,
  106. capture=True,
  107. )
  108. assert output.returncode == 0
  109. output = c.run(
  110. "mz",
  111. "secret",
  112. "create",
  113. "confluent_password",
  114. stdin=CONFLUENT_API_SECRET,
  115. capture=True,
  116. )
  117. assert output.returncode == 0
  118. output = c.run(
  119. "mz",
  120. "sql",
  121. "--",
  122. "-q",
  123. "-c",
  124. f"""
  125. CREATE CONNECTION confluent_cloud TO KAFKA (
  126. BROKER '{KAFKA_BOOTSTRAP_SERVER}',
  127. SASL MECHANISMS = 'PLAIN',
  128. SASL USERNAME = SECRET confluent_username,
  129. SASL PASSWORD = SECRET confluent_password
  130. );""",
  131. capture=True,
  132. )
  133. output = c.run(
  134. "mz",
  135. "sql",
  136. "--",
  137. "-q",
  138. "-c",
  139. """VALIDATE CONNECTION confluent_cloud;""",
  140. capture=True,
  141. )
  142. assert output.returncode == 0
  143. output = c.run("mz", "sql", "--", "-q", "-c", "SHOW SECRETS;", capture=True)
  144. assert "ci_secret" in output.stdout
  145. # Assert using force
  146. output = c.run(
  147. "mz", "secret", "create", "CI_SECRET", "--force", stdin=secret, capture=True
  148. )
  149. assert output.returncode == 0
  150. # Test - `mz user`
  151. user_email = "mz_ci_e2e_test_nightlies@materialize.com"
  152. output = c.run("mz", "user", "list", capture=True)
  153. if user_email in output.stdout:
  154. # Try to remove the username if it exist before trying to create one.
  155. c.run("mz", "user", "remove", user_email, check=False)
  156. output = c.run("mz", "user", "create", user_email, "MZ_CI", capture=True)
  157. assert output.returncode == 0
  158. output = c.run("mz", "user", "list", capture=True)
  159. assert output.returncode == 0
  160. assert user_email in output.stdout
  161. output = c.run("mz", "user", "remove", user_email, capture=True)
  162. assert output.returncode == 0
  163. output = c.run(
  164. "mz",
  165. "user",
  166. "list",
  167. "--format",
  168. "csv",
  169. capture=True,
  170. )
  171. # assert username is not in output.stdout
  172. user_list = csv.DictReader(output.stdout.split("\n"))
  173. for user in user_list:
  174. assert user["email"] != user_email
  175. # Test - `mz region list`
  176. # Enable, disable and show are already tested.
  177. output = c.run("mz", "region", "list", "--format", "json", capture=True)
  178. regions = json.loads(output.stdout)
  179. us_region = None
  180. for region in regions:
  181. if region["region"] == REGION:
  182. us_region = region
  183. assert us_region is not None and "enabled" == us_region["status"]
  184. # Verify the content is ok
  185. print(f"Path: {psql_config_path}")
  186. if os.path.exists(psql_config_path):
  187. with open(psql_config_path):
  188. # content = file.read()
  189. output = c.run("cat {psql_config_path}", capture=True)
  190. if output.stdout != "\\timing\n\\include ~/.psqlrc":
  191. # TODO:
  192. print("Content is not equal. Fix this.")
  193. # raise ValueError("Incorrect content in the '.psqlrc-mz' file.")
  194. else:
  195. print("The configuration file '.psqlrc-mz' does not exist.")
  196. # TODO:
  197. # raise FileNotFoundError(
  198. # "The configuration file '.psqlrc-mz' does not exist."
  199. # )
  200. test_failed = False
  201. finally:
  202. # Clean up
  203. if args.cleanup:
  204. disable_region(c)
  205. assert not test_failed
  206. def disable_region(c: Composition) -> None:
  207. print(f"Shutting down region {REGION} ...")
  208. try:
  209. c.run("mz", "region", "disable", "--hard")
  210. except UIError:
  211. # Can return: status 404 Not Found
  212. pass
  213. def wait_for_cloud(c: Composition) -> None:
  214. print(f"Waiting for cloud cluster to come up with username {USERNAME} ...")
  215. _wait_for_pg(
  216. host=c.cloud_hostname(),
  217. user=USERNAME,
  218. password=APP_PASSWORD,
  219. port=6875,
  220. query="SELECT 1",
  221. expected=[(1,)],
  222. timeout_secs=900,
  223. dbname="materialize",
  224. sslmode="require",
  225. # print_result=True
  226. )