base_data_storage.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import time
  10. from collections.abc import Sequence
  11. from typing import Any
  12. from materialize.test_analytics.connector.test_analytics_connector import (
  13. DatabaseConnector,
  14. )
  15. class BaseDataStorage:
  16. def __init__(self, database_connector: DatabaseConnector):
  17. self.database_connector = database_connector
  18. def query_data(
  19. self,
  20. query: str,
  21. verbose: bool = True,
  22. statement_timeout: str | None = None,
  23. ) -> Sequence[Sequence[Any]]:
  24. statement_timeout = (
  25. statement_timeout
  26. or self.database_connector.config.default_statement_timeout
  27. )
  28. cursor = self.database_connector.create_cursor(
  29. allow_reusing_connection=True, statement_timeout=statement_timeout
  30. )
  31. if verbose:
  32. print(
  33. f"Executing query: {self.database_connector.to_short_printable_sql(query)}"
  34. )
  35. start_time = time.time()
  36. cursor.execute(query.encode())
  37. result = cursor.fetchall()
  38. end_time = time.time()
  39. if verbose:
  40. duration_in_sec = round(end_time - start_time, 2)
  41. print(f"Query returned {len(result)} rows in {duration_in_sec}s")
  42. return result