diff --git a/CMakeLists.txt b/CMakeLists.txt index 95819c7..ad85288 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,5 @@ endif() # Add your child projects. They will inherit all settings and # dependencies (absl::*, gtest*, pybind11::*) from this file. add_subdirectory(src/ml_flashpoint/checkpoint_object_manager/buffer_object) -add_subdirectory(src/ml_flashpoint/checkpoint_object_manager/object_manager) add_subdirectory(src/ml_flashpoint/replication/transfer_service) diff --git a/src/ml_flashpoint/checkpoint_object_manager/object_manager/CMakeLists.txt b/src/ml_flashpoint/checkpoint_object_manager/object_manager/CMakeLists.txt deleted file mode 100644 index 219f5c2..0000000 --- a/src/ml_flashpoint/checkpoint_object_manager/object_manager/CMakeLists.txt +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cmake_minimum_required(VERSION 3.18) -project(object_manager LANGUAGES CXX) - -# Place the .so file directly in the source directory, to directly resolve the ext created below. -set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) - -add_library(object_manager_lib STATIC - object_manager.cpp -) - -target_link_libraries(object_manager_lib PUBLIC - absl::log - absl::strings -) - -target_include_directories(object_manager_lib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) - -find_package(Python COMPONENTS Interpreter Development.Module REQUIRED) -find_package(pybind11 CONFIG REQUIRED) -pybind11_add_module(object_manager_ext bindings.cpp) - -# Link the module against the static library AND the Python::Python target. -# Linking to pybind11::module automatically adds the necessary -# include directories, compiler flags, and libraries. -target_link_libraries(object_manager_ext PUBLIC - object_manager_lib - pybind11::module -) - - -# --- Test Target --- -if(BUILD_TESTING) - # Include the GoogleTest module. - include(GoogleTest) - - # Define the test source directory for readability - set(TEST_SRC_DIR ${CMAKE_SOURCE_DIR}/tests/checkpoint_object_manager/object_manager) - - # Best Practice: Combine all related test source files into a single executable. - add_executable(object_manager_test - ${TEST_SRC_DIR}/object_manager_test.cpp - ) - - # Link all necessary libraries to this unified test target just once. - target_link_libraries(object_manager_test PRIVATE - object_manager_lib # The library we are testing - GTest::gmock_main # The GoogleTest library - ) - - # Discover all tests in the single test executable. - gtest_discover_tests(object_manager_test) -endif() diff --git a/src/ml_flashpoint/checkpoint_object_manager/object_manager/bindings.cpp b/src/ml_flashpoint/checkpoint_object_manager/object_manager/bindings.cpp deleted file mode 100644 index 2c56fba..0000000 --- a/src/ml_flashpoint/checkpoint_object_manager/object_manager/bindings.cpp +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2025 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include - -#include "object_manager.h" - -namespace py = pybind11; - -PYBIND11_MODULE(object_manager_ext, m) { - m.doc() = "Pybind11 bindings for the object_manager C++ library"; - - // Defining a basic Future-like class, called BasicFutureVoid, that exposes a - // wait() method to optionally block on the underlying std::future via its - // wait() method. The wait method releases the GIL so that other threads can - // execute while waiting. - py::class_>(m, "BasicFutureVoid") - .def("wait", &std::future::wait, "Wait for the future to complete.", - py::call_guard()); - - // Static function that deletes directories async, return a BasicFutureVoid - // that can be waited-on. - m.def("delete_directories_async", - &ml_flashpoint::checkpoint_object_manager::object_manager:: - delete_directories_async, - "Asynchronously deletes a list of directories."); -} diff --git a/src/ml_flashpoint/checkpoint_object_manager/object_manager/object_manager.cpp b/src/ml_flashpoint/checkpoint_object_manager/object_manager/object_manager.cpp deleted file mode 100644 index f92f9a8..0000000 --- a/src/ml_flashpoint/checkpoint_object_manager/object_manager/object_manager.cpp +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2025 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "object_manager.h" - -#include -#include -#include - -#include -#include -#include -#include - -#include "absl/log/log.h" -#include "absl/strings/str_join.h" - -namespace ml_flashpoint::checkpoint_object_manager::object_manager { - -namespace fs = std::filesystem; - -namespace { -// We use a fork/exec approach calling 'rm -rf' here instead of -// std::filesystem::remove_all to address a Segmentation Fault -// observed in multi-threaded environments. This should be safer -// and avoids the crash experienced with std::filesystem operations. -// -// The actual deletion logic -void delete_directories_task(const std::vector& directories) { - for (const std::string& dir_path : directories) { - try { - if (fs::is_directory(dir_path)) { - LOG(INFO) << "Removing directory " << dir_path << " via fork/exec..."; - pid_t pid = fork(); - if (pid == 0) { - // Child process - execlp("rm", "rm", "-rf", dir_path.c_str(), (char*)NULL); - // If execlp returns, it failed - std::cerr << "Failed to exec rm -rf for " << dir_path << std::endl; - exit(1); - } else if (pid > 0) { - // Parent process - int status; - waitpid(pid, &status, 0); - if (status != 0) { - LOG(ERROR) << "rm -rf failed for " << dir_path << " with status " - << status; - } - } else { - LOG(ERROR) << "Failed to fork for deleting " << dir_path; - } - } - } catch (const fs::filesystem_error& e) { - // It's important to handle errors inside the thread, - // otherwise they will cause a std::terminate. - // For now, we'll just log to stderr. - LOG(ERROR) << "Error deleting directory " << dir_path << ": " << e.what(); - } - } -} -} // namespace - -std::future delete_directories_async( - std::vector directories) { - // 1. Create a promise to manually control the future. - auto promise = std::make_unique>(); - - // 2. Get the future, which has a _non-blocking_ destructor, - // as it just carries the result, but does not own the thread itself. - std::future future = promise->get_future(); - - if (directories.empty()) { - promise->set_value(); - return future; - } - - // 3. Launch a std::thread `t` for deleting the directories and updating the - // promise. - std::thread t([p = std::move(promise), dirs = std::move(directories)]() { - try { - delete_directories_task(dirs); - p->set_value(); // Signal success - } catch (...) { - LOG(ERROR) << "An unexpected exception occurred when trying to delete " - "directories: [" - << absl::StrJoin(dirs, ", ") << "]"; - try { - p->set_exception(std::current_exception()); // Signal failure - } catch (...) { - LOG(ERROR) << "An unexpected exception occurred when trying to set the " - "exception on the promise."; - } - } - }); - - // Detach `t` to make it a daemon thread that won't be waited on or crash when - // it is destroyed. - t.detach(); - - return future; -} - -} // namespace ml_flashpoint::checkpoint_object_manager::object_manager diff --git a/src/ml_flashpoint/checkpoint_object_manager/object_manager/object_manager.h b/src/ml_flashpoint/checkpoint_object_manager/object_manager/object_manager.h deleted file mode 100644 index aa905cd..0000000 --- a/src/ml_flashpoint/checkpoint_object_manager/object_manager/object_manager.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef ML_FLASHPOINT_OBJECT_MANAGER_H -#define ML_FLASHPOINT_OBJECT_MANAGER_H - -#include -#include -#include - -namespace ml_flashpoint::checkpoint_object_manager::object_manager { - -/** - * @brief Asynchronously deletes a list of directories. - * - * This function takes a vector of directory paths and deletes them in a - * separate thread. It returns a future that will be ready when the deletion - * is complete. - * - * @param directories A vector of strings, where each string is a path to a - * directory to be deleted. This is a deep copy of the original list for - * thread safety. - * - * @return A std::future that can be used to wait for the deletion to - * finish. - */ -std::future delete_directories_async( - std::vector directories); - -} // namespace ml_flashpoint::checkpoint_object_manager::object_manager - -#endif // ML_FLASHPOINT_OBJECT_MANAGER_H diff --git a/src/ml_flashpoint/core/checkpoint_saver.py b/src/ml_flashpoint/core/checkpoint_saver.py index c2126cf..14ea70c 100644 --- a/src/ml_flashpoint/core/checkpoint_saver.py +++ b/src/ml_flashpoint/core/checkpoint_saver.py @@ -20,8 +20,9 @@ import os import pickle import queue +import subprocess import threading -from typing import Callable, Optional, Protocol, Union +from typing import Callable, Protocol, Union import torch from torch.distributed.checkpoint import metadata as torchdistmeta @@ -31,7 +32,6 @@ from typing_extensions import override from ml_flashpoint.checkpoint_object_manager.checkpoint_object_manager import CheckpointObjectManager -from ml_flashpoint.checkpoint_object_manager.object_manager import object_manager_ext from ml_flashpoint.core.checkpoint_id_types import CheckpointContainerId, CheckpointObjectId from ml_flashpoint.core.defaults import DIRTY_MARKER_SUFFIX, CheckpointFormat, default_metadata_object_name from ml_flashpoint.core.mlf_logging import get_logger @@ -269,7 +269,7 @@ def write_metadata( pass @abc.abstractmethod - def finalize_checkpoint(self, checkpoint_id: CheckpointContainerId) -> Optional[object_manager_ext.BasicFutureVoid]: + def finalize_checkpoint(self, checkpoint_id: CheckpointContainerId) -> Union[subprocess.Popen, None]: """Finalize the checkpoint for checkpoint_id, indicating it is complete and safe to recover from. This specifically does the following: 1. Cleans up the unfinished marker created by initialize_checkpoint(). @@ -281,7 +281,7 @@ def finalize_checkpoint(self, checkpoint_id: CheckpointContainerId) -> Optional[ checkpoint_id: The CheckpointContainerId to mark as finalized. Returns: - A future that completes when deletion of older checkpoints is done, or None if no deletion was started. + The subprocess.Popen object that handles deletion of older checkpoints, or None if no deletion was started. """ pass @@ -549,7 +549,7 @@ def write_metadata( @override @log_execution_time(logger=_LOGGER, name="finalize_checkpoint") - def finalize_checkpoint(self, checkpoint_id: CheckpointContainerId) -> Optional[object_manager_ext.BasicFutureVoid]: + def finalize_checkpoint(self, checkpoint_id: CheckpointContainerId) -> Union[subprocess.Popen, None]: self._remove_dirty_checkpoint_marker(checkpoint_id) # synchronize across ranks to guarantee they all completed checkpointing before proceeding with log_execution_time(logger=_LOGGER, name="finalize_checkpoint__barrier_func", level=logging.DEBUG): @@ -707,9 +707,7 @@ def _write_to_buffer_from_queue_worker( object_write_bucket_queue.task_done() @log_execution_time(logger=_LOGGER, name="_remove_older_checkpoints") - def _remove_older_checkpoints( - self, older_than: CheckpointContainerId - ) -> Optional[object_manager_ext.BasicFutureVoid]: + def _remove_older_checkpoints(self, older_than: CheckpointContainerId) -> subprocess.Popen | None: """Scans for sibling checkpoint containers to `older_than`, by listing the children of its parent and filtering for those that match the expected format as a safety check, and then deletes all those that are considered older _async_. @@ -721,7 +719,7 @@ def _remove_older_checkpoints( older than this will be removed. Returns: - A future that completes when deletion is done, or None if no deletion was started. + The subprocess.Popen object that handles deletion of older checkpoints, or None if no deletion was started. """ parent_dir = os.path.dirname(older_than.data) older_than_step = CheckpointContainerId.parse_version_container_step(os.path.basename(older_than.data)) @@ -741,7 +739,27 @@ def _remove_older_checkpoints( if step is not None and step < older_than_step: siblings_to_delete.add(full_path) - return object_manager_ext.delete_directories_async(list(siblings_to_delete)) + if siblings_to_delete: + try: + # We use a background subprocess (rm -rf) instead of Python's shutil.rmtree + # to avoid blocking the main Python thread (GIL) during large directory deletions. + # This allows the training process to continue immediately while the OS handles + # the deletion asynchronously. + # + # start_new_session=True is used to ensure the deletion process is decoupled + # from the parent process group, preventing it from being interrupted by signals + # (like SIGINT during Ctrl+C) sent to the main training job. + p = subprocess.Popen( + ["rm", "-rf"] + list(siblings_to_delete), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + return p + except Exception as e: + _LOGGER.exception("Background deletion of old checkpoints failed: %s", e) + return None + return None def _save_tensor_optimized(self, tensor: torch.Tensor, buffer_io_writer): """Saves a tensor to the buffer using a zero-copy approach where possible. diff --git a/tests/checkpoint_object_manager/object_manager/object_manager_test.cpp b/tests/checkpoint_object_manager/object_manager/object_manager_test.cpp deleted file mode 100644 index 0cc50fd..0000000 --- a/tests/checkpoint_object_manager/object_manager/object_manager_test.cpp +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2025 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "object_manager.h" - -#include - -#include -#include -#include -#include - -namespace fs = std::filesystem; - -class ObjectManagerTest : public ::testing::Test { - protected: - void SetUp() override { - test_dir_ = fs::temp_directory_path() / "object_manager_test"; - fs::create_directories(test_dir_); - } - - void TearDown() override { fs::remove_all(test_dir_); } - - fs::path test_dir_; -}; - -TEST_F(ObjectManagerTest, DeleteDirectoriesAsyncDeletesExistingDirectories) { - // Given - fs::path dir1 = test_dir_ / "dir1"; - fs::path dir2 = test_dir_ / "dir2"; - fs::create_directory(dir1); - fs::create_directory(dir2); - std::vector dirs_to_delete = {dir1.string(), dir2.string()}; - - // When - auto future = ml_flashpoint::checkpoint_object_manager::object_manager:: - delete_directories_async(dirs_to_delete); - future.get(); // Block until deletion is complete. - - // Then - ASSERT_FALSE(fs::exists(dir1)); - ASSERT_FALSE(fs::exists(dir2)); -} - -TEST_F(ObjectManagerTest, DeleteDirectoriesAsyncHandlesEmptyVector) { - // Given - std::vector dirs_to_delete; - - // When - auto future = ml_flashpoint::checkpoint_object_manager::object_manager:: - delete_directories_async(dirs_to_delete); - future.get(); // Block until deletion is complete. - - // Then: This should not throw or crash. -} - -TEST_F(ObjectManagerTest, DeleteDirectoriesAsyncDoesNotDeleteFiles) { - // Given - fs::path file1 = test_dir_ / "file1.txt"; - std::ofstream ofs{file1}; - ofs << "test"; - ofs.close(); - std::vector paths_to_delete = {file1.string()}; - - // When - auto future = ml_flashpoint::checkpoint_object_manager::object_manager:: - delete_directories_async(paths_to_delete); - future.get(); // Block until deletion is complete. - - // Then - ASSERT_TRUE(fs::exists(file1)); -} diff --git a/tests/checkpoint_object_manager/object_manager/test_object_manager_bindings.py b/tests/checkpoint_object_manager/object_manager/test_object_manager_bindings.py deleted file mode 100644 index db3d92e..0000000 --- a/tests/checkpoint_object_manager/object_manager/test_object_manager_bindings.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path - -import pytest - -from ml_flashpoint.checkpoint_object_manager.object_manager import object_manager_ext - - -def test_delete_directories_async(tmp_path: Path): - # Given - dir1 = tmp_path / "dir1" - dir2 = tmp_path / "dir2" - dir1.mkdir() - dir2.mkdir() - dirs_to_delete = [str(dir1), str(dir2)] - - # When - future = object_manager_ext.delete_directories_async(dirs_to_delete) - future.wait() # Block until deletion is complete. - - # Then - assert not dir1.exists() - assert not dir2.exists() - - -def test_delete_directories_async_empty_list(): - # Given - dirs_to_delete = [] - - # When/Then: This should not raise an exception. - try: - future = object_manager_ext.delete_directories_async(dirs_to_delete) - future.wait() - except Exception as e: - pytest.fail(f"delete_directories_async raised an exception with an empty list: {e}") - - -def test_delete_directories_async_with_files(tmp_path: Path): - # Given - file1_path = tmp_path / "file1.txt" - file1_path.write_text("test") - paths_to_delete = [str(file1_path)] - - # When - future = object_manager_ext.delete_directories_async(paths_to_delete) - future.wait() - - # Then - assert file1_path.exists() diff --git a/tests/core/test_checkpoint_saver.py b/tests/core/test_checkpoint_saver.py index 1659b2d..b5bb4dc 100644 --- a/tests/core/test_checkpoint_saver.py +++ b/tests/core/test_checkpoint_saver.py @@ -36,7 +36,6 @@ from ml_flashpoint.checkpoint_object_manager.buffer_io import BufferIO from ml_flashpoint.checkpoint_object_manager.checkpoint_object_manager import CheckpointObjectManager -from ml_flashpoint.checkpoint_object_manager.object_manager import object_manager_ext from ml_flashpoint.core.checkpoint_id_types import CheckpointContainerId, CheckpointObjectId from ml_flashpoint.core.checkpoint_saver import DefaultMLFlashpointCheckpointSaver, WriteItemResolver from ml_flashpoint.core.defaults import CheckpointFormat @@ -636,7 +635,7 @@ def test_finalize_checkpoint_calls_barrier_and_removes_older_in_order( checkpoint_id = CheckpointContainerId(f"{temp_dir_path}/checkpoint_finalize_barrier") # When - returned_future = saver.finalize_checkpoint(checkpoint_id) + saver.finalize_checkpoint(checkpoint_id) # Then expected_calls = [ @@ -645,7 +644,6 @@ def test_finalize_checkpoint_calls_barrier_and_removes_older_in_order( mocker.call.remove_older(older_than=checkpoint_id), ] assert manager.mock_calls == expected_calls - assert returned_future is mock_future @pytest.mark.parametrize("local_rank", [0, 1, 5]) def test_finalize_checkpoint_removes_older_dirs_only_on_local_rank_0( @@ -677,12 +675,12 @@ def test_finalize_checkpoint_removes_older_dirs_only_on_local_rank_0( checkpoint_id = CheckpointContainerId(current_ckpt_path) # When - future = saver.finalize_checkpoint(checkpoint_id) + proc = saver.finalize_checkpoint(checkpoint_id) if local_rank == 0: - assert future is not None - future.wait() + assert proc is not None + proc.wait() # Wait for background deletion to complete else: - assert future is None + assert proc is None # Then if local_rank == 0: @@ -1893,10 +1891,11 @@ def test_remove_older_checkpoints_some_older_some_newer(self, saver, temp_dir_pa os.makedirs(d) # When - future = saver._remove_older_checkpoints( + proc = saver._remove_older_checkpoints( older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "step-300_ckpt")), ) - future.wait() # Wait for deletion to complete + assert proc is not None + proc.wait() # Then assert not os.path.exists(os.path.join(checkpoint_base_dir, "step-100_ckpt")) @@ -1917,10 +1916,10 @@ def test_remove_older_checkpoints_no_older(self, saver, temp_dir_path): os.makedirs(d) # When - future = saver._remove_older_checkpoints( + proc = saver._remove_older_checkpoints( older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "step-100_ckpt")), ) - future.wait() # Wait for deletion to complete + assert proc is None # Then assert os.path.exists(os.path.join(checkpoint_base_dir, "step-100_ckpt")) @@ -1945,10 +1944,11 @@ def test_remove_older_checkpoints_with_other_files(self, saver, temp_dir_path): os.makedirs(other_dir) # When - future = saver._remove_older_checkpoints( + proc = saver._remove_older_checkpoints( older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "step-300_ckpt")), ) - future.wait() + assert proc is not None + proc.wait() # Then assert not os.path.exists(os.path.join(checkpoint_base_dir, "step-100_ckpt")) @@ -1962,10 +1962,10 @@ def test_remove_older_checkpoints_empty_dir(self, saver, temp_dir_path): os.makedirs(checkpoint_base_dir) # When - future = saver._remove_older_checkpoints( + proc = saver._remove_older_checkpoints( older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "step-100_ckpt")), ) - future.wait() + assert proc is None # Then # No directories were deleted, and no error was raised. @@ -1983,26 +1983,33 @@ def test_remove_older_checkpoints_delete_fails(self, saver, temp_dir_path, mocke for d in checkpoint_dirs: os.makedirs(d) - mocker.patch( - "ml_flashpoint.core.checkpoint_saver.object_manager_ext.delete_directories_async", - side_effect=IOError("Delete failed"), - ) + mock_popen = mocker.patch("subprocess.Popen", side_effect=IOError("Mocked deletion failure")) + mock_logger = mocker.patch("ml_flashpoint.core.checkpoint_saver._LOGGER") - # When/Then - with pytest.raises(IOError, match="Delete failed"): - # The future is returned, but the exception is raised immediately by the mock - saver._remove_older_checkpoints( - older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "step-300_ckpt")), - ) + # When + proc = saver._remove_older_checkpoints( + older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "step-300_ckpt")), + ) - object_manager_ext.delete_directories_async.assert_called_once() - args, _ = object_manager_ext.delete_directories_async.call_args - actual_requested_deleted = set(args[0]) + # Then + assert proc is None + mock_popen.assert_called_once() + args, _ = mock_popen.call_args + actual_cmd = args[0] + assert actual_cmd[0:2] == ["rm", "-rf"] + actual_requested_deleted = set(actual_cmd[2:]) expected_requested_deleted = { os.path.join(checkpoint_base_dir, "step-100_ckpt"), os.path.join(checkpoint_base_dir, "step-200_ckpt"), } assert actual_requested_deleted == expected_requested_deleted + for d in checkpoint_dirs: + assert os.path.exists(d) + + # Verify that the exception was caught and logged + mock_logger.exception.assert_called_once() + log_args, _ = mock_logger.exception.call_args + assert "Background deletion of old checkpoints failed" in log_args[0] def test_remove_older_checkpoints_with_various_invalid_formats(self, saver, temp_dir_path): # Given @@ -2030,10 +2037,11 @@ def test_remove_older_checkpoints_with_various_invalid_formats(self, saver, temp os.makedirs(d) # When - future = saver._remove_older_checkpoints( + proc = saver._remove_older_checkpoints( older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "step-300_ckpt")), ) - future.wait() + assert proc is not None + proc.wait() # Then for d in checkpoint_dirs_to_delete: @@ -2054,16 +2062,15 @@ def test_remove_older_checkpoints_older_than_is_invalid(self, saver, temp_dir_pa for d in checkpoint_dirs: os.makedirs(d) - mocker.patch("ml_flashpoint.core.checkpoint_saver.object_manager_ext.delete_directories_async") + mock_popen = mocker.patch("subprocess.Popen") # When - future = saver._remove_older_checkpoints( + saver._remove_older_checkpoints( older_than=CheckpointContainerId(os.path.join(checkpoint_base_dir, "invalid-step")), ) # Then - # No deletion is requested, so the future should complete immediately without error - assert future is None + mock_popen.assert_not_called() assert os.path.exists(os.path.join(checkpoint_base_dir, "step-100_ckpt")) assert os.path.exists(os.path.join(checkpoint_base_dir, "step-200_ckpt")) @@ -2128,9 +2135,12 @@ def test_remove_older_checkpoints_digit_length_crossover( ) # When - future = saver._remove_older_checkpoints(older_than=older_than_checkpoint_id) - if future is not None: - future.wait() # Wait for deletion to complete + proc = saver._remove_older_checkpoints(older_than=older_than_checkpoint_id) + if expected_deleted_steps: + assert proc is not None + proc.wait() + else: + assert proc is None # Then expected_deleted_paths = {