aws.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from datetime import UTC, datetime, timedelta
  10. from pathlib import PurePosixPath
  11. from typing import Any
  12. from urllib.parse import unquote, urlparse
  13. import boto3
  14. from materialize import scratch
  15. MAX_AGE = timedelta(hours=1)
  16. def clean_up_kinesis() -> None:
  17. print(f"Deleting Kinesis streams whose age exceeds {MAX_AGE}")
  18. client = boto3.client("kinesis")
  19. streams = client.list_streams()["StreamNames"]
  20. for stream in streams:
  21. if not stream.startswith("testdrive"):
  22. print("Skipping non-testdrive stream {}", stream)
  23. continue
  24. desc = client.describe_stream(StreamName=stream)
  25. created_at = desc["StreamDescription"]["StreamCreationTimestamp"]
  26. age = datetime.now(UTC) - created_at
  27. if age <= MAX_AGE:
  28. print(f"Skipping stream {stream} whose age is beneath threshold")
  29. continue
  30. print(f"Deleting Kinesis stream {stream!r} (age={age})")
  31. client.delete_stream(StreamName=stream)
  32. def clean_up_s3() -> None:
  33. print(f"Deleting S3 buckets whose age exceeds {MAX_AGE}")
  34. client = boto3.client("s3")
  35. buckets = client.list_buckets()["Buckets"]
  36. for desc in buckets:
  37. if not desc["Name"].startswith("testdrive"):
  38. print("Skipping non-testdrive bucket {}".format(desc["Name"]))
  39. continue
  40. age = datetime.now(UTC) - desc["CreationDate"]
  41. if age <= MAX_AGE:
  42. print(
  43. "Skipping bucket {} whose age is beneath threshold".format(desc["Name"])
  44. )
  45. continue
  46. print("Deleting bucket {} (age={})".format(desc["Name"], age))
  47. try:
  48. bucket = boto3.resource("s3").Bucket(desc["Name"])
  49. bucket.objects.all().delete()
  50. bucket.delete()
  51. except client.exceptions.NoSuchBucket:
  52. print(
  53. f"Couldn't delete {desc['Name']}: NoSuchBucket. This might be a transient issue."
  54. )
  55. def clean_up_sqs() -> None:
  56. print(f"Deleting SQS queues whose age exceeds {MAX_AGE}")
  57. client = boto3.client("sqs")
  58. queues = client.list_queues()
  59. if "QueueUrls" in queues:
  60. for queue in queues["QueueUrls"]:
  61. name = PurePosixPath(unquote(urlparse(queue).path)).parts[2]
  62. if not name.startswith("testdrive"):
  63. print(f"Skipping non-testdrive queue {name}")
  64. continue
  65. attributes = client.get_queue_attributes(
  66. QueueUrl=queue, AttributeNames=["All"]
  67. )
  68. created_at = int(attributes["Attributes"]["CreatedTimestamp"])
  69. age = datetime.now(UTC) - datetime.fromtimestamp(created_at, UTC)
  70. if age <= MAX_AGE:
  71. print(f"Skipping queue {name} whose age is beneath threshold")
  72. continue
  73. print(f"Deleting SQS queue {name} (age={age})")
  74. client.delete_queue(QueueUrl=queue)
  75. def clean_up_ec2() -> None:
  76. print("Terminating scratch ec2 instances whose age exceeds the deletion time")
  77. olds = [i["InstanceId"] for i in scratch.get_old_instances()]
  78. if olds:
  79. print(f"Instances to delete: {olds}")
  80. boto3.client("ec2").terminate_instances(InstanceIds=olds)
  81. else:
  82. print("No instances to delete")
  83. def clean_up_iam() -> None:
  84. client = boto3.client("iam")
  85. roles = get_testdrive_roles(client)
  86. if not roles:
  87. print("No testdrive IAM roles found")
  88. return
  89. now = datetime.utcnow().timestamp()
  90. print(f"Found {len(roles)} candidate IAM roles for deletion")
  91. for role in roles:
  92. used = role.get("RoleLastUsed", {}).get("LastUsedDate")
  93. if used is None:
  94. used = role["CreateDate"]
  95. role_name = role["RoleName"]
  96. expiration = (used + MAX_AGE).timestamp()
  97. if now > expiration:
  98. policy_response = client.list_role_policies(RoleName=role_name)
  99. for policy_name in policy_response.get("PolicyNames", []):
  100. client.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
  101. client.delete_role(RoleName=role_name)
  102. print(f"Deleted role {role_name}")
  103. else:
  104. print(f"Skipping role {role_name}")
  105. def get_testdrive_roles(client: Any) -> list[Any]:
  106. roles = []
  107. paginator = client.get_paginator("list_roles")
  108. page_iterator = paginator.paginate()
  109. for page in page_iterator:
  110. roles.extend(page.get("Roles", []))
  111. return [r for r in roles if r["RoleName"].startswith("testdrive")]
  112. def main() -> None:
  113. clean_up_kinesis()
  114. clean_up_s3()
  115. clean_up_sqs()
  116. clean_up_ec2()
  117. clean_up_iam()
  118. if __name__ == "__main__":
  119. main()