protobuf-import.td 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test that Protobuf imports are handled correctly.
  11. $ set empty-schema
  12. syntax = "proto3";
  13. $ set importee-schema
  14. syntax = "proto3";
  15. import "google/protobuf/timestamp.proto";
  16. message Importee1 {
  17. bool b = 1;
  18. }
  19. message Importee2 {
  20. google.protobuf.Timestamp ts = 3;
  21. }
  22. $ set importer-schema
  23. syntax = "proto3";
  24. import "empty.proto";
  25. import "importee.proto";
  26. message Importer {
  27. Importee1 importee1 = 1;
  28. Importee2 importee2 = 2;
  29. }
  30. # First, test without the schema registry.
  31. $ file-append path=empty.proto
  32. \${empty-schema}
  33. $ file-append path=importee.proto
  34. \${importee-schema}
  35. $ file-append path=importer.proto
  36. \${importer-schema}
  37. $ protobuf-compile-descriptors inputs=empty.proto,importee.proto,importer.proto output=import.pb set-var=import-schema
  38. $ kafka-create-topic topic=import partitions=1
  39. $ kafka-ingest topic=import format=protobuf descriptor-file=import.pb message=Importer
  40. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  41. > CREATE CONNECTION kafka_conn
  42. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  43. > CREATE SOURCE import
  44. IN CLUSTER ${arg.single-replica-cluster}
  45. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-import-${testdrive.seed}')
  46. > CREATE TABLE import_tbl FROM SOURCE import (REFERENCE "testdrive-import-${testdrive.seed}")
  47. FORMAT PROTOBUF MESSAGE '.Importer' USING SCHEMA '${import-schema}'
  48. > SELECT importee1::text, importee2::text FROM import_tbl
  49. importee1 importee2
  50. ------------------------------
  51. (f) "(\"(1234,5678)\")"
  52. # Then, test again with the Confluent Schema Registry. Publishing Protobuf
  53. # schemas to the CSR is tricky, because each Protobuf file needs to go into its
  54. # own subject. This is handled automatically by the Java Kafka client, but it is
  55. # complicated to duplicate that automatic logic in Testdrive. So we publish
  56. # the schemas manually here, but keep the schema very simple to reduce the pain.
  57. $ kafka-create-topic topic=import-csr partitions=1
  58. # The Confluent toolchain publishes even schemas for well-known types, so we
  59. # have to do the same.
  60. # See: https://github.com/protocolbuffers/protobuf/blob/61e0395c89fe520ae7569aea6838313195e05ec5/src/google/protobuf/timestamp.proto
  61. $ schema-registry-publish subject=google/protobuf/timestamp.proto schema-type=protobuf
  62. syntax = "proto3";
  63. package google.protobuf;
  64. message Timestamp {
  65. int64 seconds = 1;
  66. int32 nanos = 2;
  67. }
  68. $ schema-registry-publish subject=empty.proto schema-type=protobuf
  69. \${empty-schema}
  70. $ schema-registry-publish subject=importee.proto schema-type=protobuf references=google/protobuf/timestamp.proto
  71. \${importee-schema}
  72. $ schema-registry-publish subject=testdrive-import-csr-${testdrive.seed}-value schema-type=protobuf references=empty.proto,importee.proto
  73. \${importer-schema}
  74. $ kafka-ingest topic=import-csr format=protobuf descriptor-file=import.pb message=Importer confluent-wire-format=true
  75. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  76. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  77. URL '${testdrive.schema-registry-url}'
  78. );
  79. > CREATE SOURCE import_csr
  80. IN CLUSTER ${arg.single-replica-cluster}
  81. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-import-csr-${testdrive.seed}')
  82. > CREATE TABLE import_csr_tbl FROM SOURCE import_csr (REFERENCE "testdrive-import-csr-${testdrive.seed}")
  83. FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  84. > SELECT importee1::text, importee2::text FROM import_csr_tbl
  85. importee1 importee2
  86. -------------------------------
  87. (f) "(\"(1234,5678)\")"
  88. # Test that non-zero message IDs in the Confluent wire format are rejected.
  89. $ kafka-ingest topic=import-csr format=protobuf descriptor-file=import.pb message=Importer confluent-wire-format=true schema-message-id=123
  90. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  91. ! SELECT importee1::text, importee2::text FROM import_csr_tbl
  92. contains:Decode error: protobuf deserialization error: unsupported Confluent-style protobuf message descriptor id: expected 0, but found: 123