mzcompose.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import os
  10. from textwrap import dedent
  11. from urllib.parse import quote
  12. import psycopg
  13. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  14. from materialize.mzcompose.services.dbt import Dbt
  15. from materialize.mzcompose.services.testdrive import Testdrive
  16. # The actual values are stored as Pulumi secrets in the i2 repository
  17. MATERIALIZE_PROD_SANDBOX_HOSTNAME = os.getenv("MATERIALIZE_PROD_SANDBOX_HOSTNAME")
  18. MATERIALIZE_PROD_SANDBOX_USERNAME = os.getenv("MATERIALIZE_PROD_SANDBOX_USERNAME")
  19. MATERIALIZE_PROD_SANDBOX_APP_PASSWORD = os.getenv(
  20. "MATERIALIZE_PROD_SANDBOX_APP_PASSWORD"
  21. )
  22. MATERIALIZE_PROD_SANDBOX_RDS_HOSTNAME = os.getenv(
  23. "MATERIALIZE_PROD_SANDBOX_RDS_HOSTNAME"
  24. )
  25. MATERIALIZE_PROD_SANDBOX_RDS_PASSWORD = os.getenv(
  26. "MATERIALIZE_PROD_SANDBOX_RDS_PASSWORD"
  27. )
  28. MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_HOSTNAME = os.getenv(
  29. "MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_HOSTNAME"
  30. )
  31. MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_PASSWORD = os.getenv(
  32. "MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_PASSWORD"
  33. )
  34. CONFLUENT_CLOUD_FIELDENG_KAFKA_BROKER = os.getenv(
  35. "CONFLUENT_CLOUD_FIELDENG_KAFKA_BROKER"
  36. )
  37. CONFLUENT_CLOUD_FIELDENG_CSR_URL = os.getenv("CONFLUENT_CLOUD_FIELDENG_CSR_URL")
  38. CONFLUENT_CLOUD_FIELDENG_CSR_USERNAME = os.getenv(
  39. "CONFLUENT_CLOUD_FIELDENG_CSR_USERNAME"
  40. )
  41. CONFLUENT_CLOUD_FIELDENG_CSR_PASSWORD = os.getenv(
  42. "CONFLUENT_CLOUD_FIELDENG_CSR_PASSWORD"
  43. )
  44. CONFLUENT_CLOUD_FIELDENG_KAFKA_USERNAME = os.getenv(
  45. "CONFLUENT_CLOUD_FIELDENG_KAFKA_USERNAME"
  46. )
  47. CONFLUENT_CLOUD_FIELDENG_KAFKA_PASSWORD = os.getenv(
  48. "CONFLUENT_CLOUD_FIELDENG_KAFKA_PASSWORD"
  49. )
  50. SERVICES = [
  51. Dbt(
  52. environment=[
  53. "MATERIALIZE_PROD_SANDBOX_HOSTNAME",
  54. "MATERIALIZE_PROD_SANDBOX_USERNAME",
  55. "MATERIALIZE_PROD_SANDBOX_APP_PASSWORD",
  56. ]
  57. ),
  58. Testdrive(
  59. no_reset=True,
  60. no_consistency_checks=True, # No access to HTTP for coordinator check
  61. ),
  62. ]
  63. POSTGRES_RANGE = 1024
  64. POSTGRES_RANGE_FUNCTION = "FLOOR(RANDOM() * (SELECT MAX(id) FROM people))"
  65. MYSQL_RANGE = 1024
  66. MYSQL_RANGE_FUNCTION = (
  67. "FLOOR(RAND() * (SELECT MAX(id) FROM (SELECT * FROM people) AS p))"
  68. )
  69. def workflow_create(c: Composition, parser: WorkflowArgumentParser) -> None:
  70. c.up({"name": "dbt", "persistent": True}, {"name": "testdrive", "persistent": True})
  71. assert MATERIALIZE_PROD_SANDBOX_USERNAME is not None
  72. assert MATERIALIZE_PROD_SANDBOX_APP_PASSWORD is not None
  73. assert MATERIALIZE_PROD_SANDBOX_HOSTNAME is not None
  74. materialize_url = f"postgres://{quote(MATERIALIZE_PROD_SANDBOX_USERNAME)}:{quote(MATERIALIZE_PROD_SANDBOX_APP_PASSWORD)}@{quote(MATERIALIZE_PROD_SANDBOX_HOSTNAME)}:6875"
  75. with c.override(
  76. Testdrive(
  77. default_timeout="1200s",
  78. materialize_url=materialize_url,
  79. no_reset=True, # Required so that admin port 6877 is not used
  80. no_consistency_checks=True, # No access to HTTP for coordinator check
  81. )
  82. ):
  83. c.testdrive(
  84. input=dedent(
  85. f"""
  86. > SET DATABASE=qa_canary_environment
  87. > CREATE SECRET IF NOT EXISTS kafka_username AS '{CONFLUENT_CLOUD_FIELDENG_KAFKA_USERNAME}'
  88. > CREATE SECRET IF NOT EXISTS kafka_password AS '{CONFLUENT_CLOUD_FIELDENG_KAFKA_PASSWORD}'
  89. > CREATE CONNECTION IF NOT EXISTS kafka_connection TO KAFKA (
  90. BROKER '{CONFLUENT_CLOUD_FIELDENG_KAFKA_BROKER}',
  91. SASL MECHANISMS = 'PLAIN',
  92. SASL USERNAME = SECRET kafka_username,
  93. SASL PASSWORD = SECRET kafka_password
  94. )
  95. > CREATE SECRET IF NOT EXISTS csr_username AS '{CONFLUENT_CLOUD_FIELDENG_CSR_USERNAME}'
  96. > CREATE SECRET IF NOT EXISTS csr_password AS '{CONFLUENT_CLOUD_FIELDENG_CSR_PASSWORD}'
  97. > CREATE CONNECTION IF NOT EXISTS csr_connection TO CONFLUENT SCHEMA REGISTRY (
  98. URL '{CONFLUENT_CLOUD_FIELDENG_CSR_URL}',
  99. USERNAME = SECRET csr_username,
  100. PASSWORD = SECRET csr_password
  101. )
  102. """
  103. )
  104. )
  105. c.testdrive(
  106. input=dedent(
  107. f"""
  108. > SET DATABASE=qa_canary_environment
  109. $ mysql-connect name=mysql url=mysql://admin@{MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_HOSTNAME} password={MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_PASSWORD}
  110. $ mysql-execute name=mysql
  111. DROP DATABASE IF EXISTS public;
  112. CREATE DATABASE public;
  113. USE public;
  114. CREATE TABLE IF NOT EXISTS people (id INTEGER PRIMARY KEY, name TEXT, incarnation INTEGER DEFAULT 1);
  115. CREATE TABLE IF NOT EXISTS relationships (a INTEGER, b INTEGER, incarnation INTEGER DEFAULT 1, PRIMARY KEY (a,b));
  116. CREATE EVENT insert_people ON SCHEDULE EVERY 1 SECOND DO INSERT INTO people (id, name) VALUES (FLOOR(RAND() * {MYSQL_RANGE}), 'aaaaaaaaaaaaaaaa'), (FLOOR(RAND() * {MYSQL_RANGE}), 'aaaaaaaaaaaaaaaa') ON DUPLICATE KEY UPDATE incarnation = people.incarnation + 1;
  117. CREATE EVENT update_people_name ON SCHEDULE EVERY 1 SECOND DO UPDATE people SET name = REPEAT(id, 16) WHERE id = {MYSQL_RANGE_FUNCTION};
  118. CREATE EVENT update_people_incarnation ON SCHEDULE EVERY 1 SECOND DO UPDATE people SET incarnation = incarnation + 1 WHERE id = {MYSQL_RANGE_FUNCTION};
  119. CREATE EVENT delete_people ON SCHEDULE EVERY 1 SECOND DO DELETE FROM people WHERE id = {MYSQL_RANGE_FUNCTION};
  120. -- MOD() is used to prevent truly random relationships from being created, as this overwhelms WMR
  121. -- See https://materialize.com/docs/sql/recursive-ctes/#queries-with-update-locality
  122. CREATE EVENT insert_relationships ON SCHEDULE EVERY 1 SECOND DO INSERT INTO relationships (a, b) VALUES (MOD({MYSQL_RANGE_FUNCTION}, 10), {MYSQL_RANGE_FUNCTION}), (MOD({MYSQL_RANGE_FUNCTION}, 10), {MYSQL_RANGE_FUNCTION}) ON DUPLICATE KEY UPDATE incarnation = relationships.incarnation + 1;
  123. CREATE EVENT update_relationships_incarnation ON SCHEDULE EVERY 1 SECOND DO UPDATE relationships SET incarnation = incarnation + 1 WHERE a = {MYSQL_RANGE_FUNCTION} and b = {MYSQL_RANGE_FUNCTION};
  124. CREATE EVENT delete_relationships ON SCHEDULE EVERY 1 SECOND DO DELETE FROM relationships WHERE a = {MYSQL_RANGE_FUNCTION} AND b = {MYSQL_RANGE_FUNCTION};
  125. > CREATE SECRET IF NOT EXISTS mysql_password AS '{MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_PASSWORD}'
  126. > CREATE CONNECTION IF NOT EXISTS mysql TO MYSQL (
  127. HOST '{MATERIALIZE_PROD_SANDBOX_RDS_MYSQL_HOSTNAME}',
  128. USER admin,
  129. PASSWORD SECRET mysql_password
  130. )
  131. $ postgres-execute connection=postgres://postgres:{MATERIALIZE_PROD_SANDBOX_RDS_PASSWORD}@{MATERIALIZE_PROD_SANDBOX_RDS_HOSTNAME}
  132. -- ALTER USER postgres WITH replication;
  133. DROP SCHEMA IF EXISTS public CASCADE;
  134. CREATE SCHEMA public;
  135. CREATE TABLE IF NOT EXISTS people (id INTEGER PRIMARY KEY, name TEXT DEFAULT REPEAT('a', 16), incarnation INTEGER DEFAULT 1);
  136. ALTER TABLE people REPLICA IDENTITY FULL;
  137. CREATE TABLE IF NOT EXISTS relationships (a INTEGER, b INTEGER, incarnation INTEGER DEFAULT 1, PRIMARY KEY (a,b));
  138. ALTER TABLE relationships REPLICA IDENTITY FULL;
  139. DROP PUBLICATION IF EXISTS mz_source;
  140. CREATE PUBLICATION mz_source FOR ALL TABLES;
  141. CREATE EXTENSION IF NOT EXISTS pg_cron;
  142. SELECT cron.schedule('insert-people', '1 seconds', 'INSERT INTO people (id) SELECT FLOOR(RANDOM() * {POSTGRES_RANGE}) FROM generate_series(1,2) ON CONFLICT (id) DO UPDATE SET incarnation = people.incarnation + 1');
  143. SELECT cron.schedule('update-people-name', '1 seconds', 'UPDATE people SET name = REPEAT(id::text, 16) WHERE id = {POSTGRES_RANGE_FUNCTION}');
  144. SELECT cron.schedule('update-people-incarnation', '1 seconds', 'UPDATE people SET incarnation = incarnation + 1 WHERE id = {POSTGRES_RANGE_FUNCTION}');
  145. SELECT cron.schedule('delete-people', '1 seconds', 'DELETE FROM people WHERE id = {POSTGRES_RANGE_FUNCTION}');
  146. -- MOD() is used to prevent truly random relationships from being created, as this overwhelms WMR
  147. -- See https://materialize.com/docs/sql/recursive-ctes/#queries-with-update-locality
  148. SELECT cron.schedule('insert-relationships', '1 seconds', 'INSERT INTO relationships (a,b) SELECT MOD({POSTGRES_RANGE_FUNCTION}::INTEGER, 10), {POSTGRES_RANGE_FUNCTION} FROM generate_series(1,2) ON CONFLICT (a, b) DO UPDATE SET incarnation = relationships.incarnation + 1');
  149. SELECT cron.schedule('update-relationships-incarnation', '1 seconds', 'UPDATE relationships SET incarnation = incarnation + 1 WHERE a = {POSTGRES_RANGE_FUNCTION} AND b = {POSTGRES_RANGE_FUNCTION}');
  150. SELECT cron.schedule('delete-relationships', '1 seconds', 'DELETE FROM relationships WHERE a = {POSTGRES_RANGE_FUNCTION} AND b = {POSTGRES_RANGE_FUNCTION}');
  151. > CREATE SECRET IF NOT EXISTS pg_password AS '{MATERIALIZE_PROD_SANDBOX_RDS_PASSWORD}'
  152. > CREATE CONNECTION IF NOT EXISTS pg TO POSTGRES (
  153. HOST '{MATERIALIZE_PROD_SANDBOX_RDS_HOSTNAME}',
  154. DATABASE postgres,
  155. USER postgres,
  156. PASSWORD SECRET pg_password,
  157. SSL MODE 'require'
  158. )
  159. """
  160. )
  161. )
  162. c.testdrive(
  163. input=dedent(
  164. """
  165. > SET DATABASE=qa_canary_environment
  166. > CREATE SCHEMA IF NOT EXISTS public_table;
  167. # create the table here because dbt creates it as materialized view, which will not allow inserts
  168. > CREATE TABLE IF NOT EXISTS public_table.table (c INT);
  169. > GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE public_table.table TO "infra+qacanaryload@materialize.io"
  170. > GRANT SELECT ON TABLE public_table.table TO "dennis.felsing@materialize.com"
  171. """
  172. )
  173. )
  174. c.exec("dbt", "dbt", "run", "--threads", "8", workdir="/workdir")
  175. def workflow_test(c: Composition, parser: WorkflowArgumentParser) -> None:
  176. c.up({"name": "dbt", "persistent": True})
  177. con = psycopg.connect(
  178. host=MATERIALIZE_PROD_SANDBOX_RDS_HOSTNAME,
  179. user="postgres",
  180. dbname="postgres",
  181. password=MATERIALIZE_PROD_SANDBOX_RDS_PASSWORD,
  182. sslmode="require",
  183. )
  184. try:
  185. with con.cursor() as cur:
  186. cur.execute("SELECT COUNT(*) FROM pg_replication_slots")
  187. result = cur.fetchall()
  188. assert (
  189. result[0][0] == 1
  190. ), f"""RDS Postgres has wrong number of pg_replication_slots {result[0][0]}, please fix manually to prevent Postgres from going out of disk from stalled Materialize connections:
  191. $ psql postgres://postgres:$MATERIALIZE_PROD_SANDBOX_RDS_PASSWORD@$MATERIALIZE_PROD_SANDBOX_RDS_HOSTNAME
  192. postgres=> SELECT * FROM pg_replication_slots;
  193. postgres=> SELECT pg_drop_replication_slot('...');
  194. """
  195. finally:
  196. con.close()
  197. c.exec("dbt", "dbt", "test", workdir="/workdir")
  198. def workflow_clean(c: Composition, parser: WorkflowArgumentParser) -> None:
  199. c.exec("dbt", "dbt", "clean", workdir="/workdir")