kafka-sink-empty-progress-topic.td 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test creating a sink when the progress topic is empty but has a non-zero low
  10. # water mark.
  11. #
  12. # Note that, without outside intervention, it's impossible for modern versions
  13. # of Materialize to get a progress topic into this state because:
  14. #
  15. # * Materialize always creates progress topics with compaction enabled but
  16. # time-based retention disabled.
  17. #
  18. # * Materialize never publishes tombstones to the progress topic.
  19. #
  20. # So the progress topic will either have a low water mark of zero, or it will
  21. # have at least one progress message in it.
  22. #
  23. # Still, we've observed at least one progress topic in this state in the wild.
  24. # We believe this occured because the user created the progress topic in an old
  25. # version of Materialize that did not force disable the retention policy when
  26. # creating the progress topic.
  27. #
  28. # Even though modern versions of Materialize don't leave progress topics in this
  29. # state naturally, outside intervention can still cause Materialize to observe
  30. # progress topics in this state:
  31. #
  32. # * A user could enable a retention policy on the progress topic after it's
  33. # created. While this is incorrect it won't cause problems in the steady
  34. # state in practice as long as the retention window is large enough to never
  35. # delete the latest messages. But if all sinks associated with the progress
  36. # topic are dropped, and then the progress topic is allowed to sit for a
  37. # while ... it will eventually empty out and have a nonzero low water mark.
  38. #
  39. # * A user could manually clear the progress topic out after it's created by
  40. # sending DeleteRecords requests to the topic. This is technically valid as
  41. # long as there are no sinks actively associated with the progress topic.
  42. #
  43. # So it's worth testing that we're handling these sorts of progress topics
  44. # correctly. Previous versions of Materialize used to get wedged if you created
  45. # a sink against a progress topic in this state.
  46. $ set-arg-default single-replica-cluster=quickstart
  47. > CREATE MATERIALIZED VIEW mv AS VALUES (1)
  48. # Ensure that sinks can be created against a progress topic that is empty with a
  49. # low water mark of zero.
  50. $ kafka-create-topic topic=empty-at-zero partitions=1
  51. > CREATE CONNECTION kafka_conn_empty_at_zero
  52. TO KAFKA (
  53. BROKER '${testdrive.kafka-addr}',
  54. SECURITY PROTOCOL PLAINTEXT,
  55. PROGRESS TOPIC 'testdrive-empty-at-zero-${testdrive.seed}'
  56. )
  57. > CREATE SINK empty_at_zero
  58. IN CLUSTER ${arg.single-replica-cluster}
  59. FROM mv
  60. INTO KAFKA CONNECTION kafka_conn_empty_at_zero (TOPIC 'testdrive-empty-at-zero-${testdrive.seed}')
  61. KEY (column1) NOT ENFORCED
  62. FORMAT JSON
  63. ENVELOPE UPSERT
  64. $ kafka-verify-data sink=materialize.public.empty_at_zero format=json
  65. {"column1": 1}
  66. # Ensure that sinks can be created against a progress topic that is empty with a
  67. # low water mark that is nonzero.
  68. $ kafka-create-topic topic=empty-at-nonzero partitions=1
  69. $ kafka-ingest topic=empty-at-nonzero format=bytes
  70. data
  71. data
  72. data
  73. $ kafka-delete-records topic=empty-at-nonzero partition=0 offset=3
  74. > CREATE CONNECTION kafka_conn_empty_at_nonzero
  75. TO KAFKA (
  76. BROKER '${testdrive.kafka-addr}',
  77. SECURITY PROTOCOL PLAINTEXT,
  78. PROGRESS TOPIC 'testdrive-empty-at-nonzero-${testdrive.seed}'
  79. )
  80. > CREATE SINK empty_at_nonzero
  81. IN CLUSTER ${arg.single-replica-cluster}
  82. FROM mv
  83. INTO KAFKA CONNECTION kafka_conn_empty_at_nonzero (TOPIC 'testdrive-empty-at-nonzero-${testdrive.seed}')
  84. KEY (column1) NOT ENFORCED
  85. FORMAT JSON
  86. ENVELOPE UPSERT
  87. $ kafka-verify-data sink=materialize.public.empty_at_nonzero format=json
  88. {"column1": 1}