test_privatelink_connection.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import time
  10. from textwrap import dedent
  11. import pytest
  12. from pg8000.dbapi import DatabaseError, ProgrammingError
  13. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  14. from materialize.cloudtest.util.common import retry
  15. from materialize.cloudtest.util.exists import exists, not_exists
  16. from materialize.ui import UIError
  17. def test_create_privatelink_connection(mz: MaterializeApplication) -> None:
  18. # Create a PrivateLink SQL connection object,
  19. # which should create a K8S VpcEndpoint object.
  20. # We don't run the environment-controller,
  21. # so no AWS VPC Endpoint will be created.
  22. # so we don't need the named service to actually exist.
  23. create_connection_statement = dedent(
  24. """\
  25. CREATE CONNECTION privatelinkconn
  26. TO AWS PRIVATELINK (
  27. SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
  28. AVAILABILITY ZONES ('use1-az1', 'use1-az2')
  29. )
  30. """
  31. )
  32. # This should fail until max_aws_privatelink_connections is increased.
  33. with pytest.raises(
  34. ProgrammingError,
  35. match="creating AWS PrivateLink Connection would violate max_aws_privatelink_connections limit",
  36. ):
  37. mz.environmentd.sql(create_connection_statement)
  38. next_gid = mz.environmentd.sql_query(
  39. "SELECT MAX(SUBSTR(id, 2, LENGTH(id) - 1)::int) + 1 FROM mz_objects WHERE id LIKE 'u%'"
  40. )[0][0]
  41. not_exists(resource=f"vpcendpoint/connection-u{next_gid}")
  42. mz.environmentd.sql(
  43. "ALTER SYSTEM SET max_aws_privatelink_connections = 5",
  44. port="internal",
  45. user="mz_system",
  46. )
  47. mz.environmentd.sql(create_connection_statement)
  48. aws_connection_id = mz.environmentd.sql_query(
  49. "SELECT id FROM mz_connections WHERE name = 'privatelinkconn'"
  50. )[0][0]
  51. exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
  52. # Less flaky if we sleep before checking the status
  53. time.sleep(5)
  54. assert (
  55. "unknown"
  56. == mz.environmentd.sql_query(
  57. f"SELECT status FROM mz_internal.mz_aws_privatelink_connection_status_history WHERE connection_id = '{aws_connection_id}'"
  58. )[0][0]
  59. )
  60. # TODO: validate the contents of the VPC endpoint resource, rather than just
  61. # its existence.
  62. mz.environmentd.sql(
  63. "ALTER SYSTEM SET enable_connection_validation_syntax = true",
  64. port="internal",
  65. user="mz_system",
  66. )
  67. mz.environmentd.sql(
  68. dedent(
  69. """\
  70. CREATE CONNECTION kafkaconn TO KAFKA (
  71. BROKERS (
  72. 'customer-hostname-1:9092' USING AWS PRIVATELINK privatelinkconn,
  73. 'customer-hostname-2:9092' USING AWS PRIVATELINK privatelinkconn (PORT 9093),
  74. 'customer-hostname-3:9092' USING AWS PRIVATELINK privatelinkconn (AVAILABILITY ZONE 'use1-az1', PORT 9093),
  75. 'customer-hostname-4:9094'
  76. ),
  77. SECURITY PROTOCOL PLAINTEXT
  78. ) WITH (VALIDATE = false);
  79. """
  80. )
  81. )
  82. mz.environmentd.sql_query("SELECT id FROM mz_connections WHERE name = 'kafkaconn'")[
  83. 0
  84. ][0]
  85. principal = mz.environmentd.sql_query(
  86. "SELECT principal FROM mz_aws_privatelink_connections"
  87. )[0][0]
  88. assert principal == (
  89. f"arn:aws:iam::123456789000:role/mz_eb5cb59b-e2fe-41f3-87ca-d2176a495345_{aws_connection_id}"
  90. )
  91. # Validate default privatelink connections for kafka
  92. mz.environmentd.sql(
  93. dedent(
  94. """\
  95. CREATE CONNECTION kafkaconn_alt TO KAFKA (
  96. AWS PRIVATELINK privatelinkconn (PORT 9092),
  97. SECURITY PROTOCOL PLAINTEXT
  98. ) WITH (VALIDATE = false);
  99. """
  100. )
  101. )
  102. mz.environmentd.sql_query(
  103. "SELECT id FROM mz_connections WHERE name = 'kafkaconn_alt'"
  104. )[0][0]
  105. mz.environmentd.sql(
  106. dedent(
  107. """\
  108. CREATE CONNECTION sshconn TO SSH TUNNEL (
  109. HOST 'ssh-bastion-host',
  110. USER 'mz',
  111. PORT 22
  112. );
  113. """
  114. )
  115. )
  116. with pytest.raises(
  117. ProgrammingError, match="cannot specify both SSH TUNNEL and AWS PRIVATELINK"
  118. ):
  119. mz.environmentd.sql(
  120. dedent(
  121. """\
  122. CREATE CONNECTION pg TO POSTGRES (
  123. HOST 'postgres',
  124. DATABASE postgres,
  125. USER postgres,
  126. AWS PRIVATELINK privatelinkconn,
  127. SSH TUNNEL sshconn
  128. ) WITH (VALIDATE = false);
  129. """
  130. )
  131. )
  132. with pytest.raises(
  133. ProgrammingError, match='invalid AWS PrivateLink availability zone "us-east-1a"'
  134. ):
  135. mz.environmentd.sql(
  136. dedent(
  137. """\
  138. CREATE CONNECTION privatelinkconn2
  139. TO AWS PRIVATELINK (
  140. SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
  141. AVAILABILITY ZONES ('use1-az2', 'us-east-1a')
  142. );
  143. """
  144. )
  145. )
  146. with pytest.raises(
  147. ProgrammingError,
  148. match="connection cannot contain duplicate availability zones",
  149. ):
  150. mz.environmentd.sql(
  151. dedent(
  152. """\
  153. CREATE CONNECTION privatelinkconn2
  154. TO AWS PRIVATELINK (
  155. SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
  156. AVAILABILITY ZONES ('use1-az1', 'use1-az1', 'use1-az2')
  157. );
  158. """
  159. )
  160. )
  161. with pytest.raises(
  162. ProgrammingError,
  163. match='AWS PrivateLink availability zone "use1-az3" does not match any of the availability zones on the AWS PrivateLink connection',
  164. ):
  165. mz.environmentd.sql(
  166. dedent(
  167. """\
  168. CREATE CONNECTION kafkaconn2 TO KAFKA (
  169. BROKERS (
  170. 'customer-hostname-3:9092' USING AWS PRIVATELINK privatelinkconn (AVAILABILITY ZONE 'use1-az3', PORT 9093)
  171. ),
  172. SECURITY PROTOCOL PLAINTEXT
  173. ) WITH (VALIDATE = false);
  174. """
  175. )
  176. )
  177. with pytest.raises(
  178. DatabaseError,
  179. match="invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK",
  180. ):
  181. mz.environmentd.sql(
  182. dedent(
  183. """\
  184. CREATE CONNECTION kafkaconn2_alt TO KAFKA (
  185. AWS PRIVATELINK privatelinkconn (PORT 9092),
  186. BROKERS (
  187. 'customer-hostname-3:9092' USING AWS PRIVATELINK privatelinkconn (PORT 9093)
  188. ),
  189. SECURITY PROTOCOL PLAINTEXT
  190. ) WITH (VALIDATE = false);
  191. """
  192. )
  193. )
  194. with pytest.raises(
  195. ProgrammingError,
  196. match="invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka",
  197. ):
  198. mz.environmentd.sql(
  199. dedent(
  200. """\
  201. CREATE CONNECTION pg TO POSTGRES (
  202. HOST 'postgres',
  203. DATABASE postgres,
  204. USER postgres,
  205. AWS PRIVATELINK privatelinkconn ( PORT 1234 ),
  206. PORT 1234
  207. ) WITH (VALIDATE = false);
  208. """
  209. )
  210. )
  211. mz.environmentd.sql("DROP CONNECTION kafkaconn CASCADE")
  212. mz.environmentd.sql("DROP CONNECTION privatelinkconn CASCADE")
  213. not_exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
  214. def test_background_drop_privatelink_connection(mz: MaterializeApplication) -> None:
  215. # Ensure that privatelink connections are
  216. # deleted in a background task
  217. mz.environmentd.sql(
  218. "ALTER SYSTEM SET max_aws_privatelink_connections = 5",
  219. port="internal",
  220. user="mz_system",
  221. )
  222. create_connection_statement = dedent(
  223. """\
  224. CREATE CONNECTION privatelinkconn
  225. TO AWS PRIVATELINK (
  226. SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
  227. AVAILABILITY ZONES ('use1-az1', 'use1-az2')
  228. )
  229. """
  230. )
  231. mz.environmentd.sql(create_connection_statement)
  232. aws_connection_id = mz.environmentd.sql_query(
  233. "SELECT id FROM mz_connections WHERE name = 'privatelinkconn'"
  234. )[0][0]
  235. mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=pause'")
  236. mz.environmentd.sql("DROP CONNECTION privatelinkconn CASCADE")
  237. exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
  238. mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=off'")
  239. not_exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
  240. def test_retry_drop_privatelink_connection(mz: MaterializeApplication) -> None:
  241. # Ensure that privatelink connections are
  242. # deleted in a background task
  243. mz.environmentd.sql(
  244. "ALTER SYSTEM SET max_aws_privatelink_connections = 5",
  245. port="internal",
  246. user="mz_system",
  247. )
  248. create_connection_statement = dedent(
  249. """\
  250. CREATE CONNECTION privatelinkconn
  251. TO AWS PRIVATELINK (
  252. SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
  253. AVAILABILITY ZONES ('use1-az1', 'use1-az2')
  254. )
  255. """
  256. )
  257. mz.environmentd.sql(create_connection_statement)
  258. aws_connection_id = mz.environmentd.sql_query(
  259. "SELECT id FROM mz_connections WHERE name = 'privatelinkconn'"
  260. )[0][0]
  261. mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=return(failed)'")
  262. mz.environmentd.sql("DROP CONNECTION privatelinkconn CASCADE")
  263. exists(resource=f"vpcendpoint/connection-{aws_connection_id}")
  264. mz.environmentd.sql("SET FAILPOINTS = 'drop_vpc_endpoint=off'")
  265. retry(
  266. f=lambda: not_exists(resource=f"vpcendpoint/connection-{aws_connection_id}"),
  267. max_attempts=10,
  268. exception_types=[UIError],
  269. )