test_cluster_rehydration.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  11. # Test that a crashed (and restarted) cluster replica handles rehydration
  12. # correctly by not recreating dropped sources. Tests database-issues#5308.
  13. def test_create_drop_source(mz: MaterializeApplication) -> None:
  14. mz.testdrive.run(
  15. input=dedent(
  16. """
  17. > CREATE CLUSTER c REPLICAS ( r1 (SIZE = '1', INTROSPECTION INTERVAL = 0));
  18. > CREATE SOURCE counter IN CLUSTER c
  19. FROM LOAD GENERATOR COUNTER
  20. (TICK INTERVAL '500ms');
  21. > DROP SOURCE counter CASCADE;
  22. """
  23. ),
  24. no_reset=True,
  25. )
  26. # Simulate an unexpected clusterd crash.
  27. pods = mz.kubectl("get", "pods", "-o", "custom-columns=:metadata.name")
  28. podcount = 0
  29. for pod in pods.splitlines():
  30. if "cluster" in pod:
  31. try:
  32. mz.kubectl("delete", "pod", pod)
  33. podcount += 1
  34. except:
  35. # It's OK if the pod delete fails --
  36. # it probably means we raced with a previous test that
  37. # dropped resources.
  38. pass
  39. assert podcount > 0
  40. mz.testdrive.run(
  41. input=dedent(
  42. """
  43. > SELECT COUNT(*) FROM (SHOW SOURCES);
  44. 0
  45. # Ensure that there are no sources currently running from the perspective of mz_source_status_history
  46. > SELECT COUNT(*) FROM mz_internal.mz_source_status_history JOIN ( SELECT source_id AS src, max(occurred_at) AS ocr FROM mz_internal.mz_source_status_history GROUP BY source_id ) AS newest ON newest.src = source_id AND newest.ocr = occurred_at WHERE status = 'running';
  47. 0
  48. > DROP CLUSTER c CASCADE
  49. """
  50. ),
  51. no_reset=True,
  52. )