protobuf-evolution.td 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test that Protobuf message evolution is handled sensibly.
  11. # Ingest one message with a very simple schema.
  12. $ set schema
  13. syntax = "proto3";
  14. message Message {
  15. int32 i = 1;
  16. }
  17. $ file-append path=evolution.proto
  18. \${schema}
  19. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  20. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  21. \${schema}
  22. $ kafka-create-topic topic=evolution partitions=1
  23. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  24. URL '${testdrive.schema-registry-url}'
  25. );
  26. > CREATE CONNECTION kafka_conn
  27. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  28. > CREATE SOURCE evolution
  29. IN CLUSTER ${arg.single-replica-cluster}
  30. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-evolution-${testdrive.seed}')
  31. FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  32. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  33. {"i": 1}
  34. > SELECT * FROM evolution
  35. i
  36. ---
  37. 1
  38. # Adding new fields and renaming existing fields is safe. New fields won't be
  39. # reflected in the source, though.
  40. $ set schema
  41. syntax = "proto3";
  42. message Message {
  43. int32 i_renamed = 1;
  44. bool extra = 2;
  45. }
  46. $ file-delete path=evolution.proto
  47. $ file-append path=evolution.proto
  48. \${schema}
  49. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  50. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  51. \${schema}
  52. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  53. {"i_renamed": 2, "extra": false}
  54. > SELECT * FROM evolution
  55. i
  56. ---
  57. 1
  58. 2
  59. # Changing the size of an integer type is allowed; values are truncated/promoted
  60. # as in C.
  61. $ set schema
  62. syntax = "proto3";
  63. message Message {
  64. bool i_demoted = 1;
  65. }
  66. $ file-delete path=evolution.proto
  67. $ file-append path=evolution.proto
  68. \${schema}
  69. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  70. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  71. \${schema}
  72. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  73. {"i_demoted": false}
  74. > SELECT * FROM evolution
  75. i
  76. ---
  77. 1
  78. 2
  79. 0
  80. # Expect that ingesting an incompatible message will brick the topic. We have to
  81. # explicitly disable the schema registry's protection against this.
  82. $ http-request method=PUT content-type=application/json
  83. url=${testdrive.schema-registry-url}config/testdrive-evolution-${testdrive.seed}-value
  84. {"compatibility": "NONE"}
  85. $ set schema
  86. syntax = "proto3";
  87. message Message {
  88. string wrong = 1;
  89. }
  90. $ file-delete path=evolution.proto
  91. $ file-append path=evolution.proto
  92. \${schema}
  93. $ protobuf-compile-descriptors inputs=evolution.proto output=evolution.pb set-var=evolution-schema
  94. $ schema-registry-publish subject=testdrive-evolution-${testdrive.seed}-value schema-type=protobuf
  95. \${schema}
  96. $ kafka-ingest topic=evolution format=protobuf descriptor-file=evolution.pb message=Message confluent-wire-format=true
  97. {"wrong": "i'm not an int!"}
  98. ! SELECT * FROM evolution
  99. contains:Decode error: protobuf deserialization error: failed to decode Protobuf message: invalid wire type: LengthDelimited (expected Varint)