redpanda_cloud.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import os
  10. import time
  11. from typing import Any
  12. import requests
  13. def get_result(response: requests.Response) -> dict[str, Any]:
  14. if response.status_code not in (200, 201, 202):
  15. raise ValueError(
  16. f"Redpanda API call failed: {response.status_code} {response.text}"
  17. )
  18. result = response.json()
  19. # Don't print result since it can contain access tokens
  20. return result
  21. class RedpandaCluster:
  22. def __init__(self, token: str, dataplane_api_url: str) -> None:
  23. self.token = token
  24. self.dataplane_api_url = dataplane_api_url
  25. def headers(self) -> dict[str, str]:
  26. return {"Authorization": f"Bearer {self.token}"}
  27. def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]:
  28. return get_result(
  29. requests.post(
  30. f"{self.dataplane_api_url}/v1alpha1/{object}",
  31. json=content,
  32. headers=self.headers(),
  33. )
  34. )
  35. class RedpandaCloud:
  36. def __init__(self) -> None:
  37. client_id = os.environ["REDPANDA_CLOUD_CLIENT_ID"]
  38. client_secret = os.environ["REDPANDA_CLOUD_CLIENT_SECRET"]
  39. result = get_result(
  40. requests.post(
  41. "https://auth.prd.cloud.redpanda.com/oauth/token",
  42. json={
  43. "client_id": client_id,
  44. "client_secret": client_secret,
  45. "audience": "cloudv2-production.redpanda.cloud",
  46. "grant_type": "client_credentials",
  47. },
  48. )
  49. )
  50. # Can't finish our test otherwise
  51. assert result["expires_in"] >= 3600, result
  52. self.token = result["access_token"]
  53. self.controlplane_api_url = "https://api.redpanda.com"
  54. def headers(self) -> dict[str, str]:
  55. return {"Authorization": f"Bearer {self.token}"}
  56. def wait(self, result: dict[str, Any]) -> dict[str, Any]:
  57. operation_id = result["operation"]["id"]
  58. while True:
  59. time.sleep(10)
  60. result = get_result(
  61. requests.get(
  62. f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}",
  63. headers=self.headers(),
  64. )
  65. )
  66. if result["operation"]["state"] == "STATE_COMPLETED":
  67. return result["operation"]
  68. if result["operation"]["state"] == "STATE_FAILED":
  69. raise ValueError(result)
  70. if result["operation"]["state"] != "STATE_IN_PROGRESS":
  71. raise ValueError(result)
  72. def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
  73. return get_result(
  74. requests.post(
  75. f"{self.controlplane_api_url}/v1beta2/{object}",
  76. json=content,
  77. headers=self.headers(),
  78. )
  79. )
  80. def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
  81. return get_result(
  82. requests.patch(
  83. f"{self.controlplane_api_url}/v1beta2/{object}",
  84. json=content,
  85. headers=self.headers(),
  86. )
  87. )
  88. def get(self, object: str) -> dict[str, Any]:
  89. return get_result(
  90. requests.get(
  91. f"{self.controlplane_api_url}/v1beta2/{object}",
  92. headers=self.headers(),
  93. )
  94. )
  95. def delete(self, object: str, id: str) -> dict[str, Any]:
  96. return get_result(
  97. requests.delete(
  98. f"{self.controlplane_api_url}/v1beta2/{object}/{id}",
  99. headers=self.headers(),
  100. )
  101. )
  102. def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster:
  103. return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])