kafka-recreate-topic.td 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. > CREATE CONNECTION kafka_conn
  10. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  11. > CREATE CLUSTER to_recreate SIZE '1'
  12. # Test detection of topic deletion.
  13. $ kafka-create-topic topic=topic0 partitions=4
  14. > CREATE SOURCE source0 IN CLUSTER to_recreate
  15. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic0-${testdrive.seed}')
  16. FORMAT TEXT ENVELOPE NONE
  17. > SELECT * FROM source0
  18. $ kafka-delete-topic-flaky topic=topic0
  19. ! SELECT * FROM source0
  20. contains:topic was deleted
  21. # Test detection of topic recreation.
  22. #
  23. # The Kafka source detects topic recreation based on regression of the upstream
  24. # frontier. For the upstream frontier to regress, the new topic must have:
  25. # (1) fewer partitions than the old topic, or
  26. # (2) a lower watermark for at least one of its partitions.
  27. # We test both cases below.
  28. # (1) topic recreation with fewer partitions.
  29. $ kafka-create-topic topic=topic1 partitions=4
  30. > CREATE SOURCE source1 IN CLUSTER to_recreate
  31. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}')
  32. FORMAT TEXT ENVELOPE NONE
  33. > SELECT * FROM source1
  34. # Spin down the cluster, to prevent the source from observing the topic
  35. # deletion before the new topic was created.
  36. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0)
  37. # Recreate the topic with fewer partitions.
  38. $ kafka-delete-topic-flaky topic=topic1
  39. # Even though `kafka-delete-topic` ensures that the topic no longer exists in
  40. # the broker metadata there is still work to be done asynchronously before it's
  41. # truly gone that must complete before we attempt to recreate it. There is no
  42. # way to observe this work completing so the only option left is sleeping for a
  43. # while. This is the sad state of Kafka. If this test ever becomes flaky let's
  44. # just delete it.
  45. # See: https://github.com/confluentinc/confluent-kafka-python/issues/541
  46. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s
  47. $ kafka-create-topic topic=topic1 partitions=2
  48. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1)
  49. ! SELECT * FROM source1
  50. contains:topic was recreated
  51. # (2) topic recreation with a lower watermark.
  52. $ kafka-create-topic topic=topic2 partitions=4
  53. $ kafka-ingest format=bytes topic=topic2
  54. 1
  55. > CREATE SOURCE source2 IN CLUSTER to_recreate
  56. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic2-${testdrive.seed}')
  57. FORMAT TEXT ENVELOPE NONE
  58. > SELECT * FROM source2
  59. 1
  60. # Spin down the cluster, to prevent the source from observing the topic
  61. # deletion before the new topic was created.
  62. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0)
  63. # Recreate the topic with the same number of partitions but a lower watermark.
  64. $ kafka-delete-topic-flaky topic=topic2
  65. # Even though `kafka-delete-topic` ensures that the topic no longer exists in
  66. # the broker metadata there is still work to be done asynchronously before it's
  67. # truly gone that must complete before we attempt to recreate it. There is no
  68. # way to observe this work completing so the only option left is sleeping for a
  69. # while. This is the sad state of Kafka. If this test ever becomes flaky let's
  70. # just delete it.
  71. # See: https://github.com/confluentinc/confluent-kafka-python/issues/541
  72. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s
  73. $ kafka-create-topic topic=topic2 partitions=4
  74. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1)
  75. ! SELECT * FROM source2
  76. contains:topic was recreated
  77. # Ensure we don't panic after we restart due to the above finished ingestions.
  78. $ kafka-create-topic topic=good-topic
  79. > CREATE SOURCE good_source
  80. IN CLUSTER to_recreate
  81. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-good-topic-${testdrive.seed}')
  82. FORMAT TEXT
  83. ENVELOPE NONE
  84. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0)
  85. > ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1)
  86. $ kafka-ingest format=bytes topic=good-topic repeat=1
  87. 1
  88. > SELECT * FROM good_source
  89. text
  90. ----
  91. 1
  92. # TODO: why are these paused and not stalled with errors?
  93. > SELECT name, status, error FROM mz_internal.mz_source_statuses WHERE type != 'progress'
  94. name status error
  95. -------------------------------
  96. good_source running <null>
  97. source0 paused <null>
  98. source1 paused <null>
  99. source2 paused <null>
  100. # Testdrive expects all sources to end in a healthy state, so manufacture that
  101. # by dropping sources.
  102. > DROP CLUSTER to_recreate CASCADE;