kafka-json-sinks.td 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. > CREATE MATERIALIZED VIEW simple_view AS SELECT 1 AS a, 2 AS b, 3 AS c;
  11. > CREATE CONNECTION kafka_conn
  12. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  13. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  14. URL '${testdrive.schema-registry-url}'
  15. );
  16. > CREATE CLUSTER simple_view_sink_cluster SIZE '${arg.default-storage-size}';
  17. ! CREATE SINK simple_view_sink
  18. IN CLUSTER simple_view_sink_cluster
  19. FROM simple_view
  20. INTO KAFKA CONNECTION kafka_conn (TOPIC 'unnamed-cols-sink-${testdrive.seed}')
  21. FORMAT JSON ARRAY
  22. ENVELOPE DEBEZIUM
  23. contains:JSON ARRAY format in sinks not yet supported
  24. > CREATE SINK simple_view_sink
  25. IN CLUSTER simple_view_sink_cluster
  26. FROM simple_view
  27. INTO KAFKA CONNECTION kafka_conn (TOPIC 'unnamed-cols-sink-${testdrive.seed}')
  28. FORMAT JSON
  29. ENVELOPE DEBEZIUM
  30. $ kafka-verify-data format=json sink=materialize.public.simple_view_sink key=false
  31. {"before": null, "after": {"a": 1, "b": 2, "c": 3}}
  32. > CREATE CLUSTER simple_view_upsert_cluster SIZE '${arg.default-storage-size}';
  33. > CREATE SINK simple_view_upsert
  34. IN CLUSTER simple_view_upsert_cluster
  35. FROM simple_view
  36. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-unnamed-upsert-${testdrive.seed}')
  37. KEY (b)
  38. FORMAT JSON
  39. ENVELOPE UPSERT
  40. $ kafka-verify-data format=json sink=materialize.public.simple_view_upsert key=true
  41. {"b": 2} {"a": 1, "b": 2, "c": 3}
  42. > CREATE MATERIALIZED VIEW complex_view AS SELECT LIST[1,3] AS a, 2 AS b, 3 AS c;
  43. > CREATE CLUSTER mixed_types_cluster SIZE '${arg.default-storage-size}';
  44. # this should error since binary encoding can't support complex types like lists
  45. ! CREATE SINK mixed_types
  46. IN CLUSTER mixed_types_cluster
  47. FROM complex_view
  48. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-unnamed-upsert-${testdrive.seed}')
  49. KEY (a)
  50. KEY FORMAT BYTES
  51. VALUE FORMAT JSON
  52. ENVELOPE UPSERT
  53. contains:BYTES format with non-encodable type
  54. # this should error since compound keys can't use text or binary encoding
  55. ! CREATE SINK mixed_types
  56. IN CLUSTER mixed_types_cluster
  57. FROM simple_view
  58. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-unnamed-upsert-${testdrive.seed}')
  59. KEY (a, b)
  60. KEY FORMAT TEXT
  61. VALUE FORMAT JSON
  62. ENVELOPE UPSERT
  63. contains:BYTES or TEXT format with multiple columns not yet supported
  64. > CREATE SINK mixed_types
  65. IN CLUSTER mixed_types_cluster
  66. FROM simple_view
  67. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-unnamed-upsert-${testdrive.seed}')
  68. KEY (b)
  69. KEY FORMAT TEXT
  70. VALUE FORMAT JSON
  71. ENVELOPE UPSERT
  72. $ kafka-verify-data key-format=text value-format=json sink=materialize.public.mixed_types
  73. 2 {"a": 1, "b": 2, "c": 3}
  74. # Standard types
  75. > CREATE MATERIALIZED VIEW types_view AS
  76. SELECT TRUE::boolean c1,
  77. FALSE::boolean c2,
  78. NULL c3,
  79. 123456789::bigint c4,
  80. 1234.5678::double c5,
  81. 1234.5678::decimal c6,
  82. '2011-11-11 11:11:11.12345'::timestamp c7,
  83. '2011-11-11 11:11:11.12345+12'::timestamptz c8,
  84. '2011-11-11'::date c9,
  85. '11:11:11.123456'::time c10,
  86. INTERVAL '1 year' c11,
  87. '324373a5-7718-46b1-a7ea-4a7c9981fc4e'::uuid c12,
  88. 'текст'::bytea c13,
  89. '{"a": 2}'::jsonb c14
  90. > CREATE CLUSTER types_sink_cluster SIZE '${arg.default-storage-size}';
  91. > CREATE SINK types_sink
  92. IN CLUSTER types_sink_cluster
  93. FROM types_view
  94. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-types-sink-${testdrive.seed}')
  95. FORMAT JSON
  96. ENVELOPE DEBEZIUM
  97. # Due to limitations in $ kafka-verify, the entire expected JSON output needs to be provided on a single line
  98. $ kafka-verify-data format=json sink=materialize.public.types_sink key=false
  99. {"before":null,"after":{"c1":true,"c2":false,"c3":null,"c4":123456789,"c5":1234.5678,"c6":"1234.5678","c7":"1321009871123.450","c8":"1320966671123.450","c9":"2011-11-11","c10":"11:11:11.123456","c11":"1 year","c12":"324373a5-7718-46b1-a7ea-4a7c9981fc4e","c13":[209,130,208,181,208,186,209,129,209,130],"c14":{"a":2}}}
  100. # Special characters
  101. > CREATE MATERIALIZED VIEW special_characters_view AS
  102. SELECT 'текст' c1, '"' c2, '''' c3, '\' c4, E'a\n\tb' c5
  103. > CREATE CLUSTER special_characters_sink_cluster SIZE '${arg.default-storage-size}';
  104. > CREATE SINK special_characters_sink
  105. IN CLUSTER special_characters_sink_cluster
  106. FROM special_characters_view
  107. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-special-characters-sink-${testdrive.seed}')
  108. FORMAT JSON
  109. ENVELOPE DEBEZIUM
  110. $ kafka-verify-data format=json sink=materialize.public.special_characters_sink key=false
  111. {"before":null,"after":{"c1":"текст","c2":"\"","c3":"'","c4":"\\","c5":"a\n\tb"}}
  112. # Record
  113. > CREATE MATERIALIZED VIEW record_view AS SELECT simple_view FROM simple_view;
  114. > CREATE CLUSTER record_sink_cluster SIZE '${arg.default-storage-size}';
  115. > CREATE SINK record_sink
  116. IN CLUSTER record_sink_cluster
  117. FROM record_view
  118. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-record-sink-${testdrive.seed}')
  119. FORMAT JSON
  120. ENVELOPE DEBEZIUM
  121. $ kafka-verify-data format=json sink=materialize.public.record_sink key=false
  122. {"before":null,"after":{"simple_view":{"a":1,"b":2,"c":3}}}
  123. # Duplicate column names
  124. ! CREATE VIEW duplicate_cols AS SELECT 'a1' AS a, 'a1' AS a;
  125. contains:column "a" specified more than once
  126. ! CREATE MATERIALIZED VIEW duplicate_cols AS SELECT 'a1' AS a, 'a1' AS a;
  127. contains:column "a" specified more than once
  128. # Complex types
  129. > CREATE TYPE int4_list AS LIST (ELEMENT TYPE = int4);
  130. > CREATE TYPE int4_list_list AS LIST (ELEMENT TYPE = int4_list);
  131. > CREATE TYPE int4_map AS MAP (KEY TYPE = text, VALUE TYPE = int4);
  132. > CREATE TYPE int4_map_map AS MAP (KEY TYPE = text, VALUE TYPE = int4_map);
  133. # We do this dance here to work around database-issues#7544
  134. $ set-from-sql var=int4_id
  135. SELECT id FROM mz_objects WHERE name = '_int4';
  136. $ set-from-sql var=text_id
  137. SELECT id FROM mz_objects WHERE name = '_text';
  138. > CREATE MATERIALIZED VIEW complex_type_view AS
  139. SELECT
  140. '{{1,2},{3,4}}'::int4_list_list c1,
  141. '{a=>{b=>1, c=>2}, d=> {e=>3, f=>4}}'::int4_map_map c2,
  142. ARRAY[ARRAY[1, 2], ARRAY[3, 4], ARRAY[5, 6]]::[${int4_id} AS "pg_catalog"."_int4"] c3,
  143. ARRAY[]::[${int4_id} AS "pg_catalog"."_int4"] c4,
  144. ARRAY[ARRAY[ARRAY['a'], ARRAY['b']], ARRAY[ARRAY['c'], ARRAY['d']]]::[${text_id} AS "pg_catalog"."_text"] c5;
  145. > CREATE CLUSTER complex_type_sink_cluster SIZE '${arg.default-storage-size}';
  146. > CREATE SINK complex_type_sink
  147. IN CLUSTER complex_type_sink_cluster
  148. FROM complex_type_view
  149. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-complex-type-sink-${testdrive.seed}')
  150. FORMAT JSON
  151. ENVELOPE DEBEZIUM
  152. $ kafka-verify-data format=json sink=materialize.public.complex_type_sink key=false
  153. {"before": null, "after": {"c1": [[1,2],[3,4]], "c2": {"a":{"b":1, "c":2}, "d": {"e":3, "f":4}}, "c3": [[1, 2], [3, 4], [5,6]], "c4": [], "c5": [[["a"], ["b"]], [["c"], ["d"]]]}}
  154. # testdrive will not automatically clean up types, so we need to do that ourselves
  155. > DROP MATERIALIZED VIEW complex_type_view CASCADE;
  156. > DROP TYPE int4_list_list;
  157. > DROP TYPE int4_list;
  158. > DROP TYPE int4_map_map;
  159. > DROP TYPE int4_map;