kafka-sink-headers.td 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test the HEADER option for Kafka sinks, which allows attaching user-specified
  11. # headers to each Kafka message emitted by the sink.
  12. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  13. ALTER SYSTEM SET enable_kafka_sink_headers = true
  14. > CREATE CONNECTION k
  15. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  16. # Test with a nonexistent column.
  17. > CREATE TABLE wrong_name_tbl (k int);
  18. ! CREATE SINK snk
  19. IN CLUSTER ${arg.single-replica-cluster}
  20. FROM wrong_name_tbl
  21. INTO KAFKA CONNECTION k (TOPIC 'testdrive-bad-${testdrive.seed}')
  22. KEY (k) NOT ENFORCED
  23. HEADERS h
  24. FORMAT JSON ENVELOPE UPSERT
  25. contains:HEADERS column (h) is unknown
  26. # Test with wrong types.
  27. > CREATE TABLE wrong_type_tbl (k int, h1 int, h2 map[text => int]);
  28. ! CREATE SINK snk
  29. IN CLUSTER ${arg.single-replica-cluster}
  30. FROM wrong_type_tbl
  31. INTO KAFKA CONNECTION k (TOPIC 'testdrive-bad-${testdrive.seed}')
  32. KEY (k) NOT ENFORCED
  33. HEADERS h1
  34. FORMAT JSON ENVELOPE UPSERT
  35. contains:HEADERS column must have type map[text => text] or map[text => bytea]
  36. ! CREATE SINK snk
  37. IN CLUSTER ${arg.single-replica-cluster}
  38. FROM wrong_type_tbl
  39. INTO KAFKA CONNECTION k (TOPIC 'testdrive-bad-${testdrive.seed}')
  40. KEY (k) NOT ENFORCED
  41. HEADERS h2
  42. FORMAT JSON ENVELOPE UPSERT
  43. contains:HEADERS column must have type map[text => text] or map[text => bytea]
  44. # Test successful use with `map[text => text]`.
  45. > CREATE TABLE text_tbl (k int, h map[text => text])
  46. > INSERT INTO text_tbl VALUES
  47. (1, NULL),
  48. (2, '{}'),
  49. (3, '{"a" => null}'),
  50. (4, '{"a" => "b"}'),
  51. (5, '{"a" => "b", "c" => "d"}')
  52. > CREATE SINK text_snk
  53. IN CLUSTER ${arg.single-replica-cluster}
  54. FROM text_tbl
  55. INTO KAFKA CONNECTION k (TOPIC 'testdrive-text-${testdrive.seed}')
  56. KEY (k) NOT ENFORCED
  57. HEADERS h
  58. FORMAT JSON ENVELOPE UPSERT
  59. $ kafka-verify-data headers=a,c format=json sink=materialize.public.text_snk key=false sort-messages=true
  60. <missing> <missing> {"k": 1, "h": null}
  61. <missing> <missing> {"k": 2, "h": {}}
  62. <null> <missing> {"k": 3, "h": {"a": null}}
  63. b <missing> {"k": 4, "h": {"a": "b"}}
  64. b d {"k": 5, "h": {"a": "b", "c": "d"}}
  65. > INSERT INTO text_tbl VALUES (6, '{"a" => "b", "c" => null}')
  66. $ kafka-verify-data headers=a,c format=json sink=materialize.public.text_snk key=true sort-messages=true
  67. b <null> {"k": 6} {"k": 6, "h": {"a": "b", "c": null}}
  68. > DELETE FROM text_tbl WHERE k = 6
  69. $ kafka-verify-data headers=a,c format=json sink=materialize.public.text_snk key=true sort-messages=true
  70. b <null> {"k": 6}
  71. # Test successful use with `map[text => bytea]`.
  72. > CREATE TABLE bytea_tbl (k int, h map[text => bytea])
  73. > INSERT INTO bytea_tbl VALUES
  74. (1, NULL),
  75. (2, '{}'),
  76. (3, '{"a" => null}'),
  77. (4, '{"a" => "b"}'),
  78. (5, '{"a" => "b", "c" => "d"}')
  79. > CREATE SINK bytea_snk
  80. IN CLUSTER ${arg.single-replica-cluster}
  81. FROM bytea_tbl
  82. INTO KAFKA CONNECTION k (TOPIC 'testdrive-bytea-${testdrive.seed}')
  83. KEY (k) NOT ENFORCED
  84. HEADERS h
  85. FORMAT JSON ENVELOPE UPSERT
  86. $ kafka-verify-data headers=a,c format=json sink=materialize.public.bytea_snk key=false sort-messages=true
  87. <missing> <missing> {"k": 1, "h": null}
  88. <missing> <missing> {"k": 2, "h": {}}
  89. <null> <missing> {"k": 3, "h": {"a": null}}
  90. b <missing> {"k": 4, "h": {"a": [98]}}
  91. b d {"k": 5, "h": {"a": [98], "c": [100]}}
  92. > INSERT INTO bytea_tbl VALUES (6, '{"a" => "b", "c" => null}')
  93. $ kafka-verify-data headers=a,c format=json sink=materialize.public.bytea_snk key=false sort-messages=true
  94. b <null> {"k": 6, "h": {"a": [98], "c": null}}