canary-kafka-sources.td 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # The free tier of Confluent Cloud does not alow for programatic
  10. # topic creation
  11. # $ kafka-create-topic topic=bytes
  12. # TODO(def-) Reenable when Confluent Kafka is working with Testdrive again
  13. $ skip-if
  14. SELECT true
  15. $ kafka-ingest format=bytes key-terminator=: key-format=bytes topic=bytes repeat=100
  16. abc:abc
  17. > DROP SOURCE IF EXISTS kafka_bytes CASCADE;
  18. > DROP CONNECTION IF EXISTS kafka_conn;
  19. > DROP SECRET IF EXISTS confluent_username
  20. > DROP SECRET IF EXISTS confluent_password
  21. > CREATE SECRET confluent_username AS '${arg.confluent-api-key}';
  22. > CREATE SECRET confluent_password AS '${arg.confluent-api-secret}';
  23. > CREATE CONNECTION kafka_conn TO KAFKA (
  24. BROKER '${testdrive.kafka-addr}',
  25. SASL MECHANISMS = 'PLAIN',
  26. SASL USERNAME = SECRET confluent_username,
  27. SASL PASSWORD = SECRET confluent_password
  28. );
  29. > CREATE SOURCE kafka_bytes
  30. IN CLUSTER canary_sources
  31. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bytes-${testdrive.seed}');
  32. > CREATE TABLE kafka_bytes_tbl FROM SOURCE kafka_bytes (REFERENCE "testdrive-bytes-${testdrive.seed}")
  33. FORMAT BYTES
  34. ENVELOPE NONE;
  35. > CREATE MATERIALIZED VIEW kafka_bytes_view AS SELECT COUNT(*) AS cnt FROM kafka_bytes_tbl;
  36. > CREATE DEFAULT INDEX ON kafka_bytes_view;
  37. > SELECT cnt > 0 from kafka_bytes_view
  38. true