load-generator-key-value.td 6.7 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Tests `LOAD GENERATOR KEY VALUE`
  10. $ set-arg-default default-replica-size=1
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  13. ALTER SYSTEM SET storage_statistics_interval = 2000
  14. ALTER SYSTEM SET enable_load_generator_key_value = true
  15. > CREATE CLUSTER lg_cluster SIZE '${arg.default-replica-size}'
  16. # Error if trying to create with subsources
  17. ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
  18. KEYS 8,
  19. PARTITIONS 1,
  20. SNAPSHOT ROUNDS 1,
  21. VALUE SIZE 1,
  22. SEED 42,
  23. BATCH SIZE 4
  24. ) FOR ALL TABLES;
  25. contains:FOR ALL TABLES
  26. ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
  27. KEYS 8,
  28. PARTITIONS 1,
  29. SNAPSHOT ROUNDS 1,
  30. VALUE SIZE 1,
  31. SEED 42,
  32. BATCH SIZE 4
  33. ) FOR TABLES ("foo");
  34. contains:FOR TABLES
  35. ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
  36. KEYS 8,
  37. PARTITIONS 1,
  38. SNAPSHOT ROUNDS 1,
  39. VALUE SIZE 1,
  40. SEED 42,
  41. BATCH SIZE 4
  42. ) FOR SCHEMAS ("foo");
  43. contains:FOR SCHEMAS
  44. # A loadgen that only snapshots.
  45. > CREATE SOURCE up_no_update
  46. IN CLUSTER lg_cluster
  47. FROM LOAD GENERATOR KEY VALUE (
  48. KEYS 16,
  49. PARTITIONS 4,
  50. SNAPSHOT ROUNDS 3,
  51. SEED 123,
  52. VALUE SIZE 10,
  53. BATCH SIZE 2
  54. )
  55. ENVELOPE UPSERT
  56. > CREATE SOURCE up_quick
  57. IN CLUSTER lg_cluster
  58. FROM LOAD GENERATOR KEY VALUE (
  59. KEYS 16,
  60. PARTITIONS 4,
  61. SNAPSHOT ROUNDS 3,
  62. TRANSACTIONAL SNAPSHOT false,
  63. SEED 123,
  64. VALUE SIZE 10,
  65. BATCH SIZE 2
  66. )
  67. INCLUDE KEY AS whatever
  68. ENVELOPE UPSERT
  69. # Ensure data is spread as expected.
  70. > SELECT partition, count(*) FROM up_no_update GROUP BY partition
  71. 0 4
  72. 1 4
  73. 2 4
  74. 3 4
  75. > SELECT MAX(key) FROM up_no_update;
  76. 15
  77. > SELECT partition, count(*) FROM up_quick GROUP BY partition
  78. 0 4
  79. 1 4
  80. 2 4
  81. 3 4
  82. > SELECT MAX(whatever) FROM up_quick;
  83. 15
  84. # 48 values produced (3 snapshot rounds with 16 keys).
  85. # For the `TRANSACTIONAL SNAPSHOT = false` source, we produce 48 updates from the 3 rounds.
  86. # We expect and 6 quick round offsets (based on the batch size)
  87. #
  88. # NOTE: For these statistics queries, we take the MAX, because we will have
  89. # statistics per replica that is (or was) running the source.
  90. > SELECT
  91. s.name,
  92. MAX(u.offset_known),
  93. MAX(u.offset_committed),
  94. MAX(u.snapshot_records_known),
  95. MAX(u.snapshot_records_staged),
  96. MAX(u.messages_received),
  97. MAX(u.records_indexed)
  98. FROM mz_sources s
  99. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  100. WHERE s.name IN ('up_no_update', 'up_quick')
  101. GROUP BY s.name
  102. up_no_update 3 3 48 48 48 16
  103. up_quick 6 6 0 0 48 16
  104. $ set-from-sql var=pre-rehydration
  105. SELECT
  106. encode(value, 'base64')
  107. FROM up_no_update
  108. WHERE
  109. key = 14
  110. $ set-from-sql var=pre-rehydration-quick
  111. SELECT
  112. encode(value, 'base64')
  113. FROM up_quick
  114. WHERE
  115. whatever = 14
  116. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
  117. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
  118. # Ensure that we rehydrate and keep the same value as before.
  119. > SELECT
  120. encode(value, 'base64') = '${pre-rehydration}'
  121. FROM up_no_update
  122. WHERE
  123. key = 14
  124. true
  125. > SELECT
  126. encode(value, 'base64') = '${pre-rehydration-quick}'
  127. FROM up_quick
  128. WHERE
  129. whatever = 14
  130. true
  131. > SELECT
  132. s.name,
  133. MAX(u.offset_known),
  134. MAX(u.offset_committed),
  135. MAX(u.snapshot_records_known),
  136. MAX(u.snapshot_records_staged),
  137. MAX(u.messages_received),
  138. MAX(u.records_indexed)
  139. FROM mz_sources s
  140. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  141. WHERE s.name IN ('up_no_update', 'up_quick')
  142. GROUP BY s.name
  143. up_no_update 3 3 48 48 48 16
  144. up_quick 6 6 0 0 48 16
  145. > DROP SOURCE up_no_update
  146. > DROP SOURCE up_quick
  147. # Create a source with 1s updates after snapshotting.
  148. > CREATE SOURCE up_with_update
  149. IN CLUSTER lg_cluster
  150. FROM LOAD GENERATOR KEY VALUE (
  151. KEYS 16,
  152. PARTITIONS 4,
  153. SNAPSHOT ROUNDS 3,
  154. SEED 123,
  155. VALUE SIZE 10,
  156. BATCH SIZE 2,
  157. TICK INTERVAL '1s'
  158. )
  159. ENVELOPE UPSERT
  160. # Ensure data is partitioned correctly.
  161. > SELECT partition, count(*) FROM up_with_update GROUP BY partition
  162. 0 4
  163. 1 4
  164. 2 4
  165. 3 4
  166. # Doesn't work reliably under high load in CI
  167. # Higher offsets than before, as we produce more values.
  168. # > SELECT
  169. # s.name,
  170. # MAX(u.offset_known) > 3,
  171. # MAX(u.offset_committed) = MAX(u.offset_known),
  172. # MAX(u.snapshot_records_known),
  173. # MAX(u.snapshot_records_staged),
  174. # MAX(u.messages_received) > 48,
  175. # MAX(u.records_indexed)
  176. # FROM mz_sources s
  177. # JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  178. # WHERE s.name IN ('up_with_update')
  179. # GROUP BY s.name
  180. # up_with_update true true 48 48 true 16
  181. # Also, despite the same seed, values should be different than the snapshot-only source.
  182. > SELECT
  183. encode(value, 'base64') != '${pre-rehydration}'
  184. FROM up_with_update
  185. WHERE
  186. key = 14
  187. true
  188. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
  189. > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
  190. $ set-from-sql var=pre-rehydration-with-update
  191. SELECT
  192. encode(value, 'base64')
  193. FROM up_with_update
  194. WHERE
  195. key = 14
  196. # After restarting, we should also still see new updates override values.
  197. > SELECT
  198. encode(value, 'base64') != '${pre-rehydration-with-update}'
  199. FROM up_with_update
  200. WHERE
  201. key = 14
  202. true
  203. # Test NONE-envelope
  204. > CREATE SOURCE kv_none
  205. IN CLUSTER lg_cluster
  206. FROM LOAD GENERATOR KEY VALUE (
  207. KEYS 16,
  208. PARTITIONS 4,
  209. SNAPSHOT ROUNDS 3,
  210. TRANSACTIONAL SNAPSHOT false,
  211. SEED 123,
  212. VALUE SIZE 10,
  213. BATCH SIZE 2
  214. )
  215. ENVELOPE NONE
  216. > SELECT partition, count(*) FROM kv_none GROUP BY partition
  217. 0 12
  218. 1 12
  219. 2 12
  220. 3 12
  221. # Test INCLUDE OFFSET
  222. > CREATE SOURCE kv_offset
  223. IN CLUSTER lg_cluster
  224. FROM LOAD GENERATOR KEY VALUE (
  225. KEYS 16,
  226. PARTITIONS 4,
  227. SNAPSHOT ROUNDS 3,
  228. TRANSACTIONAL SNAPSHOT false,
  229. SEED 123,
  230. VALUE SIZE 10,
  231. BATCH SIZE 2
  232. )
  233. INCLUDE OFFSET
  234. ENVELOPE NONE
  235. > SELECT partition, MAX("offset") FROM kv_offset GROUP BY partition
  236. 0 5
  237. 1 5
  238. 2 5
  239. 3 5
  240. > CREATE SOURCE kv_offset2
  241. IN CLUSTER lg_cluster
  242. FROM LOAD GENERATOR KEY VALUE (
  243. KEYS 16,
  244. PARTITIONS 4,
  245. SNAPSHOT ROUNDS 3,
  246. TRANSACTIONAL SNAPSHOT false,
  247. SEED 123,
  248. VALUE SIZE 10,
  249. BATCH SIZE 2
  250. )
  251. INCLUDE
  252. OFFSET AS something_else,
  253. KEY AS whatever
  254. ENVELOPE NONE
  255. > SELECT partition, MAX(something_else) FROM kv_offset2 GROUP BY partition
  256. 0 5
  257. 1 5
  258. 2 5
  259. 3 5
  260. > SELECT MAX(whatever) FROM kv_offset2;
  261. 15