load-generator-key-value.td 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Tests `LOAD GENERATOR KEY VALUE`
  10. $ set-arg-default default-replica-size=1
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  13. ALTER SYSTEM SET storage_statistics_interval = 2000
  14. ALTER SYSTEM SET enable_load_generator_key_value = true
  15. > CREATE CLUSTER lg_cluster SIZE '${arg.default-replica-size}'
  16. $ skip-if
  17. SELECT mz_version_num() < 14400;
  18. # Error if trying to create with subsources
  19. ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
  20. KEYS 8,
  21. PARTITIONS 1,
  22. SNAPSHOT ROUNDS 1,
  23. VALUE SIZE 1,
  24. SEED 42,
  25. BATCH SIZE 4
  26. ) FOR ALL TABLES;
  27. contains:FOR ALL TABLES
  28. ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
  29. KEYS 8,
  30. PARTITIONS 1,
  31. SNAPSHOT ROUNDS 1,
  32. VALUE SIZE 1,
  33. SEED 42,
  34. BATCH SIZE 4
  35. ) FOR TABLES ("foo");
  36. contains:FOR TABLES
  37. ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
  38. KEYS 8,
  39. PARTITIONS 1,
  40. SNAPSHOT ROUNDS 1,
  41. VALUE SIZE 1,
  42. SEED 42,
  43. BATCH SIZE 4
  44. ) FOR SCHEMAS ("foo");
  45. contains:FOR SCHEMAS
  46. $ skip-end
  47. # A loadgen that only snapshots.
  48. > CREATE SOURCE up_no_update
  49. IN CLUSTER lg_cluster
  50. FROM LOAD GENERATOR KEY VALUE (
  51. KEYS 16,
  52. PARTITIONS 4,
  53. SNAPSHOT ROUNDS 3,
  54. SEED 123,
  55. VALUE SIZE 10,
  56. BATCH SIZE 2
  57. )
  58. ENVELOPE UPSERT
  59. > CREATE SOURCE up_quick
  60. IN CLUSTER lg_cluster
  61. FROM LOAD GENERATOR KEY VALUE (
  62. KEYS 16,
  63. PARTITIONS 4,
  64. SNAPSHOT ROUNDS 3,
  65. TRANSACTIONAL SNAPSHOT false,
  66. SEED 123,
  67. VALUE SIZE 10,
  68. BATCH SIZE 2
  69. )
  70. INCLUDE KEY AS whatever
  71. ENVELOPE UPSERT
  72. # Ensure data is spread as expected.
  73. > SELECT partition, count(*) FROM up_no_update GROUP BY partition
  74. 0 4
  75. 1 4
  76. 2 4
  77. 3 4
  78. > SELECT MAX(key) FROM up_no_update;
  79. 15
  80. > SELECT partition, count(*) FROM up_quick GROUP BY partition
  81. 0 4
  82. 1 4
  83. 2 4
  84. 3 4
  85. > SELECT MAX(whatever) FROM up_quick;
  86. 15
  87. # 48 values produced (3 snapshot rounds with 16 keys).
  88. # For the `TRANSACTIONAL SNAPSHOT = false` source, we produce 48 updates from the 3 rounds.
  89. # We expect and 6 quick round offsets (based on the batch size)
  90. #
  91. # NOTE: For these statistics queries, we take the MAX, because we will have
  92. # statistics per replica that is (or was) running the source.
  93. > SELECT
  94. s.name,
  95. MAX(u.offset_known),
  96. MAX(u.offset_committed),
  97. MAX(u.snapshot_records_known),
  98. MAX(u.snapshot_records_staged),
  99. MAX(u.messages_received),
  100. MAX(u.records_indexed)
  101. FROM mz_sources s
  102. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  103. WHERE s.name IN ('up_no_update', 'up_quick')
  104. GROUP BY s.name
  105. up_no_update 3 3 48 48 48 16
  106. up_quick 6 6 0 0 48 16
  107. $ set-from-sql var=pre-rehydration
  108. SELECT
  109. encode(value, 'base64')
  110. FROM up_no_update
  111. WHERE
  112. key = 14
  113. $ set-from-sql var=pre-rehydration-quick
  114. SELECT
  115. encode(value, 'base64')
  116. FROM up_quick
  117. WHERE
  118. whatever = 14
  119. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
  120. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
  121. # Ensure that we rehydrate and keep the same value as before.
  122. > SELECT
  123. encode(value, 'base64') = '${pre-rehydration}'
  124. FROM up_no_update
  125. WHERE
  126. key = 14
  127. true
  128. > SELECT
  129. encode(value, 'base64') = '${pre-rehydration-quick}'
  130. FROM up_quick
  131. WHERE
  132. whatever = 14
  133. true
  134. > SELECT
  135. s.name,
  136. MAX(u.offset_known),
  137. MAX(u.offset_committed),
  138. MAX(u.snapshot_records_known),
  139. MAX(u.snapshot_records_staged),
  140. MAX(u.messages_received),
  141. MAX(u.records_indexed)
  142. FROM mz_sources s
  143. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  144. WHERE s.name IN ('up_no_update', 'up_quick')
  145. GROUP BY s.name
  146. up_no_update 3 3 48 48 48 16
  147. up_quick 6 6 0 0 48 16
  148. > DROP SOURCE up_no_update
  149. > DROP SOURCE up_quick
  150. # Create a source with 1s updates after snapshotting.
  151. > CREATE SOURCE up_with_update
  152. IN CLUSTER lg_cluster
  153. FROM LOAD GENERATOR KEY VALUE (
  154. KEYS 16,
  155. PARTITIONS 4,
  156. SNAPSHOT ROUNDS 3,
  157. SEED 123,
  158. VALUE SIZE 10,
  159. BATCH SIZE 2,
  160. TICK INTERVAL '1s'
  161. )
  162. ENVELOPE UPSERT
  163. # Ensure data is partitioned correctly.
  164. > SELECT partition, count(*) FROM up_with_update GROUP BY partition
  165. 0 4
  166. 1 4
  167. 2 4
  168. 3 4
  169. # Doesn't work reliably under high load in CI
  170. # Higher offsets than before, as we produce more values.
  171. # > SELECT
  172. # s.name,
  173. # MAX(u.offset_known) > 3,
  174. # MAX(u.offset_committed) = MAX(u.offset_known),
  175. # MAX(u.snapshot_records_known),
  176. # MAX(u.snapshot_records_staged),
  177. # MAX(u.messages_received) > 48,
  178. # MAX(u.records_indexed)
  179. # FROM mz_sources s
  180. # JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  181. # WHERE s.name IN ('up_with_update')
  182. # GROUP BY s.name
  183. # up_with_update true true 48 48 true 16
  184. # Also, despite the same seed, values should be different than the snapshot-only source.
  185. > SELECT
  186. encode(value, 'base64') != '${pre-rehydration}'
  187. FROM up_with_update
  188. WHERE
  189. key = 14
  190. true
  191. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
  192. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
  193. $ set-from-sql var=pre-rehydration-with-update
  194. SELECT
  195. encode(value, 'base64')
  196. FROM up_with_update
  197. WHERE
  198. key = 14
  199. # After restarting, we should also still see new updates override values.
  200. > SELECT
  201. encode(value, 'base64') != '${pre-rehydration-with-update}'
  202. FROM up_with_update
  203. WHERE
  204. key = 14
  205. true
  206. # Test NONE-envelope
  207. > CREATE SOURCE kv_none
  208. IN CLUSTER lg_cluster
  209. FROM LOAD GENERATOR KEY VALUE (
  210. KEYS 16,
  211. PARTITIONS 4,
  212. SNAPSHOT ROUNDS 3,
  213. TRANSACTIONAL SNAPSHOT false,
  214. SEED 123,
  215. VALUE SIZE 10,
  216. BATCH SIZE 2
  217. )
  218. ENVELOPE NONE
  219. > SELECT partition, count(*) FROM kv_none GROUP BY partition
  220. 0 12
  221. 1 12
  222. 2 12
  223. 3 12
  224. # Test INCLUDE OFFSET
  225. > CREATE SOURCE kv_offset
  226. IN CLUSTER lg_cluster
  227. FROM LOAD GENERATOR KEY VALUE (
  228. KEYS 16,
  229. PARTITIONS 4,
  230. SNAPSHOT ROUNDS 3,
  231. TRANSACTIONAL SNAPSHOT false,
  232. SEED 123,
  233. VALUE SIZE 10,
  234. BATCH SIZE 2
  235. )
  236. INCLUDE OFFSET
  237. ENVELOPE NONE
  238. > SELECT partition, MAX("offset") FROM kv_offset GROUP BY partition
  239. 0 5
  240. 1 5
  241. 2 5
  242. 3 5
  243. > CREATE SOURCE kv_offset2
  244. IN CLUSTER lg_cluster
  245. FROM LOAD GENERATOR KEY VALUE (
  246. KEYS 16,
  247. PARTITIONS 4,
  248. SNAPSHOT ROUNDS 3,
  249. TRANSACTIONAL SNAPSHOT false,
  250. SEED 123,
  251. VALUE SIZE 10,
  252. BATCH SIZE 2
  253. )
  254. INCLUDE
  255. OFFSET AS something_else,
  256. KEY AS whatever
  257. ENVELOPE NONE
  258. > SELECT partition, MAX(something_else) FROM kv_offset2 GROUP BY partition
  259. 0 5
  260. 1 5
  261. 2 5
  262. 3 5
  263. > SELECT MAX(whatever) FROM kv_offset2;
  264. 15