12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- import threading
- from collections.abc import Iterator
- from copy import copy
- from enum import Enum
- from pg8000.native import identifier, literal
- from materialize.data_ingest.data_type import (
- DATA_TYPES,
- DATA_TYPES_FOR_AVRO,
- DATA_TYPES_FOR_KEY,
- DATA_TYPES_FOR_MYSQL,
- NUMBER_TYPES,
- Bytea,
- DataType,
- Jsonb,
- Text,
- TextTextMap,
- )
- from materialize.data_ingest.definition import Insert
- from materialize.data_ingest.executor import KafkaExecutor, MySqlExecutor, PgExecutor
- from materialize.data_ingest.field import Field
- from materialize.data_ingest.transaction import Transaction
- from materialize.data_ingest.workload import WORKLOADS
- from materialize.mzcompose.composition import Composition
- from materialize.mzcompose.services.mysql import MySql
- from materialize.parallel_workload.executor import Executor
- from materialize.parallel_workload.settings import Complexity, Scenario
- from materialize.util import naughty_strings
- MAX_COLUMNS = 5
- MAX_INCLUDE_HEADERS = 5
- MAX_ROWS = 50
- MAX_CLUSTERS = 4
- MAX_CLUSTER_REPLICAS = 2
- MAX_DBS = 5
- MAX_SCHEMAS = 5
- MAX_TABLES = 5
- MAX_VIEWS = 15
- MAX_INDEXES = 15
- MAX_ROLES = 15
- MAX_WEBHOOK_SOURCES = 5
- MAX_KAFKA_SOURCES = 5
- MAX_MYSQL_SOURCES = 5
- MAX_POSTGRES_SOURCES = 5
- MAX_KAFKA_SINKS = 5
- MAX_INITIAL_DBS = 1
- MAX_INITIAL_SCHEMAS = 1
- MAX_INITIAL_CLUSTERS = 2
- MAX_INITIAL_TABLES = 2
- MAX_INITIAL_VIEWS = 2
- MAX_INITIAL_ROLES = 1
- MAX_INITIAL_WEBHOOK_SOURCES = 1
- MAX_INITIAL_KAFKA_SOURCES = 1
- MAX_INITIAL_MYSQL_SOURCES = 1
- MAX_INITIAL_POSTGRES_SOURCES = 1
- MAX_INITIAL_KAFKA_SINKS = 1
- NAUGHTY_IDENTIFIERS = False
- def naughtify(name: str) -> str:
- """Makes a string into a naughty identifier, always returns the same
- identifier when called with the same input."""
- global NAUGHTY_IDENTIFIERS
- if not NAUGHTY_IDENTIFIERS:
- return name
- strings = naughty_strings()
- # This rng is just to get a more interesting integer for the name
- index = sum([10**i * c for i, c in enumerate(name.encode())]) % len(strings)
- # Keep them short so we can combine later with other identifiers, 255 char limit
- return f"{name}_{strings[index].encode('utf-8')[:16].decode('utf-8', 'ignore')}"
- class BodyFormat(Enum):
- TEXT = 1
- JSON = 2
- BYTES = 3
- def to_data_type(self) -> type[DataType]:
- if self == BodyFormat.JSON:
- return Jsonb
- if self == BodyFormat.TEXT:
- return Text
- if self == BodyFormat.BYTES:
- return Bytea
- raise ValueError(f"Unknown body format {self.name}")
- class Column:
- column_id: int
- data_type: type[DataType]
- db_object: "DBObject"
- nullable: bool
- default: str | None
- raw_name: str
- def __init__(
- self,
- rng: random.Random,
- column_id: int,
- data_type: type[DataType],
- db_object: "DBObject",
- ):
- self.column_id = column_id
- self.data_type = data_type
- self.db_object = db_object
- self.nullable = rng.choice([True, False])
- self.default = rng.choice(
- [None, str(data_type.random_value(rng, in_query=True))]
- )
- self.raw_name = f"c-{self.column_id}-{self.data_type.name()}"
- def name(self, in_query: bool = False) -> str:
- return (
- identifier(naughtify(self.raw_name))
- if in_query
- else naughtify(self.raw_name)
- )
- def __str__(self) -> str:
- return f"{self.db_object}.{self.name(True)}"
- def value(self, rng: random.Random, in_query: bool = False) -> str:
- return str(self.data_type.random_value(rng, in_query=in_query))
- def create(self) -> str:
- result = f"{self.name(True)} {self.data_type.name()}"
- if self.default:
- result += f" DEFAULT {self.default}"
- if not self.nullable:
- result += " NOT NULL"
- return result
- class DB:
- seed: str
- db_id: int
- lock: threading.Lock
- def __init__(self, seed: str, db_id: int):
- self.seed = seed
- self.db_id = db_id
- self.lock = threading.Lock()
- def name(self) -> str:
- return naughtify(f"db-pw-{self.seed}-{self.db_id}")
- def __str__(self) -> str:
- return identifier(self.name())
- def create(self, exe: Executor) -> None:
- exe.execute(f"CREATE DATABASE {self}")
- def drop(self, exe: Executor) -> None:
- exe.execute(f"DROP DATABASE IF EXISTS {self}")
- class Schema:
- schema_id: int
- rename: int
- db: DB
- lock: threading.Lock
- def __init__(self, db: DB, schema_id: int):
- self.schema_id = schema_id
- self.db = db
- self.rename = 0
- self.lock = threading.Lock()
- def name(self) -> str:
- if self.rename:
- return naughtify(f"s-{self.schema_id}-{self.rename}")
- return naughtify(f"s-{self.schema_id}")
- def __str__(self) -> str:
- return f"{self.db}.{identifier(self.name())}"
- def create(self, exe: Executor) -> None:
- query = f"CREATE SCHEMA {self}"
- exe.execute(query)
- class DBObject:
- columns: list[Column]
- lock: threading.Lock
- def __init__(self):
- self.lock = threading.Lock()
- def name(self) -> str:
- raise NotImplementedError
- def create(self, exe: Executor) -> None:
- raise NotImplementedError
- class Table(DBObject):
- table_id: int
- rename: int
- num_rows: int
- schema: Schema
- def __init__(self, rng: random.Random, table_id: int, schema: Schema):
- super().__init__()
- self.table_id = table_id
- self.schema = schema
- self.columns = [
- Column(rng, i, rng.choice(DATA_TYPES), self)
- for i in range(rng.randint(2, MAX_COLUMNS))
- ]
- self.num_rows = 0
- self.rename = 0
- def name(self) -> str:
- if self.rename:
- return naughtify(f"t-{self.table_id}-{self.rename}")
- return naughtify(f"t-{self.table_id}")
- def __str__(self) -> str:
- return f"{self.schema}.{identifier(self.name())}"
- def create(self, exe: Executor) -> None:
- query = f"CREATE TABLE {self}("
- query += ",\n ".join(column.create() for column in self.columns)
- query += ")"
- exe.execute(query)
- class View(DBObject):
- view_id: int
- base_object: DBObject
- base_object2: DBObject | None
- source_columns: list[Column]
- materialized: bool
- join_column: Column | None
- join_column2: Column | None
- assert_not_null: list[Column]
- rename: int
- schema: Schema
- refresh: str | None
- def __init__(
- self,
- rng: random.Random,
- view_id: int,
- base_object: DBObject,
- base_object2: DBObject | None,
- schema: Schema,
- ):
- super().__init__()
- self.rename = 0
- self.view_id = view_id
- self.base_object = base_object
- self.base_object2 = base_object2
- self.schema = schema
- all_columns = list(base_object.columns) + (
- list(base_object2.columns) if base_object2 else []
- )
- self.source_columns = [
- column
- for column in rng.sample(all_columns, k=rng.randint(1, len(all_columns)))
- ]
- self.columns = [copy(column) for column in self.source_columns]
- for column in self.columns:
- column.raw_name = f"{column.raw_name}-{column.db_object.name()}"
- column.db_object = self
- self.materialized = rng.choice([True, False])
- self.assert_not_null = (
- [
- column
- for column in rng.sample(
- self.columns, k=rng.randint(1, len(self.columns))
- )
- if not column.nullable
- ]
- if self.materialized
- else []
- )
- self.refresh = (
- rng.choice(
- [
- "ON COMMIT",
- f"EVERY '{rng.randint(1, 60)} seconds {rng.randint(0, 60)} minutes'",
- f"EVERY '{rng.randint(1, 60)} seconds {rng.randint(0, 60)} minutes' ALIGNED TO (mz_now())",
- # Always in the future of all refreshes of previously generated MVs
- "AT mz_now()::string::int8 + 1000",
- ]
- )
- if self.materialized
- else None
- )
- if base_object2:
- self.join_column = rng.choice(base_object.columns)
- self.join_column2 = None
- columns = [
- c
- for c in base_object2.columns
- if c.data_type == self.join_column.data_type
- ]
- if columns:
- self.join_column2 = rng.choice(columns)
- def name(self) -> str:
- if self.rename:
- return naughtify(f"v-{self.view_id}-{self.rename}")
- return naughtify(f"v-{self.view_id}")
- def __str__(self) -> str:
- return f"{self.schema}.{identifier(self.name())}"
- def create(self, exe: Executor) -> None:
- if self.materialized:
- query = "CREATE MATERIALIZED VIEW"
- else:
- query = "CREATE VIEW"
- columns_str = ", ".join(
- f"{source_column} AS {column.name(True)}"
- for source_column, column in zip(self.source_columns, self.columns)
- )
- query += f" {self}"
- options = []
- if self.refresh:
- options.append(f"REFRESH {self.refresh}")
- if self.assert_not_null:
- options.extend(
- [f"ASSERT NOT NULL {c.name(True)}" for c in self.assert_not_null]
- )
- if options:
- query += f" WITH ({', '.join(options)})"
- query += f" AS SELECT {columns_str} FROM {self.base_object}"
- if self.base_object2:
- query += f" JOIN {self.base_object2}"
- if self.join_column2:
- query += " ON "
- # TODO: Generic expression generator
- if self.join_column2.data_type == TextTextMap:
- query += f"map_length({self.join_column}) = map_length({self.join_column2})"
- else:
- query += f"{self.join_column} = {self.join_column2}"
- else:
- query += " ON TRUE"
- exe.execute(query)
- class WebhookColumn(Column):
- def __init__(
- self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
- ):
- self.raw_name = name
- self.data_type = data_type
- self.nullable = nullable
- self.db_object = db_object
- def name(self, in_query: bool = False) -> str:
- return identifier(self.raw_name) if in_query else self.raw_name
- class WebhookSource(DBObject):
- source_id: int
- rename: int
- cluster: "Cluster"
- body_format: BodyFormat
- include_headers: bool
- explicit_include_headers: list[str]
- check: str | None
- schema: Schema
- num_rows: int
- def __init__(
- self, source_id: int, cluster: "Cluster", schema: Schema, rng: random.Random
- ):
- super().__init__()
- self.source_id = source_id
- self.cluster = cluster
- self.schema = schema
- self.rename = 0
- self.body_format = rng.choice([e for e in BodyFormat])
- self.include_headers = rng.choice([True, False])
- self.explicit_include_headers = []
- self.num_rows = 0
- self.columns = [
- WebhookColumn(
- "body",
- self.body_format.to_data_type(),
- False,
- self,
- )
- ]
- if self.include_headers:
- self.columns.append(WebhookColumn("headers", TextTextMap, False, self))
- for i in range(rng.randint(0, MAX_INCLUDE_HEADERS)):
- # naughtify: UnicodeEncodeError: 'ascii' codec can't encode characters
- self.explicit_include_headers.append(f"ih{i}")
- # for testing now() in check
- if rng.choice([True, False]):
- self.explicit_include_headers.append("timestamp")
- self.columns += [
- WebhookColumn(include_header, Text, True, self)
- for include_header in self.explicit_include_headers
- ]
- self.check_expr = None
- if rng.choice([True, False]):
- # TODO: More general expressions, failing expressions
- exprs = [
- "BODY = BODY",
- "map_length(HEADERS) = map_length(HEADERS)",
- ]
- if "timestamp" in self.explicit_include_headers:
- exprs.append(
- "(headers->'timestamp'::text)::timestamp + INTERVAL '10s' >= now()"
- )
- self.check_expr = " AND ".join(
- rng.sample(exprs, k=rng.randint(1, len(exprs)))
- )
- # TODO: CHECK WITH SECRET
- # TODO: NOT IN INCLUDE HEADERS
- def name(self) -> str:
- if self.rename:
- return naughtify(f"wh-{self.source_id}-{self.rename}")
- return naughtify(f"wh-{self.source_id}")
- def __str__(self) -> str:
- return f"{self.schema}.{identifier(self.name())}"
- def create(self, exe: Executor) -> None:
- query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM WEBHOOK BODY FORMAT {self.body_format.name}"
- if self.include_headers:
- query += " INCLUDE HEADERS"
- for include_header in self.explicit_include_headers:
- query += f" INCLUDE HEADER {literal(include_header)} as {identifier(include_header)}"
- if self.check_expr:
- query += f" CHECK (WITH (BODY, HEADERS) {self.check_expr})"
- exe.execute(query)
- class KafkaColumn(Column):
- def __init__(
- self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
- ):
- self.raw_name = name
- self.data_type = data_type
- self.nullable = nullable
- self.db_object = db_object
- def name(self, in_query: bool = False) -> str:
- return identifier(self.raw_name) if in_query else self.raw_name
- class KafkaSource(DBObject):
- source_id: int
- cluster: "Cluster"
- executor: KafkaExecutor
- generator: Iterator[Transaction]
- lock: threading.Lock
- columns: list[KafkaColumn]
- schema: Schema
- num_rows: int
- def __init__(
- self,
- source_id: int,
- cluster: "Cluster",
- schema: Schema,
- ports: dict[str, int],
- rng: random.Random,
- ):
- super().__init__()
- self.source_id = source_id
- self.cluster = cluster
- self.schema = schema
- self.num_rows = 0
- fields = []
- for i in range(rng.randint(1, 10)):
- fields.append(
- # naughtify: Invalid schema
- Field(f"key{i}", rng.choice(DATA_TYPES_FOR_AVRO), True)
- )
- for i in range(rng.randint(0, 20)):
- fields.append(Field(f"value{i}", rng.choice(DATA_TYPES_FOR_AVRO), False))
- self.columns = [
- KafkaColumn(field.name, field.data_type, False, self) for field in fields
- ]
- self.executor = KafkaExecutor(
- self.source_id,
- ports,
- fields,
- schema.db.name(),
- schema.name(),
- cluster.name(),
- )
- workload = rng.choice(list(WORKLOADS))(azurite=False)
- for transaction_def in workload.cycle:
- for definition in transaction_def.operations:
- if type(definition) == Insert and definition.count > MAX_ROWS:
- definition.count = 100
- self.generator = workload.generate(fields)
- self.lock = threading.Lock()
- def name(self) -> str:
- return self.executor.table
- def __str__(self) -> str:
- return f"{self.schema}.{self.name()}"
- def create(self, exe: Executor) -> None:
- self.executor.create(logging_exe=exe)
- class KafkaSink(DBObject):
- sink_id: int
- rename: int
- cluster: "Cluster"
- schema: Schema
- base_object: DBObject
- envelope: str
- key: str
- def __init__(
- self,
- sink_id: int,
- cluster: "Cluster",
- schema: Schema,
- base_object: DBObject,
- rng: random.Random,
- ):
- super().__init__()
- self.sink_id = sink_id
- self.cluster = cluster
- self.schema = schema
- self.base_object = base_object
- universal_formats = [
- "FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn",
- "FORMAT JSON",
- ]
- single_column_formats = ["FORMAT BYTES", "FORMAT TEXT"]
- formats = universal_formats.copy()
- if len(base_object.columns) == 1:
- formats.extend(single_column_formats)
- self.format = rng.choice(formats)
- self.envelope = (
- "UPSERT" if self.format == "JSON" else rng.choice(["DEBEZIUM", "UPSERT"])
- )
- if self.envelope == "UPSERT" or rng.choice([True, False]):
- key_cols = [
- column
- for column in rng.sample(
- base_object.columns, k=rng.randint(1, len(base_object.columns))
- )
- ]
- key_col_names = [column.name(True) for column in key_cols]
- self.key = f"KEY ({', '.join(key_col_names)}) NOT ENFORCED"
- potential_partition_keys = [
- key_col for key_col in key_cols if key_col.data_type in NUMBER_TYPES
- ]
- if potential_partition_keys:
- self.partition_key = rng.choice(potential_partition_keys).name(True)
- self.partition_count = rng.randint(1, 10)
- else:
- self.partition_count = 0
- if rng.choice([True, False]):
- key_formats = universal_formats.copy()
- if len(key_cols) == 1:
- key_formats.extend(single_column_formats)
- value_formats = universal_formats.copy()
- if len(base_object.columns) == 1:
- value_formats.extend(single_column_formats)
- self.format = (
- f"KEY {rng.choice(key_formats)} VALUE {rng.choice(value_formats)}"
- )
- else:
- self.key = ""
- self.partition_count = 0
- self.rename = 0
- def name(self) -> str:
- if self.rename:
- return naughtify(f"sink-{self.sink_id}-{self.rename}")
- return naughtify(f"sink-{self.sink_id}")
- def __str__(self) -> str:
- return f"{self.schema}.{identifier(self.name())}"
- def create(self, exe: Executor) -> None:
- topic = f"sink_topic{self.sink_id}"
- maybe_partition = (
- f", TOPIC PARTITION COUNT {self.partition_count}, PARTITION BY {self.partition_key}"
- if self.partition_count
- else ""
- )
- query = f"CREATE SINK {self} IN CLUSTER {self.cluster} FROM {self.base_object} INTO KAFKA CONNECTION kafka_conn (TOPIC {topic}{maybe_partition}) {self.key} {self.format} ENVELOPE {self.envelope}"
- exe.execute(query)
- class MySqlColumn(Column):
- def __init__(
- self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
- ):
- self.raw_name = name
- self.data_type = data_type
- self.nullable = nullable
- self.db_object = db_object
- def name(self, in_query: bool = False) -> str:
- return identifier(self.raw_name) if in_query else self.raw_name
- class MySqlSource(DBObject):
- source_id: int
- cluster: "Cluster"
- executor: MySqlExecutor
- generator: Iterator[Transaction]
- lock: threading.Lock
- columns: list[MySqlColumn]
- schema: Schema
- num_rows: int
- def __init__(
- self,
- source_id: int,
- cluster: "Cluster",
- schema: Schema,
- ports: dict[str, int],
- rng: random.Random,
- ):
- super().__init__()
- self.source_id = source_id
- self.cluster = cluster
- self.schema = schema
- self.num_rows = 0
- fields = []
- for i in range(rng.randint(1, 10)):
- fields.append(
- # naughtify: MySql column identifiers are escaped differently for MySql sources: key3_ЁЂЃЄЅІЇЈЉЊЋЌЍЎЏА gets "", but pg8000.native.identifier() doesn't
- Field(f"key{i}", rng.choice(DATA_TYPES_FOR_KEY), True)
- )
- for i in range(rng.randint(0, 20)):
- fields.append(Field(f"value{i}", rng.choice(DATA_TYPES_FOR_MYSQL), False))
- self.columns = [
- MySqlColumn(field.name, field.data_type, False, self) for field in fields
- ]
- self.executor = MySqlExecutor(
- self.source_id,
- ports,
- fields,
- schema.db.name(),
- schema.name(),
- cluster.name(),
- )
- self.generator = rng.choice(list(WORKLOADS))(azurite=False).generate(fields)
- self.lock = threading.Lock()
- def name(self) -> str:
- return self.executor.table
- def __str__(self) -> str:
- return f"{self.schema}.{self.name()}"
- def create(self, exe: Executor) -> None:
- self.executor.create(logging_exe=exe)
- class PostgresColumn(Column):
- def __init__(
- self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
- ):
- self.raw_name = name
- self.data_type = data_type
- self.nullable = nullable
- self.db_object = db_object
- def name(self, in_query: bool = False) -> str:
- return identifier(self.raw_name) if in_query else self.raw_name
- class PostgresSource(DBObject):
- source_id: int
- cluster: "Cluster"
- executor: PgExecutor
- generator: Iterator[Transaction]
- lock: threading.Lock
- columns: list[PostgresColumn]
- schema: Schema
- num_rows: int
- def __init__(
- self,
- source_id: int,
- cluster: "Cluster",
- schema: Schema,
- ports: dict[str, int],
- rng: random.Random,
- ):
- super().__init__()
- self.source_id = source_id
- self.cluster = cluster
- self.schema = schema
- self.num_rows = 0
- fields = []
- for i in range(rng.randint(1, 10)):
- fields.append(
- # naughtify: Postgres column identifiers are escaped differently for postgres sources: key3_ЁЂЃЄЅІЇЈЉЊЋЌЍЎЏА gets "", but pg8000.native.identifier() doesn't
- Field(f"key{i}", rng.choice(DATA_TYPES_FOR_AVRO), True)
- )
- for i in range(rng.randint(0, 20)):
- fields.append(Field(f"value{i}", rng.choice(DATA_TYPES_FOR_AVRO), False))
- self.columns = [
- PostgresColumn(field.name, field.data_type, False, self) for field in fields
- ]
- self.executor = PgExecutor(
- self.source_id,
- ports,
- fields,
- schema.db.name(),
- schema.name(),
- cluster.name(),
- )
- self.generator = rng.choice(list(WORKLOADS))(azurite=False).generate(fields)
- self.lock = threading.Lock()
- def name(self) -> str:
- return self.executor.table
- def __str__(self) -> str:
- return f"{self.schema}.{self.name()}"
- def create(self, exe: Executor) -> None:
- self.executor.create(logging_exe=exe)
- class Index:
- _name: str
- lock: threading.Lock
- def __init__(self, name: str):
- self._name = name
- self.lock = threading.Lock()
- def name(self) -> str:
- return self._name
- def __str__(self) -> str:
- return identifier(self.name())
- class Role:
- role_id: int
- lock: threading.Lock
- def __init__(self, role_id: int):
- self.role_id = role_id
- self.lock = threading.Lock()
- def __str__(self) -> str:
- return f"role{self.role_id}"
- def create(self, exe: Executor) -> None:
- exe.execute(f"CREATE ROLE {self}")
- class ClusterReplica:
- replica_id: int
- size: str
- cluster: "Cluster"
- rename: int
- lock: threading.Lock
- def __init__(self, replica_id: int, size: str, cluster: "Cluster"):
- self.replica_id = replica_id
- self.size = size
- self.cluster = cluster
- self.rename = 0
- self.lock = threading.Lock()
- def name(self) -> str:
- if self.rename:
- return naughtify(f"r-{self.replica_id+1}-{self.rename}")
- return naughtify(f"r-{self.replica_id+1}")
- def __str__(self) -> str:
- return identifier(self.name())
- def create(self, exe: Executor) -> None:
- # TODO: More Cluster Replica settings
- exe.execute(
- f"CREATE CLUSTER REPLICA {self.cluster}.{self} SIZE = '{self.size}'"
- )
- class Cluster:
- cluster_id: int
- managed: bool
- size: str
- replicas: list[ClusterReplica]
- replica_id: int
- introspection_interval: str
- rename: int
- lock: threading.Lock
- def __init__(
- self,
- cluster_id: int,
- managed: bool,
- size: str,
- replication_factor: int,
- introspection_interval: str,
- ):
- self.cluster_id = cluster_id
- self.managed = managed
- self.size = size
- self.replicas = [
- ClusterReplica(i, size, self) for i in range(replication_factor)
- ]
- self.replica_id = len(self.replicas)
- self.introspection_interval = introspection_interval
- self.rename = 0
- self.lock = threading.Lock()
- def name(self) -> str:
- if self.rename:
- return naughtify(f"cluster-{self.cluster_id}-{self.rename}")
- return naughtify(f"cluster-{self.cluster_id}")
- def __str__(self) -> str:
- return identifier(self.name())
- def create(self, exe: Executor) -> None:
- query = f"CREATE CLUSTER {self} "
- if self.managed:
- query += f"SIZE = '{self.size}', REPLICATION FACTOR = {len(self.replicas)}, INTROSPECTION INTERVAL = '{self.introspection_interval}'"
- else:
- query += "REPLICAS("
- query += ", ".join(
- f"{replica} (SIZE = '{replica.size}')" for replica in self.replicas
- )
- query += ")"
- exe.execute(query)
- # TODO: Can access both databases from same connection!
- class Database:
- complexity: Complexity
- scenario: Scenario
- host: str
- ports: dict[str, int]
- dbs: list[DB]
- db_id: int
- schemas: list[Schema]
- schema_id: int
- tables: list[Table]
- table_id: int
- views: list[View]
- view_id: int
- roles: list[Role]
- role_id: int
- clusters: list[Cluster]
- cluster_id: int
- indexes: set[Index]
- webhook_sources: list[WebhookSource]
- webhook_source_id: int
- kafka_sources: list[KafkaSource]
- kafka_source_id: int
- mysql_sources: list[MySqlSource]
- mysql_source_id: int
- postgres_sources: list[PostgresSource]
- postgres_source_id: int
- kafka_sinks: list[KafkaSink]
- kafka_sink_id: int
- s3_path: int
- lock: threading.Lock
- seed: str
- sqlsmith_state: str
- flags: dict[str, str]
- def __init__(
- self,
- rng: random.Random,
- seed: str,
- host: str,
- ports: dict[str, int],
- complexity: Complexity,
- scenario: Scenario,
- naughty_identifiers: bool,
- ):
- global NAUGHTY_IDENTIFIERS
- self.host = host
- self.ports = ports
- self.complexity = complexity
- self.scenario = scenario
- self.seed = seed
- NAUGHTY_IDENTIFIERS = naughty_identifiers
- self.s3_path = 0
- self.dbs = [DB(seed, i) for i in range(rng.randint(1, MAX_INITIAL_DBS))]
- self.db_id = len(self.dbs)
- self.schemas = [
- Schema(rng.choice(self.dbs), i)
- for i in range(rng.randint(1, MAX_INITIAL_SCHEMAS))
- ]
- self.schema_id = len(self.schemas)
- self.tables = [
- Table(rng, i, rng.choice(self.schemas))
- for i in range(rng.randint(2, MAX_INITIAL_TABLES))
- ]
- self.table_id = len(self.tables)
- self.views = []
- for i in range(rng.randint(2, MAX_INITIAL_VIEWS)):
- # Only use tables for now since LIMIT 1 and statement_timeout are
- # not effective yet at preventing long-running queries and OoMs.
- base_object = rng.choice(self.tables)
- base_object2: Table | None = rng.choice(self.tables)
- if rng.choice([True, False]) or base_object2 == base_object:
- base_object2 = None
- view = View(rng, i, base_object, base_object2, rng.choice(self.schemas))
- self.views.append(view)
- self.view_id = len(self.views)
- self.roles = [Role(i) for i in range(rng.randint(0, MAX_INITIAL_ROLES))]
- self.role_id = len(self.roles)
- # At least one storage cluster required for WebhookSources
- self.clusters = [
- Cluster(
- i,
- managed=rng.choice([True, False]),
- size=rng.choice(["1", "2"]),
- replication_factor=1,
- introspection_interval=rng.choice(["0", "1s", "10s"]),
- )
- for i in range(rng.randint(1, MAX_INITIAL_CLUSTERS))
- ]
- self.cluster_id = len(self.clusters)
- self.indexes = set()
- self.webhook_sources = [
- WebhookSource(i, rng.choice(self.clusters), rng.choice(self.schemas), rng)
- for i in range(rng.randint(0, MAX_INITIAL_WEBHOOK_SOURCES))
- ]
- self.webhook_source_id = len(self.webhook_sources)
- self.kafka_sources = []
- self.mysql_sources = []
- self.postgres_sources = []
- self.kafka_sinks = []
- self.kafka_source_id = len(self.kafka_sources)
- self.mysql_source_id = len(self.mysql_sources)
- self.postgres_source_id = len(self.postgres_sources)
- self.kafka_sink_id = len(self.kafka_sinks)
- self.lock = threading.Lock()
- self.sqlsmith_state = ""
- self.flags = {}
- def db_objects(
- self,
- ) -> list[
- WebhookSource | MySqlSource | PostgresSource | KafkaSource | View | Table
- ]:
- return (
- self.tables
- + self.views
- + self.kafka_sources
- + self.mysql_sources
- + self.postgres_sources
- + self.webhook_sources
- )
- def db_objects_without_views(
- self,
- ) -> list[
- WebhookSource | MySqlSource | PostgresSource | KafkaSource | View | Table
- ]:
- return [
- obj for obj in self.db_objects() if type(obj) != View or obj.materialized
- ]
- def __iter__(self):
- """Returns all relations"""
- return (
- self.schemas + self.clusters + self.roles + self.db_objects()
- ).__iter__()
- def create(self, exe: Executor, composition: Composition) -> None:
- for db in self.dbs:
- db.drop(exe)
- db.create(exe)
- exe.execute("SELECT name FROM mz_clusters WHERE name LIKE 'c%'")
- for row in exe.cur.fetchall():
- exe.execute(f"DROP CLUSTER {identifier(row[0])} CASCADE")
- exe.execute("SELECT name FROM mz_roles WHERE name LIKE 'r%'")
- for row in exe.cur.fetchall():
- exe.execute(f"DROP ROLE {identifier(row[0])}")
- exe.execute("DROP SECRET IF EXISTS pgpass CASCADE")
- exe.execute("DROP SECRET IF EXISTS mypass CASCADE")
- exe.execute("DROP SECRET IF EXISTS minio CASCADE")
- print("Creating connections")
- exe.execute(
- "CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT"
- )
- exe.execute(
- "CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL 'http://schema-registry:8081'"
- )
- exe.execute("CREATE SECRET pgpass AS 'postgres'")
- exe.execute(
- "CREATE CONNECTION postgres_conn FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres, PASSWORD SECRET pgpass"
- )
- exe.execute(f"CREATE SECRET mypass AS '{MySql.DEFAULT_ROOT_PASSWORD}'")
- exe.execute(
- "CREATE CONNECTION mysql_conn FOR MYSQL HOST 'mysql', USER root, PASSWORD SECRET mypass"
- )
- exe.execute("CREATE SECRET IF NOT EXISTS minio AS 'minioadmin'")
- exe.execute(
- "CREATE CONNECTION IF NOT EXISTS aws_conn TO AWS (ENDPOINT 'http://minio:9000/', REGION 'minio', ACCESS KEY ID 'minioadmin', SECRET ACCESS KEY SECRET minio)"
- )
- print("Creating relations")
- for relation in self:
- relation.create(exe)
- if False: # Questionable use
- result = composition.run(
- "sqlsmith",
- "--target=host=materialized port=6875 dbname=materialize user=materialize",
- "--exclude-catalog",
- "--dump-state",
- capture=True,
- capture_stderr=True,
- rm=True,
- )
- self.sqlsmith_state = result.stdout
- def drop(self, exe: Executor) -> None:
- for db in self.dbs:
- print(f"Dropping database {db}")
- db.drop(exe)
- for src in self.kafka_sources:
- src.executor.mz_conn.close()
- for src in self.postgres_sources:
- src.executor.mz_conn.close()
- for src in self.mysql_sources:
- src.executor.mz_conn.close()
- def update_sqlsmith_state(self, composition: Composition) -> None:
- if False: # Questionable use
- result = composition.run(
- "sqlsmith",
- "--target=host=materialized port=6875 dbname=materialize user=materialize",
- "--exclude-catalog",
- "--read-state",
- "--dump-state",
- stdin=self.sqlsmith_state,
- capture=True,
- capture_stderr=True,
- rm=True,
- )
- self.sqlsmith_state = result.stdout
|