protobuf-import.td 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test that Protobuf imports are handled correctly.
  11. $ set empty-schema
  12. syntax = "proto3";
  13. $ set importee-schema
  14. syntax = "proto3";
  15. import "google/protobuf/timestamp.proto";
  16. message Importee1 {
  17. bool b = 1;
  18. }
  19. message Importee2 {
  20. google.protobuf.Timestamp ts = 3;
  21. }
  22. $ set importer-schema
  23. syntax = "proto3";
  24. import "empty.proto";
  25. import "importee.proto";
  26. message Importer {
  27. Importee1 importee1 = 1;
  28. Importee2 importee2 = 2;
  29. }
  30. # First, test without the schema registry.
  31. $ file-append path=empty.proto
  32. \${empty-schema}
  33. $ file-append path=importee.proto
  34. \${importee-schema}
  35. $ file-append path=importer.proto
  36. \${importer-schema}
  37. $ protobuf-compile-descriptors inputs=empty.proto,importee.proto,importer.proto output=import.pb set-var=import-schema
  38. $ kafka-create-topic topic=import partitions=1
  39. $ kafka-ingest topic=import format=protobuf descriptor-file=import.pb message=Importer
  40. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  41. > CREATE CONNECTION kafka_conn
  42. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  43. > CREATE SOURCE import
  44. IN CLUSTER ${arg.single-replica-cluster}
  45. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-import-${testdrive.seed}')
  46. FORMAT PROTOBUF MESSAGE '.Importer' USING SCHEMA '${import-schema}'
  47. > SELECT importee1::text, importee2::text FROM import
  48. importee1 importee2
  49. ------------------------------
  50. (f) "(\"(1234,5678)\")"
  51. # Then, test again with the Confluent Schema Registry. Publishing Protobuf
  52. # schemas to the CSR is tricky, because each Protobuf file needs to go into its
  53. # own subject. This is handled automatically by the Java Kafka client, but it is
  54. # complicated to duplicate that automatic logic in Testdrive. So we publish
  55. # the schemas manually here, but keep the schema very simple to reduce the pain.
  56. $ kafka-create-topic topic=import-csr partitions=1
  57. # The Confluent toolchain publishes even schemas for well-known types, so we
  58. # have to do the same.
  59. # See: https://github.com/protocolbuffers/protobuf/blob/61e0395c89fe520ae7569aea6838313195e05ec5/src/google/protobuf/timestamp.proto
  60. $ schema-registry-publish subject=google/protobuf/timestamp.proto schema-type=protobuf
  61. syntax = "proto3";
  62. package google.protobuf;
  63. message Timestamp {
  64. int64 seconds = 1;
  65. int32 nanos = 2;
  66. }
  67. $ schema-registry-publish subject=empty.proto schema-type=protobuf
  68. \${empty-schema}
  69. $ schema-registry-publish subject=importee.proto schema-type=protobuf references=google/protobuf/timestamp.proto
  70. \${importee-schema}
  71. $ schema-registry-publish subject=testdrive-import-csr-${testdrive.seed}-value schema-type=protobuf references=empty.proto,importee.proto
  72. \${importer-schema}
  73. $ kafka-ingest topic=import-csr format=protobuf descriptor-file=import.pb message=Importer confluent-wire-format=true
  74. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  75. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  76. URL '${testdrive.schema-registry-url}'
  77. );
  78. > CREATE SOURCE import_csr
  79. IN CLUSTER ${arg.single-replica-cluster}
  80. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-import-csr-${testdrive.seed}')
  81. FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  82. > SELECT importee1::text, importee2::text FROM import_csr
  83. importee1 importee2
  84. -------------------------------
  85. (f) "(\"(1234,5678)\")"
  86. # Test that non-zero message IDs in the Confluent wire format are rejected.
  87. $ kafka-ingest topic=import-csr format=protobuf descriptor-file=import.pb message=Importer confluent-wire-format=true schema-message-id=123
  88. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  89. ! SELECT importee1::text, importee2::text FROM import_csr
  90. contains:Decode error: protobuf deserialization error: unsupported Confluent-style protobuf message descriptor id: expected 0, but found: 123