test_disk.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  11. from materialize.cloudtest.util.cluster import cluster_pod_name
  12. def test_disk_replica(mz: MaterializeApplication) -> None:
  13. """Testing `DISK = true` cluster replicas"""
  14. mz.testdrive.run(
  15. input=dedent(
  16. """
  17. $ kafka-create-topic topic=test
  18. $ kafka-ingest key-format=bytes format=bytes topic=test
  19. key1:val1
  20. key2:val2
  21. > CREATE CLUSTER testdrive_no_reset_disk_cluster1
  22. REPLICAS (r1 (
  23. SIZE '1-no-disk', DISK = true
  24. ))
  25. > CREATE CONNECTION IF NOT EXISTS kafka TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  26. > CREATE SOURCE source1
  27. IN CLUSTER testdrive_no_reset_disk_cluster1
  28. FROM KAFKA CONNECTION kafka
  29. (TOPIC 'testdrive-test-${testdrive.seed}');
  30. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-test-${testdrive.seed}")
  31. KEY FORMAT TEXT
  32. VALUE FORMAT TEXT
  33. ENVELOPE UPSERT;
  34. > SELECT * FROM source1_tbl;
  35. key text
  36. ------------------
  37. key1 val1
  38. key2 val2
  39. $ kafka-ingest key-format=bytes format=bytes topic=test
  40. key1:val3
  41. > SELECT * FROM source1_tbl;
  42. key text
  43. ------------------
  44. key1 val3
  45. key2 val2
  46. """
  47. )
  48. )
  49. cluster_id, replica_id = mz.environmentd.sql_query(
  50. "SELECT r.cluster_id, r.id as replica_id FROM mz_cluster_replicas r, mz_clusters c WHERE c.id = r.cluster_id AND c.name = 'testdrive_no_reset_disk_cluster1';"
  51. )[0]
  52. source_tbl_global_id = mz.environmentd.sql_query(
  53. "SELECT id FROM mz_tables WHERE name = 'source1_tbl';"
  54. )[0][0]
  55. # verify that the replica's scratch directory contains data files for source1
  56. on_disk_sources = mz.kubectl(
  57. "exec",
  58. cluster_pod_name(cluster_id, replica_id),
  59. "-c",
  60. "clusterd",
  61. "--",
  62. "bash",
  63. "-c",
  64. "ls /scratch/storage/upsert",
  65. )
  66. assert source_tbl_global_id in on_disk_sources
  67. def test_always_use_disk_replica(mz: MaterializeApplication) -> None:
  68. """Testing `DISK = false, cluster_always_use_disk = true` cluster replicas"""
  69. mz.environmentd.sql(
  70. "ALTER SYSTEM SET cluster_always_use_disk = true",
  71. port="internal",
  72. user="mz_system",
  73. )
  74. mz.testdrive.run(
  75. input=dedent(
  76. """
  77. $ kafka-create-topic topic=test
  78. $ kafka-ingest key-format=bytes format=bytes topic=test
  79. key1:val1
  80. key2:val2
  81. > CREATE CLUSTER disk_cluster2
  82. REPLICAS (r1 (SIZE '1-no-disk'))
  83. > CREATE CONNECTION IF NOT EXISTS kafka TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  84. > CREATE SOURCE source1
  85. IN CLUSTER disk_cluster2
  86. FROM KAFKA CONNECTION kafka
  87. (TOPIC 'testdrive-test-${testdrive.seed}');
  88. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-test-${testdrive.seed}")
  89. KEY FORMAT TEXT
  90. VALUE FORMAT TEXT
  91. ENVELOPE UPSERT;
  92. > SELECT * FROM source1_tbl;
  93. key text
  94. ------------------
  95. key1 val1
  96. key2 val2
  97. $ kafka-ingest key-format=bytes format=bytes topic=test
  98. key1:val3
  99. > SELECT * FROM source1_tbl;
  100. key text
  101. ------------------
  102. key1 val3
  103. key2 val2
  104. """
  105. )
  106. )
  107. cluster_id, replica_id = mz.environmentd.sql_query(
  108. "SELECT r.cluster_id, r.id as replica_id FROM mz_cluster_replicas r, mz_clusters c WHERE c.id = r.cluster_id AND c.name = 'disk_cluster2';"
  109. )[0]
  110. source_tbl_global_id = mz.environmentd.sql_query(
  111. "SELECT id FROM mz_tables WHERE name = 'source1_tbl';"
  112. )[0][0]
  113. # verify that the replica's scratch directory contains data files for source1
  114. on_disk_sources = mz.kubectl(
  115. "exec",
  116. cluster_pod_name(cluster_id, replica_id),
  117. "-c",
  118. "clusterd",
  119. "--",
  120. "bash",
  121. "-c",
  122. "ls /scratch/storage/upsert",
  123. )
  124. assert source_tbl_global_id in on_disk_sources
  125. def test_no_disk_replica(mz: MaterializeApplication) -> None:
  126. """Testing `DISK = false` cluster replicas"""
  127. mz.testdrive.run(
  128. input=dedent(
  129. """
  130. $ kafka-create-topic topic=test-no-disk
  131. $ kafka-ingest key-format=bytes format=bytes topic=test-no-disk
  132. key1:val1
  133. key2:val2
  134. > CREATE CLUSTER no_disk_cluster1
  135. REPLICAS (r1 (
  136. SIZE '1-no-disk', DISK = false
  137. ))
  138. > CREATE CONNECTION IF NOT EXISTS kafka
  139. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  140. > CREATE SOURCE no_disk_source1
  141. IN CLUSTER no_disk_cluster1
  142. FROM KAFKA CONNECTION kafka
  143. (TOPIC 'testdrive-test-no-disk-${testdrive.seed}');
  144. > CREATE TABLE no_disk_source1_tbl FROM SOURCE no_disk_source1 (REFERENCE "testdrive-test-no-disk-${testdrive.seed}")
  145. KEY FORMAT TEXT
  146. VALUE FORMAT TEXT
  147. ENVELOPE UPSERT;
  148. > SELECT * FROM no_disk_source1_tbl;
  149. key text
  150. ------------------
  151. key1 val1
  152. key2 val2
  153. $ kafka-ingest key-format=bytes format=bytes topic=test-no-disk
  154. key1:val3
  155. > SELECT * FROM no_disk_source1_tbl;
  156. key text
  157. ------------------
  158. key1 val3
  159. key2 val2
  160. > DROP CLUSTER no_disk_cluster1 CASCADE;
  161. """
  162. )
  163. )