kafka-sink-after-ssh-failure.td 1.2 KB

1234567891011121314151617181920212223242526272829303132
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # This test script runs after the SSH bastion host has been terminated.
  10. $ set-sql-timeout duration=300s
  11. # Ensure they all are marked as broken for ssh reasons. Sinks continuously restart, so we
  12. # instead check the history
  13. > SELECT s.name, count(*) > 0 FROM mz_internal.mz_sink_status_history st
  14. JOIN mz_sinks s ON st.sink_id = s.id
  15. WHERE error LIKE 'ssh:%'
  16. AND s.name in ('sink_fixed', 'sink_dynamic')
  17. GROUP BY s.name
  18. sink_fixed true
  19. sink_dynamic true
  20. $ kafka-ingest topic=thetopic format=bytes
  21. one
  22. # Ensure we maintain statistics even if the sink is broken.
  23. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  24. FROM mz_sinks s
  25. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  26. WHERE s.name IN ('sink_fixed')
  27. GROUP BY s.name
  28. sink_fixed 1 1 true true