kafka-avro-upsert-sinks.td 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. # Test Avro UPSERT sinks.
  12. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  13. ALTER SYSTEM SET enable_envelope_materialize = true
  14. ALTER SYSTEM SET max_clusters = 20
  15. # sinking directly from an UPSERT source with multi-part key
  16. $ set upsert-keyschema={
  17. "type": "record",
  18. "name": "Key",
  19. "fields": [
  20. {"name": "key1", "type": "string"},
  21. {"name": "key2", "type": "long"}
  22. ]
  23. }
  24. $ set upsert-schema={
  25. "type" : "record",
  26. "name" : "test",
  27. "fields" : [
  28. {"name":"f1", "type":"string"},
  29. {"name":"f2", "type":"long"}
  30. ]
  31. }
  32. $ kafka-create-topic topic=upsert-avro
  33. $ kafka-ingest format=avro topic=upsert-avro key-format=avro key-schema=${upsert-keyschema} schema=${upsert-schema}
  34. {"key1": "fish", "key2": 2} {"f1": "fish", "f2": 1000}
  35. {"key1": "fisch", "key2": 42} {"f1": "fish", "f2": 1000}
  36. > CREATE CONNECTION kafka_conn
  37. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  38. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  39. URL '${testdrive.schema-registry-url}'
  40. );
  41. > CREATE CLUSTER upsert_input_cluster SIZE '${arg.default-storage-size}';
  42. > CREATE SOURCE upsert_input
  43. IN CLUSTER upsert_input_cluster
  44. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-avro-${testdrive.seed}')
  45. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  46. ENVELOPE UPSERT
  47. > CREATE CLUSTER upsert_input_sink_cluster SIZE '${arg.default-storage-size}';
  48. > CREATE SINK upsert_input_sink
  49. IN CLUSTER upsert_input_sink_cluster
  50. FROM upsert_input
  51. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink-${testdrive.seed}')
  52. KEY (key1, key2)
  53. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  54. $ kafka-verify-data format=avro sink=materialize.public.upsert_input_sink sort-messages=true
  55. {"key1": "fisch", "key2": 42} {"key1": "fisch", "key2": 42, "f1": "fish", "f2": 1000}
  56. {"key1": "fish", "key2": 2} {"key1": "fish", "key2": 2, "f1": "fish", "f2": 1000}
  57. $ kafka-ingest format=avro topic=upsert-avro key-format=avro key-schema=${upsert-keyschema} schema=${upsert-schema}
  58. {"key1": "fisch", "key2": 42} {"f1": "richtig, fisch", "f2": 2000}
  59. $ kafka-verify-data format=avro sink=materialize.public.upsert_input_sink
  60. {"key1": "fisch", "key2": 42} {"key1": "fisch", "key2": 42, "f1": "richtig, fisch", "f2": 2000}
  61. # More complicated scenarios: super keys, consistency input/output
  62. $ set schema=[
  63. {
  64. "type": "array",
  65. "items": {
  66. "type": "record",
  67. "name": "update",
  68. "namespace": "com.materialize.cdc",
  69. "fields": [
  70. {
  71. "name": "data",
  72. "type": {
  73. "type": "record",
  74. "name": "data",
  75. "fields": [
  76. {"name": "a", "type": "long"},
  77. {"name": "b", "type": "long"}
  78. ]
  79. }
  80. },
  81. {
  82. "name": "time",
  83. "type": "long"
  84. },
  85. {
  86. "name": "diff",
  87. "type": "long"
  88. }
  89. ]
  90. }
  91. },
  92. {
  93. "type": "record",
  94. "name": "progress",
  95. "namespace": "com.materialize.cdc",
  96. "fields": [
  97. {
  98. "name": "lower",
  99. "type": {
  100. "type": "array",
  101. "items": "long"
  102. }
  103. },
  104. {
  105. "name": "upper",
  106. "type": {
  107. "type": "array",
  108. "items": "long"
  109. }
  110. },
  111. {
  112. "name": "counts",
  113. "type": {
  114. "type": "array",
  115. "items": {
  116. "type": "record",
  117. "name": "counts",
  118. "fields": [
  119. {
  120. "name": "time",
  121. "type": "long"
  122. },
  123. {
  124. "name": "count",
  125. "type": "long"
  126. }
  127. ]
  128. }
  129. }
  130. }
  131. ]
  132. }
  133. ]
  134. $ kafka-create-topic topic=input
  135. # (PRIMARY KEY (id) NOT ENFORCED)
  136. > CREATE CLUSTER input_cluster SIZE '${arg.default-storage-size}';
  137. > CREATE SOURCE input
  138. IN CLUSTER input_cluster
  139. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-${testdrive.seed}')
  140. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  141. > CREATE MATERIALIZED VIEW input_keyed AS SELECT a, max(b) as b FROM input GROUP BY a
  142. > CREATE CLUSTER input_sink_cluster SIZE '${arg.default-storage-size}';
  143. > CREATE SINK input_sink
  144. IN CLUSTER input_sink_cluster
  145. FROM input_keyed
  146. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (a)
  147. FORMAT AVRO
  148. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  149. # requesting to key by (a, b) is fine when (a) is a unique key
  150. > CREATE CLUSTER input_sink_multiple_keys_cluster SIZE '${arg.default-storage-size}';
  151. > CREATE SINK input_sink_multiple_keys
  152. IN CLUSTER input_sink_multiple_keys_cluster
  153. FROM input_keyed
  154. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-multikey-${testdrive.seed}') KEY (b, a)
  155. FORMAT AVRO
  156. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  157. $ kafka-ingest format=avro topic=input schema=${schema}
  158. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  159. {"array":[{"data":{"a":2,"b":2},"time":1,"diff":1}]}
  160. {"array":[{"data":{"a":3,"b":1},"time":2,"diff":1}]}
  161. {"array":[{"data":{"a":4,"b":2},"time":2,"diff":1}]}
  162. {"array":[{"data":{"a":1,"b":7},"time":3,"diff":1}]}
  163. {"com.materialize.cdc.progress":{"lower":[0],"upper":[4],"counts":[{"time":1,"count":2},{"time":2,"count":2}, {"time": 3, "count": 1}]}}
  164. > SELECT * FROM input;
  165. a b
  166. ------
  167. 1 1
  168. 2 2
  169. 3 1
  170. 4 2
  171. 1 7
  172. # Compare sorted messages within each transaction. We know that messages of one
  173. # transaction appear together as one "bundle" in the output. But there is no
  174. # guarantee on the order within a transaction.
  175. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink sort-messages=true
  176. 1 {"a": 1} {"a": 1, "b": 1}
  177. 1 {"a": 2} {"a": 2, "b": 2}
  178. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink sort-messages=true
  179. 2 {"a": 3} {"a": 3, "b": 1}
  180. 2 {"a": 4} {"a": 4, "b": 2}
  181. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink sort-messages=true
  182. 3 {"a": 1} {"a": 1, "b": 7}
  183. # Again, compare split by transaction. See comment just above.
  184. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink_multiple_keys sort-messages=true
  185. 1 {"a": 1, "b": 1} {"a": 1, "b": 1}
  186. 1 {"a": 2, "b": 2} {"a": 2, "b": 2}
  187. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink_multiple_keys sort-messages=true
  188. 2 {"a": 3, "b": 1} {"a": 3, "b": 1}
  189. 2 {"a": 4, "b": 2} {"a": 4, "b": 2}
  190. # missing value denotes DELETE
  191. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_sink_multiple_keys sort-messages=true
  192. 3 {"a": 1, "b": 1}
  193. 3 {"a": 1, "b": 7} {"a": 1, "b": 7}
  194. # verify if/when input deletions are emitted to an UPSERT sink
  195. $ kafka-create-topic topic=input-with-deletions
  196. > CREATE CLUSTER input_with_deletions_cluster SIZE '${arg.default-storage-size}';
  197. > CREATE SOURCE input_with_deletions
  198. IN CLUSTER input_with_deletions_cluster
  199. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-with-deletions-${testdrive.seed}')
  200. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  201. > CREATE MATERIALIZED VIEW input_with_deletions_keyed AS SELECT a, max(b) as b FROM input_with_deletions GROUP BY a
  202. > CREATE CLUSTER input_with_deletions_sink_cluster SIZE '${arg.default-storage-size}';
  203. > CREATE SINK input_with_deletions_sink
  204. IN CLUSTER input_with_deletions_sink_cluster
  205. FROM input_with_deletions_keyed
  206. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-with-deletions-${testdrive.seed}') KEY (a)
  207. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  208. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  209. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  210. {"com.materialize.cdc.progress":{"lower":[0],"upper":[2],"counts":[{"time":1,"count":1}]}}
  211. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  212. 1 {"a": 1} {"a": 1, "b": 1}
  213. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  214. {"array":[{"data":{"a":1,"b":2},"time":2,"diff":1}]}
  215. {"com.materialize.cdc.progress":{"lower":[2],"upper":[3],"counts":[{"time":2,"count":1}]}}
  216. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  217. 2 {"a": 1} {"a": 1, "b": 2}
  218. # deletion of the "shadowed" input should not cause downstream updates
  219. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  220. {"array":[{"data":{"a":1,"b":1},"time":3,"diff":-1}]}
  221. {"com.materialize.cdc.progress":{"lower":[3],"upper":[4],"counts":[{"time":3,"count":1}]}}
  222. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  223. {"array":[{"data":{"a":1,"b":2},"time":4,"diff":-1}]}
  224. {"com.materialize.cdc.progress":{"lower":[4],"upper":[5],"counts":[{"time":4,"count":1}]}}
  225. # now we should see a NULL update on the key, which means a DELETE
  226. $ kafka-verify-data format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  227. {"a": 1}
  228. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  229. {"array":[{"data":{"a":1,"b":1},"time":5,"diff":1}]}
  230. {"array":[{"data":{"a":1,"b":2},"time":5,"diff":1}]}
  231. {"com.materialize.cdc.progress":{"lower":[5],"upper":[6],"counts":[{"time":5,"count":2}]}}
  232. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  233. 5 {"a": 1} {"a": 1, "b": 2}
  234. $ kafka-ingest format=avro topic=input-with-deletions schema=${schema}
  235. {"array":[{"data":{"a":1,"b":2},"time":6,"diff":-1}]}
  236. {"com.materialize.cdc.progress":{"lower":[6],"upper":[7],"counts":[{"time":6,"count":1}]}}
  237. # removing the occluding input should "reveal" the previous input again
  238. #
  239. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.input_with_deletions_sink sort-messages=true
  240. 6 {"a": 1} {"a": 1, "b": 1}
  241. # NOT ENFORCED Keys
  242. $ kafka-create-topic topic=non-keyed-input
  243. > CREATE CLUSTER non_keyed_input_cluster SIZE '${arg.default-storage-size}';
  244. > CREATE SOURCE non_keyed_input
  245. IN CLUSTER non_keyed_input_cluster
  246. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-keyed-input-${testdrive.seed}')
  247. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  248. > CREATE CLUSTER not_enforced_key_cluster SIZE '${arg.default-storage-size}';
  249. > CREATE SINK not_enforced_key
  250. IN CLUSTER not_enforced_key_cluster
  251. FROM non_keyed_input
  252. INTO KAFKA CONNECTION kafka_conn (TOPIC 'not-enforced-sink-${testdrive.seed}') KEY (a) NOT ENFORCED
  253. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  254. # Send a create, an update, and a delete for two separate keys.
  255. $ kafka-ingest format=avro topic=non-keyed-input schema=${schema}
  256. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  257. {"array":[{"data":{"a":2,"b":1},"time":1,"diff":1}]}
  258. {"array":[{"data":{"a":1,"b":1},"time":2,"diff":-1}]}
  259. {"array":[{"data":{"a":2,"b":1},"time":2,"diff":-1}]}
  260. {"array":[{"data":{"a":1,"b":2},"time":2,"diff":1}]}
  261. {"array":[{"data":{"a":2,"b":2},"time":2,"diff":1}]}
  262. {"array":[{"data":{"a":1,"b":2},"time":3,"diff":-1}]}
  263. {"array":[{"data":{"a":2,"b":2},"time":3,"diff":-1}]}
  264. {"com.materialize.cdc.progress":{"lower":[0],"upper":[10],"counts":[{"time":1,"count":2}, {"time":2,"count":4}, {"time":3,"count":2}]}}
  265. # Verify that the update appears as an upsert instead of a create + delete, even when keys are not enforced.
  266. $ kafka-verify-data headers=materialize-timestamp format=avro topic=not-enforced-sink-${testdrive.seed} sort-messages=true
  267. 1 {"a": 1} {"a": 1, "b": 1}
  268. 1 {"a": 2} {"a": 2, "b": 1}
  269. $ kafka-verify-data headers=materialize-timestamp format=avro topic=not-enforced-sink-${testdrive.seed} sort-messages=true
  270. 2 {"a": 1} {"a": 1, "b": 2}
  271. 2 {"a": 2} {"a": 2, "b": 2}
  272. $ kafka-verify-data headers=materialize-timestamp format=avro topic=not-enforced-sink-${testdrive.seed} sort-messages=true
  273. 3 {"a": 1}
  274. 3 {"a": 2}
  275. # Bad upsert keys
  276. ! CREATE SINK invalid_key
  277. IN CLUSTER ${arg.single-replica-cluster}
  278. FROM input
  279. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (a)
  280. FORMAT AVRO
  281. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  282. contains:upsert key could not be validated as unique
  283. ! CREATE SINK another_invalid_key
  284. IN CLUSTER ${arg.single-replica-cluster}
  285. FROM input_keyed
  286. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (b)
  287. FORMAT AVRO
  288. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  289. contains:upsert key could not be validated as unique
  290. > CREATE MATERIALIZED VIEW input_keyed_ab AS SELECT a, b FROM input GROUP BY a, b
  291. ! CREATE SINK invalid_sub_key
  292. IN CLUSTER ${arg.single-replica-cluster}
  293. FROM input_keyed_ab
  294. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (a)
  295. FORMAT AVRO
  296. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  297. contains:upsert key could not be validated as unique
  298. ! CREATE SINK another_invalid_sub_key
  299. IN CLUSTER ${arg.single-replica-cluster}
  300. FROM input_keyed_ab
  301. INTO KAFKA CONNECTION kafka_conn (TOPIC 'input-sink-${testdrive.seed}') KEY (b)
  302. FORMAT AVRO
  303. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  304. contains:upsert key could not be validated as unique
  305. ! CREATE SINK invalid_key_from_upsert_input
  306. IN CLUSTER ${arg.single-replica-cluster}
  307. FROM upsert_input
  308. INTO KAFKA CONNECTION kafka_conn (TOPIC 'data-sink-${testdrive.seed}')
  309. KEY (key1)
  310. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  311. contains:upsert key could not be validated as unique
  312. ! CREATE SINK invalid_key_from_upsert_input
  313. IN CLUSTER ${arg.single-replica-cluster}
  314. FROM upsert_input
  315. INTO KAFKA CONNECTION kafka_conn (TOPIC 'data-sink-${testdrive.seed}')
  316. KEY (key2)
  317. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT
  318. contains:upsert key could not be validated as unique
  319. # Check arrangements, seeing new arrangements can mean a significant increase
  320. # in memory consumptions and should be understood before adapting the values.
  321. > SET cluster_replica = r1
  322. > SELECT mdod.dataflow_name, mdod.name
  323. FROM mz_introspection.mz_arrangement_sharing mash
  324. JOIN mz_introspection.mz_dataflow_operator_dataflows mdod ON mash.operator_id = mdod.id
  325. JOIN mz_introspection.mz_compute_exports USING (dataflow_id)
  326. WHERE export_id LIKE 'u%'
  327. "Dataflow: materialize.public.input_keyed" "Arrange ReduceMinsMaxes"
  328. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  329. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  330. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  331. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  332. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  333. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  334. "Dataflow: materialize.public.input_keyed" "Arranged MinsMaxesHierarchical input"
  335. "Dataflow: materialize.public.input_keyed" ReduceMinsMaxes
  336. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  337. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  338. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  339. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  340. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  341. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  342. "Dataflow: materialize.public.input_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  343. "Dataflow: materialize.public.input_keyed_ab" "Arranged DistinctBy"
  344. "Dataflow: materialize.public.input_keyed_ab" DistinctBy
  345. "Dataflow: materialize.public.input_keyed_ab" DistinctByErrorCheck
  346. "Dataflow: materialize.public.input_with_deletions_keyed" "Arrange ReduceMinsMaxes"
  347. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  348. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  349. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  350. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  351. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  352. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  353. "Dataflow: materialize.public.input_with_deletions_keyed" "Arranged MinsMaxesHierarchical input"
  354. "Dataflow: materialize.public.input_with_deletions_keyed" ReduceMinsMaxes
  355. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  356. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  357. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  358. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  359. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  360. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"
  361. "Dataflow: materialize.public.input_with_deletions_keyed" "Reduced Fallibly MinsMaxesHierarchical"