continual-tasks.td 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ kafka-create-topic topic=append_only partitions=1
  11. > CREATE CONNECTION kafka_conn
  12. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  13. > CREATE CLUSTER ct_cluster SIZE '${arg.default-storage-size}';
  14. > CREATE SOURCE append_only
  15. IN CLUSTER ct_cluster
  16. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-append_only-${testdrive.seed}')
  17. > CREATE TABLE append_only_tbl FROM SOURCE append_only (REFERENCE "testdrive-append_only-${testdrive.seed}")
  18. KEY FORMAT TEXT
  19. VALUE FORMAT TEXT
  20. INCLUDE KEY
  21. ENVELOPE NONE
  22. $ kafka-ingest topic=append_only format=bytes key-format=bytes key-terminator=:
  23. x:0
  24. # Wait for the ingestion to have completed before creating the CT, we want the x:0
  25. # value to be in the continual task, since it only starts after it is ingested already.
  26. > SELECT * FROM append_only_tbl
  27. x 0
  28. > CREATE CONTINUAL TASK ct_max FROM TRANSFORM append_only_tbl USING
  29. (SELECT max(text::int) FROM append_only_tbl)
  30. $ kafka-ingest topic=append_only format=bytes key-format=bytes key-terminator=:
  31. a:1
  32. > SELECT * FROM ct_max
  33. 0
  34. 1
  35. $ kafka-ingest topic=append_only format=bytes key-format=bytes key-terminator=:
  36. b:2
  37. > SELECT * FROM ct_max
  38. 0
  39. 1
  40. 2
  41. $ kafka-ingest topic=append_only format=bytes key-format=bytes key-terminator=:
  42. c:3
  43. > SELECT * FROM ct_max
  44. 0
  45. 1
  46. 2
  47. 3
  48. $ kafka-ingest topic=append_only format=bytes key-format=bytes key-terminator=:
  49. d:1
  50. > SELECT * FROM ct_max
  51. 0
  52. 1
  53. 2
  54. 3