kafka-recreate-topic.td 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. > CREATE CONNECTION kafka_conn
  10. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  11. > CREATE CLUSTER to_recreate SIZE '1'
  12. # Test detection of topic deletion.
  13. $ kafka-create-topic topic=topic0 partitions=4
  14. > CREATE SOURCE source0 IN CLUSTER to_recreate
  15. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic0-${testdrive.seed}')
  16. > CREATE TABLE source0_tbl FROM SOURCE source0 (REFERENCE "testdrive-topic0-${testdrive.seed}")
  17. FORMAT TEXT ENVELOPE NONE
  18. > SELECT * FROM source0_tbl
  19. $ kafka-delete-topic-flaky topic=topic0
  20. ! SELECT * FROM source0_tbl
  21. contains:topic was deleted
  22. # Test detection of topic recreation.
  23. #
  24. # The Kafka source detects topic recreation based on regression of the upstream
  25. # frontier. For the upstream frontier to regress, the new topic must have:
  26. # (1) fewer partitions than the old topic, or
  27. # (2) a lower watermark for at least one of its partitions.
  28. # We test both cases below.
  29. # (1) topic recreation with fewer partitions.
  30. $ kafka-create-topic topic=topic1 partitions=4
  31. > CREATE SOURCE source1 IN CLUSTER to_recreate
  32. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}')
  33. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-topic1-${testdrive.seed}")
  34. FORMAT TEXT ENVELOPE NONE
  35. > SELECT * FROM source1_tbl
  36. # Spin down the cluster, to prevent the source from observing the topic
  37. # deletion before the new topic was created.
  38. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0)
  39. # Recreate the topic with fewer partitions.
  40. $ kafka-delete-topic-flaky topic=topic1
  41. # Even though `kafka-delete-topic` ensures that the topic no longer exists in
  42. # the broker metadata there is still work to be done asynchronously before it's
  43. # truly gone that must complete before we attempt to recreate it. There is no
  44. # way to observe this work completing so the only option left is sleeping for a
  45. # while. This is the sad state of Kafka. If this test ever becomes flaky let's
  46. # just delete it.
  47. # See: https://github.com/confluentinc/confluent-kafka-python/issues/541
  48. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s
  49. $ kafka-create-topic topic=topic1 partitions=2
  50. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1)
  51. ! SELECT * FROM source1_tbl
  52. contains:topic was recreated
  53. # (2) topic recreation with a lower watermark.
  54. $ kafka-create-topic topic=topic2 partitions=4
  55. $ kafka-ingest format=bytes topic=topic2
  56. 1
  57. > CREATE SOURCE source2 IN CLUSTER to_recreate
  58. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic2-${testdrive.seed}')
  59. > CREATE TABLE source2_tbl FROM SOURCE source2 (REFERENCE "testdrive-topic2-${testdrive.seed}")
  60. FORMAT TEXT ENVELOPE NONE
  61. > SELECT * FROM source2_tbl
  62. 1
  63. # Spin down the cluster, to prevent the source from observing the topic
  64. # deletion before the new topic was created.
  65. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0)
  66. # Recreate the topic with the same number of partitions but a lower watermark.
  67. $ kafka-delete-topic-flaky topic=topic2
  68. # Even though `kafka-delete-topic` ensures that the topic no longer exists in
  69. # the broker metadata there is still work to be done asynchronously before it's
  70. # truly gone that must complete before we attempt to recreate it. There is no
  71. # way to observe this work completing so the only option left is sleeping for a
  72. # while. This is the sad state of Kafka. If this test ever becomes flaky let's
  73. # just delete it.
  74. # See: https://github.com/confluentinc/confluent-kafka-python/issues/541
  75. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s
  76. $ kafka-create-topic topic=topic2 partitions=4
  77. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1)
  78. ! SELECT * FROM source2_tbl
  79. contains:topic was recreated
  80. # Ensure we don't panic after we restart due to the above finished ingestions.
  81. $ kafka-create-topic topic=good-topic
  82. > CREATE SOURCE good_source
  83. IN CLUSTER to_recreate
  84. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-good-topic-${testdrive.seed}')
  85. > CREATE TABLE good_source_tbl FROM SOURCE good_source (REFERENCE "testdrive-good-topic-${testdrive.seed}")
  86. FORMAT TEXT
  87. ENVELOPE NONE
  88. # After suspending the cluster (by scaling the replication factor to 0), the
  89. # latest state is paused for sources that previously reported as stalled.
  90. # Because their last state _is_ in fact paused. Errored sources don't restart,
  91. # so they will not go through the source lifecycle on restarting the replica,
  92. # so they remain paused.
  93. # Before suspending the cluster, we see the errors in the status collection.
  94. # For source0 and source1, we already went through a couple suspension cycles,
  95. # so they already report as paused. With source2 online, we didn't yet do a
  96. # suspend-restart cycle.
  97. > SELECT name, status, error FROM mz_internal.mz_source_statuses WHERE type != 'progress'
  98. name status error
  99. -------------------------------
  100. good_source running <null>
  101. good_source_tbl running <null>
  102. source0 paused <null>
  103. source0_tbl paused <null>
  104. source1 paused <null>
  105. source1_tbl paused <null>
  106. source2 stalled "kafka: Source error: source must be dropped and recreated due to failure: topic was recreated"
  107. source2_tbl stalled "kafka: Source error: source must be dropped and recreated due to failure: topic was recreated"
  108. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0)
  109. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1)
  110. $ kafka-ingest format=bytes topic=good-topic repeat=1
  111. 1
  112. > SELECT * FROM good_source_tbl
  113. text
  114. ----
  115. 1
  116. # After another suspend-restart cycle, source2 also reports as paused.
  117. > SELECT name, status, error FROM mz_internal.mz_source_statuses WHERE type != 'progress'
  118. name status error
  119. -------------------------------
  120. good_source running <null>
  121. good_source_tbl running <null>
  122. source0 paused <null>
  123. source0_tbl paused <null>
  124. source1 paused <null>
  125. source1_tbl paused <null>
  126. source2 paused <null>
  127. source2_tbl paused <null>
  128. # Testdrive expects all sources to end in a healthy state, so manufacture that
  129. # by dropping sources.
  130. > DROP CLUSTER to_recreate CASCADE;