test_managed_cluster.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import time
  10. from textwrap import dedent
  11. from threading import Thread
  12. from typing import Any
  13. from pg8000 import Connection
  14. from pg8000.exceptions import DatabaseError
  15. from materialize.cloudtest.app.materialize_application import (
  16. LOGGER,
  17. MaterializeApplication,
  18. )
  19. from materialize.cloudtest.util.cluster import cluster_pod_name
  20. from materialize.cloudtest.util.wait import wait
  21. def test_managed_cluster_sizing(mz: MaterializeApplication) -> None:
  22. """Test that a SIZE N cluster indeed creates N clusterd instances."""
  23. SIZE = 2
  24. mz.environmentd.sql(f"CREATE CLUSTER sized1 SIZE '{SIZE}-1', REPLICATION FACTOR 2")
  25. cluster_id = mz.environmentd.sql_query(
  26. "SELECT id FROM mz_clusters WHERE name = 'sized1'"
  27. )[0][0]
  28. assert cluster_id is not None
  29. check = mz.environmentd.sql_query(
  30. "SELECT availability_zones IS NULL FROM mz_clusters WHERE name = 'sized1'"
  31. )[0][0]
  32. assert check is not None
  33. assert check == True
  34. mz.environmentd.sql("ALTER CLUSTER sized1 SET (AVAILABILITY ZONES ('1', '2', '3'))")
  35. check = mz.environmentd.sql_query(
  36. "SELECT list_length(availability_zones) = 3 FROM mz_clusters WHERE name = 'sized1'"
  37. )[0][0]
  38. assert check is not None
  39. assert check == True
  40. mz.testdrive.run(
  41. input=dedent(
  42. """
  43. ! ALTER CLUSTER sized1 SET (AVAILABILITY ZONES ('4'))
  44. exact:unknown cluster replica availability zone 4
  45. """
  46. ),
  47. no_reset=True,
  48. )
  49. replicas = mz.environmentd.sql_query(
  50. "SELECT mz_cluster_replicas.name, mz_cluster_replicas.id FROM mz_cluster_replicas JOIN mz_clusters ON mz_cluster_replicas.cluster_id = mz_clusters.id WHERE mz_clusters.name = 'sized1' ORDER BY 1"
  51. )
  52. assert [replica[0] for replica in replicas] == ["r1", "r2"]
  53. for compute_id in range(0, SIZE):
  54. for replica in replicas:
  55. compute_pod = cluster_pod_name(cluster_id, replica[1], compute_id)
  56. wait(condition="condition=Ready", resource=compute_pod)
  57. mz.environmentd.sql("ALTER CLUSTER sized1 SET (REPLICATION FACTOR 1)")
  58. replicas = mz.environmentd.sql_query(
  59. "SELECT mz_cluster_replicas.name, mz_cluster_replicas.id FROM mz_cluster_replicas JOIN mz_clusters ON mz_cluster_replicas.cluster_id = mz_clusters.id WHERE mz_clusters.name = 'sized1' ORDER BY 1"
  60. )
  61. assert [replica[0] for replica in replicas] == ["r1"]
  62. for compute_id in range(0, SIZE):
  63. for replica in replicas:
  64. compute_pod = cluster_pod_name(cluster_id, replica[1], compute_id)
  65. wait(condition="condition=Ready", resource=compute_pod)
  66. mz.environmentd.sql("DROP CLUSTER sized1 CASCADE")
  67. mz.testdrive.run(
  68. input=dedent(
  69. """
  70. ! CREATE CLUSTER sizedbad (SIZE="badsize")
  71. contains:unknown cluster replica size badsize
  72. """
  73. ),
  74. no_reset=True,
  75. )
  76. mz.environmentd.sql(
  77. 'ALTER SYSTEM SET ALLOWED_CLUSTER_REPLICA_SIZES="1"',
  78. port="internal",
  79. user="mz_system",
  80. )
  81. try:
  82. mz.environmentd.sql(
  83. 'CREATE CLUSTER mzsizetest (SIZE="2")',
  84. port="internal",
  85. user="mz_system",
  86. )
  87. mz.environmentd.sql(
  88. "DROP CLUSTER mzsizetest CASCADE",
  89. port="internal",
  90. user="mz_system",
  91. )
  92. finally:
  93. mz.environmentd.sql(
  94. "ALTER SYSTEM RESET ALLOWED_CLUSTER_REPLICA_SIZES",
  95. port="internal",
  96. user="mz_system",
  97. )
  98. def test_zero_downtime_reconfiguration(mz: MaterializeApplication) -> None:
  99. mz.environmentd.sql(
  100. """
  101. ALTER SYSTEM SET enable_zero_downtime_cluster_reconfiguration = true;
  102. ALTER SYSTEM SET enable_multi_replica_sources = true;
  103. """,
  104. port="internal",
  105. user="mz_system",
  106. )
  107. def assert_replica_names(names, allow_pending=False):
  108. replicas = mz.environmentd.sql_query(
  109. """
  110. SELECT mz_cluster_replicas.name
  111. FROM mz_cluster_replicas, mz_clusters
  112. WHERE mz_cluster_replicas.cluster_id = mz_clusters.id
  113. AND mz_clusters.name = 'zdtaltertest';
  114. """
  115. )
  116. assert [replica[0] for replica in replicas] == names
  117. if not allow_pending:
  118. assert (
  119. len(
  120. mz.environmentd.sql_query(
  121. """
  122. SELECT cr.name
  123. FROM mz_internal.mz_pending_cluster_replicas ur
  124. INNER join mz_cluster_replicas cr ON cr.id=ur.id
  125. INNER join mz_clusters c ON c.id=cr.cluster_id
  126. WHERE c.name = 'zdtaltertest';
  127. """
  128. )
  129. )
  130. == 0
  131. ), "There should be no pending replicas"
  132. # Basic zero-downtime reconfig test cases matrix
  133. # - size change, no replica change
  134. # - replica size up, no other change
  135. # - replica size down, with size change
  136. # - replica size down, no other change
  137. # - replica size up, with size change
  138. # Other assertions
  139. # - no pending replicas after alter finishes
  140. # - names should match r# patter, not end with `-pending`
  141. # - cancelled statements correctly roll back
  142. # - timedout until ready queries take the appropriate action
  143. # - Fails to zero-downtime alter cluster with source
  144. mz.environmentd.sql(
  145. 'CREATE CLUSTER zdtaltertest ( SIZE = "1" )',
  146. port="internal",
  147. user="mz_system",
  148. )
  149. mz.environmentd.sql(
  150. """
  151. ALTER CLUSTER zdtaltertest SET ( SIZE = '2' ) WITH ( WAIT FOR '1ms' )
  152. """,
  153. port="internal",
  154. user="mz_system",
  155. )
  156. assert_replica_names(["r1"])
  157. mz.environmentd.sql(
  158. """
  159. ALTER CLUSTER zdtaltertest SET ( SIZE = '1', REPLICATION FACTOR 2 ) WITH ( WAIT FOR '1ms' )
  160. """,
  161. port="internal",
  162. user="mz_system",
  163. )
  164. assert_replica_names(["r1", "r2"])
  165. mz.environmentd.sql(
  166. """
  167. ALTER CLUSTER zdtaltertest SET ( SIZE = '1', REPLICATION FACTOR 1 ) WITH ( WAIT FOR '1ms' )
  168. """,
  169. port="internal",
  170. user="mz_system",
  171. )
  172. assert_replica_names(["r1"])
  173. mz.environmentd.sql(
  174. """
  175. ALTER CLUSTER zdtaltertest SET ( SIZE = '2', REPLICATION FACTOR 2 ) WITH ( WAIT FOR '1ms' )
  176. """,
  177. port="internal",
  178. user="mz_system",
  179. )
  180. assert_replica_names(["r1", "r2"])
  181. mz.environmentd.sql(
  182. """
  183. ALTER CLUSTER zdtaltertest SET ( SIZE = '1', REPLICATION FACTOR 1 ) WITH ( WAIT FOR '1ms' )
  184. """,
  185. port="internal",
  186. user="mz_system",
  187. )
  188. assert_replica_names(["r1"])
  189. # Setup for validating cancelation and
  190. # replica checks during alter
  191. mz.testdrive.run(
  192. no_reset=True,
  193. input=dedent(
  194. """
  195. $ kafka-create-topic topic=zdt-reconfig
  196. $ kafka-ingest topic=zdt-reconfig format=bytes key-format=bytes key-terminator=: repeat=1000
  197. key${kafka-ingest.iteration}:value${kafka-ingest.iteration}
  198. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  199. DROP CLUSTER IF EXISTS zdtaltertest CASCADE;
  200. DROP TABLE IF EXISTS t CASCADE;
  201. CREATE CLUSTER zdtaltertest ( SIZE = '1');
  202. GRANT ALL ON CLUSTER zdtaltertest TO materialize;
  203. SET CLUSTER = zdtaltertest;
  204. > CREATE TABLE t (a int);
  205. > CREATE DEFAULT INDEX ON t;
  206. > INSERT INTO t VALUES (42);
  207. > CREATE CONNECTION kafka_conn
  208. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  209. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  210. URL '${testdrive.schema-registry-url}'
  211. )
  212. > CREATE SOURCE kafka_src
  213. IN CLUSTER zdtaltertest
  214. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-zdt-reconfig-${testdrive.seed}')
  215. > CREATE TABLE kafka_tbl
  216. FROM SOURCE kafka_src (REFERENCE "testdrive-zdt-reconfig-${testdrive.seed}")
  217. KEY FORMAT TEXT
  218. VALUE FORMAT TEXT
  219. ENVELOPE UPSERT
  220. """
  221. ),
  222. )
  223. # Valudate replicas are correct during an ongoing alter
  224. def zero_downtime_alter():
  225. mz.environmentd.sql(
  226. """
  227. ALTER CLUSTER zdtaltertest SET (SIZE = '2') WITH ( WAIT FOR '5s')
  228. """,
  229. port="internal",
  230. user="mz_system",
  231. )
  232. thread = Thread(target=zero_downtime_alter)
  233. thread.start()
  234. time.sleep(1)
  235. assert_replica_names(["r1", "r1-pending"], allow_pending=True)
  236. assert (
  237. mz.environmentd.sql_query(
  238. """
  239. SELECT size FROM mz_clusters WHERE name='zdtaltertest';
  240. """
  241. )
  242. == (["1"],)
  243. ), "Cluster should use original config during alter"
  244. thread.join()
  245. assert_replica_names(["r1"], allow_pending=False)
  246. assert (
  247. mz.environmentd.sql_query(
  248. """
  249. SELECT size FROM mz_clusters WHERE name='zdtaltertest';
  250. """
  251. )
  252. == (["2"],)
  253. ), "Cluster should use new config after alter completes"
  254. # Validate cancelation of alter cluster..with
  255. mz.environmentd.sql(
  256. """
  257. DROP CLUSTER IF EXISTS cluster1 CASCADE;
  258. CREATE CLUSTER cluster1 ( SIZE = '1');
  259. """,
  260. port="internal",
  261. user="mz_system",
  262. )
  263. # We need persistent connection that we can later issue a cancel backend to
  264. conn = mz.environmentd.sql_conn(
  265. port="internal",
  266. user="mz_system",
  267. )
  268. conn.autocommit = True
  269. def query_with_conn(
  270. sql: str, conn: Connection, ignore_pg_exception=False
  271. ) -> list[list[Any]]:
  272. """Execute a SQL query against the service and return results."""
  273. try:
  274. with conn.cursor() as cursor:
  275. LOGGER.info(f"> {sql}")
  276. cursor.execute(sql)
  277. return cursor.fetchall()
  278. except DatabaseError:
  279. if ignore_pg_exception:
  280. return []
  281. else:
  282. raise
  283. pid = query_with_conn("select pg_backend_pid();", conn)[0][0]
  284. thread = Thread(
  285. target=query_with_conn,
  286. args=[
  287. """
  288. ALTER CLUSTER cluster1 SET (SIZE = '2') WITH ( WAIT FOR '5s')
  289. """,
  290. conn,
  291. True,
  292. ],
  293. )
  294. thread.start()
  295. time.sleep(1)
  296. mz.environmentd.sql(
  297. f"select pg_cancel_backend({pid});",
  298. port="internal",
  299. user="mz_system",
  300. )
  301. time.sleep(1)
  302. assert_replica_names(["r1"], allow_pending=False)
  303. assert (
  304. mz.environmentd.sql_query(
  305. """
  306. SELECT size FROM mz_clusters WHERE name='cluster1';
  307. """
  308. )
  309. == (["1"],)
  310. ), "Cluster should not have updated if canceled during alter"
  311. # Test zero-downtime reconfig wait until ready
  312. mz.environmentd.sql(
  313. """
  314. DROP CLUSTER IF EXISTS cluster1 CASCADE;
  315. DROP CLUSTER IF EXISTS zdtaltertest CASCADE;
  316. """,
  317. port="internal",
  318. user="mz_system",
  319. )
  320. mz.environmentd.sql(
  321. """
  322. CREATE CLUSTER slow_hydration( SIZE = "1" );
  323. SET CLUSTER TO slow_hydration;
  324. SET DATABASE TO materialize;
  325. CREATE TABLE test_table (id int);
  326. -- this view will take a loong time to run/hydrate
  327. -- we'll use it to validate timeouts
  328. CREATE VIEW test_view AS WITH
  329. a AS (SELECT generate_series(0,10000) AS a),
  330. b AS (SELECT generate_series(0,1000) AS b)
  331. SELECT * FROM a,b,test_table;
  332. CREATE INDEX test_view_idx ON test_view(id);
  333. """
  334. )
  335. mz.testdrive.run(
  336. input=dedent(
  337. """
  338. ! ALTER CLUSTER slow_hydration set (size='4') WITH (WAIT UNTIL READY (TIMEOUT='1s', ON TIMEOUT ROLLBACK))
  339. contains: canceling statement, provided timeout lapsed
  340. """
  341. ),
  342. no_reset=True,
  343. )
  344. # Test fails to alter with source
  345. mz.environmentd.sql(
  346. """
  347. CREATE CLUSTER cluster_with_source( SIZE = "1" );
  348. SET CLUSTER TO cluster_with_source;
  349. SET DATABASE TO materialize;
  350. CREATE SOURCE counter
  351. FROM LOAD GENERATOR COUNTER
  352. (TICK INTERVAL '500ms');
  353. """
  354. )
  355. mz.testdrive.run(
  356. input=dedent(
  357. """
  358. ! ALTER CLUSTER cluster_with_source set (replication factor 2000) WITH (WAIT UNTIL READY (TIMEOUT='10s', ON TIMEOUT ROLLBACK))
  359. contains: creating cluster replica would violate max_replicas_per_cluster limit
  360. """
  361. ),
  362. no_reset=True,
  363. )