test-kafka-acl-lockdown.td 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test that Materialize can create a sink using a Kafka user with restricted
  10. # access to consumer groups and transactional IDs.
  11. # ==> Set up. <==
  12. $ kafka-create-topic topic=data
  13. $ kafka-ingest topic=data format=bytes
  14. banana
  15. > CREATE SECRET kafka_password AS 'sekurity'
  16. > CREATE CONNECTION kafka_bad_progress_topic TO KAFKA (
  17. BROKER 'kafka:9095',
  18. SASL MECHANISMS = 'PLAIN',
  19. SASL USERNAME = 'materialize_lockdown',
  20. SASL PASSWORD = SECRET kafka_password,
  21. SECURITY PROTOCOL SASL_PLAINTEXT
  22. );
  23. > CREATE CONNECTION kafka_good_progress_topic TO KAFKA (
  24. BROKER 'kafka:9095',
  25. SASL MECHANISMS = 'PLAIN',
  26. SASL USERNAME = 'materialize_lockdown',
  27. SASL PASSWORD = SECRET kafka_password,
  28. SECURITY PROTOCOL SASL_PLAINTEXT,
  29. PROGRESS TOPIC = 'lockdown-progress'
  30. );
  31. > CREATE TABLE t (column1 integer)
  32. > INSERT INTO t VALUES (1), (2)
  33. > CREATE MATERIALIZED VIEW mv AS SELECT DISTINCT column1 FROM t
  34. # ==> Test. <==
  35. # The default group ID prefix is not usable by the `materialize_lockdown`
  36. # user.
  37. > CREATE SOURCE broken
  38. FROM KAFKA CONNECTION kafka_good_progress_topic (
  39. TOPIC 'testdrive-data-${testdrive.seed}'
  40. )
  41. > CREATE TABLE broken_tbl FROM SOURCE broken (REFERENCE "testdrive-data-${testdrive.seed}")
  42. FORMAT TEXT
  43. > SELECT EXISTS (
  44. SELECT 1
  45. FROM mz_sources
  46. JOIN mz_internal.mz_source_status_history ON mz_sources.id = mz_source_status_history.source_id
  47. WHERE name = 'broken'
  48. AND error ILIKE '%error when polling consumer for source%Group authorization failed%'
  49. )
  50. true
  51. > DROP SOURCE broken CASCADE
  52. # The default group ID prefix *is* writeable by the `materialize_lockdown` user.
  53. # Ensure that offsets are committed.
  54. > CREATE SOURCE working_source
  55. FROM KAFKA CONNECTION kafka_good_progress_topic (
  56. TOPIC 'testdrive-data-${testdrive.seed}',
  57. GROUP ID PREFIX 'lockdown-'
  58. )
  59. > CREATE TABLE working_source_tbl FROM SOURCE working_source (REFERENCE "testdrive-data-${testdrive.seed}")
  60. FORMAT TEXT
  61. > SELECT * FROM working_source_tbl
  62. banana
  63. $ set-from-sql var=conn-id
  64. SELECT id FROM mz_connections WHERE name = 'kafka_good_progress_topic'
  65. $ set-from-sql var=source-id
  66. SELECT id FROM mz_sources WHERE name = 'working_source'
  67. $ kafka-verify-commit topic=data partition=0 consumer-group-id=lockdown-materialize-${testdrive.materialize-environment-id}-${conn-id}-${source-id}
  68. 1
  69. # A sink which uses a bad transactional ID should fail.
  70. > CREATE SINK broken1 FROM mv
  71. INTO KAFKA CONNECTION kafka_bad_progress_topic (
  72. TOPIC 'testdrive-broken-${testdrive.seed}'
  73. )
  74. KEY (column1) FORMAT JSON ENVELOPE UPSERT
  75. > SELECT EXISTS (
  76. SELECT 1
  77. FROM mz_sinks
  78. JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
  79. WHERE name = 'broken1'
  80. AND error ILIKE '%Transactional Id authorization failed%'
  81. )
  82. true
  83. > DROP SINK broken1
  84. # A sink which uses a good transactional ID but a bad progress topic should
  85. # fail.
  86. > CREATE SINK broken2 FROM mv
  87. INTO KAFKA CONNECTION kafka_bad_progress_topic (
  88. TOPIC 'testdrive-broken-${testdrive.seed}',
  89. TRANSACTIONAL ID PREFIX 'lockdown'
  90. )
  91. KEY (column1) FORMAT JSON ENVELOPE UPSERT
  92. > SELECT EXISTS (
  93. SELECT 1
  94. FROM mz_sinks
  95. JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
  96. WHERE name = 'broken2'
  97. AND error ILIKE '%kafka: error registering kafka progress topic for sink%Topic authorization failed%'
  98. )
  99. true
  100. > DROP SINK broken2
  101. # A sink which uses a good transactional ID and progress topic but a bad data
  102. # topic should fail.
  103. > CREATE SINK broken3 FROM mv
  104. INTO KAFKA CONNECTION kafka_good_progress_topic (
  105. TOPIC 'testdrive-broken-${testdrive.seed}',
  106. TRANSACTIONAL ID PREFIX 'lockdown'
  107. )
  108. KEY (column1) FORMAT JSON ENVELOPE UPSERT
  109. > SELECT EXISTS (
  110. SELECT 1
  111. FROM mz_sinks
  112. JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
  113. WHERE name = 'broken3'
  114. AND error ILIKE '%Error creating topic testdrive-broken-${testdrive.seed}%Topic authorization failed%'
  115. )
  116. true
  117. > DROP SINK broken3
  118. # A sink which uses a good transactional ID, progress topic, and data topic
  119. # but a bad group ID prefix will fail, but only after restart when the progress
  120. # topic contains entries.
  121. > CREATE CLUSTER c (SIZE = '1')
  122. > CREATE SINK broken4 IN CLUSTER c FROM mv
  123. INTO KAFKA CONNECTION kafka_good_progress_topic (
  124. TOPIC 'lockdown-data1',
  125. TRANSACTIONAL ID PREFIX 'lockdown'
  126. )
  127. KEY (column1) FORMAT JSON ENVELOPE UPSERT
  128. $ kafka-verify-data format=json key=false sink=materialize.public.broken4 sort-messages=true
  129. {"column1": 1}
  130. {"column1": 2}
  131. # Resize the cluster on which the sink is running to force the sink to restart.
  132. > ALTER CLUSTER c SET (SIZE = '2')
  133. > SELECT EXISTS (
  134. SELECT 1
  135. FROM mz_sinks
  136. JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
  137. WHERE name = 'broken4'
  138. AND error ILIKE '%failed to fetch progress message%Group authorization failed%'
  139. )
  140. true
  141. > DROP SINK broken4
  142. # A sink which uses a good transactional ID, progress topic, data topic, and
  143. # group ID prefix should work.
  144. > CREATE SINK working IN CLUSTER c FROM mv
  145. INTO KAFKA CONNECTION kafka_good_progress_topic (
  146. TOPIC 'lockdown-data2',
  147. TRANSACTIONAL ID PREFIX 'lockdown',
  148. PROGRESS GROUP ID PREFIX 'lockdown'
  149. )
  150. KEY (column1) FORMAT JSON ENVELOPE UPSERT
  151. # Validate that the sink is actually emitting data. Success of the `CREATE SINK`
  152. # command itself is not sufficient validation.
  153. $ kafka-verify-data format=json key=false sink=materialize.public.working sort-messages=true
  154. {"column1": 1}
  155. {"column1": 2}
  156. # Resize the cluster on which the sink is running to force the sink to restart.
  157. > ALTER CLUSTER c SET (SIZE = '1')
  158. # Ensure that the sink is emitting new messages.
  159. > INSERT INTO t VALUES (3)
  160. $ kafka-verify-data format=json key=false sink=materialize.public.working sort-messages=true
  161. {"column1": 3}
  162. # Ensure that the sink never entered the `stalled` status.
  163. > SELECT DISTINCT status FROM mz_sinks
  164. JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
  165. WHERE mz_sinks.name = 'working'
  166. starting
  167. running
  168. paused