upsert-modification-before.td 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # Ingest records before restart and then upsert them to a different value post-restart
  11. #
  12. $ set keyschema={
  13. "type": "record",
  14. "name": "Key",
  15. "fields": [
  16. {"name": "f1", "type": "long"}
  17. ]
  18. }
  19. $ set schema={
  20. "type" : "record",
  21. "name" : "test",
  22. "fields" : [
  23. {"name":"f2", "type":"string"}
  24. ]
  25. }
  26. $ kafka-create-topic topic=upsert-modification
  27. $ kafka-ingest format=avro topic=upsert-modification key-format=avro key-schema=${keyschema} schema=${schema} repeat=10000
  28. {"f1": ${kafka-ingest.iteration}} {"f2": "${kafka-ingest.iteration}"}
  29. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  30. URL '${testdrive.schema-registry-url}'
  31. );
  32. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  33. > CREATE SOURCE upsert_modification
  34. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-modification-${testdrive.seed}')
  35. > CREATE TABLE upsert_modification_tbl FROM SOURCE upsert_modification (REFERENCE "testdrive-upsert-modification-${testdrive.seed}")
  36. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  37. ENVELOPE UPSERT
  38. > SELECT COUNT(*) FROM upsert_modification_tbl;
  39. 10000
  40. $ kafka-create-topic topic=textbytes
  41. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  42. fish:fish
  43. bìrd1:goose
  44. bírdmore:geese
  45. mammal1:moose
  46. bìrd1:
  47. > CREATE SOURCE texttext
  48. FROM KAFKA CONNECTION kafka_conn (TOPIC
  49. 'testdrive-textbytes-${testdrive.seed}')
  50. > CREATE TABLE texttext_tbl FROM SOURCE texttext (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  51. KEY FORMAT TEXT VALUE FORMAT TEXT
  52. INCLUDE PARTITION AS kafka_partition, OFFSET AS mz_offset
  53. ENVELOPE UPSERT
  54. > CREATE SOURCE textbytes
  55. FROM KAFKA CONNECTION kafka_conn (TOPIC
  56. 'testdrive-textbytes-${testdrive.seed}')
  57. > CREATE TABLE textbytes_tbl FROM SOURCE textbytes (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  58. KEY FORMAT TEXT VALUE FORMAT BYTES
  59. INCLUDE PARTITION AS kafka_partition, OFFSET AS mz_offset
  60. ENVELOPE UPSERT
  61. > CREATE SOURCE bytesbytes
  62. FROM KAFKA CONNECTION kafka_conn (TOPIC
  63. 'testdrive-textbytes-${testdrive.seed}')
  64. > CREATE TABLE bytesbytes_tbl FROM SOURCE bytesbytes (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  65. KEY FORMAT BYTES VALUE FORMAT BYTES
  66. INCLUDE PARTITION AS kafka_partition, OFFSET AS mz_offset
  67. ENVELOPE UPSERT
  68. > CREATE SOURCE bytestext
  69. FROM KAFKA CONNECTION kafka_conn (TOPIC
  70. 'testdrive-textbytes-${testdrive.seed}')
  71. > CREATE TABLE bytestext_tbl FROM SOURCE bytestext (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  72. KEY FORMAT BYTES VALUE FORMAT TEXT
  73. INCLUDE PARTITION AS kafka_partition, OFFSET AS mz_offset
  74. ENVELOPE UPSERT
  75. > select * from texttext_tbl
  76. key text kafka_partition mz_offset
  77. ----------------------------------------------
  78. fish fish 0 0
  79. bírdmore geese 0 2
  80. mammal1 moose 0 3
  81. > select * from textbytes_tbl
  82. key data kafka_partition mz_offset
  83. ----------------------------------------------
  84. fish fish 0 0
  85. bírdmore geese 0 2
  86. mammal1 moose 0 3
  87. > select * from bytestext_tbl
  88. key text kafka_partition mz_offset
  89. ------------------------------------------------
  90. fish fish 0 0
  91. b\xc3\xadrdmore geese 0 2
  92. mammal1 moose 0 3
  93. > select * from bytesbytes_tbl
  94. key data kafka_partition mz_offset
  95. ------------------------------------------------
  96. fish fish 0 0
  97. b\xc3\xadrdmore geese 0 2
  98. mammal1 moose 0 3