kafka-sink.td 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET kafka_transaction_timeout = '60s'
  11. # Test creating a Kafka sink using ssh.
  12. $ kafka-create-topic topic=thetopic
  13. $ kafka-ingest topic=thetopic format=bytes
  14. one
  15. > DROP CLUSTER IF EXISTS sc;
  16. > CREATE CLUSTER sc REPLICAS (r1 (SIZE '32'))
  17. # Unfortunately, currently we need a source we can add stuff to (so kafka), but one that
  18. # isn't also broken by ssh (so we can ensure we are testing sinks specifically). This is because
  19. # sinks sometimes require new values to actually fall over. In the future, when sinks
  20. # dynamically check the connection status, we can use a simple `SELECT 1` MV here.
  21. > CREATE CONNECTION kafka_conn_non_ssh
  22. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  23. > CREATE SOURCE non_ssh IN CLUSTER sc
  24. FROM KAFKA CONNECTION kafka_conn_non_ssh (TOPIC 'testdrive-thetopic-${testdrive.seed}')
  25. > CREATE TABLE non_ssh_tbl FROM SOURCE non_ssh (REFERENCE "testdrive-thetopic-${testdrive.seed}")
  26. FORMAT TEXT
  27. ENVELOPE NONE
  28. > CREATE CONNECTION kafka_conn_using
  29. TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL thancred, SECURITY PROTOCOL PLAINTEXT);
  30. > CREATE CONNECTION kafka_conn_dynamic
  31. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, SSH TUNNEL thancred);
  32. > CREATE SINK sink_fixed
  33. IN CLUSTER sc
  34. FROM non_ssh_tbl
  35. INTO KAFKA CONNECTION kafka_conn_using (TOPIC 'sink_fixed-${testdrive.seed}')
  36. FORMAT JSON ENVELOPE DEBEZIUM
  37. > CREATE SINK sink_dynamic
  38. IN CLUSTER sc
  39. FROM non_ssh_tbl
  40. INTO KAFKA CONNECTION kafka_conn_dynamic (TOPIC 'sink_dynamic-${testdrive.seed}')
  41. FORMAT JSON ENVELOPE DEBEZIUM
  42. $ kafka-verify-data format=json sink=materialize.public.sink_fixed key=false sort-messages=true
  43. {"before": null, "after": {"text": "one"}}
  44. $ kafka-verify-data format=json sink=materialize.public.sink_dynamic key=false sort-messages=true
  45. {"before": null, "after": {"text": "one"}}
  46. # ensure they all were marked as running correctly
  47. > SELECT status FROM mz_internal.mz_sink_statuses st
  48. JOIN mz_sinks s ON st.id = s.id
  49. WHERE s.name in ('sink_fixed', 'sink_dynamic')
  50. running
  51. running
  52. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  53. FROM mz_sinks s
  54. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  55. WHERE s.name IN ('sink_fixed')
  56. GROUP BY s.name
  57. sink_fixed 1 1 true true