mzcompose.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import datetime
  10. import os
  11. import time
  12. import urllib.parse
  13. from textwrap import dedent
  14. import psycopg
  15. import requests
  16. from psycopg import Connection, Cursor
  17. from psycopg.errors import IdleInTransactionSessionTimeout, OperationalError
  18. from requests.exceptions import ConnectionError, ReadTimeout
  19. from materialize.cloudtest.util.jwt_key import fetch_jwt
  20. from materialize.mz_env_util import get_cloud_hostname
  21. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  22. from materialize.mzcompose.services.mz import Mz
  23. from materialize.mzcompose.services.testdrive import Testdrive
  24. from materialize.mzcompose.test_result import (
  25. FailedTestExecutionError,
  26. TestFailureDetails,
  27. )
  28. from materialize.ui import CommandFailureCausedUIError
  29. SERVICES = [
  30. Testdrive(), # Overridden below
  31. Mz(app_password=""), # Overridden below
  32. ]
  33. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  34. REGION = "aws/us-east-1"
  35. ENVIRONMENT = os.getenv("ENVIRONMENT", "production")
  36. USERNAME = os.getenv(
  37. "CANARY_LOADTEST_USERNAME", "infra+qacanaryload@materialize.io"
  38. )
  39. PASSWORD = os.environ["CANARY_LOADTEST_PASSWORD"]
  40. APP_PASSWORD = os.environ["CANARY_LOADTEST_APP_PASSWORD"]
  41. parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
  42. args = parser.parse_args()
  43. start_time = time.time()
  44. host = get_cloud_hostname(
  45. c, region=REGION, environment=ENVIRONMENT, app_password=APP_PASSWORD
  46. )
  47. with c.override(
  48. Testdrive(
  49. no_reset=True,
  50. no_consistency_checks=True, # No access to HTTP for coordinator check
  51. materialize_url=f"postgres://{urllib.parse.quote(USERNAME)}:{urllib.parse.quote(APP_PASSWORD)}@{host}:6875/materialize",
  52. default_timeout="1200s",
  53. ),
  54. ):
  55. c.up({"name": "testdrive", "persistent": True})
  56. failures: list[TestFailureDetails] = []
  57. count_chunk = 0
  58. while time.time() - start_time < args.runtime:
  59. count_chunk = count_chunk + 1
  60. try:
  61. c.testdrive(
  62. dedent(
  63. """
  64. > DELETE FROM qa_canary_environment.public_table.table
  65. """
  66. )
  67. )
  68. conn1, cursor_on_table = create_connection_and_cursor(
  69. host,
  70. USERNAME,
  71. APP_PASSWORD,
  72. "DECLARE subscribe_table CURSOR FOR SUBSCRIBE (SELECT * FROM qa_canary_environment.public_table.table)",
  73. )
  74. conn2, cursor_on_mv = create_connection_and_cursor(
  75. host,
  76. USERNAME,
  77. APP_PASSWORD,
  78. "DECLARE subscribe_mv CURSOR FOR SUBSCRIBE (SELECT * FROM qa_canary_environment.public_table.table_mv)",
  79. )
  80. i = 0
  81. while time.time() - start_time < args.runtime:
  82. print(f"Running iteration {i} of chunk {count_chunk}")
  83. c.override_current_testcase_name(
  84. f"iteration {i} of chunk {count_chunk} in workflow_default"
  85. )
  86. perform_test(
  87. c,
  88. host,
  89. USERNAME,
  90. PASSWORD,
  91. cursor_on_table,
  92. cursor_on_mv,
  93. i,
  94. )
  95. i += 1
  96. close_connection_and_cursor(conn1, cursor_on_table, "subscribe_table")
  97. close_connection_and_cursor(conn2, cursor_on_mv, "subscribe_mv")
  98. except (
  99. OperationalError,
  100. ReadTimeout,
  101. ConnectionError,
  102. IdleInTransactionSessionTimeout,
  103. ) as e:
  104. error_msg_str = str(e)
  105. if (
  106. "Read timed out" in error_msg_str
  107. or "closed connection" in error_msg_str
  108. or "terminating connection due to idle-in-transaction timeout"
  109. in error_msg_str
  110. or "consuming input failed: SSL SYSCALL error: EOF detected"
  111. in error_msg_str
  112. or "consuming input failed: SSL connection has been closed unexpectedly"
  113. in error_msg_str
  114. or "terminating connection due to idle-in-transaction timeout"
  115. in error_msg_str
  116. ):
  117. print(f"Failed: {e}; retrying")
  118. else:
  119. raise
  120. except FailedTestExecutionError as e:
  121. assert len(e.errors) > 0, "Exception contains no errors"
  122. for error in e.errors:
  123. # TODO(def-): Remove when database-issues#6825 is fixed
  124. if "Non-positive multiplicity in DistinctBy" in error.message:
  125. continue
  126. print(
  127. f"Test failure occurred ({error.message}), collecting it, and continuing."
  128. )
  129. # collect, continue, and rethrow at the end
  130. failures.append(error)
  131. except CommandFailureCausedUIError as e:
  132. msg = (e.stdout or "") + (e.stderr or "")
  133. # TODO(def-): Remove when database-issues#6825 is fixed
  134. if "Non-positive multiplicity in DistinctBy" in msg:
  135. continue
  136. print(f"Test failure occurred ({msg}), collecting it, and continuing.")
  137. # collect, continue, and rethrow at the end
  138. failures.append(TestFailureDetails(message=msg, details=None))
  139. if len(failures) > 0:
  140. # reset test case name to remove current iteration and chunk, which does not apply to collected errors
  141. c.override_current_testcase_name("workflow_default")
  142. raise FailedTestExecutionError(
  143. error_summary="SQL failures occurred",
  144. errors=failures,
  145. )
  146. def fetch_token(user_name: str, password: str) -> str:
  147. return fetch_jwt(
  148. email=user_name,
  149. password=password,
  150. host="admin.cloud.materialize.com/frontegg",
  151. scheme="https",
  152. max_tries=10,
  153. )
  154. def http_sql_query(
  155. host: str, query: str, token: str, retries: int = 10
  156. ) -> list[list[str]]:
  157. try:
  158. r = requests.post(
  159. f'https://{host}/api/sql?options={{"application_name":"canary-load","cluster":"qa_canary_environment_compute"}}',
  160. headers={"authorization": f"Bearer {token}"},
  161. json={"queries": [{"params": [], "query": query}]},
  162. timeout=60,
  163. )
  164. except requests.exceptions.HTTPError as e:
  165. res = e.response
  166. print(f"{e}\n{res}\n{res.text}")
  167. raise
  168. except (requests.exceptions.Timeout, requests.exceptions.ReadTimeout):
  169. # TODO: This should be an error once database-issues#8737 is fixed
  170. if retries > 0:
  171. print("Timed out after 60s, retrying")
  172. return http_sql_query(host, query, token, retries - 1)
  173. raise
  174. assert r.status_code == 200, f"{r}\n{r.text}"
  175. results = r.json()["results"]
  176. assert len(results) == 1, results
  177. if "rows" not in results[0].keys():
  178. assert "error" in results[0].keys()
  179. error = results[0]["error"]
  180. details = f"Occurred at {datetime.datetime.now()}."
  181. if "notices" in results[0].keys():
  182. notices = results[0]["notices"]
  183. if not (type(notices) == list and len(notices) == 0):
  184. details = f"{details} Notices: {notices}"
  185. raise FailedTestExecutionError(
  186. error_summary="SQL query failed",
  187. errors=[TestFailureDetails(message=error, details=details)],
  188. )
  189. return results[0]["rows"]
  190. def create_connection_and_cursor(
  191. host: str, user_name: str, app_password: str, cursor_statement: str
  192. ) -> tuple[Connection, Cursor]:
  193. conn = psycopg.connect(
  194. host=host,
  195. user=user_name,
  196. password=app_password,
  197. port=6875,
  198. sslmode="require",
  199. )
  200. cursor = conn.cursor()
  201. cursor.execute("BEGIN")
  202. cursor.execute(cursor_statement.encode())
  203. return conn, cursor
  204. def close_connection_and_cursor(
  205. connection: Connection, cursor: Cursor, object_to_close: str
  206. ) -> None:
  207. cursor.execute(f"CLOSE {object_to_close}".encode())
  208. cursor.execute("ROLLBACK")
  209. cursor.close()
  210. connection.close()
  211. def perform_test(
  212. c: Composition,
  213. host: str,
  214. user_name: str,
  215. password: str,
  216. cursor_on_table: Cursor,
  217. cursor_on_mv: Cursor,
  218. i: int,
  219. ) -> None:
  220. current_time = time.time()
  221. update_data(c, i)
  222. validate_updated_data(c, i)
  223. validate_cursor_on_table(cursor_on_table, current_time, i)
  224. validate_cursor_on_mv(cursor_on_mv, current_time, i)
  225. # Token can run out, so refresh it occasionally
  226. token = fetch_token(user_name, password)
  227. validate_data_through_http_connection(
  228. host,
  229. token,
  230. i,
  231. )
  232. def update_data(c: Composition, i: int) -> None:
  233. c.testdrive(
  234. dedent(
  235. f"""
  236. > SELECT 1
  237. 1
  238. > INSERT INTO qa_canary_environment.public_table.table VALUES {", ".join(f"({i*100+j})" for j in range(100))}
  239. """
  240. )
  241. )
  242. def validate_updated_data(c: Composition, i: int) -> None:
  243. c.testdrive(
  244. dedent(
  245. f"""
  246. > SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0
  247. 0
  248. > SELECT COUNT(DISTINCT c_name) FROM qa_canary_environment.public_tpch.tpch_q18 WHERE o_orderdate >= '2023-01-01'
  249. 0
  250. > SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_pg_cdc.pg_wmr WHERE degree > 10
  251. 0
  252. > SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_mysql_cdc.mysql_wmr WHERE degree > 10
  253. 0
  254. > SELECT COUNT(DISTINCT count_star) FROM qa_canary_environment.public_loadgen.sales_product_product_category WHERE count_distinct_product_id < 0
  255. 0
  256. > SELECT * FROM qa_canary_environment.public_table.table_mv
  257. {i * 100 + 99}
  258. > SELECT min(c), max(c), count(*) FROM qa_canary_environment.public_table.table
  259. 0 {i * 100 + 99} {(i + 1) * 100}
  260. """
  261. )
  262. )
  263. def validate_cursor_on_table(
  264. cursor_on_table: Cursor,
  265. current_time: float,
  266. i: int,
  267. ) -> None:
  268. cursor_on_table.execute("FETCH ALL subscribe_table WITH (timeout='5s')")
  269. results = cursor_on_table.fetchall()
  270. assert len(results) == 100, f"Unexpected results: {results}"
  271. for result in results:
  272. assert int(result[0]) >= current_time, f"Unexpected results: {results}"
  273. assert int(result[1]) == 1, f"Unexpected results: {results}"
  274. assert (
  275. i * 100 <= int(result[2]) < (i + 1) * 100
  276. ), f"Unexpected results: {results}"
  277. def validate_cursor_on_mv(
  278. cursor_on_mv: Cursor,
  279. current_time: float,
  280. i: int,
  281. ) -> None:
  282. cursor_on_mv.execute("FETCH ALL subscribe_mv WITH (timeout='5s')")
  283. results = cursor_on_mv.fetchall()
  284. # First the removal, then the addition if it happens at the same timestamp
  285. r = list(sorted(list(results)))
  286. if i == 0:
  287. assert len(r) == 1, f"Unexpected results: {r}"
  288. else:
  289. assert len(r) % 2 == 0, f"Unexpected results: {r}"
  290. assert len(r) >= 2
  291. assert int(r[-2][0]) >= current_time, f"Unexpected results: {r}" # type: ignore
  292. assert int(r[-2][1]) == -1, f"Unexpected results: {r}" # type: ignore
  293. assert int(r[-2][2]) == i * 100 - 1, f"Unexpected results: {r}" # type: ignore
  294. assert int(r[-1][0]) >= current_time, f"Unexpected results: {r}" # type: ignore
  295. assert int(r[-1][1]) == 1, f"Unexpected results: {r}" # type: ignore
  296. assert int(r[-1][2]) == (i + 1) * 100 - 1, f"Unexpected results: {r}" # type: ignore
  297. def validate_data_through_http_connection(
  298. host: str,
  299. token: str,
  300. i: int,
  301. ) -> None:
  302. result = http_sql_query(host, "SELECT 1", token)
  303. assert result == [["1"]]
  304. result = http_sql_query(
  305. host,
  306. "SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0",
  307. token,
  308. )
  309. assert result == [["0"]]
  310. result = http_sql_query(
  311. host,
  312. "SELECT COUNT(DISTINCT c_name) FROM qa_canary_environment.public_tpch.tpch_q18 WHERE o_orderdate >= '2023-01-01'",
  313. token,
  314. )
  315. assert result == [["0"]]
  316. result = http_sql_query(
  317. host,
  318. "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_pg_cdc.pg_wmr WHERE degree > 10",
  319. token,
  320. )
  321. assert result == [["0"]]
  322. result = http_sql_query(
  323. host,
  324. "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_mysql_cdc.mysql_wmr WHERE degree > 10",
  325. token,
  326. )
  327. assert result == [["0"]]
  328. result = http_sql_query(
  329. host,
  330. "SELECT COUNT(DISTINCT count_star) FROM qa_canary_environment.public_loadgen.sales_product_product_category WHERE count_distinct_product_id < 0",
  331. token,
  332. )
  333. assert result == [["0"]]
  334. result = http_sql_query(
  335. host,
  336. "SELECT * FROM qa_canary_environment.public_table.table_mv",
  337. token,
  338. )
  339. assert result == [[f"{i * 100 + 99}"]]
  340. result = http_sql_query(
  341. host,
  342. "SELECT min(c), max(c), count(*) FROM qa_canary_environment.public_table.table",
  343. token,
  344. )
  345. assert result == [["0", f"{i * 100 + 99}", f"{(i + 1) * 100}"]]