test-kafka-acl-no-describe-configs.td 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test that sinks can be created even in the absence of the `DescribeConfigs`
  10. # cluster permission.
  11. # ==> Set up. <==
  12. > CREATE SECRET kafka_password AS 'sekurity'
  13. > CREATE CONNECTION kafka TO KAFKA (
  14. BROKER 'kafka:9095',
  15. SASL MECHANISMS = 'PLAIN',
  16. SASL USERNAME = 'materialize_no_describe_configs',
  17. SASL PASSWORD = SECRET kafka_password,
  18. SECURITY PROTOCOL SASL_PLAINTEXT
  19. );
  20. > CREATE TABLE t (column1 integer)
  21. > INSERT INTO t VALUES (1), (2)
  22. > CREATE MATERIALIZED VIEW mv AS SELECT DISTINCT column1 FROM t
  23. # ==> Test. <==
  24. # Creating the sink when the topic does not yet exist should work, because
  25. # Materialize will gracefully continue even if it cannot discover the default
  26. # number of partitions and replication factor to use.
  27. > CREATE SINK nonexisting FROM mv
  28. INTO KAFKA CONNECTION kafka (TOPIC 'testdrive-nonexisting-${testdrive.seed}')
  29. KEY (column1) FORMAT JSON ENVELOPE UPSERT
  30. # Validate that the sink is actually emitting data. Success of the `CREATE SINK`
  31. # command itself is not sufficient validation.
  32. $ kafka-verify-data format=json key=false sink=materialize.public.nonexisting sort-messages=true
  33. {"column1": 1}
  34. {"column1": 2}
  35. # Ensure that the sink never entered the `stalled` status. Previously we had a
  36. # glitch where the sink would stall once before becoming healthy due to a glitch
  37. # in the topic creation code path when access to `DescribeConfigs` is banned.
  38. > SELECT DISTINCT status FROM mz_sinks
  39. JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
  40. WHERE mz_sinks.name = 'nonexisting'
  41. starting
  42. running
  43. # Creating the sink after creating the topic outside of Materialize should
  44. # succeed, because Materialize no longer needs to run DescribeConfigs to
  45. # determine the replication factor/number of partitions to use.
  46. $ kafka-create-topic topic=no-describe-configs
  47. > CREATE SINK preexisting FROM mv
  48. INTO KAFKA CONNECTION kafka (TOPIC 'testdrive-preexisting-${testdrive.seed}')
  49. KEY (column1) FORMAT JSON ENVELOPE UPSERT
  50. # Validate that the sink is actually emitting data. Success of the `CREATE SINK`
  51. # command itself is not sufficient validation.
  52. $ kafka-verify-data format=json key=false sink=materialize.public.preexisting sort-messages=true
  53. {"column1": 1}
  54. {"column1": 2}
  55. # Ensure that the sink never entered the `stalled` status. Previously we had a
  56. # glitch where the sink would stall once before becoming healthy due to a glitch
  57. # in the topic creation code path when access to `DescribeConfigs` is banned.
  58. > SELECT DISTINCT status FROM mz_sinks
  59. JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
  60. WHERE mz_sinks.name = 'preexisting'
  61. starting
  62. running