executor.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. import random
  11. import time
  12. from typing import Any
  13. import confluent_kafka # type: ignore
  14. import psycopg
  15. import pymysql
  16. import pymysql.cursors
  17. from confluent_kafka.admin import AdminClient # type: ignore
  18. from confluent_kafka.schema_registry import Schema, SchemaRegistryClient # type: ignore
  19. from confluent_kafka.schema_registry.avro import AvroSerializer # type: ignore
  20. from confluent_kafka.serialization import ( # type: ignore
  21. MessageField,
  22. SerializationContext,
  23. )
  24. from pg8000.native import identifier
  25. from psycopg.errors import OperationalError
  26. from materialize.data_ingest.data_type import Backend
  27. from materialize.data_ingest.field import Field, formatted_value
  28. from materialize.data_ingest.query_error import QueryError
  29. from materialize.data_ingest.row import Operation
  30. from materialize.data_ingest.transaction import Transaction
  31. from materialize.mzcompose.services.mysql import MySql
  32. class Executor:
  33. num_transactions: int
  34. ports: dict[str, int]
  35. mz_conn: psycopg.Connection
  36. fields: list[Field]
  37. database: str
  38. schema: str
  39. cluster: str | None
  40. logging_exe: Any | None
  41. mz_service: str | None = None
  42. def __init__(
  43. self,
  44. ports: dict[str, int],
  45. fields: list[Field] = [],
  46. database: str = "",
  47. schema: str = "public",
  48. cluster: str | None = None,
  49. mz_service: str | None = None,
  50. ) -> None:
  51. self.num_transactions = 0
  52. self.ports = ports
  53. self.fields = fields
  54. self.database = database
  55. self.schema = schema
  56. self.cluster = cluster
  57. self.mz_service = mz_service
  58. self.logging_exe = None
  59. self.reconnect()
  60. def reconnect(self) -> None:
  61. mz_service = self.mz_service
  62. if not mz_service:
  63. mz_service = (
  64. random.choice(["materialized", "materialized2"])
  65. if "materialized2" in self.ports
  66. else "materialized"
  67. )
  68. self.mz_conn = psycopg.connect(
  69. host="localhost",
  70. port=self.ports[mz_service],
  71. user="materialize",
  72. dbname=self.database,
  73. )
  74. self.mz_conn.autocommit = True
  75. def create(self, logging_exe: Any | None = None) -> None:
  76. raise NotImplementedError
  77. def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
  78. raise NotImplementedError
  79. def execute(self, cur: psycopg.Cursor | pymysql.cursors.Cursor, query: str) -> None:
  80. if self.logging_exe is not None:
  81. self.logging_exe.log(query)
  82. try:
  83. (
  84. cur.execute(query.encode())
  85. if isinstance(cur, psycopg.Cursor)
  86. else cur.execute(query)
  87. )
  88. except OperationalError:
  89. # Can happen after Mz disruptions if we are running queries against Mz
  90. print("Network error, retrying")
  91. time.sleep(0.01)
  92. self.reconnect()
  93. with self.mz_conn.cursor() as cur:
  94. self.execute(cur, query)
  95. except Exception as e:
  96. print(f"Query failed: {query} {e}")
  97. raise QueryError(str(e), query)
  98. def execute_with_retry_on_error(
  99. self,
  100. cur: psycopg.Cursor,
  101. query: str,
  102. required_error_message_substrs: list[str],
  103. max_tries: int = 5,
  104. wait_time_in_sec: int = 1,
  105. ) -> None:
  106. for try_count in range(1, max_tries + 1):
  107. try:
  108. self.execute(cur, query)
  109. return
  110. except Exception as e:
  111. if not any([s in e.__str__() for s in required_error_message_substrs]):
  112. raise
  113. elif try_count == max_tries:
  114. raise
  115. else:
  116. time.sleep(wait_time_in_sec)
  117. class PrintExecutor(Executor):
  118. def create(self, logging_exe: Any | None = None) -> None:
  119. pass
  120. def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
  121. print("Transaction:")
  122. print(" ", transaction.row_lists)
  123. def delivery_report(err: str, msg: Any) -> None:
  124. assert err is None, f"Delivery failed for User record {msg.key()}: {err}"
  125. class KafkaExecutor(Executor):
  126. producer: confluent_kafka.Producer
  127. avro_serializer: AvroSerializer
  128. key_avro_serializer: AvroSerializer
  129. serialization_context: SerializationContext
  130. key_serialization_context: SerializationContext
  131. topic: str
  132. table: str
  133. def __init__(
  134. self,
  135. num: int,
  136. ports: dict[str, int],
  137. fields: list[Field],
  138. database: str,
  139. schema: str = "public",
  140. cluster: str | None = None,
  141. mz_service: str | None = None,
  142. ):
  143. super().__init__(ports, fields, database, schema, cluster, mz_service)
  144. self.topic = f"data-ingest-{num}"
  145. self.table = f"kafka_table{num}"
  146. def create(self, logging_exe: Any | None = None) -> None:
  147. self.logging_exe = logging_exe
  148. schema = {
  149. "type": "record",
  150. "name": "value",
  151. "fields": [
  152. {
  153. "name": field.name,
  154. "type": str(field.data_type.name(Backend.AVRO)).lower(),
  155. }
  156. for field in self.fields
  157. if not field.is_key
  158. ],
  159. }
  160. key_schema = {
  161. "type": "record",
  162. "name": "key",
  163. "fields": [
  164. {
  165. "name": field.name,
  166. "type": str(field.data_type.name(Backend.AVRO)).lower(),
  167. }
  168. for field in self.fields
  169. if field.is_key
  170. ],
  171. }
  172. kafka_conf = {"bootstrap.servers": f"localhost:{self.ports['kafka']}"}
  173. a = AdminClient(kafka_conf)
  174. fs = a.create_topics(
  175. [
  176. confluent_kafka.admin.NewTopic( # type: ignore
  177. self.topic, num_partitions=1, replication_factor=1
  178. )
  179. ]
  180. )
  181. for topic, f in fs.items():
  182. f.result()
  183. # NOTE: this _could_ be refactored, but since we are fairly certain at
  184. # this point there will be exactly one topic it should be fine.
  185. topic = list(fs.keys())[0]
  186. schema_registry_conf = {
  187. "url": f"http://localhost:{self.ports['schema-registry']}"
  188. }
  189. registry = SchemaRegistryClient(schema_registry_conf)
  190. self.avro_serializer = AvroSerializer(
  191. registry, json.dumps(schema), lambda d, ctx: d
  192. )
  193. self.key_avro_serializer = AvroSerializer(
  194. registry, json.dumps(key_schema), lambda d, ctx: d
  195. )
  196. if logging_exe is not None:
  197. logging_exe.log(f"{topic}-value: {json.dumps(schema)}")
  198. logging_exe.log(f"{topic}-key: {json.dumps(key_schema)}")
  199. registry.register_schema(
  200. f"{topic}-value", Schema(json.dumps(schema), schema_type="AVRO")
  201. )
  202. registry.register_schema(
  203. f"{topic}-key", Schema(json.dumps(key_schema), schema_type="AVRO")
  204. )
  205. self.serialization_context = SerializationContext(
  206. self.topic, MessageField.VALUE
  207. )
  208. self.key_serialization_context = SerializationContext(
  209. self.topic, MessageField.KEY
  210. )
  211. self.producer = confluent_kafka.Producer(kafka_conf)
  212. with self.mz_conn.cursor() as cur:
  213. self.execute_with_retry_on_error(
  214. cur,
  215. f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table)}
  216. {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
  217. FROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC '{self.topic}')
  218. FORMAT AVRO
  219. USING CONFLUENT SCHEMA REGISTRY CONNECTION materialize.public.csr_conn
  220. ENVELOPE UPSERT""",
  221. required_error_message_substrs=[
  222. "Topic does not exist",
  223. ],
  224. )
  225. def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
  226. self.logging_exe = logging_exe
  227. for row_list in transaction.row_lists:
  228. for row in row_list.rows:
  229. if (
  230. row.operation == Operation.INSERT
  231. or row.operation == Operation.UPSERT
  232. ):
  233. self.producer.produce(
  234. topic=self.topic,
  235. key=self.key_avro_serializer(
  236. {
  237. field.name: value
  238. for field, value in zip(row.fields, row.values)
  239. if field.is_key
  240. },
  241. self.key_serialization_context,
  242. ),
  243. value=self.avro_serializer(
  244. {
  245. field.name: value
  246. for field, value in zip(row.fields, row.values)
  247. if not field.is_key
  248. },
  249. self.serialization_context,
  250. ),
  251. on_delivery=delivery_report,
  252. )
  253. elif row.operation == Operation.DELETE:
  254. self.producer.produce(
  255. topic=self.topic,
  256. key=self.key_avro_serializer(
  257. {
  258. field.name: value
  259. for field, value in zip(row.fields, row.values)
  260. if field.is_key
  261. },
  262. self.key_serialization_context,
  263. ),
  264. value=None,
  265. on_delivery=delivery_report,
  266. )
  267. else:
  268. raise ValueError(f"Unexpected operation {row.operation}")
  269. self.producer.flush()
  270. class MySqlExecutor(Executor):
  271. mysql_conn: pymysql.Connection
  272. table: str
  273. source: str
  274. num: int
  275. def __init__(
  276. self,
  277. num: int,
  278. ports: dict[str, int],
  279. fields: list[Field],
  280. database: str,
  281. schema: str = "public",
  282. cluster: str | None = None,
  283. mz_service: str | None = None,
  284. ):
  285. super().__init__(ports, fields, database, schema, cluster, mz_service)
  286. self.table = f"mytable{num}"
  287. self.source = f"mysql_source{num}"
  288. self.num = num
  289. def create(self, logging_exe: Any | None = None) -> None:
  290. self.logging_exe = logging_exe
  291. self.mysql_conn = pymysql.connect(
  292. host="localhost",
  293. user="root",
  294. password=MySql.DEFAULT_ROOT_PASSWORD,
  295. database="mysql",
  296. port=self.ports["mysql"],
  297. )
  298. values = [
  299. f"`{field.name}` {str(field.data_type.name(Backend.MYSQL)).lower()}"
  300. for field in self.fields
  301. ]
  302. keys = [field.name for field in self.fields if field.is_key]
  303. self.mysql_conn.autocommit(True)
  304. with self.mysql_conn.cursor() as cur:
  305. self.execute(cur, f"DROP TABLE IF EXISTS `{self.table}`;")
  306. primary_key = (
  307. f", PRIMARY KEY ({', '.join([f'`{key}`' for key in keys])})"
  308. if keys
  309. else ""
  310. )
  311. self.execute(
  312. cur,
  313. f"CREATE TABLE `{self.table}` ({', '.join(values)} {primary_key});",
  314. )
  315. self.mysql_conn.autocommit(False)
  316. with self.mz_conn.cursor() as cur:
  317. self.execute(
  318. cur,
  319. f"CREATE SECRET IF NOT EXISTS mypass AS '{MySql.DEFAULT_ROOT_PASSWORD}'",
  320. )
  321. self.execute(
  322. cur,
  323. f"""CREATE CONNECTION mysql{self.num} FOR MYSQL
  324. HOST 'mysql',
  325. USER root,
  326. PASSWORD SECRET mypass""",
  327. )
  328. self.execute(
  329. cur,
  330. f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
  331. {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
  332. FROM MYSQL CONNECTION mysql{self.num}
  333. """,
  334. )
  335. self.execute(
  336. cur,
  337. f"""CREATE TABLE {identifier(self.table)}
  338. FROM SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
  339. (REFERENCE mysql.{identifier(self.table)})""",
  340. )
  341. def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
  342. self.logging_exe = logging_exe
  343. with self.mysql_conn.cursor() as cur:
  344. for row_list in transaction.row_lists:
  345. for row in row_list.rows:
  346. if row.operation == Operation.INSERT:
  347. values_str = ", ".join(
  348. str(formatted_value(value)) for value in row.values
  349. )
  350. self.execute(
  351. cur,
  352. f"""INSERT INTO `{self.table}`
  353. VALUES ({values_str})
  354. """,
  355. )
  356. elif row.operation == Operation.UPSERT:
  357. values_str = ", ".join(
  358. str(formatted_value(value)) for value in row.values
  359. )
  360. ", ".join(
  361. f"`{field.name}`" for field in row.fields if field.is_key
  362. )
  363. update_str = ", ".join(
  364. f"`{field.name}` = VALUES(`{field.name}`)"
  365. for field in row.fields
  366. )
  367. self.execute(
  368. cur,
  369. f"""INSERT INTO `{self.table}`
  370. VALUES ({values_str})
  371. ON DUPLICATE KEY
  372. UPDATE {update_str}
  373. """,
  374. )
  375. elif row.operation == Operation.DELETE:
  376. cond_str = " AND ".join(
  377. f"`{field.name}` = {formatted_value(value)}"
  378. for field, value in zip(row.fields, row.values)
  379. if field.is_key
  380. )
  381. self.execute(
  382. cur,
  383. f"""DELETE FROM `{self.table}`
  384. WHERE {cond_str}
  385. """,
  386. )
  387. else:
  388. raise ValueError(f"Unexpected operation {row.operation}")
  389. self.mysql_conn.commit()
  390. class PgExecutor(Executor):
  391. pg_conn: psycopg.Connection
  392. table: str
  393. source: str
  394. num: int
  395. def __init__(
  396. self,
  397. num: int,
  398. ports: dict[str, int],
  399. fields: list[Field],
  400. database: str,
  401. schema: str = "public",
  402. cluster: str | None = None,
  403. mz_service: str | None = None,
  404. ):
  405. super().__init__(ports, fields, database, schema, cluster, mz_service)
  406. self.table = f"table{num}"
  407. self.source = f"postgres_source{num}"
  408. self.num = num
  409. def create(self, logging_exe: Any | None = None) -> None:
  410. self.logging_exe = logging_exe
  411. self.pg_conn = psycopg.connect(
  412. host="localhost",
  413. user="postgres",
  414. password="postgres",
  415. port=self.ports["postgres"],
  416. )
  417. values = [
  418. f"{identifier(field.name)} {str(field.data_type.name(Backend.POSTGRES)).lower()}"
  419. for field in self.fields
  420. ]
  421. keys = [field.name for field in self.fields if field.is_key]
  422. self.pg_conn.autocommit = True
  423. with self.pg_conn.cursor() as cur:
  424. self.execute(
  425. cur,
  426. f"""DROP TABLE IF EXISTS {identifier(self.table)};
  427. CREATE TABLE {identifier(self.table)} (
  428. {", ".join(values)},
  429. PRIMARY KEY ({", ".join([identifier(key) for key in keys])}));
  430. ALTER TABLE {identifier(self.table)} REPLICA IDENTITY FULL;
  431. CREATE USER postgres{self.num} WITH SUPERUSER PASSWORD 'postgres';
  432. ALTER USER postgres{self.num} WITH replication;
  433. DROP PUBLICATION IF EXISTS {self.source};
  434. CREATE PUBLICATION {self.source} FOR ALL TABLES;""",
  435. )
  436. self.pg_conn.autocommit = False
  437. with self.mz_conn.cursor() as cur:
  438. self.execute(cur, f"CREATE SECRET pgpass{self.num} AS 'postgres'")
  439. self.execute(
  440. cur,
  441. f"""CREATE CONNECTION pg{self.num} FOR POSTGRES
  442. HOST 'postgres',
  443. DATABASE postgres,
  444. USER postgres{self.num},
  445. PASSWORD SECRET pgpass{self.num}""",
  446. )
  447. self.execute(
  448. cur,
  449. f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
  450. {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
  451. FROM POSTGRES CONNECTION pg{self.num} (PUBLICATION '{self.source}')
  452. """,
  453. )
  454. self.execute(
  455. cur,
  456. f"""CREATE TABLE {identifier(self.table)}
  457. FROM SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
  458. (REFERENCE {identifier(self.table)})""",
  459. )
  460. def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
  461. self.logging_exe = logging_exe
  462. with self.pg_conn.cursor() as cur:
  463. for row_list in transaction.row_lists:
  464. for row in row_list.rows:
  465. if row.operation == Operation.INSERT:
  466. values_str = ", ".join(
  467. str(formatted_value(value)) for value in row.values
  468. )
  469. self.execute(
  470. cur,
  471. f"""INSERT INTO {identifier(self.table)}
  472. VALUES ({values_str})
  473. """,
  474. )
  475. elif row.operation == Operation.UPSERT:
  476. values_str = ", ".join(
  477. str(formatted_value(value)) for value in row.values
  478. )
  479. keys_str = ", ".join(
  480. identifier(field.name)
  481. for field in row.fields
  482. if field.is_key
  483. )
  484. update_str = ", ".join(
  485. f"{identifier(field.name)} = EXCLUDED.{identifier(field.name)}"
  486. for field in row.fields
  487. )
  488. self.execute(
  489. cur,
  490. f"""INSERT INTO {identifier(self.table)}
  491. VALUES ({values_str})
  492. ON CONFLICT ({keys_str})
  493. DO UPDATE SET {update_str}
  494. """,
  495. )
  496. elif row.operation == Operation.DELETE:
  497. cond_str = " AND ".join(
  498. f"{identifier(field.name)} = {formatted_value(value)}"
  499. for field, value in zip(row.fields, row.values)
  500. if field.is_key
  501. )
  502. self.execute(
  503. cur,
  504. f"""DELETE FROM {identifier(self.table)}
  505. WHERE {cond_str}
  506. """,
  507. )
  508. else:
  509. raise ValueError(f"Unexpected operation {row.operation}")
  510. self.pg_conn.commit()
  511. class KafkaRoundtripExecutor(Executor):
  512. table: str
  513. table_original: str
  514. topic: str
  515. known_keys: set[tuple[str]]
  516. num: int
  517. def __init__(
  518. self,
  519. num: int,
  520. ports: dict[str, int],
  521. fields: list[Field],
  522. database: str,
  523. schema: str = "public",
  524. cluster: str | None = None,
  525. mz_service: str | None = None,
  526. ):
  527. super().__init__(ports, fields, database, schema, cluster, mz_service)
  528. self.table_original = f"table_rt_source{num}"
  529. self.table = f"table_rt{num}"
  530. self.topic = f"data-ingest-rt-{num}"
  531. self.num = num
  532. self.known_keys = set()
  533. def create(self, logging_exe: Any | None = None) -> None:
  534. self.logging_exe = logging_exe
  535. values = [
  536. f"{field.name} {str(field.data_type.name(Backend.MATERIALIZE)).lower()}"
  537. for field in self.fields
  538. ]
  539. keys = [field.name for field in self.fields if field.is_key]
  540. with self.mz_conn.cursor() as cur:
  541. self.execute(cur, f"DROP TABLE IF EXISTS {identifier(self.table_original)}")
  542. self.execute(
  543. cur,
  544. f"""CREATE TABLE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)} (
  545. {", ".join(values)},
  546. PRIMARY KEY ({", ".join(keys)}));""",
  547. )
  548. self.execute(
  549. cur,
  550. f"""CREATE SINK {identifier(self.database)}.{identifier(self.schema)}.sink{self.num}
  551. {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
  552. FROM {identifier(self.table_original)}
  553. INTO KAFKA CONNECTION kafka_conn (TOPIC '{self.topic}')
  554. KEY ({", ".join([identifier(key) for key in keys])})
  555. FORMAT AVRO
  556. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  557. ENVELOPE DEBEZIUM""",
  558. )
  559. self.execute_with_retry_on_error(
  560. cur,
  561. f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table)}
  562. {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
  563. FROM KAFKA CONNECTION kafka_conn (TOPIC '{self.topic}')
  564. FORMAT AVRO
  565. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  566. ENVELOPE DEBEZIUM""",
  567. wait_time_in_sec=1,
  568. max_tries=15,
  569. required_error_message_substrs=[
  570. "No value schema found",
  571. "Key schema is required for ENVELOPE DEBEZIUM",
  572. "Topic does not exist",
  573. ],
  574. )
  575. def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
  576. self.logging_exe = logging_exe
  577. with self.mz_conn.cursor() as cur:
  578. for row_list in transaction.row_lists:
  579. for row in row_list.rows:
  580. key_values = tuple(
  581. value
  582. for field, value in zip(row.fields, row.values)
  583. if field.is_key
  584. )
  585. if row.operation == Operation.INSERT:
  586. values_str = ", ".join(
  587. str(formatted_value(value)) for value in row.values
  588. )
  589. self.execute(
  590. cur,
  591. f"""INSERT INTO {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
  592. VALUES ({values_str})
  593. """,
  594. )
  595. self.known_keys.add(key_values)
  596. elif row.operation == Operation.UPSERT:
  597. if key_values in self.known_keys:
  598. non_key_values = tuple(
  599. (field, value)
  600. for field, value in zip(row.fields, row.values)
  601. if not field.is_key
  602. )
  603. # Can't update anything if there are no values, only a key, and the key is already in the table
  604. if non_key_values:
  605. cond_str = " AND ".join(
  606. f"{identifier(field.name)} = {formatted_value(value)}"
  607. for field, value in zip(row.fields, row.values)
  608. if field.is_key
  609. )
  610. set_str = ", ".join(
  611. f"{identifier(field.name)} = {formatted_value(value)}"
  612. for field, value in non_key_values
  613. )
  614. self.execute(
  615. cur,
  616. f"""UPDATE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
  617. SET {set_str}
  618. WHERE {cond_str}
  619. """,
  620. )
  621. else:
  622. values_str = ", ".join(
  623. str(formatted_value(value)) for value in row.values
  624. )
  625. self.execute(
  626. cur,
  627. f"""INSERT INTO {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
  628. VALUES ({values_str})
  629. """,
  630. )
  631. self.known_keys.add(key_values)
  632. elif row.operation == Operation.DELETE:
  633. cond_str = " AND ".join(
  634. f"{identifier(field.name)} = {formatted_value(value)}"
  635. for field, value in zip(row.fields, row.values)
  636. if field.is_key
  637. )
  638. self.execute(
  639. cur,
  640. f"""DELETE FROM {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
  641. WHERE {cond_str}
  642. """,
  643. )
  644. self.known_keys.discard(key_values)
  645. else:
  646. raise ValueError(f"Unexpected operation {row.operation}")