kafka-sink-topic-config.td 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. #
  11. # Test topic configuration options at Kafka Sink CREATE time
  12. #
  13. > CREATE MATERIALIZED VIEW v1 (f1) AS VALUES (1);
  14. > CREATE CONNECTION kafka_conn
  15. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  16. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  17. URL '${testdrive.schema-registry-url}'
  18. );
  19. > CREATE SINK topic_replication
  20. IN CLUSTER ${arg.single-replica-cluster}
  21. FROM v1
  22. INTO KAFKA CONNECTION kafka_conn (
  23. TOPIC 'testdrive-kafka-replication-${testdrive.seed}',
  24. TOPIC REPLICATION FACTOR 1
  25. )
  26. KEY(f1)
  27. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  28. ENVELOPE DEBEZIUM
  29. $ kafka-verify-topic sink=materialize.public.topic_replication topic-config={} replication-factor=1
  30. > CREATE SINK topic_partition
  31. IN CLUSTER ${arg.single-replica-cluster}
  32. FROM v1
  33. INTO KAFKA CONNECTION kafka_conn (
  34. TOPIC 'testdrive-kafka-partition-${testdrive.seed}',
  35. TOPIC PARTITION COUNT 3
  36. )
  37. KEY(f1)
  38. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  39. ENVELOPE DEBEZIUM
  40. $ kafka-verify-topic sink=materialize.public.topic_partition partition-count=3
  41. > CREATE SINK topic_config
  42. IN CLUSTER ${arg.single-replica-cluster}
  43. FROM v1
  44. INTO KAFKA CONNECTION kafka_conn (
  45. TOPIC 'testdrive-kafka-config-${testdrive.seed}',
  46. TOPIC CONFIG MAP['cleanup.policy' => 'compact']
  47. )
  48. KEY(f1)
  49. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  50. ENVELOPE DEBEZIUM
  51. $ kafka-verify-topic sink=materialize.public.topic_config partition-count=1 topic-config={"cleanup.policy": "compact"}
  52. # The config map contains unknown config names, but the CREATE SINK currently still succeeds
  53. > CREATE SINK topic_config_unknown
  54. IN CLUSTER ${arg.single-replica-cluster}
  55. FROM v1
  56. INTO KAFKA CONNECTION kafka_conn (
  57. TOPIC 'testdrive-kafka-config-unknown-${testdrive.seed}',
  58. TOPIC CONFIG MAP['abc' => 'def', 'ghi''' => 'jkl''']
  59. )
  60. KEY(f1)
  61. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  62. ENVELOPE DEBEZIUM
  63. $ skip-if
  64. SELECT '${arg.uses-redpanda}'::BOOL
  65. > SELECT status, error FROM mz_internal.mz_sink_statuses WHERE name = 'topic_config_unknown';
  66. stalled "kafka: Error creating topic testdrive-kafka-config-unknown-${testdrive.seed} for sink: Admin operation error: InvalidConfig (Broker: Configuration is invalid)"
  67. # Test whether MZ can alter the progress topic configuration when the relevant option is enabled
  68. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  69. ALTER SYSTEM SET storage_sink_ensure_topic_config = 'alter'
  70. $ kafka-create-topic topic=kafka-progress partitions=1 compaction=false
  71. $ kafka-verify-topic topic=testdrive-kafka-progress-${testdrive.seed} topic-config={"cleanup.policy": "delete"} replication-factor=1 partition-count=1
  72. > CREATE CONNECTION kafka_conn_progress
  73. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, PROGRESS TOPIC 'testdrive-kafka-progress-${testdrive.seed}');
  74. > CREATE SINK topic_config_check
  75. IN CLUSTER ${arg.single-replica-cluster}
  76. FROM v1
  77. INTO KAFKA CONNECTION kafka_conn_progress (
  78. TOPIC 'testdrive-kafka-config-check-${testdrive.seed}',
  79. TOPIC PARTITION COUNT 3
  80. )
  81. KEY(f1)
  82. FORMAT JSON
  83. ENVELOPE UPSERT
  84. $ kafka-verify-topic topic=testdrive-kafka-progress-${testdrive.seed} topic-config={"cleanup.policy": "compact"} replication-factor=1 partition-count=1