mzcompose.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Verify how much storage is being used, based on mz_storage_usage and
  11. mz_recent_storage_usage.
  12. """
  13. import time
  14. from dataclasses import dataclass
  15. from textwrap import dedent
  16. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  17. from materialize.mzcompose.services.materialized import Materialized
  18. from materialize.mzcompose.services.postgres import Postgres
  19. from materialize.mzcompose.services.redpanda import Redpanda
  20. from materialize.mzcompose.services.testdrive import Testdrive
  21. COLLECTION_INTERVAL_SECS = 5
  22. PG_CDC_SETUP = dedent(
  23. """
  24. > CREATE SECRET pgpass AS 'postgres'
  25. > CREATE CONNECTION pg TO POSTGRES (
  26. HOST postgres,
  27. DATABASE postgres,
  28. USER postgres,
  29. PASSWORD SECRET pgpass
  30. )
  31. $ postgres-execute connection=postgres://postgres:postgres@postgres
  32. ALTER USER postgres WITH replication;
  33. DROP SCHEMA IF EXISTS public CASCADE;
  34. CREATE SCHEMA public;
  35. DROP PUBLICATION IF EXISTS mz_source;
  36. CREATE PUBLICATION mz_source FOR ALL TABLES;
  37. """
  38. )
  39. KAFKA_SETUP = dedent(
  40. """
  41. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  42. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  43. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  44. URL '${testdrive.schema-registry-url}'
  45. );
  46. $ set key-schema={"type": "string"}
  47. $ set value-schema={"type": "record", "name": "r", "fields": [{"name": "a", "type": "string"}]}
  48. """
  49. )
  50. SERVICES = [
  51. Redpanda(),
  52. Postgres(),
  53. Materialized(
  54. environment_extra=[
  55. f"MZ_STORAGE_USAGE_COLLECTION_INTERVAL={COLLECTION_INTERVAL_SECS}s"
  56. ],
  57. additional_system_parameter_defaults={"persist_rollup_threshold": "20"},
  58. ),
  59. Testdrive(default_timeout="120s", no_reset=True),
  60. ]
  61. @dataclass
  62. class DatabaseObject:
  63. name: str
  64. testdrive: str
  65. expected_size: int
  66. database_objects = [
  67. DatabaseObject(
  68. name="table_insert_unique_rows",
  69. testdrive=dedent(
  70. """
  71. > CREATE TABLE obj (f1 TEXT)
  72. > INSERT INTO obj SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
  73. """
  74. ),
  75. expected_size=1024 * 1024,
  76. ),
  77. # Identical rows should cause a diff > 1 and not be stored individually
  78. DatabaseObject(
  79. name="table_insert_identical_rows",
  80. testdrive=dedent(
  81. """
  82. > CREATE TABLE obj (f1 TEXT)
  83. > INSERT INTO obj SELECT REPEAT('x', 1024 * 1024) FROM generate_series(1, 1024)
  84. """
  85. ),
  86. expected_size=1024 * 1024,
  87. ),
  88. # Deleted/updated rows should be garbage-collected
  89. # https://github.com/MaterializeInc/database-issues/issues/4313
  90. # DatabaseObject(
  91. # name="table_delete",
  92. # testdrive=dedent(
  93. # f"""
  94. # > CREATE TABLE obj (f1 TEXT)
  95. # > INSERT INTO obj SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
  96. # > SELECT mz_unsafe.mz_sleep({COLLECTION_INTERVAL_SECS} + 1)
  97. # <null>
  98. # > DELETE FROM obj;
  99. # """
  100. # ),
  101. # expected_size=???,
  102. # ),
  103. # DatabaseObject(
  104. # name="upsert_update",
  105. # testdrive=KAFKA_SETUP+ dedent(
  106. # f"""
  107. # $ kafka-create-topic topic=upsert-update
  108. #
  109. # $ kafka-ingest format=avro topic=upsert-update key-format=avro key-schema=${{key-schema}} schema=${{value-schema}}
  110. # "${{kafka-ingest.iteration}}" {{"a": "0"}}
  111. #
  112. # > CREATE SOURCE obj
  113. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-update-${{testdrive.seed}}')
  114. #
  115. # > CREATE TABLE obj_tbl
  116. # FROM SOURCE obj (REFERENCE "testdrive-upsert-update-${{testdrive.seed}}")
  117. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  118. # ENVELOPE UPSERT
  119. # """) + "\n".join([dedent(
  120. # f"""
  121. # $ kafka-ingest format=avro topic=upsert-update key-format=avro key-schema=${{key-schema}} schema=${{value-schema}} repeat=5000000
  122. # "${{kafka-ingest.iteration}}" {{"a": "{i}"}}
  123. # """
  124. # ) for i in range(1,11)]) + dedent(
  125. # """
  126. # > SELECT COUNT(*) FROM obj_tbl WHERE a::integer = 10;
  127. # 5000000
  128. # """
  129. # ),
  130. # expected_size=???,
  131. # ),
  132. DatabaseObject(
  133. name="materialized_view_constant",
  134. testdrive=dedent(
  135. """
  136. > CREATE MATERIALIZED VIEW obj AS SELECT generate_series::text , REPEAT('x', 1024) FROM generate_series(1, 1024)
  137. """
  138. ),
  139. # Dictionary encoding in Persist greatly reduces the size of repeated characters.
  140. expected_size=1024 * 10,
  141. ),
  142. # If a materialized view returns a small number of rows,
  143. # it should not require storage proportional to its input
  144. DatabaseObject(
  145. name="materialized_view_small_output",
  146. testdrive=dedent(
  147. """
  148. > CREATE TABLE t1 (f1 TEXT)
  149. > INSERT INTO t1 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
  150. > CREATE MATERIALIZED VIEW obj AS SELECT COUNT(*) FROM t1;
  151. """
  152. ),
  153. expected_size=4 * 1024,
  154. ),
  155. # The pg-cdc source is expected to be empty. The data is in the sub-source
  156. DatabaseObject(
  157. name="pg_cdc_source",
  158. testdrive=PG_CDC_SETUP
  159. + dedent(
  160. """
  161. $ postgres-execute connection=postgres://postgres:postgres@postgres
  162. CREATE TABLE pg_table (f1 TEXT);
  163. INSERT INTO pg_table SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024);
  164. ALTER TABLE pg_table REPLICA IDENTITY FULL;
  165. > CREATE SOURCE obj
  166. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  167. > CREATE TABLE pg_table FROM SOURCE obj (REFERENCE pg_table);
  168. """
  169. ),
  170. expected_size=4 * 1024,
  171. ),
  172. # The pg-cdc data is expected to be in the sub-source,
  173. # unaffected by the presence of other tables
  174. DatabaseObject(
  175. name="pg_cdc_subsource",
  176. testdrive=PG_CDC_SETUP
  177. + dedent(
  178. """
  179. $ postgres-execute connection=postgres://postgres:postgres@postgres
  180. CREATE TABLE pg_table1 (f1 TEXT);
  181. INSERT INTO pg_table1 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024);
  182. ALTER TABLE pg_table1 REPLICA IDENTITY FULL;
  183. CREATE TABLE pg_table2 (f1 TEXT);
  184. INSERT INTO pg_table2 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
  185. ALTER TABLE pg_table2 REPLICA IDENTITY FULL;
  186. CREATE TABLE pg_table3 (f1 TEXT);
  187. INSERT INTO pg_table3 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
  188. ALTER TABLE pg_table3 REPLICA IDENTITY FULL;
  189. > CREATE SOURCE pg_source
  190. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  191. > CREATE TABLE obj FROM SOURCE pg_source (REFERENCE pg_table1);
  192. > SELECT COUNT(*) FROM obj;
  193. 1024
  194. """
  195. ),
  196. expected_size=1024 * 1024,
  197. ),
  198. ]
  199. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  200. """Create various database objects and confirm that their storage
  201. as reported in the mz_storage_usage table are as expected.
  202. """
  203. parser.add_argument("tests", nargs="*", default=None, help="run specified tests")
  204. args = parser.parse_args()
  205. c.up(
  206. "redpanda",
  207. "postgres",
  208. "materialized",
  209. {"name": "testdrive", "persistent": True},
  210. )
  211. for database_object in database_objects:
  212. if (
  213. args.tests is not None
  214. and len(args.tests) > 0
  215. and database_object.name not in args.tests
  216. ):
  217. continue
  218. print(f"Running scenario {database_object.name} ...")
  219. c.testdrive(
  220. dedent(
  221. """
  222. $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
  223. DROP SCHEMA IF EXISTS public CASCADE;
  224. CREATE SCHEMA public;
  225. GRANT ALL PRIVILEGES ON SCHEMA public TO materialize;
  226. """
  227. )
  228. )
  229. c.testdrive(database_object.testdrive)
  230. # Make sure the storage is fully accounted for
  231. print(
  232. f"Sleeping for {COLLECTION_INTERVAL_SECS + 1} seconds so that collection kicks in ..."
  233. )
  234. time.sleep(COLLECTION_INTERVAL_SECS + 1)
  235. c.testdrive(
  236. dedent(
  237. f"""
  238. $ set-regex match=\\d+ replacement=<SIZE>
  239. # Select the raw size as well, so if this errors in testdrive, its easier to debug.
  240. > SELECT size_bytes, size_bytes BETWEEN {database_object.expected_size//3} AND {database_object.expected_size*3}
  241. FROM mz_storage_usage
  242. WHERE collection_timestamp = ( SELECT MAX(collection_timestamp) FROM mz_storage_usage )
  243. AND object_id = ( SELECT id FROM mz_objects WHERE name = 'obj' );
  244. <SIZE> true
  245. > SELECT size_bytes, size_bytes BETWEEN {database_object.expected_size//3} AND {database_object.expected_size*3}
  246. FROM mz_recent_storage_usage
  247. WHERE object_id = ( SELECT id FROM mz_objects WHERE name = 'obj' );
  248. <SIZE> true
  249. """
  250. )
  251. )