mzcompose.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Tests of AWS functionality that run against AWS.
  10. To run these tests locally:
  11. $ cd test/aws
  12. $ AWS_PROFILE=mz-scratch-admin ./mzcompose --dev run default
  13. """
  14. import codecs
  15. import json
  16. import random
  17. import boto3
  18. from psycopg.errors import SystemError
  19. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  20. from materialize.mzcompose.services.materialized import Materialized
  21. AWS_EXTERNAL_ID_PREFIX = "eb5cb59b-e2fe-41f3-87ca-d2176a495345"
  22. SERVICES = [
  23. Materialized(),
  24. ]
  25. class TestContext:
  26. def __init__(self, iam_propagation_seconds: int):
  27. self.iam_propagation_seconds = iam_propagation_seconds
  28. self.seed = random.getrandbits(32)
  29. self.sts = boto3.client("sts")
  30. self.iam = boto3.client("iam")
  31. # Get the IAM principal that we're running as.
  32. caller = self.sts.get_caller_identity()
  33. self.account_id = caller["Account"]
  34. self.materialized_principal = caller["Arn"]
  35. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  36. # Sleeping to wait for IAM to propagate is ugly and somewhat flaky, but
  37. # there isn't an obviously better solution. This only runs in the nightly
  38. # pipeline, so flakes are more tolerable than they would be if this ran in
  39. # the PR pipeline.
  40. parser.add_argument(
  41. "--iam-propagation-seconds",
  42. type=int,
  43. default=10,
  44. help="how long to wait for IAM policies to propagate",
  45. )
  46. args = parser.parse_args()
  47. # Set up.
  48. ctx = TestContext(iam_propagation_seconds=args.iam_propagation_seconds)
  49. # Create the "jump role" that Materialize will use to assume each
  50. # connection's role.
  51. connection_role = f"testdrive-{ctx.seed}-MaterializeConnection"
  52. connection_role_arn = f"arn:aws:iam::{ctx.account_id}:role/{connection_role}"
  53. _create_role(ctx, connection_role, ctx.materialized_principal)
  54. try:
  55. # Start Materialize.
  56. materialized = Materialized(
  57. environment_extra=[
  58. "AWS_DEFAULT_REGION=us-east-1",
  59. "AWS_ACCESS_KEY_ID",
  60. "AWS_PROFILE",
  61. "AWS_SECRET_ACCESS_KEY",
  62. "AWS_SESSION_TOKEN",
  63. ],
  64. volumes_extra=[
  65. # Mounting the .aws directory in the container allows Materialize to
  66. # use SSO credentials, which makes it easier to run this composition
  67. # locally. CI doesn't need this.
  68. "~/.aws:/home/materialize/.aws",
  69. ],
  70. options=[
  71. f"--aws-connection-role-arn={connection_role_arn}",
  72. f"--aws-external-id-prefix={AWS_EXTERNAL_ID_PREFIX}",
  73. ],
  74. )
  75. with c.override(materialized):
  76. # (Re)start Materialize and enable AWS connections.
  77. c.down()
  78. c.up("materialized")
  79. c.sql(
  80. port=6877,
  81. user="mz_system",
  82. sql="""
  83. ALTER SYSTEM SET enable_connection_validation_syntax = true;
  84. """,
  85. )
  86. for fn in [test_credentials, test_assume_role]:
  87. with c.test_case(fn.__name__):
  88. fn(c, ctx)
  89. finally:
  90. _delete_role(ctx, connection_role)
  91. def test_credentials(c: Composition, ctx: TestContext):
  92. # Create a user with an access key.
  93. customer_user = f"testdrive-{ctx.seed}-Customer"
  94. ctx.iam.create_user(UserName=customer_user)
  95. access_key = ctx.iam.create_access_key(UserName=customer_user)
  96. access_key_id = access_key["AccessKey"]["AccessKeyId"]
  97. secret_access_key = access_key["AccessKey"]["SecretAccessKey"]
  98. # Creating a connection with those credentials should work.
  99. c.sql(
  100. f"""
  101. CREATE SECRET aws_secret_access_key AS '{secret_access_key}';
  102. CREATE CONNECTION aws_credentials TO AWS (
  103. ACCESS KEY ID = '{access_key_id}',
  104. SECRET ACCESS KEY = SECRET aws_secret_access_key
  105. );
  106. """,
  107. print_statement=False,
  108. )
  109. # Wait for IAM to propagate.
  110. c.sleep(ctx.iam_propagation_seconds)
  111. c.sql("VALIDATE CONNECTION aws_credentials")
  112. # Corrupting the secret access key should cause authentication to fail with
  113. # an invalid signature error.
  114. bad_secret_access_key = codecs.encode(secret_access_key, "rot13")
  115. c.sql(
  116. f"ALTER SECRET aws_secret_access_key AS '{bad_secret_access_key}'",
  117. print_statement=False,
  118. )
  119. try:
  120. c.sql("VALIDATE CONNECTION aws_credentials")
  121. except SystemError as e:
  122. assert (
  123. e.diag.message_primary and "SignatureDoesNotMatch" in e.diag.message_primary
  124. ), e
  125. else:
  126. raise RuntimeError("connection validation unexpectedly succeeded")
  127. # Changing the access key to a nonexistent access key should fail with an
  128. # invalid client ID error.
  129. c.sql(
  130. "ALTER CONNECTION aws_credentials SET (ACCESS KEY ID = 'AKIAV2KIV5LP3RAKAZUY')",
  131. print_statement=False,
  132. )
  133. try:
  134. c.sql("VALIDATE CONNECTION aws_credentials")
  135. except SystemError as e:
  136. assert (
  137. e.diag.message_primary and "InvalidClientTokenId" in e.diag.message_primary
  138. ), e
  139. else:
  140. raise RuntimeError("connection validation unexpectedly succeeded")
  141. def test_assume_role(c: Composition, ctx: TestContext):
  142. # Create a connection to a not-yet-existing customer role.
  143. customer_role = f"testdrive-{ctx.seed}-Customer"
  144. customer_role_arn = f"arn:aws:iam::{ctx.account_id}:role/{customer_role}"
  145. c.sql(
  146. f"CREATE CONNECTION aws_assume_role TO AWS (ASSUME ROLE ARN '{customer_role_arn}')"
  147. )
  148. connection_id = c.sql_query(
  149. "SELECT id FROM mz_connections WHERE name = 'aws_assume_role'"
  150. )[0][0]
  151. # Ensure that validating the connection fails.
  152. try:
  153. c.sql("VALIDATE CONNECTION aws_assume_role")
  154. except SystemError as e:
  155. assert e.diag.message_primary and "AccessDenied" in e.diag.message_primary, e
  156. else:
  157. raise RuntimeError("connection validation unexpectedly succeeded")
  158. # Create the customer role, but incorrectly fail to constrain the
  159. # external ID.
  160. principal = c.sql_query(
  161. f"SELECT principal FROM mz_internal.mz_aws_connections WHERE id = '{connection_id}'"
  162. )[0][0]
  163. _create_role(ctx, customer_role, principal)
  164. # Wait for IAM to propagate.
  165. c.sleep(ctx.iam_propagation_seconds)
  166. try:
  167. try:
  168. c.sql("VALIDATE CONNECTION aws_assume_role")
  169. except SystemError as e:
  170. # Ensure the top line error message is exactly what we expect.
  171. assert (
  172. "role trust policy does not require an external ID"
  173. == e.diag.message_primary
  174. )
  175. # We're not as prescriptive about the detail/hint fields. Just ensure
  176. # that the details include the exact ARN of the connection's role and
  177. # that the hint includes a link to further documentation.
  178. assert (
  179. e.diag.message_detail and customer_role_arn in e.diag.message_detail
  180. ), e
  181. assert (
  182. e.diag.message_hint
  183. and "https://materialize.com/s/aws-connection-role-trust-policy"
  184. in e.diag.message_hint
  185. ), e
  186. else:
  187. raise RuntimeError("connection validation unexpectedly succeeded")
  188. # Update the customer role's trust policy to use Materialize's example.
  189. trust_policy = c.sql_query(
  190. f"SELECT example_trust_policy FROM mz_internal.mz_aws_connections WHERE id = '{connection_id}'"
  191. )[0][0]
  192. ctx.iam.update_assume_role_policy(
  193. RoleName=customer_role,
  194. PolicyDocument=json.dumps(trust_policy),
  195. )
  196. # Wait for IAM to propagate.
  197. c.sleep(ctx.iam_propagation_seconds)
  198. # Ensure that connection validation now succeeds.
  199. c.sql("VALIDATE CONNECTION aws_assume_role")
  200. finally:
  201. _delete_role(ctx, customer_role)
  202. def _create_role(ctx: TestContext, customer_role: str, principal: str) -> None:
  203. ctx.iam.create_role(
  204. RoleName=customer_role,
  205. AssumeRolePolicyDocument=json.dumps(
  206. {
  207. "Version": "2012-10-17",
  208. "Statement": [
  209. {
  210. "Effect": "Allow",
  211. "Principal": {
  212. "AWS": principal,
  213. },
  214. "Action": "sts:AssumeRole",
  215. }
  216. ],
  217. }
  218. ),
  219. )
  220. def _delete_role(ctx: TestContext, customer_role: str) -> None:
  221. ctx.iam.delete_role(
  222. RoleName=customer_role,
  223. )