123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import json
- import random
- import time
- from typing import Any
- import confluent_kafka # type: ignore
- import psycopg
- import pymysql
- import pymysql.cursors
- from confluent_kafka.admin import AdminClient # type: ignore
- from confluent_kafka.schema_registry import Schema, SchemaRegistryClient # type: ignore
- from confluent_kafka.schema_registry.avro import AvroSerializer # type: ignore
- from confluent_kafka.serialization import ( # type: ignore
- MessageField,
- SerializationContext,
- )
- from pg8000.native import identifier
- from psycopg.errors import OperationalError
- from materialize.data_ingest.data_type import Backend
- from materialize.data_ingest.field import Field, formatted_value
- from materialize.data_ingest.query_error import QueryError
- from materialize.data_ingest.row import Operation
- from materialize.data_ingest.transaction import Transaction
- from materialize.mzcompose.services.mysql import MySql
- class Executor:
- num_transactions: int
- ports: dict[str, int]
- mz_conn: psycopg.Connection
- fields: list[Field]
- database: str
- schema: str
- cluster: str | None
- logging_exe: Any | None
- mz_service: str | None = None
- def __init__(
- self,
- ports: dict[str, int],
- fields: list[Field] = [],
- database: str = "",
- schema: str = "public",
- cluster: str | None = None,
- mz_service: str | None = None,
- ) -> None:
- self.num_transactions = 0
- self.ports = ports
- self.fields = fields
- self.database = database
- self.schema = schema
- self.cluster = cluster
- self.mz_service = mz_service
- self.logging_exe = None
- self.reconnect()
- def reconnect(self) -> None:
- mz_service = self.mz_service
- if not mz_service:
- mz_service = (
- random.choice(["materialized", "materialized2"])
- if "materialized2" in self.ports
- else "materialized"
- )
- self.mz_conn = psycopg.connect(
- host="localhost",
- port=self.ports[mz_service],
- user="materialize",
- dbname=self.database,
- )
- self.mz_conn.autocommit = True
- def create(self, logging_exe: Any | None = None) -> None:
- raise NotImplementedError
- def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
- raise NotImplementedError
- def execute(self, cur: psycopg.Cursor | pymysql.cursors.Cursor, query: str) -> None:
- if self.logging_exe is not None:
- self.logging_exe.log(query)
- try:
- (
- cur.execute(query.encode())
- if isinstance(cur, psycopg.Cursor)
- else cur.execute(query)
- )
- except OperationalError:
- # Can happen after Mz disruptions if we are running queries against Mz
- print("Network error, retrying")
- time.sleep(0.01)
- self.reconnect()
- with self.mz_conn.cursor() as cur:
- self.execute(cur, query)
- except Exception as e:
- print(f"Query failed: {query} {e}")
- raise QueryError(str(e), query)
- def execute_with_retry_on_error(
- self,
- cur: psycopg.Cursor,
- query: str,
- required_error_message_substrs: list[str],
- max_tries: int = 5,
- wait_time_in_sec: int = 1,
- ) -> None:
- for try_count in range(1, max_tries + 1):
- try:
- self.execute(cur, query)
- return
- except Exception as e:
- if not any([s in e.__str__() for s in required_error_message_substrs]):
- raise
- elif try_count == max_tries:
- raise
- else:
- time.sleep(wait_time_in_sec)
- class PrintExecutor(Executor):
- def create(self, logging_exe: Any | None = None) -> None:
- pass
- def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
- print("Transaction:")
- print(" ", transaction.row_lists)
- def delivery_report(err: str, msg: Any) -> None:
- assert err is None, f"Delivery failed for User record {msg.key()}: {err}"
- class KafkaExecutor(Executor):
- producer: confluent_kafka.Producer
- avro_serializer: AvroSerializer
- key_avro_serializer: AvroSerializer
- serialization_context: SerializationContext
- key_serialization_context: SerializationContext
- topic: str
- table: str
- def __init__(
- self,
- num: int,
- ports: dict[str, int],
- fields: list[Field],
- database: str,
- schema: str = "public",
- cluster: str | None = None,
- mz_service: str | None = None,
- ):
- super().__init__(ports, fields, database, schema, cluster, mz_service)
- self.topic = f"data-ingest-{num}"
- self.table = f"kafka_table{num}"
- def create(self, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- schema = {
- "type": "record",
- "name": "value",
- "fields": [
- {
- "name": field.name,
- "type": str(field.data_type.name(Backend.AVRO)).lower(),
- }
- for field in self.fields
- if not field.is_key
- ],
- }
- key_schema = {
- "type": "record",
- "name": "key",
- "fields": [
- {
- "name": field.name,
- "type": str(field.data_type.name(Backend.AVRO)).lower(),
- }
- for field in self.fields
- if field.is_key
- ],
- }
- kafka_conf = {"bootstrap.servers": f"localhost:{self.ports['kafka']}"}
- a = AdminClient(kafka_conf)
- fs = a.create_topics(
- [
- confluent_kafka.admin.NewTopic( # type: ignore
- self.topic, num_partitions=1, replication_factor=1
- )
- ]
- )
- for topic, f in fs.items():
- f.result()
- # NOTE: this _could_ be refactored, but since we are fairly certain at
- # this point there will be exactly one topic it should be fine.
- topic = list(fs.keys())[0]
- schema_registry_conf = {
- "url": f"http://localhost:{self.ports['schema-registry']}"
- }
- registry = SchemaRegistryClient(schema_registry_conf)
- self.avro_serializer = AvroSerializer(
- registry, json.dumps(schema), lambda d, ctx: d
- )
- self.key_avro_serializer = AvroSerializer(
- registry, json.dumps(key_schema), lambda d, ctx: d
- )
- if logging_exe is not None:
- logging_exe.log(f"{topic}-value: {json.dumps(schema)}")
- logging_exe.log(f"{topic}-key: {json.dumps(key_schema)}")
- registry.register_schema(
- f"{topic}-value", Schema(json.dumps(schema), schema_type="AVRO")
- )
- registry.register_schema(
- f"{topic}-key", Schema(json.dumps(key_schema), schema_type="AVRO")
- )
- self.serialization_context = SerializationContext(
- self.topic, MessageField.VALUE
- )
- self.key_serialization_context = SerializationContext(
- self.topic, MessageField.KEY
- )
- self.producer = confluent_kafka.Producer(kafka_conf)
- with self.mz_conn.cursor() as cur:
- self.execute_with_retry_on_error(
- cur,
- f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table)}
- {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
- FROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC '{self.topic}')
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION materialize.public.csr_conn
- ENVELOPE UPSERT""",
- required_error_message_substrs=[
- "Topic does not exist",
- ],
- )
- def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- for row_list in transaction.row_lists:
- for row in row_list.rows:
- if (
- row.operation == Operation.INSERT
- or row.operation == Operation.UPSERT
- ):
- self.producer.produce(
- topic=self.topic,
- key=self.key_avro_serializer(
- {
- field.name: value
- for field, value in zip(row.fields, row.values)
- if field.is_key
- },
- self.key_serialization_context,
- ),
- value=self.avro_serializer(
- {
- field.name: value
- for field, value in zip(row.fields, row.values)
- if not field.is_key
- },
- self.serialization_context,
- ),
- on_delivery=delivery_report,
- )
- elif row.operation == Operation.DELETE:
- self.producer.produce(
- topic=self.topic,
- key=self.key_avro_serializer(
- {
- field.name: value
- for field, value in zip(row.fields, row.values)
- if field.is_key
- },
- self.key_serialization_context,
- ),
- value=None,
- on_delivery=delivery_report,
- )
- else:
- raise ValueError(f"Unexpected operation {row.operation}")
- self.producer.flush()
- class MySqlExecutor(Executor):
- mysql_conn: pymysql.Connection
- table: str
- source: str
- num: int
- def __init__(
- self,
- num: int,
- ports: dict[str, int],
- fields: list[Field],
- database: str,
- schema: str = "public",
- cluster: str | None = None,
- mz_service: str | None = None,
- ):
- super().__init__(ports, fields, database, schema, cluster, mz_service)
- self.table = f"mytable{num}"
- self.source = f"mysql_source{num}"
- self.num = num
- def create(self, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- self.mysql_conn = pymysql.connect(
- host="localhost",
- user="root",
- password=MySql.DEFAULT_ROOT_PASSWORD,
- database="mysql",
- port=self.ports["mysql"],
- )
- values = [
- f"`{field.name}` {str(field.data_type.name(Backend.MYSQL)).lower()}"
- for field in self.fields
- ]
- keys = [field.name for field in self.fields if field.is_key]
- self.mysql_conn.autocommit(True)
- with self.mysql_conn.cursor() as cur:
- self.execute(cur, f"DROP TABLE IF EXISTS `{self.table}`;")
- primary_key = (
- f", PRIMARY KEY ({', '.join([f'`{key}`' for key in keys])})"
- if keys
- else ""
- )
- self.execute(
- cur,
- f"CREATE TABLE `{self.table}` ({', '.join(values)} {primary_key});",
- )
- self.mysql_conn.autocommit(False)
- with self.mz_conn.cursor() as cur:
- self.execute(
- cur,
- f"CREATE SECRET IF NOT EXISTS mypass AS '{MySql.DEFAULT_ROOT_PASSWORD}'",
- )
- self.execute(
- cur,
- f"""CREATE CONNECTION mysql{self.num} FOR MYSQL
- HOST 'mysql',
- USER root,
- PASSWORD SECRET mypass""",
- )
- self.execute(
- cur,
- f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
- {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
- FROM MYSQL CONNECTION mysql{self.num}
- """,
- )
- self.execute(
- cur,
- f"""CREATE TABLE {identifier(self.table)}
- FROM SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
- (REFERENCE mysql.{identifier(self.table)})""",
- )
- def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- with self.mysql_conn.cursor() as cur:
- for row_list in transaction.row_lists:
- for row in row_list.rows:
- if row.operation == Operation.INSERT:
- values_str = ", ".join(
- str(formatted_value(value)) for value in row.values
- )
- self.execute(
- cur,
- f"""INSERT INTO `{self.table}`
- VALUES ({values_str})
- """,
- )
- elif row.operation == Operation.UPSERT:
- values_str = ", ".join(
- str(formatted_value(value)) for value in row.values
- )
- ", ".join(
- f"`{field.name}`" for field in row.fields if field.is_key
- )
- update_str = ", ".join(
- f"`{field.name}` = VALUES(`{field.name}`)"
- for field in row.fields
- )
- self.execute(
- cur,
- f"""INSERT INTO `{self.table}`
- VALUES ({values_str})
- ON DUPLICATE KEY
- UPDATE {update_str}
- """,
- )
- elif row.operation == Operation.DELETE:
- cond_str = " AND ".join(
- f"`{field.name}` = {formatted_value(value)}"
- for field, value in zip(row.fields, row.values)
- if field.is_key
- )
- self.execute(
- cur,
- f"""DELETE FROM `{self.table}`
- WHERE {cond_str}
- """,
- )
- else:
- raise ValueError(f"Unexpected operation {row.operation}")
- self.mysql_conn.commit()
- class PgExecutor(Executor):
- pg_conn: psycopg.Connection
- table: str
- source: str
- num: int
- def __init__(
- self,
- num: int,
- ports: dict[str, int],
- fields: list[Field],
- database: str,
- schema: str = "public",
- cluster: str | None = None,
- mz_service: str | None = None,
- ):
- super().__init__(ports, fields, database, schema, cluster, mz_service)
- self.table = f"table{num}"
- self.source = f"postgres_source{num}"
- self.num = num
- def create(self, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- self.pg_conn = psycopg.connect(
- host="localhost",
- user="postgres",
- password="postgres",
- port=self.ports["postgres"],
- )
- values = [
- f"{identifier(field.name)} {str(field.data_type.name(Backend.POSTGRES)).lower()}"
- for field in self.fields
- ]
- keys = [field.name for field in self.fields if field.is_key]
- self.pg_conn.autocommit = True
- with self.pg_conn.cursor() as cur:
- self.execute(
- cur,
- f"""DROP TABLE IF EXISTS {identifier(self.table)};
- CREATE TABLE {identifier(self.table)} (
- {", ".join(values)},
- PRIMARY KEY ({", ".join([identifier(key) for key in keys])}));
- ALTER TABLE {identifier(self.table)} REPLICA IDENTITY FULL;
- CREATE USER postgres{self.num} WITH SUPERUSER PASSWORD 'postgres';
- ALTER USER postgres{self.num} WITH replication;
- DROP PUBLICATION IF EXISTS {self.source};
- CREATE PUBLICATION {self.source} FOR ALL TABLES;""",
- )
- self.pg_conn.autocommit = False
- with self.mz_conn.cursor() as cur:
- self.execute(cur, f"CREATE SECRET pgpass{self.num} AS 'postgres'")
- self.execute(
- cur,
- f"""CREATE CONNECTION pg{self.num} FOR POSTGRES
- HOST 'postgres',
- DATABASE postgres,
- USER postgres{self.num},
- PASSWORD SECRET pgpass{self.num}""",
- )
- self.execute(
- cur,
- f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
- {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
- FROM POSTGRES CONNECTION pg{self.num} (PUBLICATION '{self.source}')
- """,
- )
- self.execute(
- cur,
- f"""CREATE TABLE {identifier(self.table)}
- FROM SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
- (REFERENCE {identifier(self.table)})""",
- )
- def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- with self.pg_conn.cursor() as cur:
- for row_list in transaction.row_lists:
- for row in row_list.rows:
- if row.operation == Operation.INSERT:
- values_str = ", ".join(
- str(formatted_value(value)) for value in row.values
- )
- self.execute(
- cur,
- f"""INSERT INTO {identifier(self.table)}
- VALUES ({values_str})
- """,
- )
- elif row.operation == Operation.UPSERT:
- values_str = ", ".join(
- str(formatted_value(value)) for value in row.values
- )
- keys_str = ", ".join(
- identifier(field.name)
- for field in row.fields
- if field.is_key
- )
- update_str = ", ".join(
- f"{identifier(field.name)} = EXCLUDED.{identifier(field.name)}"
- for field in row.fields
- )
- self.execute(
- cur,
- f"""INSERT INTO {identifier(self.table)}
- VALUES ({values_str})
- ON CONFLICT ({keys_str})
- DO UPDATE SET {update_str}
- """,
- )
- elif row.operation == Operation.DELETE:
- cond_str = " AND ".join(
- f"{identifier(field.name)} = {formatted_value(value)}"
- for field, value in zip(row.fields, row.values)
- if field.is_key
- )
- self.execute(
- cur,
- f"""DELETE FROM {identifier(self.table)}
- WHERE {cond_str}
- """,
- )
- else:
- raise ValueError(f"Unexpected operation {row.operation}")
- self.pg_conn.commit()
- class KafkaRoundtripExecutor(Executor):
- table: str
- table_original: str
- topic: str
- known_keys: set[tuple[str]]
- num: int
- def __init__(
- self,
- num: int,
- ports: dict[str, int],
- fields: list[Field],
- database: str,
- schema: str = "public",
- cluster: str | None = None,
- mz_service: str | None = None,
- ):
- super().__init__(ports, fields, database, schema, cluster, mz_service)
- self.table_original = f"table_rt_source{num}"
- self.table = f"table_rt{num}"
- self.topic = f"data-ingest-rt-{num}"
- self.num = num
- self.known_keys = set()
- def create(self, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- values = [
- f"{field.name} {str(field.data_type.name(Backend.MATERIALIZE)).lower()}"
- for field in self.fields
- ]
- keys = [field.name for field in self.fields if field.is_key]
- with self.mz_conn.cursor() as cur:
- self.execute(cur, f"DROP TABLE IF EXISTS {identifier(self.table_original)}")
- self.execute(
- cur,
- f"""CREATE TABLE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)} (
- {", ".join(values)},
- PRIMARY KEY ({", ".join(keys)}));""",
- )
- self.execute(
- cur,
- f"""CREATE SINK {identifier(self.database)}.{identifier(self.schema)}.sink{self.num}
- {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
- FROM {identifier(self.table_original)}
- INTO KAFKA CONNECTION kafka_conn (TOPIC '{self.topic}')
- KEY ({", ".join([identifier(key) for key in keys])})
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM""",
- )
- self.execute_with_retry_on_error(
- cur,
- f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table)}
- {f"IN CLUSTER {identifier(self.cluster)}" if self.cluster else ""}
- FROM KAFKA CONNECTION kafka_conn (TOPIC '{self.topic}')
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM""",
- wait_time_in_sec=1,
- max_tries=15,
- required_error_message_substrs=[
- "No value schema found",
- "Key schema is required for ENVELOPE DEBEZIUM",
- "Topic does not exist",
- ],
- )
- def run(self, transaction: Transaction, logging_exe: Any | None = None) -> None:
- self.logging_exe = logging_exe
- with self.mz_conn.cursor() as cur:
- for row_list in transaction.row_lists:
- for row in row_list.rows:
- key_values = tuple(
- value
- for field, value in zip(row.fields, row.values)
- if field.is_key
- )
- if row.operation == Operation.INSERT:
- values_str = ", ".join(
- str(formatted_value(value)) for value in row.values
- )
- self.execute(
- cur,
- f"""INSERT INTO {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
- VALUES ({values_str})
- """,
- )
- self.known_keys.add(key_values)
- elif row.operation == Operation.UPSERT:
- if key_values in self.known_keys:
- non_key_values = tuple(
- (field, value)
- for field, value in zip(row.fields, row.values)
- if not field.is_key
- )
- # Can't update anything if there are no values, only a key, and the key is already in the table
- if non_key_values:
- cond_str = " AND ".join(
- f"{identifier(field.name)} = {formatted_value(value)}"
- for field, value in zip(row.fields, row.values)
- if field.is_key
- )
- set_str = ", ".join(
- f"{identifier(field.name)} = {formatted_value(value)}"
- for field, value in non_key_values
- )
- self.execute(
- cur,
- f"""UPDATE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
- SET {set_str}
- WHERE {cond_str}
- """,
- )
- else:
- values_str = ", ".join(
- str(formatted_value(value)) for value in row.values
- )
- self.execute(
- cur,
- f"""INSERT INTO {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
- VALUES ({values_str})
- """,
- )
- self.known_keys.add(key_values)
- elif row.operation == Operation.DELETE:
- cond_str = " AND ".join(
- f"{identifier(field.name)} = {formatted_value(value)}"
- for field, value in zip(row.fields, row.values)
- if field.is_key
- )
- self.execute(
- cur,
- f"""DELETE FROM {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
- WHERE {cond_str}
- """,
- )
- self.known_keys.discard(key_values)
- else:
- raise ValueError(f"Unexpected operation {row.operation}")
|