database.py 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. import threading
  11. from collections.abc import Iterator
  12. from copy import copy
  13. from enum import Enum
  14. from pg8000.native import identifier, literal
  15. from materialize.data_ingest.data_type import (
  16. DATA_TYPES,
  17. DATA_TYPES_FOR_AVRO,
  18. DATA_TYPES_FOR_KEY,
  19. DATA_TYPES_FOR_MYSQL,
  20. NUMBER_TYPES,
  21. Bytea,
  22. DataType,
  23. Jsonb,
  24. Text,
  25. TextTextMap,
  26. )
  27. from materialize.data_ingest.definition import Insert
  28. from materialize.data_ingest.executor import KafkaExecutor, MySqlExecutor, PgExecutor
  29. from materialize.data_ingest.field import Field
  30. from materialize.data_ingest.transaction import Transaction
  31. from materialize.data_ingest.workload import WORKLOADS
  32. from materialize.mzcompose.composition import Composition
  33. from materialize.mzcompose.services.mysql import MySql
  34. from materialize.parallel_workload.executor import Executor
  35. from materialize.parallel_workload.settings import Complexity, Scenario
  36. from materialize.util import naughty_strings
  37. MAX_COLUMNS = 5
  38. MAX_INCLUDE_HEADERS = 5
  39. MAX_ROWS = 50
  40. MAX_CLUSTERS = 4
  41. MAX_CLUSTER_REPLICAS = 2
  42. MAX_DBS = 5
  43. MAX_SCHEMAS = 5
  44. MAX_TABLES = 5
  45. MAX_VIEWS = 15
  46. MAX_INDEXES = 15
  47. MAX_ROLES = 15
  48. MAX_WEBHOOK_SOURCES = 5
  49. MAX_KAFKA_SOURCES = 5
  50. MAX_MYSQL_SOURCES = 5
  51. MAX_POSTGRES_SOURCES = 5
  52. MAX_KAFKA_SINKS = 5
  53. MAX_INITIAL_DBS = 1
  54. MAX_INITIAL_SCHEMAS = 1
  55. MAX_INITIAL_CLUSTERS = 2
  56. MAX_INITIAL_TABLES = 2
  57. MAX_INITIAL_VIEWS = 2
  58. MAX_INITIAL_ROLES = 1
  59. MAX_INITIAL_WEBHOOK_SOURCES = 1
  60. MAX_INITIAL_KAFKA_SOURCES = 1
  61. MAX_INITIAL_MYSQL_SOURCES = 1
  62. MAX_INITIAL_POSTGRES_SOURCES = 1
  63. MAX_INITIAL_KAFKA_SINKS = 1
  64. NAUGHTY_IDENTIFIERS = False
  65. def naughtify(name: str) -> str:
  66. """Makes a string into a naughty identifier, always returns the same
  67. identifier when called with the same input."""
  68. global NAUGHTY_IDENTIFIERS
  69. if not NAUGHTY_IDENTIFIERS:
  70. return name
  71. strings = naughty_strings()
  72. # This rng is just to get a more interesting integer for the name
  73. index = sum([10**i * c for i, c in enumerate(name.encode())]) % len(strings)
  74. # Keep them short so we can combine later with other identifiers, 255 char limit
  75. return f"{name}_{strings[index].encode('utf-8')[:16].decode('utf-8', 'ignore')}"
  76. class BodyFormat(Enum):
  77. TEXT = 1
  78. JSON = 2
  79. BYTES = 3
  80. def to_data_type(self) -> type[DataType]:
  81. if self == BodyFormat.JSON:
  82. return Jsonb
  83. if self == BodyFormat.TEXT:
  84. return Text
  85. if self == BodyFormat.BYTES:
  86. return Bytea
  87. raise ValueError(f"Unknown body format {self.name}")
  88. class Column:
  89. column_id: int
  90. data_type: type[DataType]
  91. db_object: "DBObject"
  92. nullable: bool
  93. default: str | None
  94. raw_name: str
  95. def __init__(
  96. self,
  97. rng: random.Random,
  98. column_id: int,
  99. data_type: type[DataType],
  100. db_object: "DBObject",
  101. ):
  102. self.column_id = column_id
  103. self.data_type = data_type
  104. self.db_object = db_object
  105. self.nullable = rng.choice([True, False])
  106. self.default = rng.choice(
  107. [None, str(data_type.random_value(rng, in_query=True))]
  108. )
  109. self.raw_name = f"c-{self.column_id}-{self.data_type.name()}"
  110. def name(self, in_query: bool = False) -> str:
  111. return (
  112. identifier(naughtify(self.raw_name))
  113. if in_query
  114. else naughtify(self.raw_name)
  115. )
  116. def __str__(self) -> str:
  117. return f"{self.db_object}.{self.name(True)}"
  118. def value(self, rng: random.Random, in_query: bool = False) -> str:
  119. return str(self.data_type.random_value(rng, in_query=in_query))
  120. def create(self) -> str:
  121. result = f"{self.name(True)} {self.data_type.name()}"
  122. if self.default:
  123. result += f" DEFAULT {self.default}"
  124. if not self.nullable:
  125. result += " NOT NULL"
  126. return result
  127. class DB:
  128. seed: str
  129. db_id: int
  130. lock: threading.Lock
  131. def __init__(self, seed: str, db_id: int):
  132. self.seed = seed
  133. self.db_id = db_id
  134. self.lock = threading.Lock()
  135. def name(self) -> str:
  136. return naughtify(f"db-pw-{self.seed}-{self.db_id}")
  137. def __str__(self) -> str:
  138. return identifier(self.name())
  139. def create(self, exe: Executor) -> None:
  140. exe.execute(f"CREATE DATABASE {self}")
  141. def drop(self, exe: Executor) -> None:
  142. exe.execute(f"DROP DATABASE IF EXISTS {self}")
  143. class Schema:
  144. schema_id: int
  145. rename: int
  146. db: DB
  147. lock: threading.Lock
  148. def __init__(self, db: DB, schema_id: int):
  149. self.schema_id = schema_id
  150. self.db = db
  151. self.rename = 0
  152. self.lock = threading.Lock()
  153. def name(self) -> str:
  154. if self.rename:
  155. return naughtify(f"s-{self.schema_id}-{self.rename}")
  156. return naughtify(f"s-{self.schema_id}")
  157. def __str__(self) -> str:
  158. return f"{self.db}.{identifier(self.name())}"
  159. def create(self, exe: Executor) -> None:
  160. query = f"CREATE SCHEMA {self}"
  161. exe.execute(query)
  162. class DBObject:
  163. columns: list[Column]
  164. lock: threading.Lock
  165. def __init__(self):
  166. self.lock = threading.Lock()
  167. def name(self) -> str:
  168. raise NotImplementedError
  169. def create(self, exe: Executor) -> None:
  170. raise NotImplementedError
  171. class Table(DBObject):
  172. table_id: int
  173. rename: int
  174. num_rows: int
  175. schema: Schema
  176. def __init__(self, rng: random.Random, table_id: int, schema: Schema):
  177. super().__init__()
  178. self.table_id = table_id
  179. self.schema = schema
  180. self.columns = [
  181. Column(rng, i, rng.choice(DATA_TYPES), self)
  182. for i in range(rng.randint(2, MAX_COLUMNS))
  183. ]
  184. self.num_rows = 0
  185. self.rename = 0
  186. def name(self) -> str:
  187. if self.rename:
  188. return naughtify(f"t-{self.table_id}-{self.rename}")
  189. return naughtify(f"t-{self.table_id}")
  190. def __str__(self) -> str:
  191. return f"{self.schema}.{identifier(self.name())}"
  192. def create(self, exe: Executor) -> None:
  193. query = f"CREATE TABLE {self}("
  194. query += ",\n ".join(column.create() for column in self.columns)
  195. query += ")"
  196. exe.execute(query)
  197. class View(DBObject):
  198. view_id: int
  199. base_object: DBObject
  200. base_object2: DBObject | None
  201. source_columns: list[Column]
  202. materialized: bool
  203. join_column: Column | None
  204. join_column2: Column | None
  205. assert_not_null: list[Column]
  206. rename: int
  207. schema: Schema
  208. refresh: str | None
  209. def __init__(
  210. self,
  211. rng: random.Random,
  212. view_id: int,
  213. base_object: DBObject,
  214. base_object2: DBObject | None,
  215. schema: Schema,
  216. ):
  217. super().__init__()
  218. self.rename = 0
  219. self.view_id = view_id
  220. self.base_object = base_object
  221. self.base_object2 = base_object2
  222. self.schema = schema
  223. all_columns = list(base_object.columns) + (
  224. list(base_object2.columns) if base_object2 else []
  225. )
  226. self.source_columns = [
  227. column
  228. for column in rng.sample(all_columns, k=rng.randint(1, len(all_columns)))
  229. ]
  230. self.columns = [copy(column) for column in self.source_columns]
  231. for column in self.columns:
  232. column.raw_name = f"{column.raw_name}-{column.db_object.name()}"
  233. column.db_object = self
  234. self.materialized = rng.choice([True, False])
  235. self.assert_not_null = (
  236. [
  237. column
  238. for column in rng.sample(
  239. self.columns, k=rng.randint(1, len(self.columns))
  240. )
  241. if not column.nullable
  242. ]
  243. if self.materialized
  244. else []
  245. )
  246. self.refresh = (
  247. rng.choice(
  248. [
  249. "ON COMMIT",
  250. f"EVERY '{rng.randint(1, 60)} seconds {rng.randint(0, 60)} minutes'",
  251. f"EVERY '{rng.randint(1, 60)} seconds {rng.randint(0, 60)} minutes' ALIGNED TO (mz_now())",
  252. # Always in the future of all refreshes of previously generated MVs
  253. "AT mz_now()::string::int8 + 1000",
  254. ]
  255. )
  256. if self.materialized
  257. else None
  258. )
  259. if base_object2:
  260. self.join_column = rng.choice(base_object.columns)
  261. self.join_column2 = None
  262. columns = [
  263. c
  264. for c in base_object2.columns
  265. if c.data_type == self.join_column.data_type
  266. ]
  267. if columns:
  268. self.join_column2 = rng.choice(columns)
  269. def name(self) -> str:
  270. if self.rename:
  271. return naughtify(f"v-{self.view_id}-{self.rename}")
  272. return naughtify(f"v-{self.view_id}")
  273. def __str__(self) -> str:
  274. return f"{self.schema}.{identifier(self.name())}"
  275. def create(self, exe: Executor) -> None:
  276. if self.materialized:
  277. query = "CREATE MATERIALIZED VIEW"
  278. else:
  279. query = "CREATE VIEW"
  280. columns_str = ", ".join(
  281. f"{source_column} AS {column.name(True)}"
  282. for source_column, column in zip(self.source_columns, self.columns)
  283. )
  284. query += f" {self}"
  285. options = []
  286. if self.refresh:
  287. options.append(f"REFRESH {self.refresh}")
  288. if self.assert_not_null:
  289. options.extend(
  290. [f"ASSERT NOT NULL {c.name(True)}" for c in self.assert_not_null]
  291. )
  292. if options:
  293. query += f" WITH ({', '.join(options)})"
  294. query += f" AS SELECT {columns_str} FROM {self.base_object}"
  295. if self.base_object2:
  296. query += f" JOIN {self.base_object2}"
  297. if self.join_column2:
  298. query += " ON "
  299. # TODO: Generic expression generator
  300. if self.join_column2.data_type == TextTextMap:
  301. query += f"map_length({self.join_column}) = map_length({self.join_column2})"
  302. else:
  303. query += f"{self.join_column} = {self.join_column2}"
  304. else:
  305. query += " ON TRUE"
  306. exe.execute(query)
  307. class WebhookColumn(Column):
  308. def __init__(
  309. self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
  310. ):
  311. self.raw_name = name
  312. self.data_type = data_type
  313. self.nullable = nullable
  314. self.db_object = db_object
  315. def name(self, in_query: bool = False) -> str:
  316. return identifier(self.raw_name) if in_query else self.raw_name
  317. class WebhookSource(DBObject):
  318. source_id: int
  319. rename: int
  320. cluster: "Cluster"
  321. body_format: BodyFormat
  322. include_headers: bool
  323. explicit_include_headers: list[str]
  324. check: str | None
  325. schema: Schema
  326. num_rows: int
  327. def __init__(
  328. self, source_id: int, cluster: "Cluster", schema: Schema, rng: random.Random
  329. ):
  330. super().__init__()
  331. self.source_id = source_id
  332. self.cluster = cluster
  333. self.schema = schema
  334. self.rename = 0
  335. self.body_format = rng.choice([e for e in BodyFormat])
  336. self.include_headers = rng.choice([True, False])
  337. self.explicit_include_headers = []
  338. self.num_rows = 0
  339. self.columns = [
  340. WebhookColumn(
  341. "body",
  342. self.body_format.to_data_type(),
  343. False,
  344. self,
  345. )
  346. ]
  347. if self.include_headers:
  348. self.columns.append(WebhookColumn("headers", TextTextMap, False, self))
  349. for i in range(rng.randint(0, MAX_INCLUDE_HEADERS)):
  350. # naughtify: UnicodeEncodeError: 'ascii' codec can't encode characters
  351. self.explicit_include_headers.append(f"ih{i}")
  352. # for testing now() in check
  353. if rng.choice([True, False]):
  354. self.explicit_include_headers.append("timestamp")
  355. self.columns += [
  356. WebhookColumn(include_header, Text, True, self)
  357. for include_header in self.explicit_include_headers
  358. ]
  359. self.check_expr = None
  360. if rng.choice([True, False]):
  361. # TODO: More general expressions, failing expressions
  362. exprs = [
  363. "BODY = BODY",
  364. "map_length(HEADERS) = map_length(HEADERS)",
  365. ]
  366. if "timestamp" in self.explicit_include_headers:
  367. exprs.append(
  368. "(headers->'timestamp'::text)::timestamp + INTERVAL '10s' >= now()"
  369. )
  370. self.check_expr = " AND ".join(
  371. rng.sample(exprs, k=rng.randint(1, len(exprs)))
  372. )
  373. # TODO: CHECK WITH SECRET
  374. # TODO: NOT IN INCLUDE HEADERS
  375. def name(self) -> str:
  376. if self.rename:
  377. return naughtify(f"wh-{self.source_id}-{self.rename}")
  378. return naughtify(f"wh-{self.source_id}")
  379. def __str__(self) -> str:
  380. return f"{self.schema}.{identifier(self.name())}"
  381. def create(self, exe: Executor) -> None:
  382. query = f"CREATE SOURCE {self} IN CLUSTER {self.cluster} FROM WEBHOOK BODY FORMAT {self.body_format.name}"
  383. if self.include_headers:
  384. query += " INCLUDE HEADERS"
  385. for include_header in self.explicit_include_headers:
  386. query += f" INCLUDE HEADER {literal(include_header)} as {identifier(include_header)}"
  387. if self.check_expr:
  388. query += f" CHECK (WITH (BODY, HEADERS) {self.check_expr})"
  389. exe.execute(query)
  390. class KafkaColumn(Column):
  391. def __init__(
  392. self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
  393. ):
  394. self.raw_name = name
  395. self.data_type = data_type
  396. self.nullable = nullable
  397. self.db_object = db_object
  398. def name(self, in_query: bool = False) -> str:
  399. return identifier(self.raw_name) if in_query else self.raw_name
  400. class KafkaSource(DBObject):
  401. source_id: int
  402. cluster: "Cluster"
  403. executor: KafkaExecutor
  404. generator: Iterator[Transaction]
  405. lock: threading.Lock
  406. columns: list[KafkaColumn]
  407. schema: Schema
  408. num_rows: int
  409. def __init__(
  410. self,
  411. source_id: int,
  412. cluster: "Cluster",
  413. schema: Schema,
  414. ports: dict[str, int],
  415. rng: random.Random,
  416. ):
  417. super().__init__()
  418. self.source_id = source_id
  419. self.cluster = cluster
  420. self.schema = schema
  421. self.num_rows = 0
  422. fields = []
  423. for i in range(rng.randint(1, 10)):
  424. fields.append(
  425. # naughtify: Invalid schema
  426. Field(f"key{i}", rng.choice(DATA_TYPES_FOR_AVRO), True)
  427. )
  428. for i in range(rng.randint(0, 20)):
  429. fields.append(Field(f"value{i}", rng.choice(DATA_TYPES_FOR_AVRO), False))
  430. self.columns = [
  431. KafkaColumn(field.name, field.data_type, False, self) for field in fields
  432. ]
  433. self.executor = KafkaExecutor(
  434. self.source_id,
  435. ports,
  436. fields,
  437. schema.db.name(),
  438. schema.name(),
  439. cluster.name(),
  440. )
  441. workload = rng.choice(list(WORKLOADS))(azurite=False)
  442. for transaction_def in workload.cycle:
  443. for definition in transaction_def.operations:
  444. if type(definition) == Insert and definition.count > MAX_ROWS:
  445. definition.count = 100
  446. self.generator = workload.generate(fields)
  447. self.lock = threading.Lock()
  448. def name(self) -> str:
  449. return self.executor.table
  450. def __str__(self) -> str:
  451. return f"{self.schema}.{self.name()}"
  452. def create(self, exe: Executor) -> None:
  453. self.executor.create(logging_exe=exe)
  454. class KafkaSink(DBObject):
  455. sink_id: int
  456. rename: int
  457. cluster: "Cluster"
  458. schema: Schema
  459. base_object: DBObject
  460. envelope: str
  461. key: str
  462. def __init__(
  463. self,
  464. sink_id: int,
  465. cluster: "Cluster",
  466. schema: Schema,
  467. base_object: DBObject,
  468. rng: random.Random,
  469. ):
  470. super().__init__()
  471. self.sink_id = sink_id
  472. self.cluster = cluster
  473. self.schema = schema
  474. self.base_object = base_object
  475. universal_formats = [
  476. "FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn",
  477. "FORMAT JSON",
  478. ]
  479. single_column_formats = ["FORMAT BYTES", "FORMAT TEXT"]
  480. formats = universal_formats.copy()
  481. if len(base_object.columns) == 1:
  482. formats.extend(single_column_formats)
  483. self.format = rng.choice(formats)
  484. self.envelope = (
  485. "UPSERT" if self.format == "JSON" else rng.choice(["DEBEZIUM", "UPSERT"])
  486. )
  487. if self.envelope == "UPSERT" or rng.choice([True, False]):
  488. key_cols = [
  489. column
  490. for column in rng.sample(
  491. base_object.columns, k=rng.randint(1, len(base_object.columns))
  492. )
  493. ]
  494. key_col_names = [column.name(True) for column in key_cols]
  495. self.key = f"KEY ({', '.join(key_col_names)}) NOT ENFORCED"
  496. potential_partition_keys = [
  497. key_col for key_col in key_cols if key_col.data_type in NUMBER_TYPES
  498. ]
  499. if potential_partition_keys:
  500. self.partition_key = rng.choice(potential_partition_keys).name(True)
  501. self.partition_count = rng.randint(1, 10)
  502. else:
  503. self.partition_count = 0
  504. if rng.choice([True, False]):
  505. key_formats = universal_formats.copy()
  506. if len(key_cols) == 1:
  507. key_formats.extend(single_column_formats)
  508. value_formats = universal_formats.copy()
  509. if len(base_object.columns) == 1:
  510. value_formats.extend(single_column_formats)
  511. self.format = (
  512. f"KEY {rng.choice(key_formats)} VALUE {rng.choice(value_formats)}"
  513. )
  514. else:
  515. self.key = ""
  516. self.partition_count = 0
  517. self.rename = 0
  518. def name(self) -> str:
  519. if self.rename:
  520. return naughtify(f"sink-{self.sink_id}-{self.rename}")
  521. return naughtify(f"sink-{self.sink_id}")
  522. def __str__(self) -> str:
  523. return f"{self.schema}.{identifier(self.name())}"
  524. def create(self, exe: Executor) -> None:
  525. topic = f"sink_topic{self.sink_id}"
  526. maybe_partition = (
  527. f", TOPIC PARTITION COUNT {self.partition_count}, PARTITION BY {self.partition_key}"
  528. if self.partition_count
  529. else ""
  530. )
  531. query = f"CREATE SINK {self} IN CLUSTER {self.cluster} FROM {self.base_object} INTO KAFKA CONNECTION kafka_conn (TOPIC {topic}{maybe_partition}) {self.key} {self.format} ENVELOPE {self.envelope}"
  532. exe.execute(query)
  533. class MySqlColumn(Column):
  534. def __init__(
  535. self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
  536. ):
  537. self.raw_name = name
  538. self.data_type = data_type
  539. self.nullable = nullable
  540. self.db_object = db_object
  541. def name(self, in_query: bool = False) -> str:
  542. return identifier(self.raw_name) if in_query else self.raw_name
  543. class MySqlSource(DBObject):
  544. source_id: int
  545. cluster: "Cluster"
  546. executor: MySqlExecutor
  547. generator: Iterator[Transaction]
  548. lock: threading.Lock
  549. columns: list[MySqlColumn]
  550. schema: Schema
  551. num_rows: int
  552. def __init__(
  553. self,
  554. source_id: int,
  555. cluster: "Cluster",
  556. schema: Schema,
  557. ports: dict[str, int],
  558. rng: random.Random,
  559. ):
  560. super().__init__()
  561. self.source_id = source_id
  562. self.cluster = cluster
  563. self.schema = schema
  564. self.num_rows = 0
  565. fields = []
  566. for i in range(rng.randint(1, 10)):
  567. fields.append(
  568. # naughtify: MySql column identifiers are escaped differently for MySql sources: key3_ЁЂЃЄЅІЇЈЉЊЋЌЍЎЏА gets "", but pg8000.native.identifier() doesn't
  569. Field(f"key{i}", rng.choice(DATA_TYPES_FOR_KEY), True)
  570. )
  571. for i in range(rng.randint(0, 20)):
  572. fields.append(Field(f"value{i}", rng.choice(DATA_TYPES_FOR_MYSQL), False))
  573. self.columns = [
  574. MySqlColumn(field.name, field.data_type, False, self) for field in fields
  575. ]
  576. self.executor = MySqlExecutor(
  577. self.source_id,
  578. ports,
  579. fields,
  580. schema.db.name(),
  581. schema.name(),
  582. cluster.name(),
  583. )
  584. self.generator = rng.choice(list(WORKLOADS))(azurite=False).generate(fields)
  585. self.lock = threading.Lock()
  586. def name(self) -> str:
  587. return self.executor.table
  588. def __str__(self) -> str:
  589. return f"{self.schema}.{self.name()}"
  590. def create(self, exe: Executor) -> None:
  591. self.executor.create(logging_exe=exe)
  592. class PostgresColumn(Column):
  593. def __init__(
  594. self, name: str, data_type: type[DataType], nullable: bool, db_object: DBObject
  595. ):
  596. self.raw_name = name
  597. self.data_type = data_type
  598. self.nullable = nullable
  599. self.db_object = db_object
  600. def name(self, in_query: bool = False) -> str:
  601. return identifier(self.raw_name) if in_query else self.raw_name
  602. class PostgresSource(DBObject):
  603. source_id: int
  604. cluster: "Cluster"
  605. executor: PgExecutor
  606. generator: Iterator[Transaction]
  607. lock: threading.Lock
  608. columns: list[PostgresColumn]
  609. schema: Schema
  610. num_rows: int
  611. def __init__(
  612. self,
  613. source_id: int,
  614. cluster: "Cluster",
  615. schema: Schema,
  616. ports: dict[str, int],
  617. rng: random.Random,
  618. ):
  619. super().__init__()
  620. self.source_id = source_id
  621. self.cluster = cluster
  622. self.schema = schema
  623. self.num_rows = 0
  624. fields = []
  625. for i in range(rng.randint(1, 10)):
  626. fields.append(
  627. # naughtify: Postgres column identifiers are escaped differently for postgres sources: key3_ЁЂЃЄЅІЇЈЉЊЋЌЍЎЏА gets "", but pg8000.native.identifier() doesn't
  628. Field(f"key{i}", rng.choice(DATA_TYPES_FOR_AVRO), True)
  629. )
  630. for i in range(rng.randint(0, 20)):
  631. fields.append(Field(f"value{i}", rng.choice(DATA_TYPES_FOR_AVRO), False))
  632. self.columns = [
  633. PostgresColumn(field.name, field.data_type, False, self) for field in fields
  634. ]
  635. self.executor = PgExecutor(
  636. self.source_id,
  637. ports,
  638. fields,
  639. schema.db.name(),
  640. schema.name(),
  641. cluster.name(),
  642. )
  643. self.generator = rng.choice(list(WORKLOADS))(azurite=False).generate(fields)
  644. self.lock = threading.Lock()
  645. def name(self) -> str:
  646. return self.executor.table
  647. def __str__(self) -> str:
  648. return f"{self.schema}.{self.name()}"
  649. def create(self, exe: Executor) -> None:
  650. self.executor.create(logging_exe=exe)
  651. class Index:
  652. _name: str
  653. lock: threading.Lock
  654. def __init__(self, name: str):
  655. self._name = name
  656. self.lock = threading.Lock()
  657. def name(self) -> str:
  658. return self._name
  659. def __str__(self) -> str:
  660. return identifier(self.name())
  661. class Role:
  662. role_id: int
  663. lock: threading.Lock
  664. def __init__(self, role_id: int):
  665. self.role_id = role_id
  666. self.lock = threading.Lock()
  667. def __str__(self) -> str:
  668. return f"role{self.role_id}"
  669. def create(self, exe: Executor) -> None:
  670. exe.execute(f"CREATE ROLE {self}")
  671. class ClusterReplica:
  672. replica_id: int
  673. size: str
  674. cluster: "Cluster"
  675. rename: int
  676. lock: threading.Lock
  677. def __init__(self, replica_id: int, size: str, cluster: "Cluster"):
  678. self.replica_id = replica_id
  679. self.size = size
  680. self.cluster = cluster
  681. self.rename = 0
  682. self.lock = threading.Lock()
  683. def name(self) -> str:
  684. if self.rename:
  685. return naughtify(f"r-{self.replica_id+1}-{self.rename}")
  686. return naughtify(f"r-{self.replica_id+1}")
  687. def __str__(self) -> str:
  688. return identifier(self.name())
  689. def create(self, exe: Executor) -> None:
  690. # TODO: More Cluster Replica settings
  691. exe.execute(
  692. f"CREATE CLUSTER REPLICA {self.cluster}.{self} SIZE = '{self.size}'"
  693. )
  694. class Cluster:
  695. cluster_id: int
  696. managed: bool
  697. size: str
  698. replicas: list[ClusterReplica]
  699. replica_id: int
  700. introspection_interval: str
  701. rename: int
  702. lock: threading.Lock
  703. def __init__(
  704. self,
  705. cluster_id: int,
  706. managed: bool,
  707. size: str,
  708. replication_factor: int,
  709. introspection_interval: str,
  710. ):
  711. self.cluster_id = cluster_id
  712. self.managed = managed
  713. self.size = size
  714. self.replicas = [
  715. ClusterReplica(i, size, self) for i in range(replication_factor)
  716. ]
  717. self.replica_id = len(self.replicas)
  718. self.introspection_interval = introspection_interval
  719. self.rename = 0
  720. self.lock = threading.Lock()
  721. def name(self) -> str:
  722. if self.rename:
  723. return naughtify(f"cluster-{self.cluster_id}-{self.rename}")
  724. return naughtify(f"cluster-{self.cluster_id}")
  725. def __str__(self) -> str:
  726. return identifier(self.name())
  727. def create(self, exe: Executor) -> None:
  728. query = f"CREATE CLUSTER {self} "
  729. if self.managed:
  730. query += f"SIZE = '{self.size}', REPLICATION FACTOR = {len(self.replicas)}, INTROSPECTION INTERVAL = '{self.introspection_interval}'"
  731. else:
  732. query += "REPLICAS("
  733. query += ", ".join(
  734. f"{replica} (SIZE = '{replica.size}')" for replica in self.replicas
  735. )
  736. query += ")"
  737. exe.execute(query)
  738. # TODO: Can access both databases from same connection!
  739. class Database:
  740. complexity: Complexity
  741. scenario: Scenario
  742. host: str
  743. ports: dict[str, int]
  744. dbs: list[DB]
  745. db_id: int
  746. schemas: list[Schema]
  747. schema_id: int
  748. tables: list[Table]
  749. table_id: int
  750. views: list[View]
  751. view_id: int
  752. roles: list[Role]
  753. role_id: int
  754. clusters: list[Cluster]
  755. cluster_id: int
  756. indexes: set[Index]
  757. webhook_sources: list[WebhookSource]
  758. webhook_source_id: int
  759. kafka_sources: list[KafkaSource]
  760. kafka_source_id: int
  761. mysql_sources: list[MySqlSource]
  762. mysql_source_id: int
  763. postgres_sources: list[PostgresSource]
  764. postgres_source_id: int
  765. kafka_sinks: list[KafkaSink]
  766. kafka_sink_id: int
  767. s3_path: int
  768. lock: threading.Lock
  769. seed: str
  770. sqlsmith_state: str
  771. flags: dict[str, str]
  772. def __init__(
  773. self,
  774. rng: random.Random,
  775. seed: str,
  776. host: str,
  777. ports: dict[str, int],
  778. complexity: Complexity,
  779. scenario: Scenario,
  780. naughty_identifiers: bool,
  781. ):
  782. global NAUGHTY_IDENTIFIERS
  783. self.host = host
  784. self.ports = ports
  785. self.complexity = complexity
  786. self.scenario = scenario
  787. self.seed = seed
  788. NAUGHTY_IDENTIFIERS = naughty_identifiers
  789. self.s3_path = 0
  790. self.dbs = [DB(seed, i) for i in range(rng.randint(1, MAX_INITIAL_DBS))]
  791. self.db_id = len(self.dbs)
  792. self.schemas = [
  793. Schema(rng.choice(self.dbs), i)
  794. for i in range(rng.randint(1, MAX_INITIAL_SCHEMAS))
  795. ]
  796. self.schema_id = len(self.schemas)
  797. self.tables = [
  798. Table(rng, i, rng.choice(self.schemas))
  799. for i in range(rng.randint(2, MAX_INITIAL_TABLES))
  800. ]
  801. self.table_id = len(self.tables)
  802. self.views = []
  803. for i in range(rng.randint(2, MAX_INITIAL_VIEWS)):
  804. # Only use tables for now since LIMIT 1 and statement_timeout are
  805. # not effective yet at preventing long-running queries and OoMs.
  806. base_object = rng.choice(self.tables)
  807. base_object2: Table | None = rng.choice(self.tables)
  808. if rng.choice([True, False]) or base_object2 == base_object:
  809. base_object2 = None
  810. view = View(rng, i, base_object, base_object2, rng.choice(self.schemas))
  811. self.views.append(view)
  812. self.view_id = len(self.views)
  813. self.roles = [Role(i) for i in range(rng.randint(0, MAX_INITIAL_ROLES))]
  814. self.role_id = len(self.roles)
  815. # At least one storage cluster required for WebhookSources
  816. self.clusters = [
  817. Cluster(
  818. i,
  819. managed=rng.choice([True, False]),
  820. size=rng.choice(["1", "2"]),
  821. replication_factor=1,
  822. introspection_interval=rng.choice(["0", "1s", "10s"]),
  823. )
  824. for i in range(rng.randint(1, MAX_INITIAL_CLUSTERS))
  825. ]
  826. self.cluster_id = len(self.clusters)
  827. self.indexes = set()
  828. self.webhook_sources = [
  829. WebhookSource(i, rng.choice(self.clusters), rng.choice(self.schemas), rng)
  830. for i in range(rng.randint(0, MAX_INITIAL_WEBHOOK_SOURCES))
  831. ]
  832. self.webhook_source_id = len(self.webhook_sources)
  833. self.kafka_sources = []
  834. self.mysql_sources = []
  835. self.postgres_sources = []
  836. self.kafka_sinks = []
  837. self.kafka_source_id = len(self.kafka_sources)
  838. self.mysql_source_id = len(self.mysql_sources)
  839. self.postgres_source_id = len(self.postgres_sources)
  840. self.kafka_sink_id = len(self.kafka_sinks)
  841. self.lock = threading.Lock()
  842. self.sqlsmith_state = ""
  843. self.flags = {}
  844. def db_objects(
  845. self,
  846. ) -> list[
  847. WebhookSource | MySqlSource | PostgresSource | KafkaSource | View | Table
  848. ]:
  849. return (
  850. self.tables
  851. + self.views
  852. + self.kafka_sources
  853. + self.mysql_sources
  854. + self.postgres_sources
  855. + self.webhook_sources
  856. )
  857. def db_objects_without_views(
  858. self,
  859. ) -> list[
  860. WebhookSource | MySqlSource | PostgresSource | KafkaSource | View | Table
  861. ]:
  862. return [
  863. obj for obj in self.db_objects() if type(obj) != View or obj.materialized
  864. ]
  865. def __iter__(self):
  866. """Returns all relations"""
  867. return (
  868. self.schemas + self.clusters + self.roles + self.db_objects()
  869. ).__iter__()
  870. def create(self, exe: Executor, composition: Composition) -> None:
  871. for db in self.dbs:
  872. db.drop(exe)
  873. db.create(exe)
  874. exe.execute("SELECT name FROM mz_clusters WHERE name LIKE 'c%'")
  875. for row in exe.cur.fetchall():
  876. exe.execute(f"DROP CLUSTER {identifier(row[0])} CASCADE")
  877. exe.execute("SELECT name FROM mz_roles WHERE name LIKE 'r%'")
  878. for row in exe.cur.fetchall():
  879. exe.execute(f"DROP ROLE {identifier(row[0])}")
  880. exe.execute("DROP SECRET IF EXISTS pgpass CASCADE")
  881. exe.execute("DROP SECRET IF EXISTS mypass CASCADE")
  882. exe.execute("DROP SECRET IF EXISTS minio CASCADE")
  883. print("Creating connections")
  884. exe.execute(
  885. "CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT"
  886. )
  887. exe.execute(
  888. "CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL 'http://schema-registry:8081'"
  889. )
  890. exe.execute("CREATE SECRET pgpass AS 'postgres'")
  891. exe.execute(
  892. "CREATE CONNECTION postgres_conn FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres, PASSWORD SECRET pgpass"
  893. )
  894. exe.execute(f"CREATE SECRET mypass AS '{MySql.DEFAULT_ROOT_PASSWORD}'")
  895. exe.execute(
  896. "CREATE CONNECTION mysql_conn FOR MYSQL HOST 'mysql', USER root, PASSWORD SECRET mypass"
  897. )
  898. exe.execute("CREATE SECRET IF NOT EXISTS minio AS 'minioadmin'")
  899. exe.execute(
  900. "CREATE CONNECTION IF NOT EXISTS aws_conn TO AWS (ENDPOINT 'http://minio:9000/', REGION 'minio', ACCESS KEY ID 'minioadmin', SECRET ACCESS KEY SECRET minio)"
  901. )
  902. print("Creating relations")
  903. for relation in self:
  904. relation.create(exe)
  905. if False: # Questionable use
  906. result = composition.run(
  907. "sqlsmith",
  908. "--target=host=materialized port=6875 dbname=materialize user=materialize",
  909. "--exclude-catalog",
  910. "--dump-state",
  911. capture=True,
  912. capture_stderr=True,
  913. rm=True,
  914. )
  915. self.sqlsmith_state = result.stdout
  916. def drop(self, exe: Executor) -> None:
  917. for db in self.dbs:
  918. print(f"Dropping database {db}")
  919. db.drop(exe)
  920. for src in self.kafka_sources:
  921. src.executor.mz_conn.close()
  922. for src in self.postgres_sources:
  923. src.executor.mz_conn.close()
  924. for src in self.mysql_sources:
  925. src.executor.mz_conn.close()
  926. def update_sqlsmith_state(self, composition: Composition) -> None:
  927. if False: # Questionable use
  928. result = composition.run(
  929. "sqlsmith",
  930. "--target=host=materialized port=6875 dbname=materialize user=materialize",
  931. "--exclude-catalog",
  932. "--read-state",
  933. "--dump-state",
  934. stdin=self.sqlsmith_state,
  935. capture=True,
  936. capture_stderr=True,
  937. rm=True,
  938. )
  939. self.sqlsmith_state = result.stdout