mzcompose.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Tests of AWS functionality that run against localstack."""
  10. import uuid
  11. from typing import Any, cast
  12. import boto3
  13. from materialize.mzcompose import (
  14. DEFAULT_CLOUD_REGION,
  15. DEFAULT_MZ_ENVIRONMENT_ID,
  16. DEFAULT_ORDINAL,
  17. DEFAULT_ORG_ID,
  18. )
  19. from materialize.mzcompose.composition import (
  20. Composition,
  21. )
  22. from materialize.mzcompose.services.localstack import Localstack
  23. from materialize.mzcompose.services.materialized import Materialized
  24. from materialize.mzcompose.services.mz import Mz
  25. from materialize.mzcompose.services.testdrive import Testdrive
  26. ENVIRONMENT_NAME = f"environment-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
  27. NAMESPACE = ENVIRONMENT_NAME
  28. SERVICE_ACCOUNT_NAME = ENVIRONMENT_NAME
  29. OIDC_SUB = f"system:serviceaccount:{NAMESPACE}:{SERVICE_ACCOUNT_NAME}"
  30. PURPOSE = "test-aws"
  31. STACK = "mzcompose"
  32. KMS_KEY_ALIAS_NAME = f"alias/customer_key_{DEFAULT_MZ_ENVIRONMENT_ID}"
  33. AWS_CONNECTION_ROLE_ARN = "arn:aws:iam::123456789000:role/MaterializeConnection"
  34. AWS_EXTERNAL_ID_PREFIX = "eb5cb59b-e2fe-41f3-87ca-d2176a495345"
  35. AWS_ACCESS_KEY_ID = "LSIAQAAAAAAVNCBMPNSG"
  36. AWS_SECRET_ACCESS_KEY = "secret"
  37. AWS_ENDPOINT_URL_MZ = "http://localstack:4566"
  38. SERVICES = [
  39. Localstack(),
  40. Mz(app_password=""),
  41. Materialized(
  42. depends_on=["localstack"],
  43. environment_extra=[
  44. f"AWS_REGION={DEFAULT_CLOUD_REGION}",
  45. f"AWS_ENDPOINT_URL={AWS_ENDPOINT_URL_MZ}",
  46. f"AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}",
  47. f"AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}",
  48. ],
  49. options=[
  50. "--secrets-controller=aws-secrets-manager",
  51. f"--aws-secrets-controller-tags=Owner={OIDC_SUB}",
  52. f"--aws-secrets-controller-tags=Environment={ENVIRONMENT_NAME}",
  53. f"--aws-secrets-controller-tags=Purpose={PURPOSE}",
  54. f"--aws-secrets-controller-tags=Stack={STACK}",
  55. f"--aws-connection-role-arn={AWS_CONNECTION_ROLE_ARN}",
  56. f"--aws-external-id-prefix={AWS_EXTERNAL_ID_PREFIX}",
  57. ],
  58. ),
  59. Testdrive(default_timeout="5s"),
  60. ]
  61. def workflow_default(c: Composition) -> None:
  62. def process(name: str) -> None:
  63. with c.test_case(name):
  64. c.workflow(name)
  65. workflows = ["secrets-manager", "aws-connection", "copy-to-s3"]
  66. c.test_parts(workflows, process)
  67. def workflow_secrets_manager(c: Composition) -> None:
  68. c.up("localstack")
  69. aws_endpoint_url = f"http://localhost:{c.port('localstack', 4566)}"
  70. kms_client = boto3.client(
  71. "kms",
  72. endpoint_url=aws_endpoint_url,
  73. region_name=DEFAULT_CLOUD_REGION,
  74. aws_access_key_id=AWS_ACCESS_KEY_ID,
  75. aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
  76. )
  77. key_id = kms_client.create_key()["KeyMetadata"]["KeyId"]
  78. kms_client.create_alias(
  79. AliasName=KMS_KEY_ALIAS_NAME,
  80. TargetKeyId=key_id,
  81. )
  82. sm_client = boto3.client(
  83. "secretsmanager",
  84. endpoint_url=aws_endpoint_url,
  85. region_name=DEFAULT_CLOUD_REGION,
  86. aws_access_key_id=AWS_ACCESS_KEY_ID,
  87. aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
  88. )
  89. expected_tags = [
  90. {
  91. "Key": "Owner",
  92. "Value": OIDC_SUB,
  93. },
  94. {
  95. "Key": "Environment",
  96. "Value": ENVIRONMENT_NAME,
  97. },
  98. {
  99. "Key": "Purpose",
  100. "Value": PURPOSE,
  101. },
  102. {
  103. "Key": "Stack",
  104. "Value": STACK,
  105. },
  106. ]
  107. # Use up IDs u1 and u2 so we can test behavior with orphaned secrets.
  108. c.up("materialized")
  109. c.sql("CREATE TABLE t ()")
  110. c.sql("DROP TABLE t")
  111. c.sql("CREATE TABLE t ()")
  112. c.sql("DROP TABLE t")
  113. c.stop("materialized")
  114. # Create an orphaned secret that should get deleted when starting environmentd.
  115. orphan_1_name = f"/user-managed/{DEFAULT_MZ_ENVIRONMENT_ID}/u1"
  116. sm_client.create_secret(
  117. Name=orphan_1_name,
  118. KmsKeyId=KMS_KEY_ALIAS_NAME,
  119. SecretString="I'm an orphan, delete me!",
  120. Tags=expected_tags,
  121. )
  122. # Create an orphaned secret without the correct tags, so it should be ignored.
  123. orphan_2_name = f"/user-managed/{DEFAULT_MZ_ENVIRONMENT_ID}/u2"
  124. sm_client.create_secret(
  125. Name=orphan_2_name,
  126. KmsKeyId=KMS_KEY_ALIAS_NAME,
  127. SecretString="I'm an orphan, but I shouldn't be deleted because of my missing tags!",
  128. )
  129. # Create a secret for a different environment, so it should be ignored.
  130. other_environment_name = "environment-11111111-2222-3333-4444-555555555555-0"
  131. other_oidc_sub = (
  132. f"system:serviceaccount:{other_environment_name}:{other_environment_name}"
  133. )
  134. other_name = f"/user-managed/{other_environment_name}/u9876"
  135. sm_client.create_secret(
  136. Name=other_name,
  137. KmsKeyId=KMS_KEY_ALIAS_NAME,
  138. SecretString="I belong to a different environment, so leave me alone!",
  139. Tags=[
  140. {
  141. "Key": "Owner",
  142. "Value": other_oidc_sub,
  143. },
  144. {
  145. "Key": "Environment",
  146. "Value": other_environment_name,
  147. },
  148. {
  149. "Key": "Purpose",
  150. "Value": PURPOSE,
  151. },
  152. {
  153. "Key": "Stack",
  154. "Value": STACK,
  155. },
  156. ],
  157. )
  158. def list_secrets() -> dict[str, dict[str, Any]]:
  159. return {
  160. secret["Name"]: secret for secret in sm_client.list_secrets()["SecretList"]
  161. }
  162. def secret_name(_id: str) -> str:
  163. return f"/user-managed/{DEFAULT_MZ_ENVIRONMENT_ID}/{_id}"
  164. def get_secret_value(_id: str) -> bytes:
  165. return cast(
  166. bytes,
  167. sm_client.get_secret_value(SecretId=secret_name(_id))["SecretBinary"],
  168. )
  169. c.up("materialized")
  170. secrets = list_secrets()
  171. assert orphan_1_name not in secrets
  172. assert orphan_2_name in secrets
  173. assert other_name in secrets
  174. # Should include migrated secrets and secrets for other environments
  175. assert len(secrets) == 2
  176. c.sql("CREATE SECRET secret AS 's3cret'")
  177. secrets = list_secrets()
  178. # New secret should exist with specified contents
  179. assert secret_name("u3") in secrets
  180. assert b"s3cret" == get_secret_value("u3")
  181. # Secrets should have expected tags
  182. secret_u3 = secrets[secret_name("u3")]
  183. for tag in expected_tags:
  184. assert tag in secret_u3["Tags"]
  185. # Check that alter secret gets reflected in Secrets Manager
  186. c.sql("ALTER SECRET secret AS 'tops3cret'")
  187. assert b"tops3cret" == get_secret_value("u3")
  188. # Rename should not change the contents in Secrets Manager
  189. c.sql("ALTER SECRET secret RENAME TO renamed_secret")
  190. assert b"tops3cret" == get_secret_value("u3")
  191. # Ensure secret still exists after a restart (i.e., test that orphaned
  192. # cleanup doesn't fire incorrectly).
  193. c.stop("materialized")
  194. c.up("materialized")
  195. secrets = list_secrets()
  196. assert secret_name("u3") in secrets
  197. c.sql("DROP SECRET renamed_secret")
  198. # Check that the file has been deleted from Secrets Manager
  199. secrets = list_secrets()
  200. assert secret_name("u3") not in secrets
  201. def workflow_aws_connection(c: Composition) -> None:
  202. c.up("localstack", "materialized")
  203. c.run_testdrive_files("aws-connection/aws-connection.td")
  204. def workflow_copy_to_s3(c: Composition) -> None:
  205. with c.override(
  206. Materialized(
  207. depends_on=["localstack"],
  208. environment_extra=[
  209. f"AWS_ENDPOINT_URL={AWS_ENDPOINT_URL_MZ}",
  210. f"AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}",
  211. f"AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}",
  212. ],
  213. )
  214. ):
  215. c.up("localstack", "materialized")
  216. localhost_aws_endpoint_url = f"http://localhost:{c.port('localstack', 4566)}"
  217. s3_client = boto3.client(
  218. "s3",
  219. endpoint_url=localhost_aws_endpoint_url,
  220. region_name=DEFAULT_CLOUD_REGION,
  221. aws_access_key_id=AWS_ACCESS_KEY_ID,
  222. aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
  223. )
  224. bucket_name = "copy-to-s3"
  225. s3_client.create_bucket(Bucket=bucket_name)
  226. path_prefix = str(uuid.uuid4())
  227. c.run_testdrive_files(
  228. f"--var=endpoint={AWS_ENDPOINT_URL_MZ}",
  229. f"--var=access-key={AWS_ACCESS_KEY_ID}",
  230. f"--var=secret-key={AWS_SECRET_ACCESS_KEY}",
  231. f"--var=s3-prefix={bucket_name}/{path_prefix}",
  232. f"--var=region={DEFAULT_CLOUD_REGION}",
  233. "--default-timeout=300s",
  234. "copy-to-s3/copy-to-s3.td",
  235. )
  236. def validate_upload(upload, expected_output_set):
  237. assert len(upload["Contents"]) > 0
  238. output_lines = set()
  239. for obj in upload["Contents"]:
  240. assert obj["Key"].endswith(".csv")
  241. key = obj["Key"]
  242. object_response = s3_client.get_object(Bucket=bucket_name, Key=key)
  243. body = object_response["Body"].read().decode("utf-8")
  244. output_lines.update(body.splitlines())
  245. assert output_lines == expected_output_set
  246. # asserting the uploaded files
  247. date = c.sql_query("SELECT TO_CHAR(now(), 'YYYY-MM-DD')")[0][0]
  248. expected_output = set(map(lambda x: str(x), range(10)))
  249. first_upload = s3_client.list_objects_v2(
  250. Bucket=bucket_name, Prefix=f"{path_prefix}/1/{date}/"
  251. )
  252. validate_upload(first_upload, expected_output)
  253. second_upload = s3_client.list_objects_v2(
  254. Bucket=bucket_name, Prefix=f"{path_prefix}/2/"
  255. )
  256. validate_upload(second_upload, expected_output)
  257. third_upload = s3_client.list_objects_v2(
  258. Bucket=bucket_name, Prefix=f"{path_prefix}/3/"
  259. )
  260. validate_upload(third_upload, set(["1000"]))