02-source.td 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ kafka-create-topic topic=failpoint
  10. > CREATE CONNECTION conn
  11. FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT
  12. > CREATE SOURCE failpoint
  13. IN CLUSTER storage_cluster
  14. FROM KAFKA CONNECTION conn (TOPIC
  15. 'testdrive-failpoint-${testdrive.seed}'
  16. )
  17. > CREATE TABLE failpoint_tbl FROM SOURCE failpoint (REFERENCE "testdrive-failpoint-${testdrive.seed}")
  18. KEY FORMAT TEXT VALUE FORMAT TEXT
  19. ENVELOPE UPSERT
  20. $ kafka-ingest format=bytes topic=failpoint key-format=bytes key-terminator=:
  21. fish:fish
  22. bird:goose
  23. > SELECT * from failpoint_tbl
  24. key text
  25. -------------------
  26. fish fish
  27. bird goose
  28. > select st.name, st.type, st.status, st.error
  29. from mz_internal.mz_source_statuses st
  30. join mz_sources s on s.id = st.id
  31. where s.name = 'failpoint';
  32. failpoint kafka running <null>