protobuf-evolution.td 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test that Protobuf message evolution is handled sensibly.
  11. # Ingest one message with a very simple schema.
  12. $ set schema
  13. syntax = "proto3";
  14. message Message {
  15. int32 i = 1;
  16. }
  17. $ file-append path=evolution.proto
  18. \${schema}
  19. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  20. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  21. \${schema}
  22. $ kafka-create-topic topic=evolution partitions=1
  23. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  24. URL '${testdrive.schema-registry-url}'
  25. );
  26. > CREATE CONNECTION kafka_conn
  27. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  28. > CREATE SOURCE evolution
  29. IN CLUSTER ${arg.single-replica-cluster}
  30. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-evolution-${testdrive.seed}')
  31. > CREATE TABLE evolution_tbl FROM SOURCE evolution (REFERENCE "testdrive-evolution-${testdrive.seed}")
  32. FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  33. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  34. {"i": 1}
  35. > SELECT * FROM evolution_tbl
  36. i
  37. ---
  38. 1
  39. # Adding new fields and renaming existing fields is safe. New fields won't be
  40. # reflected in the source, though.
  41. $ set schema
  42. syntax = "proto3";
  43. message Message {
  44. int32 i_renamed = 1;
  45. bool extra = 2;
  46. }
  47. $ file-delete path=evolution.proto
  48. $ file-append path=evolution.proto
  49. \${schema}
  50. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  51. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  52. \${schema}
  53. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  54. {"i_renamed": 2, "extra": false}
  55. > SELECT * FROM evolution_tbl
  56. i
  57. ---
  58. 1
  59. 2
  60. # Changing the size of an integer type is allowed; values are truncated/promoted
  61. # as in C.
  62. $ set schema
  63. syntax = "proto3";
  64. message Message {
  65. bool i_demoted = 1;
  66. }
  67. $ file-delete path=evolution.proto
  68. $ file-append path=evolution.proto
  69. \${schema}
  70. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  71. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  72. \${schema}
  73. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  74. {"i_demoted": false}
  75. > SELECT * FROM evolution_tbl
  76. i
  77. ---
  78. 1
  79. 2
  80. 0
  81. # Expect that ingesting an incompatible message will brick the topic. We have to
  82. # explicitly disable the schema registry's protection against this.
  83. $ http-request method=PUT content-type=application/json
  84. url=${testdrive.schema-registry-url}config/testdrive-evolution-${testdrive.seed}-value
  85. {"compatibility": "NONE"}
  86. $ set schema
  87. syntax = "proto3";
  88. message Message {
  89. string wrong = 1;
  90. }
  91. $ file-delete path=evolution.proto
  92. $ file-append path=evolution.proto
  93. \${schema}
  94. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  95. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  96. \${schema}
  97. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  98. {"wrong": "i'm not an int!"}
  99. ! SELECT * FROM evolution_tbl
  100. contains:Decode error: protobuf deserialization error: failed to decode Protobuf message: invalid wire type: LengthDelimited (expected Varint)