kafka-sink-partition-by.td 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test the PARTITION BY option for Kafka sinks
  11. > CREATE CONNECTION k
  12. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  13. > CREATE TABLE input (a int, b int);
  14. # Test that `PARTITION BY` does not work with an invalid data type.
  15. ! CREATE SINK bad
  16. IN CLUSTER ${arg.single-replica-cluster}
  17. FROM input
  18. INTO KAFKA CONNECTION k (
  19. TOPIC 'testdrive-bad-${testdrive.seed}',
  20. PARTITION BY '2024-01-01'::date
  21. )
  22. KEY (a) NOT ENFORCED
  23. FORMAT JSON ENVELOPE UPSERT
  24. contains:PARTITION BY does not support casting from date to uint8
  25. # Test that `PARTITION BY` does not work with invalid column references.
  26. ! CREATE SINK bad
  27. IN CLUSTER ${arg.single-replica-cluster}
  28. FROM input
  29. INTO KAFKA CONNECTION k (
  30. TOPIC 'testdrive-bad-${testdrive.seed}',
  31. PARTITION BY noexist
  32. )
  33. KEY (a) NOT ENFORCED
  34. FORMAT JSON ENVELOPE UPSERT
  35. contains:column "noexist" does not exist
  36. ! CREATE SINK bad
  37. IN CLUSTER ${arg.single-replica-cluster}
  38. FROM input
  39. INTO KAFKA CONNECTION k (
  40. TOPIC 'testdrive-bad-${testdrive.seed}',
  41. PARTITION BY b
  42. )
  43. KEY (a) NOT ENFORCED
  44. FORMAT JSON ENVELOPE DEBEZIUM
  45. contains:PARTITION BY expression cannot refer to non-key column "b"
  46. # Test that `PARTITION BY` works for direct partition assignment.
  47. > DROP TABLE input CASCADE
  48. > CREATE MATERIALIZED VIEW input (part, value) AS
  49. VALUES (0, 'apple'), (1, 'banana'), (2, 'grape'), (3, 'orange'), (0, 'zucchini')
  50. > CREATE SINK direct_output
  51. IN CLUSTER ${arg.single-replica-cluster}
  52. FROM input
  53. INTO KAFKA CONNECTION k (
  54. TOPIC 'testdrive-direct-${testdrive.seed}',
  55. TOPIC PARTITION COUNT 2,
  56. PARTITION BY part
  57. )
  58. KEY (value) NOT ENFORCED
  59. FORMAT JSON
  60. ENVELOPE UPSERT
  61. $ kafka-verify-data format=json sink=materialize.public.direct_output sort-messages=true
  62. {"part": 0, "value": "apple"} partition=0
  63. {"part": 0, "value": "zucchini"} partition=0
  64. {"part": 1, "value": "banana"} partition=1
  65. {"part": 2, "value": "grape"} partition=0
  66. {"part": 3, "value": "orange"} partition=1
  67. # Test that `PARTITION BY` works with the standard kafka_murmur2 hash function.
  68. > DROP MATERIALIZED VIEW input CASCADE
  69. > CREATE TABLE input (value text)
  70. > INSERT INTO input VALUES ('apple'), ('banana'), ('grape'), ('orange'), ('zucchini')
  71. > CREATE SINK hashed_output
  72. IN CLUSTER ${arg.single-replica-cluster}
  73. FROM input
  74. INTO KAFKA CONNECTION k (
  75. TOPIC 'testdrive-hashed-${testdrive.seed}',
  76. TOPIC PARTITION COUNT 16,
  77. PARTITION BY kafka_murmur2(value)
  78. )
  79. KEY (value) NOT ENFORCED
  80. KEY FORMAT TEXT
  81. VALUE FORMAT JSON
  82. ENVELOPE UPSERT
  83. # These partition assignments were verified to match kcat's asssignments
  84. # when using the `murmur2_random` partitioner.
  85. #
  86. # Data was produced via:
  87. #
  88. # $ kcat -b localhost:9092 -t test-partitioning -P -K : -X topic.partitioner=murmur2_random <<EOF
  89. # apple:val
  90. # grape:val
  91. # zucchini:val
  92. # banana:val
  93. # orange:val
  94. # EOF
  95. #
  96. # And then partition assignments were read back via:
  97. #
  98. # $ kcat -b localhost:9092 -C -t test-partitioning -f '%k %p\n'
  99. #
  100. $ kafka-verify-data format=json sink=materialize.public.hashed_output sort-messages=true
  101. {"value": "apple"} partition=5
  102. {"value": "banana"} partition=13
  103. {"value": "grape"} partition=5
  104. {"value": "orange"} partition=8
  105. {"value": "zucchini"} partition=4
  106. > DELETE FROM input WHERE value IN ('banana', 'orange')
  107. $ kafka-verify-data key-format=text value-format=json sink=materialize.public.hashed_output sort-messages=true
  108. "banana" "<null>" partition=13
  109. "orange" "<null>" partition=8
  110. # Test that `PARTITION BY` sends errors and invalid values to partition 0.
  111. > DROP TABLE input CASCADE
  112. > CREATE MATERIALIZED VIEW input (a, b) AS
  113. VALUES (2::int, 1::int), (-1, 1), (1, 0), (1, 1)
  114. > CREATE SINK invalid_output
  115. IN CLUSTER ${arg.single-replica-cluster}
  116. FROM input
  117. INTO KAFKA CONNECTION k (
  118. TOPIC 'testdrive-invalid-${testdrive.seed}',
  119. TOPIC PARTITION COUNT 4,
  120. PARTITION BY a / b
  121. )
  122. KEY (a) NOT ENFORCED
  123. FORMAT JSON
  124. ENVELOPE UPSERT
  125. $ kafka-verify-data format=json sink=materialize.public.invalid_output sort-messages=true
  126. {"a": -1, "b": 1} partition=0
  127. {"a": 1, "b": 0} partition=0
  128. {"a": 1, "b": 1} partition=1
  129. {"a": 2, "b": 1} partition=2
  130. # Test that `PARTITION BY` works with `ENVELOPE DEBEZIUM`.
  131. > DROP MATERIALIZED VIEW input CASCADE
  132. > CREATE TABLE input (k int, v text);
  133. > CREATE SINK debezium_output
  134. IN CLUSTER ${arg.single-replica-cluster}
  135. FROM input
  136. INTO KAFKA CONNECTION k (
  137. TOPIC 'testdrive-debezium-${testdrive.seed}',
  138. TOPIC PARTITION COUNT 2,
  139. PARTITION BY k
  140. )
  141. KEY (k) NOT ENFORCED
  142. FORMAT JSON
  143. ENVELOPE DEBEZIUM
  144. > INSERT INTO input VALUES (0, 'apple'), (1, 'banana');
  145. $ kafka-verify-data format=json sink=materialize.public.debezium_output sort-messages=true
  146. {"before": null, "after": {"k": 0, "v": "apple"}} partition=0
  147. {"before": null, "after": {"k": 1, "v": "banana"}} partition=1
  148. > UPDATE input SET v = v || 's'
  149. $ kafka-verify-data format=json sink=materialize.public.debezium_output sort-messages=true
  150. {"before": {"k": 0, "v": "apple"}, "after": {"k": 0, "v": "apples"}} partition=0
  151. {"before": {"k": 1, "v": "banana"}, "after": {"k": 1, "v": "bananas"}} partition=1
  152. > DELETE FROM input
  153. $ kafka-verify-data format=json sink=materialize.public.debezium_output sort-messages=true
  154. {"before": {"k": 0, "v": "apples"}, "after": null} partition=0
  155. {"before": {"k": 1, "v": "bananas"}, "after": null} partition=1