kafka-avro-upsert-sinks.td 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. # Test Avro UPSERT sinks.
  12. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  13. ALTER SYSTEM SET enable_envelope_materialize = true
  14. ALTER SYSTEM SET max_clusters = 20
  15. # sinking directly from an UPSERT source with multi-part key
  16. $ set upsert-keyschema={
  17. "type": "record",
  18. "name": "Key",
  19. "fields": [
  20. {"name": "key1", "type": "string"},
  21. {"name": "key2", "type": "long"}
  22. ]
  23. }
  24. $ set upsert-schema={
  25. "type" : "record",
  26. "name" : "test",
  27. "fields" : [
  28. {"name":"f1", "type":"string"},
  29. {"name":"f2", "type":"long"}
  30. ]
  31. }
  32. $ kafka-create-topic topic=upsert-avro
  33. $ kafka-ingest format=avro topic=upsert-avro key-format=avro key-schema=${upsert-keyschema} schema=${upsert-schema}
  34. {"key1": "fish", "key2": 2} {"f1": "fish", "f2": 1000}
  35. {"key1": "fisch", "key2": 42} {"f1": "fish", "f2": 1000}
  36. > CREATE CONNECTION kafka_conn
  37. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  38. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  39. URL '${testdrive.schema-registry-url}'
  40. );
  41. > CREATE CLUSTER upsert_input_cluster SIZE '${arg.default-storage-size}';
  42. > CREATE SOURCE upsert_input
  43. IN CLUSTER upsert_input_cluster
  44. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-avro-${testdrive.seed}')
  45. > CREATE TABLE upsert_input_tbl FROM SOURCE upsert_input (REFERENCE "testdrive-upsert-avro-${testdrive.seed}")
  46. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  47. ENVELOPE UPSERT
  48. > CREATE CLUSTER upsert_input_sink_cluster SIZE '${arg.default-storage-size}';
  49. > CREATE SINK upsert_input_sink
  50. IN CLUSTER upsert_input_sink_cluster
  51. FROM upsert_input_tbl
  52. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink-${testdrive.seed}')
  53. KEY (key1, key2)
  54. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  55. $ kafka-verify-data format=avro sink=materialize.public.upsert_input_sink sort-messages=true
  56. {"key1": "fisch", "key2": 42} {"key1": "fisch", "key2": 42, "f1": "fish", "f2": 1000}
  57. {"key1": "fish", "key2": 2} {"key1": "fish", "key2": 2, "f1": "fish", "f2": 1000}
  58. $ kafka-ingest format=avro topic=upsert-avro key-format=avro key-schema=${upsert-keyschema} schema=${upsert-schema}
  59. {"key1": "fisch", "key2": 42} {"f1": "richtig, fisch", "f2": 2000}
  60. $ kafka-verify-data format=avro sink=materialize.public.upsert_input_sink
  61. {"key1": "fisch", "key2": 42} {"key1": "fisch", "key2": 42, "f1": "richtig, fisch", "f2": 2000}
  62. # More complicated scenarios: super keys, consistency input/output
  63. $ set schema=[
  64. {
  65. "type": "array",
  66. "items": {
  67. "type": "record",
  68. "name": "update",
  69. "namespace": "com.materialize.cdc",
  70. "fields": [
  71. {
  72. "name": "data",
  73. "type": {
  74. "type": "record",
  75. "name": "data",
  76. "fields": [
  77. {"name": "a", "type": "long"},
  78. {"name": "b", "type": "long"}
  79. ]
  80. }
  81. },
  82. {
  83. "name": "time",
  84. "type": "long"
  85. },
  86. {
  87. "name": "diff",
  88. "type": "long"
  89. }
  90. ]
  91. }
  92. },
  93. {
  94. "type": "record",
  95. "name": "progress",
  96. "namespace": "com.materialize.cdc",
  97. "fields": [
  98. {
  99. "name": "lower",
  100. "type": {
  101. "type": "array",
  102. "items": "long"
  103. }
  104. },
  105. {
  106. "name": "upper",
  107. "type": {
  108. "type": "array",
  109. "items": "long"
  110. }
  111. },
  112. {
  113. "name": "counts",
  114. "type": {
  115. "type": "array",
  116. "items": {
  117. "type": "record",
  118. "name": "counts",
  119. "fields": [
  120. {
  121. "name": "time",
  122. "type": "long"
  123. },
  124. {
  125. "name": "count",
  126. "type": "long"
  127. }
  128. ]
  129. }
  130. }
  131. }
  132. ]
  133. }
  134. ]
  135. $ kafka-create-topic topic=input
  136. # (PRIMARY KEY (id) NOT ENFORCED)
  137. > CREATE CLUSTER input_cluster SIZE '${arg.default-storage-size}';
  138. > CREATE SOURCE input
  139. IN CLUSTER input_cluster
  140. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-${testdrive.seed}')
  141. > CREATE TABLE input_tbl FROM SOURCE input (REFERENCE "testdrive-input-${testdrive.seed}")
  142. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  143. > CREATE MATERIALIZED VIEW input_keyed AS SELECT a, max(b) as b FROM input_tbl GROUP BY a
  144. > CREATE CLUSTER input_sink_cluster SIZE '${arg.default-storage-size}';
  145. > CREATE SINK input_sink
  146. IN CLUSTER input_sink_cluster
  147. FROM input_keyed
  148. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (a)
  149. FORMAT AVRO
  150. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  151. # requesting to key by (a, b) is fine when (a) is a unique key
  152. > CREATE CLUSTER input_sink_multiple_keys_cluster SIZE '${arg.default-storage-size}';
  153. > CREATE SINK input_sink_multiple_keys
  154. IN CLUSTER input_sink_multiple_keys_cluster
  155. FROM input_keyed
  156. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-multikey-${testdrive.seed}') KEY (b, a)
  157. FORMAT AVRO
  158. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  159. $ kafka-ingest format=avro topic=input schema=${schema}
  160. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  161. {"array":[{"data":{"a":2,"b":2},"time":1,"diff":1}]}
  162. {"array":[{"data":{"a":3,"b":1},"time":2,"diff":1}]}
  163. {"array":[{"data":{"a":4,"b":2},"time":2,"diff":1}]}
  164. {"array":[{"data":{"a":1,"b":7},"time":3,"diff":1}]}
  165. {"com.materialize.cdc.progress":{"lower":[0],"upper":[4],"counts":[{"time":1,"count":2},{"time":2,"count":2}, {"time": 3, "count": 1}]}}
  166. > SELECT * FROM input_tbl;
  167. a b
  168. ------
  169. 1 1
  170. 2 2
  171. 3 1
  172. 4 2
  173. 1 7
  174. # Compare sorted messages within each transaction. We know that messages of one
  175. # transaction appear together as one "bundle" in the output. But there is no
  176. # guarantee on the order within a transaction.
  177. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink sort-messages=true
  178. 1 {"a": 1} {"a": 1, "b": 1}
  179. 1 {"a": 2} {"a": 2, "b": 2}
  180. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink sort-messages=true
  181. 2 {"a": 3} {"a": 3, "b": 1}
  182. 2 {"a": 4} {"a": 4, "b": 2}
  183. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink sort-messages=true
  184. 3 {"a": 1} {"a": 1, "b": 7}
  185. # Again, compare split by transaction. See comment just above.
  186. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink_multiple_keys sort-messages=true
  187. 1 {"a": 1, "b": 1} {"a": 1, "b": 1}
  188. 1 {"a": 2, "b": 2} {"a": 2, "b": 2}
  189. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink_multiple_keys sort-messages=true
  190. 2 {"a": 3, "b": 1} {"a": 3, "b": 1}
  191. 2 {"a": 4, "b": 2} {"a": 4, "b": 2}
  192. # missing value denotes DELETE
  193. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink_multiple_keys sort-messages=true
  194. 3 {"a": 1, "b": 1}
  195. 3 {"a": 1, "b": 7} {"a": 1, "b": 7}
  196. # verify if/when input deletions are emitted to an UPSERT sink
  197. $ kafka-create-topic topic=input-with-deletions
  198. > CREATE CLUSTER input_with_deletions_cluster SIZE '${arg.default-storage-size}';
  199. > CREATE SOURCE input_with_deletions
  200. IN CLUSTER input_with_deletions_cluster
  201. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-with-deletions-${testdrive.seed}')
  202. > CREATE TABLE input_with_deletions_tbl FROM SOURCE input_with_deletions (REFERENCE "testdrive-input-with-deletions-${testdrive.seed}")
  203. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  204. > CREATE MATERIALIZED VIEW input_with_deletions_keyed AS SELECT a, max(b) as b FROM input_with_deletions_tbl GROUP BY a
  205. > CREATE CLUSTER input_with_deletions_sink_cluster SIZE '${arg.default-storage-size}';
  206. > CREATE SINK input_with_deletions_sink
  207. IN CLUSTER input_with_deletions_sink_cluster
  208. FROM input_with_deletions_keyed
  209. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-with-deletions-${testdrive.seed}') KEY (a)
  210. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  211. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  212. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  213. {"com.materialize.cdc.progress":{"lower":[0],"upper":[2],"counts":[{"time":1,"count":1}]}}
  214. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  215. 1 {"a": 1} {"a": 1, "b": 1}
  216. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  217. {"array":[{"data":{"a":1,"b":2},"time":2,"diff":1}]}
  218. {"com.materialize.cdc.progress":{"lower":[2],"upper":[3],"counts":[{"time":2,"count":1}]}}
  219. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  220. 2 {"a": 1} {"a": 1, "b": 2}
  221. # deletion of the "shadowed" input should not cause downstream updates
  222. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  223. {"array":[{"data":{"a":1,"b":1},"time":3,"diff":-1}]}
  224. {"com.materialize.cdc.progress":{"lower":[3],"upper":[4],"counts":[{"time":3,"count":1}]}}
  225. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  226. {"array":[{"data":{"a":1,"b":2},"time":4,"diff":-1}]}
  227. {"com.materialize.cdc.progress":{"lower":[4],"upper":[5],"counts":[{"time":4,"count":1}]}}
  228. # now we should see a NULL update on the key, which means a DELETE
  229. $ kafka-verify-data format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  230. {"a": 1}
  231. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  232. {"array":[{"data":{"a":1,"b":1},"time":5,"diff":1}]}
  233. {"array":[{"data":{"a":1,"b":2},"time":5,"diff":1}]}
  234. {"com.materialize.cdc.progress":{"lower":[5],"upper":[6],"counts":[{"time":5,"count":2}]}}
  235. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  236. 5 {"a": 1} {"a": 1, "b": 2}
  237. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  238. {"array":[{"data":{"a":1,"b":2},"time":6,"diff":-1}]}
  239. {"com.materialize.cdc.progress":{"lower":[6],"upper":[7],"counts":[{"time":6,"count":1}]}}
  240. # removing the occluding input should "reveal" the previous input again
  241. #
  242. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  243. 6 {"a": 1} {"a": 1, "b": 1}
  244. # NOT ENFORCED Keys
  245. $ kafka-create-topic topic=non-keyed-input
  246. > CREATE CLUSTER non_keyed_input_cluster SIZE '${arg.default-storage-size}';
  247. > CREATE SOURCE non_keyed_input
  248. IN CLUSTER non_keyed_input_cluster
  249. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-keyed-input-${testdrive.seed}')
  250. > CREATE TABLE non_keyed_input_tbl FROM SOURCE non_keyed_input (REFERENCE "testdrive-non-keyed-input-${testdrive.seed}")
  251. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  252. > CREATE CLUSTER not_enforced_key_cluster SIZE '${arg.default-storage-size}';
  253. > CREATE SINK not_enforced_key
  254. IN CLUSTER not_enforced_key_cluster
  255. FROM non_keyed_input_tbl
  256. INTO KAFKA CONNECTION kafka_conn (TOPIC 'not-enforced-sink-${testdrive.seed}') KEY (a) NOT ENFORCED
  257. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  258. # Send a create, an update, and a delete for two separate keys.
  259. $ kafka-ingest format=avro topic=non-keyed-input schema=${schema}
  260. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  261. {"array":[{"data":{"a":2,"b":1},"time":1,"diff":1}]}
  262. {"array":[{"data":{"a":1,"b":1},"time":2,"diff":-1}]}
  263. {"array":[{"data":{"a":2,"b":1},"time":2,"diff":-1}]}
  264. {"array":[{"data":{"a":1,"b":2},"time":2,"diff":1}]}
  265. {"array":[{"data":{"a":2,"b":2},"time":2,"diff":1}]}
  266. {"array":[{"data":{"a":1,"b":2},"time":3,"diff":-1}]}
  267. {"array":[{"data":{"a":2,"b":2},"time":3,"diff":-1}]}
  268. {"com.materialize.cdc.progress":{"lower":[0],"upper":[10],"counts":[{"time":1,"count":2}, {"time":2,"count":4}, {"time":3,"count":2}]}}
  269. # Verify that the update appears as an upsert instead of a create + delete, even when keys are not enforced.
  270. $ kafka-verify-data headers=materialize-timestamp format=avro topic=not-enforced-sink-${testdrive.seed} sort-messages=true
  271. 1 {"a": 1} {"a": 1, "b": 1}
  272. 1 {"a": 2} {"a": 2, "b": 1}
  273. $ kafka-verify-data headers=materialize-timestamp format=avro topic=not-enforced-sink-${testdrive.seed} sort-messages=true
  274. 2 {"a": 1} {"a": 1, "b": 2}
  275. 2 {"a": 2} {"a": 2, "b": 2}
  276. $ kafka-verify-data headers=materialize-timestamp format=avro topic=not-enforced-sink-${testdrive.seed} sort-messages=true
  277. 3 {"a": 1}
  278. 3 {"a": 2}
  279. # Bad upsert keys
  280. ! CREATE SINK invalid_key
  281. IN CLUSTER ${arg.single-replica-cluster}
  282. FROM input_tbl
  283. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (a)
  284. FORMAT AVRO
  285. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  286. contains:upsert key could not be validated as unique
  287. ! CREATE SINK another_invalid_key
  288. IN CLUSTER ${arg.single-replica-cluster}
  289. FROM input_keyed
  290. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (b)
  291. FORMAT AVRO
  292. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  293. contains:upsert key could not be validated as unique
  294. > CREATE MATERIALIZED VIEW input_keyed_ab AS SELECT a, b FROM input_tbl GROUP BY a, b
  295. ! CREATE SINK invalid_sub_key
  296. IN CLUSTER ${arg.single-replica-cluster}
  297. FROM input_keyed_ab
  298. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (a)
  299. FORMAT AVRO
  300. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  301. contains:upsert key could not be validated as unique
  302. ! CREATE SINK another_invalid_sub_key
  303. IN CLUSTER ${arg.single-replica-cluster}
  304. FROM input_keyed_ab
  305. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (b)
  306. FORMAT AVRO
  307. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  308. contains:upsert key could not be validated as unique
  309. ! CREATE SINK invalid_key_from_upsert_input
  310. IN CLUSTER ${arg.single-replica-cluster}
  311. FROM upsert_input_tbl
  312. INTO KAFKA CONNECTION kafka_conn (TOPIC 'data-sink-${testdrive.seed}')
  313. KEY (key1)
  314. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  315. contains:upsert key could not be validated as unique
  316. ! CREATE SINK invalid_key_from_upsert_input
  317. IN CLUSTER ${arg.single-replica-cluster}
  318. FROM upsert_input_tbl
  319. INTO KAFKA CONNECTION kafka_conn (TOPIC 'data-sink-${testdrive.seed}')
  320. KEY (key2)
  321. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  322. contains:upsert key could not be validated as unique
  323. # Check arrangements, seeing new arrangements can mean a significant increase
  324. # in memory consumptions and should be understood before adapting the values.
  325. > SET cluster_replica = r1
  326. > SELECT mdod.dataflow_name, mdod.name
  327. FROM mz_introspection.mz_arrangement_sharing mash
  328. JOIN mz_introspection.mz_dataflow_operator_dataflows mdod ON mash.operator_id = mdod.id
  329. JOIN mz_introspection.mz_compute_exports USING (dataflow_id)
  330. WHERE export_id LIKE 'u%'
  331. "Dataflow: materialize.public.input_keyed" "Arrange ReduceMinsMaxes"
  332. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  333. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  334. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  335. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  336. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  337. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  338. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  339. "Dataflow: materialize.public.input_keyed" ReduceMinsMaxes
  340. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  341. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  342. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  343. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  344. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  345. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  346. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  347. "Dataflow: materialize.public.input_keyed_ab" "Arranged DistinctBy"
  348. "Dataflow: materialize.public.input_keyed_ab" DistinctBy
  349. "Dataflow: materialize.public.input_keyed_ab" DistinctByErrorCheck
  350. "Dataflow: materialize.public.input_with_deletions_keyed" "Arrange ReduceMinsMaxes"
  351. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  352. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  353. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  354. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  355. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  356. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  357. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  358. "Dataflow: materialize.public.input_with_deletions_keyed" ReduceMinsMaxes
  359. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  360. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  361. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  362. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  363. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  364. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  365. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"