123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import datetime
- import os
- import time
- import urllib.parse
- from textwrap import dedent
- import psycopg
- import requests
- from psycopg import Connection, Cursor
- from psycopg.errors import IdleInTransactionSessionTimeout, OperationalError
- from requests.exceptions import ConnectionError, ReadTimeout
- from materialize.cloudtest.util.jwt_key import fetch_jwt
- from materialize.mz_env_util import get_cloud_hostname
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.test_result import (
- FailedTestExecutionError,
- TestFailureDetails,
- )
- from materialize.ui import CommandFailureCausedUIError
- SERVICES = [
- Testdrive(), # Overridden below
- Mz(app_password=""), # Overridden below
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- REGION = "aws/us-east-1"
- ENVIRONMENT = os.getenv("ENVIRONMENT", "production")
- USERNAME = os.getenv(
- "CANARY_LOADTEST_USERNAME", "infra+qacanaryload@materialize.io"
- )
- PASSWORD = os.environ["CANARY_LOADTEST_PASSWORD"]
- APP_PASSWORD = os.environ["CANARY_LOADTEST_APP_PASSWORD"]
- parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
- args = parser.parse_args()
- start_time = time.time()
- host = get_cloud_hostname(
- c, region=REGION, environment=ENVIRONMENT, app_password=APP_PASSWORD
- )
- with c.override(
- Testdrive(
- no_reset=True,
- no_consistency_checks=True, # No access to HTTP for coordinator check
- materialize_url=f"postgres://{urllib.parse.quote(USERNAME)}:{urllib.parse.quote(APP_PASSWORD)}@{host}:6875/materialize",
- default_timeout="1200s",
- ),
- ):
- c.up({"name": "testdrive", "persistent": True})
- failures: list[TestFailureDetails] = []
- count_chunk = 0
- while time.time() - start_time < args.runtime:
- count_chunk = count_chunk + 1
- try:
- c.testdrive(
- dedent(
- """
- > DELETE FROM qa_canary_environment.public_table.table
- """
- )
- )
- conn1, cursor_on_table = create_connection_and_cursor(
- host,
- USERNAME,
- APP_PASSWORD,
- "DECLARE subscribe_table CURSOR FOR SUBSCRIBE (SELECT * FROM qa_canary_environment.public_table.table)",
- )
- conn2, cursor_on_mv = create_connection_and_cursor(
- host,
- USERNAME,
- APP_PASSWORD,
- "DECLARE subscribe_mv CURSOR FOR SUBSCRIBE (SELECT * FROM qa_canary_environment.public_table.table_mv)",
- )
- i = 0
- while time.time() - start_time < args.runtime:
- print(f"Running iteration {i} of chunk {count_chunk}")
- c.override_current_testcase_name(
- f"iteration {i} of chunk {count_chunk} in workflow_default"
- )
- perform_test(
- c,
- host,
- USERNAME,
- PASSWORD,
- cursor_on_table,
- cursor_on_mv,
- i,
- )
- i += 1
- close_connection_and_cursor(conn1, cursor_on_table, "subscribe_table")
- close_connection_and_cursor(conn2, cursor_on_mv, "subscribe_mv")
- except (
- OperationalError,
- ReadTimeout,
- ConnectionError,
- IdleInTransactionSessionTimeout,
- ) as e:
- error_msg_str = str(e)
- if (
- "Read timed out" in error_msg_str
- or "closed connection" in error_msg_str
- or "terminating connection due to idle-in-transaction timeout"
- in error_msg_str
- or "consuming input failed: SSL SYSCALL error: EOF detected"
- in error_msg_str
- or "consuming input failed: SSL connection has been closed unexpectedly"
- in error_msg_str
- or "terminating connection due to idle-in-transaction timeout"
- in error_msg_str
- ):
- print(f"Failed: {e}; retrying")
- else:
- raise
- except FailedTestExecutionError as e:
- assert len(e.errors) > 0, "Exception contains no errors"
- for error in e.errors:
- # TODO(def-): Remove when database-issues#6825 is fixed
- if "Non-positive multiplicity in DistinctBy" in error.message:
- continue
- print(
- f"Test failure occurred ({error.message}), collecting it, and continuing."
- )
- # collect, continue, and rethrow at the end
- failures.append(error)
- except CommandFailureCausedUIError as e:
- msg = (e.stdout or "") + (e.stderr or "")
- # TODO(def-): Remove when database-issues#6825 is fixed
- if "Non-positive multiplicity in DistinctBy" in msg:
- continue
- print(f"Test failure occurred ({msg}), collecting it, and continuing.")
- # collect, continue, and rethrow at the end
- failures.append(TestFailureDetails(message=msg, details=None))
- if len(failures) > 0:
- # reset test case name to remove current iteration and chunk, which does not apply to collected errors
- c.override_current_testcase_name("workflow_default")
- raise FailedTestExecutionError(
- error_summary="SQL failures occurred",
- errors=failures,
- )
- def fetch_token(user_name: str, password: str) -> str:
- return fetch_jwt(
- email=user_name,
- password=password,
- host="admin.cloud.materialize.com/frontegg",
- scheme="https",
- max_tries=10,
- )
- def http_sql_query(
- host: str, query: str, token: str, retries: int = 10
- ) -> list[list[str]]:
- try:
- r = requests.post(
- f'https://{host}/api/sql?options={{"application_name":"canary-load","cluster":"qa_canary_environment_compute"}}',
- headers={"authorization": f"Bearer {token}"},
- json={"queries": [{"params": [], "query": query}]},
- timeout=60,
- )
- except requests.exceptions.HTTPError as e:
- res = e.response
- print(f"{e}\n{res}\n{res.text}")
- raise
- except (requests.exceptions.Timeout, requests.exceptions.ReadTimeout):
- # TODO: This should be an error once database-issues#8737 is fixed
- if retries > 0:
- print("Timed out after 60s, retrying")
- return http_sql_query(host, query, token, retries - 1)
- raise
- assert r.status_code == 200, f"{r}\n{r.text}"
- results = r.json()["results"]
- assert len(results) == 1, results
- if "rows" not in results[0].keys():
- assert "error" in results[0].keys()
- error = results[0]["error"]
- details = f"Occurred at {datetime.datetime.now()}."
- if "notices" in results[0].keys():
- notices = results[0]["notices"]
- if not (type(notices) == list and len(notices) == 0):
- details = f"{details} Notices: {notices}"
- raise FailedTestExecutionError(
- error_summary="SQL query failed",
- errors=[TestFailureDetails(message=error, details=details)],
- )
- return results[0]["rows"]
- def create_connection_and_cursor(
- host: str, user_name: str, app_password: str, cursor_statement: str
- ) -> tuple[Connection, Cursor]:
- conn = psycopg.connect(
- host=host,
- user=user_name,
- password=app_password,
- port=6875,
- sslmode="require",
- )
- cursor = conn.cursor()
- cursor.execute("BEGIN")
- cursor.execute(cursor_statement.encode())
- return conn, cursor
- def close_connection_and_cursor(
- connection: Connection, cursor: Cursor, object_to_close: str
- ) -> None:
- cursor.execute(f"CLOSE {object_to_close}".encode())
- cursor.execute("ROLLBACK")
- cursor.close()
- connection.close()
- def perform_test(
- c: Composition,
- host: str,
- user_name: str,
- password: str,
- cursor_on_table: Cursor,
- cursor_on_mv: Cursor,
- i: int,
- ) -> None:
- current_time = time.time()
- update_data(c, i)
- validate_updated_data(c, i)
- validate_cursor_on_table(cursor_on_table, current_time, i)
- validate_cursor_on_mv(cursor_on_mv, current_time, i)
- # Token can run out, so refresh it occasionally
- token = fetch_token(user_name, password)
- validate_data_through_http_connection(
- host,
- token,
- i,
- )
- def update_data(c: Composition, i: int) -> None:
- c.testdrive(
- dedent(
- f"""
- > SELECT 1
- 1
- > INSERT INTO qa_canary_environment.public_table.table VALUES {", ".join(f"({i*100+j})" for j in range(100))}
- """
- )
- )
- def validate_updated_data(c: Composition, i: int) -> None:
- c.testdrive(
- dedent(
- f"""
- > SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0
- 0
- > SELECT COUNT(DISTINCT c_name) FROM qa_canary_environment.public_tpch.tpch_q18 WHERE o_orderdate >= '2023-01-01'
- 0
- > SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_pg_cdc.pg_wmr WHERE degree > 10
- 0
- > SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_mysql_cdc.mysql_wmr WHERE degree > 10
- 0
- > SELECT COUNT(DISTINCT count_star) FROM qa_canary_environment.public_loadgen.sales_product_product_category WHERE count_distinct_product_id < 0
- 0
- > SELECT * FROM qa_canary_environment.public_table.table_mv
- {i * 100 + 99}
- > SELECT min(c), max(c), count(*) FROM qa_canary_environment.public_table.table
- 0 {i * 100 + 99} {(i + 1) * 100}
- """
- )
- )
- def validate_cursor_on_table(
- cursor_on_table: Cursor,
- current_time: float,
- i: int,
- ) -> None:
- cursor_on_table.execute("FETCH ALL subscribe_table WITH (timeout='5s')")
- results = cursor_on_table.fetchall()
- assert len(results) == 100, f"Unexpected results: {results}"
- for result in results:
- assert int(result[0]) >= current_time, f"Unexpected results: {results}"
- assert int(result[1]) == 1, f"Unexpected results: {results}"
- assert (
- i * 100 <= int(result[2]) < (i + 1) * 100
- ), f"Unexpected results: {results}"
- def validate_cursor_on_mv(
- cursor_on_mv: Cursor,
- current_time: float,
- i: int,
- ) -> None:
- cursor_on_mv.execute("FETCH ALL subscribe_mv WITH (timeout='5s')")
- results = cursor_on_mv.fetchall()
- # First the removal, then the addition if it happens at the same timestamp
- r = list(sorted(list(results)))
- if i == 0:
- assert len(r) == 1, f"Unexpected results: {r}"
- else:
- assert len(r) % 2 == 0, f"Unexpected results: {r}"
- assert len(r) >= 2
- assert int(r[-2][0]) >= current_time, f"Unexpected results: {r}" # type: ignore
- assert int(r[-2][1]) == -1, f"Unexpected results: {r}" # type: ignore
- assert int(r[-2][2]) == i * 100 - 1, f"Unexpected results: {r}" # type: ignore
- assert int(r[-1][0]) >= current_time, f"Unexpected results: {r}" # type: ignore
- assert int(r[-1][1]) == 1, f"Unexpected results: {r}" # type: ignore
- assert int(r[-1][2]) == (i + 1) * 100 - 1, f"Unexpected results: {r}" # type: ignore
- def validate_data_through_http_connection(
- host: str,
- token: str,
- i: int,
- ) -> None:
- result = http_sql_query(host, "SELECT 1", token)
- assert result == [["1"]]
- result = http_sql_query(
- host,
- "SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0",
- token,
- )
- assert result == [["0"]]
- result = http_sql_query(
- host,
- "SELECT COUNT(DISTINCT c_name) FROM qa_canary_environment.public_tpch.tpch_q18 WHERE o_orderdate >= '2023-01-01'",
- token,
- )
- assert result == [["0"]]
- result = http_sql_query(
- host,
- "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_pg_cdc.pg_wmr WHERE degree > 10",
- token,
- )
- assert result == [["0"]]
- result = http_sql_query(
- host,
- "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_mysql_cdc.mysql_wmr WHERE degree > 10",
- token,
- )
- assert result == [["0"]]
- result = http_sql_query(
- host,
- "SELECT COUNT(DISTINCT count_star) FROM qa_canary_environment.public_loadgen.sales_product_product_category WHERE count_distinct_product_id < 0",
- token,
- )
- assert result == [["0"]]
- result = http_sql_query(
- host,
- "SELECT * FROM qa_canary_environment.public_table.table_mv",
- token,
- )
- assert result == [[f"{i * 100 + 99}"]]
- result = http_sql_query(
- host,
- "SELECT min(c), max(c), count(*) FROM qa_canary_environment.public_table.table",
- token,
- )
- assert result == [["0", f"{i * 100 + 99}", f"{(i + 1) * 100}"]]
|