timestamps-kafka-avro-multi.td 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ set-sql-timeout duration=60s
  11. $ set schema={
  12. "type": "record",
  13. "name": "envelope",
  14. "fields": [
  15. {"name": "a", "type": "long"},
  16. {"name": "b", "type": "long"}
  17. ]
  18. }
  19. $ kafka-create-topic topic=data partitions=2
  20. $ kafka-create-topic topic=data2 partitions=2
  21. > CREATE CONNECTION kafka_conn
  22. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  23. > CREATE SOURCE data_empty
  24. IN CLUSTER ${arg.single-replica-cluster}
  25. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data2-${testdrive.seed}')
  26. FORMAT AVRO USING SCHEMA '${schema}'
  27. > CREATE SOURCE data_rt
  28. IN CLUSTER ${arg.single-replica-cluster}
  29. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  30. FORMAT AVRO USING SCHEMA '${schema}'
  31. > CREATE MATERIALIZED VIEW view_rt AS SELECT b, sum(a) FROM data_rt GROUP BY b
  32. > CREATE MATERIALIZED VIEW view_empty AS SELECT b, sum(a) FROM data_empty GROUP BY b
  33. > SELECT * FROM view_empty;
  34. b sum
  35. -----
  36. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  37. {"a": 1, "b": 1}
  38. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  39. {"a": 3, "b": 1}
  40. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  41. {"a": 2, "b": 1}
  42. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  43. {"a": 1, "b": 2}
  44. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  45. {"a": 1, "b": 3}
  46. {"a": 1, "b": 3}
  47. {"a": 1, "b": 3}
  48. > SELECT * FROM view_rt
  49. b sum
  50. ------
  51. 1 6
  52. 2 1
  53. 3 3
  54. $ kafka-add-partitions topic=data total-partitions=3
  55. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  56. {"a": 1, "b": 5}
  57. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  58. {"a": 1, "b": 6}
  59. $ kafka-ingest partition=2 format=avro topic=data schema=${schema} timestamp=1
  60. {"a": 1, "b": 7}
  61. $ kafka-add-partitions topic=data total-partitions=4
  62. $ kafka-ingest partition=2 format=avro topic=data schema=${schema} timestamp=1
  63. {"a": 1, "b": 8}
  64. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  65. {"a": 1, "b": 9}
  66. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  67. {"a": 1, "b": 10}
  68. $ kafka-ingest partition=3 format=avro topic=data schema=${schema} timestamp=1
  69. {"a": 1, "b": 11}
  70. > SELECT * FROM view_rt
  71. b sum
  72. ------
  73. 1 6
  74. 2 1
  75. 3 3
  76. 5 1
  77. 6 1
  78. 7 1
  79. 8 1
  80. 9 1
  81. 10 1
  82. 11 1