schema.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from enum import Enum, auto
  10. class Source(Enum):
  11. TABLE = auto()
  12. class TransactionIsolation(Enum):
  13. SERIALIZABLE = "serializable"
  14. STRICT_SERIALIZABLE = "strict serializable"
  15. def __str__(self) -> str:
  16. return self.value
  17. class Schema:
  18. def __init__(
  19. self,
  20. source: Source = Source.TABLE,
  21. schema: str = "scalability",
  22. create_index: bool = True,
  23. transaction_isolation: TransactionIsolation | None = None,
  24. cluster_name: str | None = None,
  25. object_count: int = 1,
  26. ) -> None:
  27. self.schema = schema
  28. self.source = source
  29. self.create_index = create_index
  30. self.transaction_isolation = transaction_isolation
  31. self.cluster_name = cluster_name
  32. self.object_count = object_count
  33. def init_sqls(self) -> list[str]:
  34. init_sqls = self.connect_sqls() + [
  35. f"DROP SCHEMA IF EXISTS {self.schema} CASCADE;",
  36. f"CREATE SCHEMA {self.schema};",
  37. "DROP TABLE IF EXISTS t1;",
  38. ]
  39. if self.source == Source.TABLE:
  40. for t in range(1, self.object_count + 1):
  41. init_sqls.extend(
  42. [
  43. f"CREATE TABLE t{t} (f1 INTEGER DEFAULT 1);",
  44. f"INSERT INTO t{t} DEFAULT VALUES;",
  45. f"CREATE OR REPLACE MATERIALIZED VIEW mv{t} AS SELECT count(*) AS count FROM t{t};",
  46. ]
  47. )
  48. # Create indexes and wait until they are queryable. Compute can
  49. # delay execution of dataflows when inputs are not yet
  50. # available. This would lead to a large outlier on the first
  51. # query of the actual workload, when it has to wait for the
  52. # index to be ready.
  53. if self.create_index:
  54. init_sqls.append(f"CREATE INDEX i{t} ON t{t} (f1);")
  55. init_sqls.append(f"CREATE INDEX mv_i{t} ON mv{t} (count);")
  56. init_sqls.append(f"SELECT f1 from t{t};")
  57. init_sqls.append(f"SELECT count from mv{t};")
  58. return init_sqls
  59. def connect_sqls(self) -> list[str]:
  60. init_sqls = [f"SET SCHEMA = {self.schema};"]
  61. if self.cluster_name is not None:
  62. init_sqls.append(f"SET CLUSTER = {self.cluster_name};")
  63. if self.transaction_isolation is not None:
  64. init_sqls.append(
  65. f"SET transaction_isolation = '{self.transaction_isolation}';"
  66. )
  67. return init_sqls