mzcompose.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Tests using the balancerd service instead of connecting to materialized directly.
  11. Uses the frontegg-mock instead of a real frontend backend.
  12. """
  13. import contextlib
  14. import json
  15. import socket
  16. import ssl
  17. import struct
  18. import uuid
  19. from collections.abc import Callable
  20. from textwrap import dedent
  21. from typing import Any
  22. from urllib.parse import quote
  23. import pg8000
  24. import requests
  25. from pg8000.exceptions import InterfaceError
  26. from psycopg import Cursor
  27. from psycopg.errors import OperationalError, ProgramLimitExceeded, ProgrammingError
  28. from materialize import MZ_ROOT
  29. from materialize.mzcompose.composition import Composition
  30. from materialize.mzcompose.services.balancerd import Balancerd
  31. from materialize.mzcompose.services.frontegg import FronteggMock
  32. from materialize.mzcompose.services.materialized import Materialized
  33. from materialize.mzcompose.services.mz import Mz
  34. from materialize.mzcompose.services.test_certs import TestCerts
  35. from materialize.mzcompose.services.testdrive import Testdrive
  36. TENANT_ID = str(uuid.uuid4())
  37. ADMIN_USER = "u1@example.com"
  38. OTHER_USER = "u2@example.com"
  39. ADMIN_ROLE = "MaterializePlatformAdmin"
  40. OTHER_ROLE = "MaterializePlatform"
  41. USERS = {
  42. ADMIN_USER: {
  43. "email": ADMIN_USER,
  44. "password": str(uuid.uuid4()),
  45. "id": str(uuid.uuid4()),
  46. "tenant_id": TENANT_ID,
  47. "initial_api_tokens": [
  48. {
  49. "client_id": str(uuid.uuid4()),
  50. "secret": str(uuid.uuid4()),
  51. }
  52. ],
  53. "roles": [OTHER_ROLE, ADMIN_ROLE],
  54. },
  55. OTHER_USER: {
  56. "email": OTHER_USER,
  57. "password": str(uuid.uuid4()),
  58. "id": str(uuid.uuid4()),
  59. "tenant_id": TENANT_ID,
  60. "initial_api_tokens": [
  61. {
  62. "client_id": str(uuid.uuid4()),
  63. "secret": str(uuid.uuid4()),
  64. }
  65. ],
  66. "roles": [OTHER_ROLE],
  67. },
  68. }
  69. FRONTEGG_URL = "http://frontegg-mock:6880"
  70. def app_password(email: str) -> str:
  71. api_token = USERS[email]["initial_api_tokens"][0]
  72. password = f"mzp_{api_token['client_id']}{api_token['secret']}".replace("-", "")
  73. return password
  74. SERVICES = [
  75. TestCerts(),
  76. Testdrive(
  77. materialize_url=f"postgres://{quote(ADMIN_USER)}:{app_password(ADMIN_USER)}@balancerd:6875?sslmode=require",
  78. materialize_use_https=True,
  79. no_reset=True,
  80. ),
  81. Balancerd(
  82. command=[
  83. "service",
  84. "--pgwire-listen-addr=0.0.0.0:6875",
  85. "--https-listen-addr=0.0.0.0:6876",
  86. "--internal-http-listen-addr=0.0.0.0:6878",
  87. "--frontegg-resolver-template=materialized:6875",
  88. "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
  89. f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
  90. f"--frontegg-admin-role={ADMIN_ROLE}",
  91. "--https-resolver-template=materialized:6876",
  92. "--tls-key=/secrets/balancerd.key",
  93. "--tls-cert=/secrets/balancerd.crt",
  94. "--default-config=balancerd_inject_proxy_protocol_header_http=true",
  95. "--internal-tls",
  96. # Nonsensical but we don't need cancellations here
  97. "--cancellation-resolver-dir=/secrets/",
  98. ],
  99. depends_on=["test-certs"],
  100. volumes=[
  101. "secrets:/secrets",
  102. ],
  103. ),
  104. FronteggMock(
  105. issuer=FRONTEGG_URL,
  106. encoding_key_file="/secrets/frontegg-mock.key",
  107. decoding_key_file="/secrets/frontegg-mock.crt",
  108. users=json.dumps(list(USERS.values())),
  109. depends_on=["test-certs"],
  110. volumes=[
  111. "secrets:/secrets",
  112. ],
  113. ),
  114. Mz(app_password=""),
  115. Materialized(
  116. options=[
  117. # Enable TLS on the public port to verify that balancerd is connecting to the balancerd
  118. # port.
  119. "--tls-mode=require",
  120. "--tls-key=/secrets/materialized.key",
  121. "--tls-cert=/secrets/materialized.crt",
  122. f"--frontegg-tenant={TENANT_ID}",
  123. "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
  124. f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
  125. f"--frontegg-admin-role={ADMIN_ROLE}",
  126. ],
  127. # We do not do anything interesting on the Mz side
  128. # to justify the extra restarts
  129. sanity_restart=False,
  130. depends_on=["test-certs"],
  131. volumes_extra=[
  132. "secrets:/secrets",
  133. ],
  134. listeners_config_path=f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth_https.json",
  135. ),
  136. ]
  137. def grant_all_admin_user(c: Composition):
  138. # Connect once just to force the user to exist
  139. sql_cursor(c)
  140. mz_system_cursor = c.sql_cursor(service="materialized", port=6877, user="mz_system")
  141. mz_system_cursor.execute(
  142. f'GRANT ALL PRIVILEGES ON SCHEMA public TO "{ADMIN_USER}";'
  143. )
  144. mz_system_cursor.execute(
  145. f'GRANT ALL PRIVILEGES ON CLUSTER quickstart TO "{ADMIN_USER}";'
  146. )
  147. # Assert that contains is present in balancer metrics.
  148. def assert_metrics(c: Composition, contains: str):
  149. result = c.exec(
  150. "materialized",
  151. "curl",
  152. "http://balancerd:6878/metrics",
  153. "-s",
  154. capture=True,
  155. )
  156. assert contains in result.stdout
  157. def sql_cursor(
  158. c: Composition, service="balancerd", email="u1@example.com", startup_params={}
  159. ) -> Cursor:
  160. return c.sql_cursor(
  161. service=service,
  162. user=email,
  163. password=app_password(email),
  164. sslmode="require",
  165. startup_params=startup_params,
  166. )
  167. def pg8000_sql_cursor(
  168. c: Composition, service="balancerd", email="u1@example.com", startup_params={}
  169. ) -> pg8000.Cursor:
  170. ssl_context = ssl.create_default_context()
  171. ssl_context.check_hostname = False
  172. ssl_context.verify_mode = ssl.CERT_NONE
  173. conn = pg8000.connect(
  174. host="127.0.0.1",
  175. port=c.default_port(service),
  176. user=email,
  177. password=app_password(email),
  178. ssl_context=ssl_context,
  179. startup_params=startup_params,
  180. )
  181. return conn.cursor()
  182. def workflow_default(c: Composition) -> None:
  183. c.down(destroy_volumes=True)
  184. def process(name: str) -> None:
  185. if name in ["default", "plaintext"]:
  186. return
  187. with c.test_case(name):
  188. c.workflow(name)
  189. c.test_parts(list(c.workflows.keys()), process)
  190. with c.test_case("plaintext"):
  191. c.workflow("plaintext")
  192. def workflow_plaintext(c: Composition) -> None:
  193. """Test plaintext internal connections"""
  194. c.down(destroy_volumes=True)
  195. with c.override(
  196. Materialized(
  197. options=[
  198. # Enable TLS on the public port to verify that balancerd is connecting to the balancerd
  199. # port.
  200. "--tls-mode=disable",
  201. f"--frontegg-tenant={TENANT_ID}",
  202. "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
  203. f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
  204. f"--frontegg-admin-role={ADMIN_ROLE}",
  205. ],
  206. # We do not do anything interesting on the Mz side
  207. # to justify the extra restarts
  208. sanity_restart=False,
  209. depends_on=["test-certs"],
  210. volumes_extra=[
  211. "secrets:/secrets",
  212. ],
  213. ),
  214. Balancerd(
  215. command=[
  216. "service",
  217. "--pgwire-listen-addr=0.0.0.0:6875",
  218. "--https-listen-addr=0.0.0.0:6876",
  219. "--internal-http-listen-addr=0.0.0.0:6878",
  220. "--frontegg-resolver-template=materialized:6875",
  221. "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
  222. f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
  223. f"--frontegg-admin-role={ADMIN_ROLE}",
  224. "--https-resolver-template=materialized:6876",
  225. "--tls-key=/secrets/balancerd.key",
  226. "--tls-cert=/secrets/balancerd.crt",
  227. "--default-config=balancerd_inject_proxy_protocol_header_http=true",
  228. # Nonsensical but we don't need cancellations here
  229. "--cancellation-resolver-dir=/secrets/",
  230. ],
  231. depends_on=["test-certs"],
  232. volumes=[
  233. "secrets:/secrets",
  234. ],
  235. ),
  236. ):
  237. with c.test_case("plaintext_http"):
  238. c.workflow("http")
  239. with c.test_case("plaintext_wide_result"):
  240. c.workflow("wide-result")
  241. def workflow_http(c: Composition) -> None:
  242. """Test http endpoint"""
  243. c.up("balancerd", "frontegg-mock", "materialized")
  244. result = c.exec(
  245. "materialized",
  246. "curl",
  247. "https://balancerd:6876/api/sql",
  248. "-k",
  249. "-s",
  250. "--header",
  251. "Content-Type: application/json",
  252. "--user",
  253. f"{OTHER_USER}:{app_password(OTHER_USER)}",
  254. "--data",
  255. '{"query": "SELECT 123"}',
  256. capture=True,
  257. )
  258. assert json.loads(result.stdout)["results"][0]["rows"][0][0] == "123"
  259. # TODO: We can't assert metrics for `mz_balancer_tenant_connection_active{source="https"` here
  260. # because there's no CNAME. Does docker-compose support this somehow?
  261. def workflow_ip_forwarding(c: Composition) -> None:
  262. """Test that forwarding the client IP through the balancer works over both HTTP and SQL."""
  263. c.up("balancerd", "frontegg-mock", "materialized")
  264. # balancer is going to be running with https
  265. # in this scenario we should validate that connections
  266. # via the balancer come from the current ip
  267. # and that we can use proxy_protocol when talking to
  268. # envd directly.
  269. balancer_port = c.port("balancerd", 6876)
  270. # mz internal (unencrypted port)
  271. materialize_port = c.port("materialized", 6878)
  272. # We want to make sure the request we're making through the balancer does not use the balancers
  273. # ip for the sessions.
  274. # https://stackoverflow.com/questions/5281341/get-local-network-interface-addresses-using-only-proc
  275. balancer_ip = [
  276. ip
  277. for ip in c.exec(
  278. "balancerd",
  279. "awk",
  280. r"/32 host/ { print i } {i=$2}",
  281. "/proc/net/fib_trie",
  282. capture=True,
  283. ).stdout.split("\n")
  284. if ip != "127.0.0.1"
  285. ][0]
  286. r = requests.post(
  287. f"https://localhost:{balancer_port}/api/sql",
  288. headers={},
  289. auth=(OTHER_USER, app_password(OTHER_USER)),
  290. json={
  291. "query": "select client_ip from mz_internal.mz_sessions where connection_id = pg_backend_pid();"
  292. },
  293. verify=False,
  294. )
  295. print(f"response {r.text}")
  296. session_ip = json.loads(r.text)["results"][0]["rows"][0][0]
  297. assert (
  298. session_ip != balancer_ip
  299. ), f"requests from ({session_ip}) proxied by balancer should not use balancer ip ({balancer_ip}) in session"
  300. # Also assert psql connections don't use the balancer ip
  301. cursor = sql_cursor(c)
  302. cursor.execute(
  303. "select client_ip from mz_internal.mz_sessions where connection_id = pg_backend_pid();"
  304. )
  305. rows = cursor.fetchall()
  306. session_ip = rows[0][0]
  307. assert (
  308. session_ip != balancer_ip
  309. ), f"requests from ({session_ip}) proxied by balancer should not use balancer ip ({balancer_ip}) in session"
  310. def create_proxy_protocol_v2_header(
  311. client_ip: str, client_port: int, server_ip: str, server_port: int
  312. ):
  313. # Signature for Proxy Protocol v2
  314. signature = b"\r\n\r\n\x00\r\nQUIT\n"
  315. # Version and command (0x21 means version 2, PROXY command)
  316. version_and_command = 0x21
  317. # Address family and protocol (0x11 means INET (IPv4) + STREAM (TCP))
  318. family_and_protocol = 0x11
  319. # Source and destination address are sent as bytes.
  320. src_addr = socket.inet_aton(client_ip)
  321. dst_addr = socket.inet_aton(server_ip)
  322. # Pack ports into 2-byte unsigned integers
  323. src_port = struct.pack("!H", client_port)
  324. dst_port = struct.pack("!H", server_port)
  325. # Length of the address information (IPv4(4*2) + ports(1*2) = 12 bytes)
  326. addr_len = struct.pack("!H", 12)
  327. # Construct the final Proxy Protocol v2 header
  328. header = (
  329. signature
  330. + struct.pack("!BB", version_and_command, family_and_protocol)
  331. + addr_len
  332. + src_addr
  333. + dst_addr
  334. + src_port
  335. + dst_port
  336. )
  337. return header
  338. with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
  339. sock.connect(("127.0.0.1", materialize_port))
  340. # Pick an ip we couldn't normal connect from and trick envd into
  341. # thinking we're connecting with
  342. proxy_header = create_proxy_protocol_v2_header(
  343. "1.1.1.1", 1111, "127.0.0.1", 1111
  344. )
  345. # Make an http request over the socket
  346. json_data = {
  347. "query": "select client_ip from mz_internal.mz_sessions where connection_id = pg_backend_pid();"
  348. }
  349. json_data = json.dumps(json_data)
  350. content_length = len(json_data.encode())
  351. http_sql_query_request = dedent(
  352. f"""\
  353. POST /api/sql HTTP/1.1\r
  354. Host: 127.0.0.1:{materialize_port}\r
  355. Authorization: Basic {OTHER_USER}:{app_password(OTHER_USER)}\r
  356. Content-Type: application/json\r
  357. Content-Length: {content_length}\r
  358. \r
  359. {json_data}"""
  360. )
  361. sock.sendall(proxy_header + http_sql_query_request.encode())
  362. # read and parse the response
  363. body_separator = "\r\n\r\n"
  364. tcp_resp = sock.recv(8192)
  365. resp_split = tcp_resp.split(body_separator.encode())
  366. assert (
  367. len(resp_split) > 1
  368. ), f"expected response with header and body, found: {resp_split}"
  369. body = resp_split[1]
  370. # assert that we tricked environmentd
  371. assert json.loads(body)["results"][0]["rows"][0][0] == "1.1.1.1"
  372. def workflow_wide_result(c: Composition) -> None:
  373. """Test passthrough of wide rows"""
  374. c.up("balancerd", "frontegg-mock", "materialized")
  375. cursor = sql_cursor(c)
  376. cursor.execute("SELECT 'ABC' || REPEAT('x', 1024 * 1024 * 96) || 'XYZ'")
  377. rows = cursor.fetchall()
  378. assert len(rows) == 1
  379. cols = rows[0]
  380. assert len(cols) == 1
  381. col = cols[0]
  382. assert len(col) == (1024 * 1024 * 96) + (2 * 3)
  383. assert col.startswith("ABCx")
  384. assert col.endswith("xXYZ")
  385. def workflow_long_result(c: Composition) -> None:
  386. """Test passthrough of long results"""
  387. c.up("balancerd", "frontegg-mock", "materialized")
  388. cursor = sql_cursor(c)
  389. cursor.execute(
  390. "SELECT 'ABC', generate_series, 'XYZ' FROM generate_series(1, 10 * 1024 * 1024)"
  391. )
  392. cnt = 0
  393. for row in cursor.fetchall():
  394. cnt = cnt + 1
  395. assert len(row) == 3
  396. assert row[0] == "ABC"
  397. assert row[2] == "XYZ"
  398. assert cnt == 10 * 1024 * 1024
  399. def workflow_long_query(c: Composition) -> None:
  400. """Test passthrough of a long SQL query."""
  401. c.up("balancerd", "frontegg-mock", "materialized")
  402. cursor = sql_cursor(c)
  403. small_pad_size = 512 * 1024
  404. small_pad = "x" * small_pad_size
  405. cursor.execute(f"SELECT 'ABC{small_pad}XYZ';")
  406. rows = cursor.fetchall()
  407. assert len(rows) == 1
  408. cols = rows[0]
  409. assert len(cols) == 1
  410. col = cols[0]
  411. assert len(col) == small_pad_size + (2 * 3)
  412. assert col.startswith("ABCx")
  413. assert col.endswith("xXYZ")
  414. medium_pad_size = 1 * 1024 * 1024
  415. medium_pad = "x" * medium_pad_size
  416. try:
  417. cursor.execute(f"SELECT 'ABC{medium_pad}XYZ';")
  418. raise RuntimeError("execute() expected to fail")
  419. except ProgramLimitExceeded as e:
  420. assert "statement batch size cannot exceed 1000.0 KB" in str(e)
  421. except:
  422. raise RuntimeError("execute() threw an unexpected exception")
  423. large_pad_size = 512 * 1024 * 1024
  424. large_pad = "x" * large_pad_size
  425. try:
  426. cursor.execute(f"SELECT 'ABC{large_pad}XYZ';")
  427. raise RuntimeError("execute() expected to fail")
  428. except OperationalError as e:
  429. msg = str(e)
  430. assert (
  431. "server closed the connection unexpectedly" in msg
  432. or "EOF detected" in msg
  433. or "frame size too big" in msg
  434. )
  435. except:
  436. raise RuntimeError("execute() threw an unexpected exception")
  437. # Confirm that balancerd remains up
  438. cursor = sql_cursor(c)
  439. cursor.execute("SELECT 1;")
  440. def workflow_mz_restarted(c: Composition) -> None:
  441. """Existing connections should fail if Mz is restarted.
  442. This protects against the client not being informed
  443. that their transaction has been aborted on the Mz side
  444. """
  445. c.up("balancerd", "frontegg-mock", "materialized")
  446. grant_all_admin_user(c)
  447. cursor = sql_cursor(c)
  448. cursor.execute("CREATE TABLE restart_mz (f1 INTEGER)")
  449. cursor.execute("START TRANSACTION")
  450. cursor.execute("INSERT INTO restart_mz VALUES (1)")
  451. c.kill("materialized")
  452. c.up("materialized")
  453. try:
  454. cursor.execute("INSERT INTO restart_mz VALUES (2)")
  455. raise RuntimeError("execute() expected to fail")
  456. except OperationalError as e:
  457. assert "SSL connection has been closed unexpectedly" in str(e)
  458. except:
  459. raise RuntimeError("execute() threw an unexpected exception")
  460. # Future connections work
  461. sql_cursor(c)
  462. def workflow_pgwire_param_rejection(c: Composition) -> None:
  463. """Parameters should be rejected"""
  464. c.up("balancerd", "frontegg-mock", "materialized")
  465. def check_error(
  466. message: str, f: Callable[..., Any], ExpectedError: type[Exception]
  467. ):
  468. try:
  469. f()
  470. except ExpectedError:
  471. return
  472. raise AssertionError(f"Expected {message} to raise {ExpectedError}")
  473. # Uses pg8000, because with psycopg/libpq only a notice is printed, and
  474. # catching it during the connection process is not easy:
  475. # NOTICE: startup setting mz_forwarded_for not set: unrecognized configuration parameter "mz_forwarded_for"
  476. check_error(
  477. "connect with mz_forwarded_for param",
  478. lambda: pg8000_sql_cursor(c, startup_params={"mz_forwarded_for": "1.1.1.1"}),
  479. InterfaceError,
  480. )
  481. check_error(
  482. "connect with mz_connection_uuid param",
  483. lambda: pg8000_sql_cursor(c, startup_params={"mz_connection_uuid": "123456"}),
  484. InterfaceError,
  485. )
  486. def workflow_balancerd_restarted(c: Composition) -> None:
  487. """Existing connections should fail if balancerd is restarted"""
  488. c.up("balancerd", "frontegg-mock", "materialized")
  489. grant_all_admin_user(c)
  490. cursor = sql_cursor(c)
  491. cursor.execute("CREATE TABLE restart_balancerd (f1 INTEGER)")
  492. cursor.execute("START TRANSACTION")
  493. cursor.execute("INSERT INTO restart_balancerd VALUES (1)")
  494. c.kill("balancerd")
  495. c.up("balancerd")
  496. try:
  497. cursor.execute("INSERT INTO restart_balancerd VALUES (2)")
  498. raise RuntimeError("execute() expected to fail")
  499. except OperationalError as e:
  500. msg = str(e)
  501. assert (
  502. "EOF detected" in msg
  503. or "failed to lookup address information: Name or service not known" in msg
  504. )
  505. except:
  506. raise RuntimeError("execute() threw an unexpected exception")
  507. # Future connections work
  508. sql_cursor(c)
  509. def workflow_mz_not_running(c: Composition) -> None:
  510. """New connections should fail if Mz is down"""
  511. c.up("balancerd", "frontegg-mock", "materialized")
  512. c.kill("materialized")
  513. try:
  514. sql_cursor(c)
  515. raise RuntimeError("connect() expected to fail")
  516. except OperationalError as e:
  517. assert any(
  518. expected in str(e)
  519. for expected in [
  520. "No route to host",
  521. "Connection timed out",
  522. "failure in name resolution",
  523. "failed to lookup address information",
  524. "Name or service not known",
  525. ]
  526. )
  527. except:
  528. raise RuntimeError("connect() threw an unexpected exception")
  529. # Things should work now
  530. c.up("materialized")
  531. sql_cursor(c)
  532. def workflow_user(c: Composition) -> None:
  533. """Test that the user is passed all the way to Mz itself."""
  534. c.up("balancerd", "frontegg-mock", "materialized")
  535. # Non-admin user.
  536. cursor = sql_cursor(c, email=OTHER_USER)
  537. try:
  538. cursor.execute("DROP DATABASE materialize CASCADE")
  539. raise RuntimeError("execute() expected to fail")
  540. except ProgrammingError as e:
  541. assert "must be owner of DATABASE materialize" in str(e)
  542. except:
  543. raise RuntimeError("execute() threw an unexpected exception")
  544. cursor.execute("SELECT current_user()")
  545. assert OTHER_USER in str(cursor.fetchall())
  546. assert_metrics(c, 'mz_balancer_tenant_connection_active{source="pgwire"')
  547. assert_metrics(c, 'mz_balancer_tenant_connection_rx{source="pgwire"')
  548. def workflow_many_connections(c: Composition) -> None:
  549. c.up("balancerd", "frontegg-mock", "materialized")
  550. cursors = []
  551. connections = 1000 - 10 # Go almost to the limit, but not above
  552. print(f"Opening {connections} connections.")
  553. for i in range(connections):
  554. cursor = sql_cursor(c)
  555. cursors.append(cursor)
  556. for cursor in cursors:
  557. cursor.execute("SELECT 'abc'")
  558. data = cursor.fetchall()
  559. assert len(data) == 1
  560. row = data[0]
  561. assert len(row) == 1
  562. col = row[0]
  563. assert col == "abc"
  564. def workflow_webhook(c: Composition) -> None:
  565. c.up(
  566. "balancerd",
  567. "frontegg-mock",
  568. "materialized",
  569. {"name": "testdrive", "persistent": True},
  570. )
  571. grant_all_admin_user(c)
  572. c.testdrive(
  573. dedent(
  574. """
  575. > CREATE SOURCE wh FROM WEBHOOK BODY FORMAT TEXT;
  576. $ webhook-append database=materialize schema=public name=wh
  577. a
  578. > SELECT * FROM wh
  579. a
  580. """
  581. )
  582. )