timestamps-kafka-avro-multi.td 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ set-sql-timeout duration=60s
  11. $ set schema={
  12. "type": "record",
  13. "name": "envelope",
  14. "fields": [
  15. {"name": "a", "type": "long"},
  16. {"name": "b", "type": "long"}
  17. ]
  18. }
  19. $ kafka-create-topic topic=data partitions=2
  20. $ kafka-create-topic topic=data2 partitions=2
  21. > CREATE CONNECTION kafka_conn
  22. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  23. > CREATE SOURCE data_empty
  24. IN CLUSTER ${arg.single-replica-cluster}
  25. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data2-${testdrive.seed}')
  26. > CREATE TABLE data_empty_tbl FROM SOURCE data_empty (REFERENCE "testdrive-data2-${testdrive.seed}")
  27. FORMAT AVRO USING SCHEMA '${schema}'
  28. > CREATE SOURCE data_rt
  29. IN CLUSTER ${arg.single-replica-cluster}
  30. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  31. > CREATE TABLE data_rt_tbl FROM SOURCE data_rt (REFERENCE "testdrive-data-${testdrive.seed}")
  32. FORMAT AVRO USING SCHEMA '${schema}'
  33. > CREATE MATERIALIZED VIEW view_rt AS SELECT b, sum(a) FROM data_rt_tbl GROUP BY b
  34. > CREATE MATERIALIZED VIEW view_empty AS SELECT b, sum(a) FROM data_empty_tbl GROUP BY b
  35. > SELECT * FROM view_empty;
  36. b sum
  37. -----
  38. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  39. {"a": 1, "b": 1}
  40. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  41. {"a": 3, "b": 1}
  42. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  43. {"a": 2, "b": 1}
  44. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  45. {"a": 1, "b": 2}
  46. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  47. {"a": 1, "b": 3}
  48. {"a": 1, "b": 3}
  49. {"a": 1, "b": 3}
  50. > SELECT * FROM view_rt
  51. b sum
  52. ------
  53. 1 6
  54. 2 1
  55. 3 3
  56. $ kafka-add-partitions topic=data total-partitions=3
  57. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  58. {"a": 1, "b": 5}
  59. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  60. {"a": 1, "b": 6}
  61. $ kafka-ingest partition=2 format=avro topic=data schema=${schema} timestamp=1
  62. {"a": 1, "b": 7}
  63. $ kafka-add-partitions topic=data total-partitions=4
  64. $ kafka-ingest partition=2 format=avro topic=data schema=${schema} timestamp=1
  65. {"a": 1, "b": 8}
  66. $ kafka-ingest partition=0 format=avro topic=data schema=${schema} timestamp=1
  67. {"a": 1, "b": 9}
  68. $ kafka-ingest partition=1 format=avro topic=data schema=${schema} timestamp=1
  69. {"a": 1, "b": 10}
  70. $ kafka-ingest partition=3 format=avro topic=data schema=${schema} timestamp=1
  71. {"a": 1, "b": 11}
  72. > SELECT * FROM view_rt
  73. b sum
  74. ------
  75. 1 6
  76. 2 1
  77. 3 3
  78. 5 1
  79. 6 1
  80. 7 1
  81. 8 1
  82. 9 1
  83. 10 1
  84. 11 1