mzcompose.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Testdrive-based tests involving restarting materialized (including its clusterd
  11. processes). See cluster tests for separate clusterds, see platform-checks for
  12. further restart scenarios.
  13. """
  14. import json
  15. import time
  16. from textwrap import dedent
  17. import requests
  18. from psycopg.errors import (
  19. InternalError_,
  20. OperationalError,
  21. )
  22. from materialize import buildkite
  23. from materialize.mzcompose.composition import Composition
  24. from materialize.mzcompose.services.kafka import Kafka
  25. from materialize.mzcompose.services.materialized import Materialized
  26. from materialize.mzcompose.services.mz import Mz
  27. from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata
  28. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  29. from materialize.mzcompose.services.testdrive import Testdrive
  30. from materialize.mzcompose.services.zookeeper import Zookeeper
  31. from materialize.ui import UIError
  32. testdrive_no_reset = Testdrive(name="testdrive_no_reset", no_reset=True)
  33. SERVICES = [
  34. Zookeeper(),
  35. Kafka(auto_create_topics=True),
  36. SchemaRegistry(),
  37. Mz(app_password=""),
  38. Materialized(),
  39. Testdrive(
  40. entrypoint_extra=[
  41. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  42. ],
  43. ),
  44. testdrive_no_reset,
  45. CockroachOrPostgresMetadata(),
  46. ]
  47. def workflow_retain_history(c: Composition) -> None:
  48. def check_retain_history(name: str):
  49. start = time.time()
  50. while True:
  51. ts = c.sql_query(
  52. f"EXPLAIN TIMESTAMP AS JSON FOR SELECT * FROM retain_{name}"
  53. )
  54. ts = ts[0][0]
  55. ts = json.loads(ts)
  56. source = ts["sources"][0]
  57. since = source["read_frontier"][0]
  58. upper = source["write_frontier"][0]
  59. if upper - since > 2000:
  60. break
  61. end = time.time()
  62. # seconds since start
  63. elapsed = end - start
  64. if elapsed > 10:
  65. raise UIError("timeout hit while waiting for retain history")
  66. time.sleep(0.5)
  67. def check_retain_history_for(names: list[str]):
  68. for name in names:
  69. check_retain_history(name)
  70. c.up("materialized")
  71. c.sql(
  72. "ALTER SYSTEM SET enable_logical_compaction_window = true",
  73. port=6877,
  74. user="mz_system",
  75. )
  76. c.sql("CREATE TABLE retain_t (i INT)")
  77. c.sql("INSERT INTO retain_t VALUES (1)")
  78. c.sql(
  79. "CREATE MATERIALIZED VIEW retain_mv WITH (RETAIN HISTORY = FOR '2s') AS SELECT * FROM retain_t"
  80. )
  81. c.sql(
  82. "CREATE SOURCE retain_s FROM LOAD GENERATOR COUNTER WITH (RETAIN HISTORY = FOR '5s')"
  83. )
  84. names = ["mv", "s"]
  85. check_retain_history_for(names)
  86. # Ensure that RETAIN HISTORY is respected on boot.
  87. c.kill("materialized")
  88. c.up("materialized")
  89. check_retain_history_for(names)
  90. c.kill("materialized")
  91. def workflow_github_2454(c: Composition) -> None:
  92. c.up("materialized")
  93. c.run_testdrive_files("github-2454.td")
  94. # Ensure MZ can boot
  95. c.kill("materialized")
  96. c.up("materialized")
  97. c.kill("materialized")
  98. # Test that `mz_internal.mz_object_dependencies` re-populates.
  99. def workflow_github_5108(c: Composition) -> None:
  100. c.up("materialized", {"name": "testdrive_no_reset", "persistent": True})
  101. c.testdrive(
  102. service="testdrive_no_reset",
  103. input=dedent(
  104. """
  105. > CREATE SOURCE with_subsources FROM LOAD GENERATOR AUCTION;
  106. > CREATE TABLE accounts FROM SOURCE with_subsources (REFERENCE accounts);
  107. > CREATE TABLE auctions FROM SOURCE with_subsources (REFERENCE auctions);
  108. > CREATE TABLE bids FROM SOURCE with_subsources (REFERENCE bids);
  109. > CREATE TABLE organizations FROM SOURCE with_subsources (REFERENCE organizations);
  110. > CREATE TABLE users FROM SOURCE with_subsources (REFERENCE users);
  111. > SELECT DISTINCT
  112. top_level_s.name as source,
  113. s.name AS subsource
  114. FROM mz_internal.mz_object_dependencies AS d
  115. JOIN mz_sources AS s ON s.id = d.referenced_object_id OR s.id = d.object_id
  116. JOIN mz_sources AS top_level_s ON top_level_s.id = d.object_id OR top_level_s.id = d.referenced_object_id
  117. WHERE top_level_s.name = 'with_subsources' AND (s.type = 'progress' OR s.type = 'subsource');
  118. source subsource
  119. -------------------------
  120. with_subsources with_subsources_progress
  121. > SELECT DISTINCT
  122. s.name AS source,
  123. t.name AS table
  124. FROM mz_internal.mz_object_dependencies AS d
  125. JOIN mz_sources AS s ON s.id = d.referenced_object_id
  126. JOIN mz_tables AS t ON t.id = d.object_id
  127. WHERE s.name = 'with_subsources';
  128. source table
  129. -------------------------
  130. with_subsources bids
  131. with_subsources users
  132. with_subsources accounts
  133. with_subsources auctions
  134. with_subsources organizations
  135. """
  136. ),
  137. )
  138. # Restart mz
  139. c.kill("materialized")
  140. c.up("materialized")
  141. c.testdrive(
  142. service="testdrive_no_reset",
  143. input=dedent(
  144. """
  145. > SELECT
  146. top_level_s.name as source,
  147. s.name AS subsource
  148. FROM mz_internal.mz_object_dependencies AS d
  149. JOIN mz_sources AS s ON s.id = d.referenced_object_id OR s.id = d.object_id
  150. JOIN mz_sources AS top_level_s ON top_level_s.id = d.object_id OR top_level_s.id = d.referenced_object_id
  151. WHERE top_level_s.name = 'with_subsources' AND (s.type = 'progress' OR s.type = 'subsource');
  152. source subsource
  153. -------------------------
  154. with_subsources with_subsources_progress
  155. > SELECT DISTINCT
  156. s.name AS source,
  157. t.name AS table
  158. FROM mz_internal.mz_object_dependencies AS d
  159. JOIN mz_sources AS s ON s.id = d.referenced_object_id
  160. JOIN mz_tables AS t ON t.id = d.object_id
  161. WHERE s.name = 'with_subsources';
  162. source table
  163. -------------------------
  164. with_subsources bids
  165. with_subsources users
  166. with_subsources accounts
  167. with_subsources auctions
  168. with_subsources organizations
  169. """
  170. ),
  171. )
  172. c.kill("materialized")
  173. def workflow_audit_log(c: Composition) -> None:
  174. c.up("materialized")
  175. # Create some audit log entries.
  176. c.sql("CREATE TABLE t (i INT)")
  177. c.sql("CREATE DEFAULT INDEX ON t")
  178. log = c.sql_query("SELECT * FROM mz_audit_events ORDER BY id")
  179. # Restart mz.
  180. c.kill("materialized")
  181. c.up("materialized")
  182. # Verify the audit log entries are still present and have not changed.
  183. restart_log = c.sql_query("SELECT * FROM mz_audit_events ORDER BY id")
  184. if log != restart_log or not log:
  185. print("initial audit log:", log)
  186. print("audit log after restart:", restart_log)
  187. raise Exception("audit logs emtpy or not equal after restart")
  188. def workflow_stash(c: Composition) -> None:
  189. c.rm(
  190. "testdrive",
  191. "materialized",
  192. stop=True,
  193. destroy_volumes=True,
  194. )
  195. c.rm_volumes("mzdata", force=True)
  196. with c.override(Materialized(external_metadata_store=True)):
  197. c.up(c.metadata_store())
  198. c.up("materialized")
  199. cursor = c.sql_cursor()
  200. cursor.execute("CREATE TABLE a (i INT)")
  201. c.stop(c.metadata_store())
  202. c.up(c.metadata_store())
  203. cursor.execute("CREATE TABLE b (i INT)")
  204. # No implicit restart as sanity check here, will panic:
  205. # https://github.com/MaterializeInc/database-issues/issues/6168
  206. c.down(sanity_restart_mz=False)
  207. def workflow_storage_managed_collections(c: Composition) -> None:
  208. c.down(destroy_volumes=True)
  209. c.up("materialized")
  210. # Create some storage shard entries.
  211. c.sql("CREATE TABLE t (i INT)")
  212. # Storage collections are eventually consistent, so loop to be sure updates
  213. # have made it.
  214. user_shards: list[str] = []
  215. while len(user_shards) == 0:
  216. user_shards = c.sql_query(
  217. "SELECT shard_id FROM mz_internal.mz_storage_shards WHERE object_id LIKE 'u%';"
  218. )
  219. # Restart mz.
  220. c.kill("materialized")
  221. c.up("materialized")
  222. # Verify the shard mappings are still present and have not changed.
  223. restart_user_shards: list[str] = []
  224. while len(restart_user_shards) == 0:
  225. restart_user_shards = c.sql_query(
  226. "SELECT shard_id FROM mz_internal.mz_storage_shards WHERE object_id LIKE 'u%';"
  227. )
  228. if user_shards != restart_user_shards or not user_shards:
  229. print("initial user shards:", user_shards)
  230. print("user shards after restart:", restart_user_shards)
  231. raise Exception("user shards empty or not equal after restart")
  232. def workflow_allowed_cluster_replica_sizes(c: Composition) -> None:
  233. c.up("materialized", {"name": "testdrive_no_reset", "persistent": True})
  234. c.testdrive(
  235. service="testdrive_no_reset",
  236. input=dedent(
  237. """
  238. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  239. # We can create a cluster with sizes '1' and '2'
  240. > CREATE CLUSTER test REPLICAS (r1 (SIZE '1'), r2 (SIZE '2'))
  241. > SHOW CLUSTER REPLICAS WHERE cluster = 'test'
  242. test r1 1 true ""
  243. test r2 2 true ""
  244. # We cannot create replicas with size '2' after restricting allowed_cluster_replica_sizes to '1'
  245. $ postgres-execute connection=mz_system
  246. ALTER SYSTEM SET allowed_cluster_replica_sizes = '1'
  247. ! CREATE CLUSTER REPLICA test.r3 SIZE '2'
  248. contains:unknown cluster replica size 2
  249. """
  250. ),
  251. )
  252. # Assert that mz restarts successfully even in the presence of replica sizes that are not allowed
  253. c.kill("materialized")
  254. c.up("materialized")
  255. c.testdrive(
  256. service="testdrive_no_reset",
  257. input=dedent(
  258. """
  259. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  260. # Cluster replica of disallowed sizes still exist
  261. > SHOW CLUSTER REPLICAS WHERE cluster = 'test'
  262. test r1 1 true ""
  263. test r2 2 true ""
  264. # We cannot create replicas with size '2' (system parameter value persists across restarts)
  265. ! CREATE CLUSTER REPLICA test.r3 SIZE '2'
  266. contains:unknown cluster replica size 2
  267. # We can create replicas with size '2' after listing that size as allowed
  268. $ postgres-execute connection=mz_system
  269. ALTER SYSTEM SET allowed_cluster_replica_sizes = '1', '2'
  270. > CREATE CLUSTER REPLICA test.r3 SIZE '2'
  271. > SHOW CLUSTER REPLICAS WHERE cluster = 'test'
  272. test r1 1 true ""
  273. test r2 2 true ""
  274. test r3 2 true ""
  275. """
  276. ),
  277. )
  278. # Assert that the persisted allowed_cluster_replica_sizes (a setting that
  279. # supports multiple values) is correctly restored on restart.
  280. c.kill("materialized")
  281. c.up("materialized")
  282. c.testdrive(
  283. service="testdrive_no_reset",
  284. input=dedent(
  285. """
  286. > SHOW allowed_cluster_replica_sizes
  287. "\\"1\\", \\"2\\""
  288. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  289. # Reset for following tests
  290. $ postgres-execute connection=mz_system
  291. ALTER SYSTEM RESET allowed_cluster_replica_sizes
  292. """
  293. ),
  294. )
  295. def workflow_allow_user_sessions(c: Composition) -> None:
  296. c.up("materialized")
  297. http_port = c.port("materialized", 6876)
  298. # Ensure new user sessions are allowed.
  299. c.sql(
  300. "ALTER SYSTEM SET allow_user_sessions = true",
  301. port=6877,
  302. user="mz_system",
  303. )
  304. # SQL and HTTP user sessions should work.
  305. assert c.sql_query("SELECT 1") == [(1,)]
  306. assert requests.post(
  307. f"http://localhost:{http_port}/api/sql", json={"query": "select 1"}
  308. ).json()["results"][0]["rows"] == [["1"]]
  309. # Save a cursor for later.
  310. cursor = c.sql_cursor()
  311. # Disallow new user sessions.
  312. c.sql(
  313. "ALTER SYSTEM SET allow_user_sessions = false",
  314. port=6877,
  315. user="mz_system",
  316. )
  317. # New SQL and HTTP user sessions should now fail.
  318. try:
  319. c.sql_query("SELECT 1")
  320. except OperationalError as e:
  321. # assert e.pgcode == "MZ010" # Not exposed by psycopg
  322. assert "login blocked" in str(e)
  323. assert (
  324. "DETAIL: Your organization has been blocked. Please contact support."
  325. in e.args[0]
  326. ), e.args
  327. res = requests.post(
  328. f"http://localhost:{http_port}/api/sql", json={"query": "select 1"}
  329. )
  330. assert res.status_code == 403
  331. assert res.json() == {
  332. "message": "login blocked",
  333. "code": "MZ010",
  334. "detail": "Your organization has been blocked. Please contact support.",
  335. }
  336. # The cursor from the beginning of the test should still work.
  337. cursor.execute("SELECT 1")
  338. assert cursor.fetchall() == [(1,)]
  339. # Re-allow new user sessions.
  340. c.sql(
  341. "ALTER SYSTEM SET allow_user_sessions = true",
  342. port=6877,
  343. user="mz_system",
  344. )
  345. # SQL and HTTP user sessions should work again.
  346. assert c.sql_query("SELECT 1") == [(1,)]
  347. assert requests.post(
  348. f"http://localhost:{http_port}/api/sql", json={"query": "select 1"}
  349. ).json()["results"][0]["rows"] == [["1"]]
  350. # The cursor from the beginning of the test should still work.
  351. cursor.execute("SELECT 1")
  352. assert cursor.fetchall() == [(1,)]
  353. def workflow_network_policies(c: Composition) -> None:
  354. c.up("materialized")
  355. http_port = c.port("materialized", 6876)
  356. # ensure default network policy
  357. def assert_can_connect():
  358. assert c.sql_query("SELECT 1") == [(1,)]
  359. assert requests.post(
  360. f"http://localhost:{http_port}/api/sql", json={"query": "select 1"}
  361. ).json()["results"][0]["rows"] == [["1"]]
  362. def assert_new_connection_fails():
  363. # New SQL and HTTP user sessions should now fail.
  364. try:
  365. c.sql_query("SELECT 1")
  366. except OperationalError as e:
  367. # assert e.pgcode == "MZ010" # Not exposed by psycopg
  368. assert "session denied" in str(e)
  369. assert "DETAIL: Access denied for address" in e.args[0], e.args
  370. res = requests.post(
  371. f"http://localhost:{http_port}/api/sql", json={"query": "select 1"}
  372. )
  373. assert res.status_code == 403
  374. assert res.json()["message"] == "session denied"
  375. assert res.json()["code"] == "MZ011"
  376. assert "Access denied for address" in res.json()["detail"]
  377. # ensure default network policy
  378. assert c.sql_query("show network_policy") == [("default",)]
  379. assert_can_connect()
  380. # enable network policy management
  381. c.sql(
  382. "ALTER SYSTEM SET enable_network_policies = true",
  383. port=6877,
  384. user="mz_system",
  385. )
  386. # assert we can't change the network policy to one that doesn't exist.
  387. try:
  388. c.sql_query(
  389. "ALTER SYSTEM SET network_policy='apples'",
  390. port=6877,
  391. user="mz_system",
  392. )
  393. except InternalError_ as e:
  394. assert (
  395. e.diag.message_primary
  396. and "no network policy with such name exists" in e.diag.message_primary
  397. ), e
  398. else:
  399. raise RuntimeError(
  400. "ALTER SYSTEM SET network_policy didn't return the expected error"
  401. )
  402. # close network policies
  403. c.sql(
  404. "CREATE NETWORK POLICY closed (RULES ())",
  405. port=6877,
  406. user="mz_system",
  407. )
  408. c.sql(
  409. "ALTER SYSTEM SET network_policy='closed'",
  410. port=6877,
  411. user="mz_system",
  412. )
  413. assert_new_connection_fails()
  414. # can't drop the actively set network policy.
  415. try:
  416. c.sql_query(
  417. "DROP NETWORK POLICY closed",
  418. port=6877,
  419. user="mz_system",
  420. )
  421. except InternalError_ as e:
  422. assert (
  423. e.diag.message_primary
  424. and "network policy is currently in use" in e.diag.message_primary
  425. ), e
  426. else:
  427. raise RuntimeError("DROP NETWORK POLICY didn't return the expected error")
  428. # open the closed network policy
  429. c.sql(
  430. "ALTER NETWORK POLICY closed SET (RULES (open (ACTION='allow', DIRECTION='ingress', ADDRESS='0.0.0.0/0')))",
  431. port=6877,
  432. user="mz_system",
  433. )
  434. assert_can_connect()
  435. cursor = c.sql_cursor()
  436. # shut down the closed network policy
  437. c.sql(
  438. "ALTER NETWORK POLICY closed SET (RULES (closed (ACTION='allow', DIRECTION='ingress', ADDRESS='0.0.0.0/32')))",
  439. port=6877,
  440. user="mz_system",
  441. )
  442. assert_new_connection_fails()
  443. # validate that the cursor from the beginning of the test still works.
  444. assert cursor.execute("SELECT 1").fetchall() == [(1,)]
  445. c.sql(
  446. "ALTER SYSTEM SET network_policy='default'",
  447. port=6877,
  448. user="mz_system",
  449. )
  450. c.sql(
  451. "DROP NETWORK POLICY closed",
  452. port=6877,
  453. user="mz_system",
  454. )
  455. def workflow_drop_materialize_database(c: Composition) -> None:
  456. c.up("materialized")
  457. # Drop materialize database
  458. c.sql(
  459. "DROP DATABASE materialize",
  460. port=6877,
  461. user="mz_system",
  462. )
  463. # Restart mz.
  464. c.kill("materialized")
  465. c.up("materialized")
  466. # Verify that materialize hasn't blown up
  467. c.sql("SELECT 1")
  468. # Restore for next tests
  469. c.sql(
  470. "CREATE DATABASE materialize",
  471. port=6877,
  472. user="mz_system",
  473. )
  474. c.sql(
  475. "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO materialize",
  476. port=6877,
  477. user="mz_system",
  478. )
  479. def workflow_bound_size_mz_status_history(c: Composition) -> None:
  480. c.up(
  481. "zookeeper",
  482. "kafka",
  483. "schema-registry",
  484. "materialized",
  485. {"name": "testdrive_no_reset", "persistent": True},
  486. )
  487. c.testdrive(
  488. service="testdrive_no_reset",
  489. input=dedent(
  490. """
  491. $ kafka-create-topic topic=status-history
  492. > CREATE CONNECTION kafka_conn
  493. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  494. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  495. URL '${testdrive.schema-registry-url}'
  496. );
  497. > CREATE SOURCE kafka_source
  498. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-status-history-${testdrive.seed}')
  499. > CREATE TABLE kafka_source_tbl FROM SOURCE kafka_source (REFERENCE "testdrive-status-history-${testdrive.seed}")
  500. FORMAT TEXT
  501. > CREATE SINK kafka_sink
  502. FROM kafka_source_tbl
  503. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${testdrive.seed}')
  504. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  505. ENVELOPE DEBEZIUM
  506. $ kafka-verify-topic sink=materialize.public.kafka_sink
  507. """
  508. ),
  509. )
  510. # Fill mz_source_status_history and mz_sink_status_history up with enough events
  511. for i in range(5):
  512. c.testdrive(
  513. service="testdrive_no_reset",
  514. input=dedent(
  515. """
  516. > ALTER CONNECTION kafka_conn SET (BROKER 'dne') WITH (VALIDATE = false);
  517. > ALTER CONNECTION kafka_conn SET (BROKER '${testdrive.kafka-addr}') WITH (VALIDATE = true);
  518. """
  519. ),
  520. )
  521. # Verify that we have enough events so that they can be truncated
  522. c.testdrive(
  523. service="testdrive_no_reset",
  524. input=dedent(
  525. """
  526. > SELECT COUNT(*) > 7 FROM mz_internal.mz_source_status_history
  527. true
  528. > SELECT COUNT(*) > 7 FROM mz_internal.mz_sink_status_history
  529. true
  530. """
  531. ),
  532. )
  533. # Restart mz.
  534. c.kill("materialized")
  535. c.up("materialized")
  536. # Verify that we have fewer events now
  537. # 14 resp. because the truncation default is 5, and the restarted
  538. # objects produce a new starting and running event.
  539. c.testdrive(
  540. service="testdrive_no_reset",
  541. input=dedent(
  542. """
  543. > SELECT COUNT(*) FROM mz_internal.mz_source_status_history
  544. 14
  545. > SELECT COUNT(*) FROM mz_internal.mz_sink_status_history
  546. 7
  547. """
  548. ),
  549. )
  550. def workflow_bound_size_mz_cluster_replica_metrics_history(c: Composition) -> None:
  551. """
  552. Test the truncation mechanism for `mz_cluster_replica_metrics_history`.
  553. """
  554. c.down(destroy_volumes=True)
  555. c.up("materialized", {"name": "testdrive_no_reset", "persistent": True})
  556. # The replica metrics are updated once per minute and on envd startup. We
  557. # can thus restart envd to generate metrics rows without having to block
  558. # for a minute.
  559. # Create a replica and wait for metrics data to arrive.
  560. c.testdrive(
  561. service="testdrive_no_reset",
  562. input=dedent(
  563. """
  564. > CREATE CLUSTER test SIZE '1'
  565. > SELECT count(*) >= 1
  566. FROM mz_internal.mz_cluster_replica_metrics_history m
  567. JOIN mz_cluster_replicas r ON r.id = m.replica_id
  568. JOIN mz_clusters c ON c.id = r.cluster_id
  569. WHERE c.name = 'test'
  570. true
  571. """
  572. ),
  573. )
  574. # The default retention interval is 30 days, so we don't expect truncation
  575. # after a restart.
  576. c.kill("materialized")
  577. c.up("materialized")
  578. c.testdrive(
  579. service="testdrive_no_reset",
  580. input=dedent(
  581. """
  582. > SELECT count(*) >= 2
  583. FROM mz_internal.mz_cluster_replica_metrics_history m
  584. JOIN mz_cluster_replicas r ON r.id = m.replica_id
  585. JOIN mz_clusters c ON c.id = r.cluster_id
  586. WHERE c.name = 'test'
  587. true
  588. """
  589. ),
  590. )
  591. # Reduce the retention interval to force a truncation.
  592. c.sql(
  593. "ALTER SYSTEM SET replica_metrics_history_retention_interval = '1s'",
  594. port=6877,
  595. user="mz_system",
  596. )
  597. c.kill("materialized")
  598. c.up("materialized")
  599. c.testdrive(
  600. service="testdrive_no_reset",
  601. input=dedent(
  602. """
  603. > SELECT count(*) < 2
  604. FROM mz_internal.mz_cluster_replica_metrics_history m
  605. JOIN mz_cluster_replicas r ON r.id = m.replica_id
  606. JOIN mz_clusters c ON c.id = r.cluster_id
  607. WHERE c.name = 'test'
  608. true
  609. """
  610. ),
  611. )
  612. # Verify that this also works a second time.
  613. c.kill("materialized")
  614. c.up("materialized")
  615. c.testdrive(
  616. service="testdrive_no_reset",
  617. input=dedent(
  618. """
  619. > SELECT count(*) < 2
  620. FROM mz_internal.mz_cluster_replica_metrics_history m
  621. JOIN mz_cluster_replicas r ON r.id = m.replica_id
  622. JOIN mz_clusters c ON c.id = r.cluster_id
  623. WHERE c.name = 'test'
  624. true
  625. """
  626. ),
  627. )
  628. def workflow_index_compute_dependencies(c: Composition) -> None:
  629. """
  630. Assert that materialized views and index catalog items see and use only
  631. indexes created before them upon restart.
  632. Various parts of the optimizer internals and tooling, such as
  633. - `EXPLAIN REPLAN`
  634. - `bin/mzcompose clone defs`
  635. are currently depending on the fact that the `GlobalId` ordering respects
  636. dependency ordering. In other words, if an index `i` is created after a
  637. catalog item `x`, then `x` cannot use `i` even after restart.
  638. This test should codify this assumption so we can get an early signal if
  639. this is broken for some reason in the future.
  640. """
  641. c.up("materialized", {"name": "testdrive_no_reset", "persistent": True})
  642. def depends_on(c: Composition, obj_name: str, dep_name: str, expected: bool):
  643. """Check whether `(obj_name, dep_name)` is a compute dependency or not."""
  644. c.testdrive(
  645. service="testdrive_no_reset",
  646. input=dedent(
  647. f"""
  648. > (
  649. SELECT
  650. true
  651. FROM
  652. mz_catalog.mz_objects as obj
  653. WHERE
  654. obj.name = '{obj_name}' AND
  655. obj.id IN (
  656. SELECT
  657. cd.object_id
  658. FROM
  659. mz_internal.mz_compute_dependencies cd JOIN
  660. mz_objects dep ON (cd.dependency_id = dep.id)
  661. WHERE
  662. dep.name = '{dep_name}'
  663. )
  664. ) UNION (
  665. SELECT
  666. false
  667. FROM
  668. mz_catalog.mz_objects as obj
  669. WHERE
  670. obj.name = '{obj_name}' AND
  671. obj.id NOT IN (
  672. SELECT
  673. cd.object_id
  674. FROM
  675. mz_internal.mz_compute_dependencies cd JOIN
  676. mz_objects dep ON (cd.dependency_id = dep.id)
  677. WHERE
  678. dep.name = '{dep_name}'
  679. )
  680. );
  681. {str(expected).lower()}
  682. """
  683. ),
  684. )
  685. c.testdrive(
  686. service="testdrive_no_reset",
  687. input=dedent(
  688. """
  689. > DROP TABLE IF EXISTS t1 CASCADE;
  690. > DROP TABLE IF EXISTS t2 CASCADE;
  691. > CREATE TABLE t1(x int, y int);
  692. > CREATE TABLE t2(y int, z int);
  693. > CREATE INDEX ON t1(y);
  694. > CREATE VIEW v1 AS SELECT * FROM t1 JOIN t2 USING (y);
  695. > CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM v1;
  696. > CREATE INDEX ix1 ON v1(x);
  697. > CREATE INDEX ON t2(y);
  698. > CREATE VIEW v2 AS SELECT * FROM t2 JOIN t1 USING (y);
  699. > CREATE MATERIALIZED VIEW mv2 AS SELECT * FROM v2;
  700. > CREATE INDEX ix2 ON v2(x);
  701. """
  702. ),
  703. )
  704. # Verify that mv1 and ix1 depend on t1_y_idx but not on t2_y_idx.
  705. depends_on(c, "mv1", "t1_y_idx", True)
  706. depends_on(c, "mv1", "t2_y_idx", False)
  707. depends_on(c, "ix1", "t1_y_idx", True)
  708. depends_on(c, "ix1", "t2_y_idx", False)
  709. # Verify that mv2 and ix2 depend on both t1_y_idx and t2_y_idx.
  710. depends_on(c, "mv2", "t1_y_idx", True)
  711. depends_on(c, "mv2", "t2_y_idx", True)
  712. depends_on(c, "ix2", "t1_y_idx", True)
  713. depends_on(c, "ix2", "t2_y_idx", True)
  714. # Restart mz. We expect the index on t2(y) to not be visible to ix1 and mv1
  715. # after the restart as well.
  716. c.kill("materialized")
  717. c.up("materialized")
  718. # Verify that mv1 and ix1 depend on t1_y_idx but not on t2_y_idx.
  719. depends_on(c, "mv1", "t1_y_idx", True)
  720. depends_on(c, "mv1", "t2_y_idx", False)
  721. depends_on(c, "ix1", "t1_y_idx", True)
  722. depends_on(c, "ix1", "t2_y_idx", False)
  723. # Verify that mv2 and ix2 depend on both t1_y_idx and t2_y_idx.
  724. depends_on(c, "mv2", "t1_y_idx", True)
  725. depends_on(c, "mv2", "t2_y_idx", True)
  726. depends_on(c, "ix2", "t1_y_idx", True)
  727. depends_on(c, "ix2", "t2_y_idx", True)
  728. def workflow_default(c: Composition) -> None:
  729. def process(name: str) -> None:
  730. if name == "default":
  731. return
  732. with c.test_case(name):
  733. c.workflow(name)
  734. files = buildkite.shard_list(list(c.workflows.keys()), lambda workflow: workflow)
  735. c.test_parts(files, process)