source_tables.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. from materialize.mzcompose.services.mysql import MySql
  13. class TableFromSourceBase(Check):
  14. def generic_setup(self) -> str:
  15. return dedent(
  16. """
  17. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  18. ALTER SYSTEM SET enable_create_table_from_source = true
  19. """
  20. )
  21. @externally_idempotent(False)
  22. class TableFromPgSource(TableFromSourceBase):
  23. suffix = "tbl_from_pg_source"
  24. def initialize(self) -> Testdrive:
  25. return Testdrive(
  26. self.generic_setup()
  27. + dedent(
  28. f"""
  29. > CREATE SECRET pgpass_{self.suffix} AS 'postgres'
  30. > CREATE CONNECTION pg_conn_{self.suffix} TO POSTGRES (
  31. HOST postgres,
  32. DATABASE postgres,
  33. USER postgres,
  34. PASSWORD SECRET pgpass_{self.suffix}
  35. )
  36. $ postgres-execute connection=postgres://postgres:postgres@postgres
  37. ALTER USER postgres WITH replication;
  38. DROP SCHEMA IF EXISTS public_{self.suffix} CASCADE;
  39. CREATE SCHEMA public_{self.suffix};
  40. DROP PUBLICATION IF EXISTS mz_source_{self.suffix};
  41. CREATE PUBLICATION mz_source_{self.suffix} FOR ALL TABLES;
  42. CREATE TYPE an_enum AS ENUM ('x1', 'x2');
  43. CREATE TABLE pg_table_1 (a INTEGER, b INTEGER, c an_enum);
  44. INSERT INTO pg_table_1 VALUES (1, 1234, NULL), (2, 0, 'x1');
  45. ALTER TABLE pg_table_1 REPLICA IDENTITY FULL;
  46. CREATE TABLE pg_table_2 (a INTEGER);
  47. INSERT INTO pg_table_2 VALUES (1000), (2000);
  48. ALTER TABLE pg_table_2 REPLICA IDENTITY FULL;
  49. > CREATE SOURCE pg_source_{self.suffix} FROM POSTGRES CONNECTION pg_conn_{self.suffix} (PUBLICATION 'mz_source_{self.suffix}');
  50. > CREATE SOURCE old_pg_source_{self.suffix} FROM POSTGRES CONNECTION pg_conn_{self.suffix}
  51. (PUBLICATION 'mz_source_{self.suffix}', TEXT COLUMNS = (pg_table_1.c))
  52. FOR TABLES (pg_table_1 AS pg_table_1_old_syntax);
  53. """
  54. )
  55. )
  56. def manipulate(self) -> list[Testdrive]:
  57. return [
  58. Testdrive(dedent(s))
  59. for s in [
  60. f"""
  61. > CREATE TABLE pg_table_1 FROM SOURCE pg_source_{self.suffix}
  62. (REFERENCE "pg_table_1")
  63. WITH (TEXT COLUMNS = (c));
  64. $ postgres-execute connection=postgres://postgres:postgres@postgres
  65. INSERT INTO pg_table_1 VALUES (3, 2345, 'x2');
  66. """,
  67. f"""
  68. > CREATE TABLE pg_table_1b FROM SOURCE pg_source_{self.suffix}
  69. (REFERENCE "pg_table_1")
  70. WITH (TEXT COLUMNS = (c));
  71. > CREATE TABLE pg_table_2 FROM SOURCE pg_source_{self.suffix} (REFERENCE "pg_table_2");
  72. $ postgres-execute connection=postgres://postgres:postgres@postgres
  73. INSERT INTO pg_table_1 VALUES (4, 3456, 'x2');
  74. INSERT INTO pg_table_2 VALUES (3000);
  75. """,
  76. ]
  77. ]
  78. def validate(self) -> Testdrive:
  79. return Testdrive(
  80. dedent(
  81. """
  82. > SELECT * FROM pg_table_1;
  83. 1 1234 <null>
  84. 2 0 x1
  85. 3 2345 x2
  86. 4 3456 x2
  87. > SELECT * FROM pg_table_1b;
  88. 1 1234 <null>
  89. 2 0 x1
  90. 3 2345 x2
  91. 4 3456 x2
  92. > SELECT * FROM pg_table_2;
  93. 1000
  94. 2000
  95. 3000
  96. > SELECT * FROM pg_table_1_old_syntax;
  97. 1 1234 <null>
  98. 2 0 x1
  99. 3 2345 x2
  100. 4 3456 x2
  101. """
  102. )
  103. )
  104. @externally_idempotent(False)
  105. class TableFromMySqlSource(TableFromSourceBase):
  106. suffix = "tbl_from_mysql_source"
  107. def initialize(self) -> Testdrive:
  108. return Testdrive(
  109. self.generic_setup()
  110. + dedent(
  111. f"""
  112. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  113. > CREATE SECRET mysqlpass_{self.suffix} AS 'p@ssw0rd';
  114. > CREATE CONNECTION mysql_conn_{self.suffix} TO MYSQL (
  115. HOST mysql,
  116. USER root,
  117. PASSWORD SECRET mysqlpass_{self.suffix}
  118. )
  119. $ mysql-execute name=mysql
  120. DROP DATABASE IF EXISTS public_{self.suffix};
  121. CREATE DATABASE public_{self.suffix};
  122. USE public_{self.suffix};
  123. CREATE TABLE mysql_source_table_1 (a INTEGER, b INTEGER, y YEAR);
  124. INSERT INTO mysql_source_table_1 VALUES (1, 1234, 2024), (2, 0, 2001);
  125. CREATE TABLE mysql_source_table_2 (a INTEGER);
  126. INSERT INTO mysql_source_table_2 VALUES (1000), (2000);
  127. > CREATE SOURCE mysql_source_{self.suffix} FROM MYSQL CONNECTION mysql_conn_{self.suffix};
  128. > CREATE SOURCE old_mysql_source_{self.suffix} FROM MYSQL CONNECTION mysql_conn_{self.suffix}
  129. (TEXT COLUMNS = (public_{self.suffix}.mysql_source_table_1.y))
  130. FOR TABLES (public_{self.suffix}.mysql_source_table_1 AS mysql_table_1_old_syntax);
  131. """
  132. )
  133. )
  134. def manipulate(self) -> list[Testdrive]:
  135. return [
  136. Testdrive(dedent(s))
  137. for s in [
  138. f"""
  139. > CREATE TABLE mysql_table_1 FROM SOURCE mysql_source_{self.suffix}
  140. (REFERENCE "public_{self.suffix}"."mysql_source_table_1")
  141. WITH (TEXT COLUMNS = (y));
  142. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  143. $ mysql-execute name=mysql
  144. USE public_{self.suffix};
  145. INSERT INTO mysql_source_table_1 VALUES (3, 2345, 2000);
  146. """,
  147. f"""
  148. > CREATE TABLE mysql_table_1b FROM SOURCE mysql_source_{self.suffix}
  149. (REFERENCE "public_{self.suffix}"."mysql_source_table_1")
  150. WITH (IGNORE COLUMNS = (y));
  151. > CREATE TABLE mysql_table_2 FROM SOURCE mysql_source_{self.suffix} (REFERENCE "public_{self.suffix}"."mysql_source_table_2");
  152. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  153. $ mysql-execute name=mysql
  154. USE public_{self.suffix};
  155. INSERT INTO mysql_source_table_1 VALUES (4, 3456, NULL);
  156. INSERT INTO mysql_source_table_2 VALUES (3000);
  157. """,
  158. ]
  159. ]
  160. def validate(self) -> Testdrive:
  161. return Testdrive(
  162. dedent(
  163. """
  164. > SELECT * FROM mysql_table_1;
  165. 1 1234 2024
  166. 2 0 2001
  167. 3 2345 2000
  168. 4 3456 <null>
  169. > SELECT * FROM mysql_table_1b;
  170. 1 1234
  171. 2 0
  172. 3 2345
  173. 4 3456
  174. > SELECT * FROM mysql_table_2;
  175. 1000
  176. 2000
  177. 3000
  178. # old source syntax still working
  179. > SELECT * FROM mysql_table_1_old_syntax;
  180. 1 1234 2024
  181. 2 0 2001
  182. 3 2345 2000
  183. 4 3456 <null>
  184. """
  185. )
  186. )
  187. class TableFromLoadGenSource(TableFromSourceBase):
  188. suffix = "tbl_from_lg_source"
  189. def initialize(self) -> Testdrive:
  190. return Testdrive(
  191. self.generic_setup()
  192. + dedent(
  193. f"""
  194. > CREATE SOURCE auction_house_{self.suffix} FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301);
  195. """
  196. )
  197. )
  198. def manipulate(self) -> list[Testdrive]:
  199. return [
  200. Testdrive(dedent(s))
  201. for s in [
  202. f"""
  203. > CREATE TABLE bids_1 FROM SOURCE auction_house_{self.suffix} (REFERENCE "auction"."bids");
  204. """,
  205. f"""
  206. > CREATE TABLE bids_2 FROM SOURCE auction_house_{self.suffix} (REFERENCE "bids");
  207. """,
  208. ]
  209. ]
  210. def validate(self) -> Testdrive:
  211. return Testdrive(
  212. dedent(
  213. """
  214. > SELECT count(*) FROM bids_1;
  215. 255
  216. > SELECT count(*) FROM bids_2;
  217. 255
  218. """
  219. )
  220. )