123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- from textwrap import dedent
- from materialize.mzcompose.composition import Composition
- from materialize.mzcompose.services.mysql import MySql
- from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
- from materialize.zippy.framework import Action, Capabilities, Capability, State
- from materialize.zippy.mysql_capabilities import MySqlRunning, MySqlTableExists
- from materialize.zippy.mz_capabilities import MzIsRunning
- class MySqlStart(Action):
- """Start a MySQL instance."""
- def provides(self) -> list[Capability]:
- return [MySqlRunning()]
- def run(self, c: Composition, state: State) -> None:
- c.up("mysql")
- c.testdrive(
- dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- DROP DATABASE IF EXISTS public;
- CREATE DATABASE public;
- USE public;
- """
- ),
- mz_service=state.mz_service,
- )
- class MySqlStop(Action):
- """Stop the MySQL instance."""
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {MySqlRunning}
- def withholds(self) -> set[type[Capability]]:
- return {MySqlRunning}
- def run(self, c: Composition, state: State) -> None:
- c.kill("mysql")
- class MySqlRestart(Action):
- """Restart the MySql instance."""
- def run(self, c: Composition, state: State) -> None:
- c.kill("mysql")
- c.up("mysql")
- class CreateMySqlTable(Action):
- """Creates a table on the MySql instance. 50% of the tables have a PK."""
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {BalancerdIsRunning, MzIsRunning, MySqlRunning}
- def __init__(self, capabilities: Capabilities) -> None:
- this_mysql_table = MySqlTableExists(name="table" + str(random.randint(1, 10)))
- existing_mysql_tables = [
- t
- for t in capabilities.get(MySqlTableExists)
- if t.name == this_mysql_table.name
- ]
- if len(existing_mysql_tables) == 0:
- self.new_mysql_table = True
- # A PK is now required for Debezium
- this_mysql_table.has_pk = True
- self.mysql_table = this_mysql_table
- elif len(existing_mysql_tables) == 1:
- self.new_mysql_table = False
- self.mysql_table = existing_mysql_tables[0]
- else:
- raise RuntimeError("More than one table exists")
- super().__init__(capabilities)
- def run(self, c: Composition, state: State) -> None:
- if self.new_mysql_table:
- primary_key = "PRIMARY KEY" if self.mysql_table.has_pk else ""
- c.testdrive(
- dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- CREATE TABLE {self.mysql_table.name} (f1 INTEGER {primary_key});
- INSERT INTO {self.mysql_table.name} VALUES ({self.mysql_table.watermarks.max});
- """
- ),
- mz_service=state.mz_service,
- )
- def provides(self) -> list[Capability]:
- return [self.mysql_table] if self.new_mysql_table else []
- class MySqlDML(Action):
- """Performs an INSERT, DELETE or UPDATE against a MySQL table."""
- # We use smaller batches in Pg then in Mz because Pg will fill up much faster
- MAX_BATCH_SIZE = 10000
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {BalancerdIsRunning, MzIsRunning, MySqlRunning, MySqlTableExists}
- def __init__(self, capabilities: Capabilities) -> None:
- self.mysql_table = random.choice(capabilities.get(MySqlTableExists))
- self.delta = random.randint(1, MySqlDML.MAX_BATCH_SIZE)
- super().__init__(capabilities)
- def __str__(self) -> str:
- return f"{Action.__str__(self)} {self.mysql_table.name}"
- class MySqlInsert(MySqlDML):
- """Inserts rows into a MySQL table."""
- def run(self, c: Composition, state: State) -> None:
- prev_max = self.mysql_table.watermarks.max
- self.mysql_table.watermarks.max = prev_max + self.delta
- c.testdrive(
- dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- SET @i:={prev_max};
- INSERT INTO {self.mysql_table.name} SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.mysql_table.watermarks.max - prev_max};
- """
- ),
- mz_service=state.mz_service,
- )
- class MySqlShiftForward(MySqlDML):
- """Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)"""
- def run(self, c: Composition, state: State) -> None:
- if not self.mysql_table.has_pk:
- self.mysql_table.watermarks.shift(self.delta)
- c.testdrive(
- dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- UPDATE {self.mysql_table.name} SET f1 = f1 + {self.delta};
- """
- ),
- mz_service=state.mz_service,
- )
- class MySqlShiftBackward(MySqlDML):
- """Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)"""
- def run(self, c: Composition, state: State) -> None:
- if not self.mysql_table.has_pk:
- self.mysql_table.watermarks.shift(-self.delta)
- c.testdrive(
- dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- UPDATE {self.mysql_table.name} SET f1 = f1 - {self.delta};
- """
- ),
- mz_service=state.mz_service,
- )
- class MySqlDeleteFromHead(MySqlDML):
- """Delete the largest values from a MySQL table"""
- def run(self, c: Composition, state: State) -> None:
- self.mysql_table.watermarks.max = max(
- self.mysql_table.watermarks.max - self.delta,
- self.mysql_table.watermarks.min,
- )
- c.testdrive(
- dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- DELETE FROM {self.mysql_table.name} WHERE f1 > {self.mysql_table.watermarks.max};
- """
- ),
- mz_service=state.mz_service,
- )
- class MySqlDeleteFromTail(MySqlDML):
- """Delete the smallest values from a MySQL table"""
- def run(self, c: Composition, state: State) -> None:
- self.mysql_table.watermarks.min = min(
- self.mysql_table.watermarks.min + self.delta,
- self.mysql_table.watermarks.max,
- )
- c.testdrive(
- dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public;
- DELETE FROM {self.mysql_table.name} WHERE f1 < {self.mysql_table.watermarks.min};
- """
- ),
- mz_service=state.mz_service,
- )
|