From 19c9d48925584846d986278844af7052da1d6b76 Mon Sep 17 00:00:00 2001 From: Nathan Stocks Date: Tue, 12 May 2026 11:25:08 -0600 Subject: [PATCH 1/2] Repo Integrity: atomic file write helpers (step 1.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for the repository-integrity plan. Today HEAD, workspace config, and version-store files are written directly to their final path, so a process killed mid-write leaves a partially-written canonical file on disk. Add a pair of helpers that always write via a sibling temp file, fsync it, then rename it over the target — a crash before the rename leaves the prior contents intact, a crash after leaves the new contents. util::fs::atomic_write_to_path(target, contents) // in-memory util::fs::atomic_write_from_reader(target, reader) // streamed Both go through a module-private `AtomicTempFile` that uses `async-tempfile` for the temp creation (its Drop impl cleans up on cancellation automatically) and runs the fsync / rename / best-effort parent-dir fsync sequence on `commit`. The struct is private on purpose — "you must commit, or the write is silently discarded" is the kind of invariant that's easier to honor when only two callers (the helpers above) in one file can construct one. Step 1.2 (atomic HEAD), 1.3 (atomic workspace config), and 1.4 (version-store atomic write) migrate onto these helpers in follow-up PRs. Also migrates `OxenError::file_create_error` / `file_rename_error` off `OxenError::basic_str` to new specific variants: FileCreate(PathBuf, #[source] io::Error) FileRename { src, dst, #[source] source: io::Error } `file_rename_error`'s third parameter is tightened from `impl Debug` to `std::io::Error`; both real callers already pass an `io::Error`. --- .claude/CLAUDE.md | 1 + crates/lib/src/error.rs | 33 ++-- crates/lib/src/util/fs.rs | 381 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 404 insertions(+), 11 deletions(-) diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index 600a0a1c6..6b3f644cc 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -136,6 +136,7 @@ oxen push origin main # Push to remote - After changing any Rust or Python code, verify that Rust tests pass with `bin/test-rust` and Python tests pass with `bin/test-rust -p` - When updating a dependency, prefer updating to the latest stable version. - Any new or changed Rust code that touches IO (file system, network, etc.) should be async code. Instead of std::io or std::fs, use equivalents from tokio. When an external dependency doesn't support async, use tokio's spawn_blocking functionality. +- Streamed IO (anything reading or writing through an `AsyncRead`/`AsyncWrite` whose total length isn't bounded ahead of time) must use a large buffer rather than rely on `tokio::io::copy`'s 8 KB default. Wrap the write side in `tokio::io::BufWriter::with_capacity(10 * 1024 * 1024, ...)` (and the read side in `BufReader` if the source isn't already buffered), and remember to explicitly `flush().await?` the `BufWriter` before any downstream `sync_all`/rename/checksum step — `BufWriter`'s `Drop` does **not** auto-flush, so unflushed bytes are silently dropped. The canonical example is the S3 store's local-cache path in `crates/lib/src/storage/s3.rs` (`store_version_to_path`). - oxen-server operations should never touch a local checkout on disk when doing operations initiated by its API. - Always use `metadata.is_dir()` instead of `path.is_dir()`. `path.is_dir()` follows symlinks, which Oxen does not track — using it risks descending into directories outside the working tree (or into cycles via cyclic links). - Oxen does not track symlinks. New code that traverses the working tree should check `metadata.is_symlink()` and skip rather than resolve, follow, or record symlinks. diff --git a/crates/lib/src/error.rs b/crates/lib/src/error.rs index 591d249dc..429cb0389 100644 --- a/crates/lib/src/error.rs +++ b/crates/lib/src/error.rs @@ -292,6 +292,21 @@ pub enum OxenError { #[error("{0}")] IO(#[from] io::Error), + /// A `create`-shaped filesystem syscall failed at the given path (e.g. opening with + /// `O_CREAT`, creating a directory tree). Carries the underlying [`io::Error`] so + /// callers can match on `ErrorKind` (e.g. `AlreadyExists` for `O_EXCL` writers). + #[error("Could not create file: {0:?}: {1}")] + FileCreate(PathBuf, #[source] io::Error), + + /// A rename syscall failed when moving `src` to `dst`. + #[error("Could not rename file from {src:?} to {dst:?}: {source}")] + FileRename { + src: PathBuf, + dst: PathBuf, + #[source] + source: io::Error, + }, + /// Encountered when authentication fails. Contains the authentication error message. #[error("Authentication failed: {0}")] Authentication(StringError), @@ -770,11 +785,7 @@ impl OxenError { } pub fn file_create_error(path: impl AsRef, error: std::io::Error) -> OxenError { - OxenError::basic_str(format!( - "Could not create file: {:?} error {:?}", - path.as_ref(), - error - )) + OxenError::FileCreate(path.as_ref().to_path_buf(), error) } pub fn file_open_error(path: impl AsRef, error: std::io::Error) -> OxenError { @@ -816,13 +827,13 @@ impl OxenError { pub fn file_rename_error( src: impl AsRef, dst: impl AsRef, - err: impl std::fmt::Debug, + source: std::io::Error, ) -> OxenError { - OxenError::basic_str(format!( - "File rename error: {err:?}\nCould not move from `{:?}` to `{:?}`", - src.as_ref(), - dst.as_ref() - )) + OxenError::FileRename { + src: src.as_ref().to_path_buf(), + dst: dst.as_ref().to_path_buf(), + source, + } } pub fn cannot_overwrite_files(paths: &[PathBuf]) -> OxenError { diff --git a/crates/lib/src/util/fs.rs b/crates/lib/src/util/fs.rs index 015aeb1f0..93d8e6543 100644 --- a/crates/lib/src/util/fs.rs +++ b/crates/lib/src/util/fs.rs @@ -19,6 +19,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::OnceLock; +use tokio::io::AsyncWriteExt; use tokio_stream::Stream; use crate::constants::CHUNKS_DIR; @@ -198,6 +199,183 @@ pub fn write_to_path(path: impl AsRef, value: impl AsRef) -> Result<( } } +/// Infix used in `AtomicTempFile` scratch file names: `.oxentmp.`. +/// Private to keep all knowledge of the pattern in this module — when fsck (or any other +/// sweeper) needs to recognize these temp files, the right move is to add a helper here +/// that does the matching, rather than re-export this constant. +const ATOMIC_TEMP_INFIX: &str = ".oxentmp."; + +/// A temp file opened in `target`'s parent directory, ready to be written into and then +/// atomically renamed over `target` via `commit`. +/// +/// Private on purpose: the type carries a "you must call `commit` to publish, or the +/// write is silently discarded" invariant, and it's safer to keep the type and that +/// invariant inside this module. External callers use the wrappers below +/// ([`atomic_write_to_path`] for bytes-in-memory, [`atomic_write_from_reader`] for +/// streamed payloads), which take care of the commit step. +/// +/// The scratch filename is `.oxentmp.`. The target-basename +/// prefix associates the temp with what it'll become; the `.oxentmp.` infix is a +/// deterministic shape fsck can match on (via a future helper here) for orphans left +/// behind by hard-kills, even when atomic writes start landing under the working tree +/// where third-party files live. +/// +/// Drop semantics: if the handle is dropped without `commit` (cancellation, panic, error +/// return), `async_tempfile` cleans up the temp file. After a successful `commit`, the +/// temp path no longer exists on disk; `commit` explicitly unlinks it and asserts the +/// expected `NotFound`, so the redundant remove inside async-tempfile's `Drop` (which +/// silently swallows any error) is no longer doing meaningful work but also can't hide +/// a surprise. +/// +/// On Linux (the deploy target) the rename happens while the temp file's fd is still +/// open; that's fine since the fd tracks the inode rather than the path. +struct AtomicTempFile { + temp: async_tempfile::TempFile, + target: PathBuf, +} + +impl AtomicTempFile { + /// Open a new temp file as a sibling of `target`. Creates `target`'s parent directory + /// if needed. The temp lives in that parent so the eventual rename stays on one + /// filesystem (POSIX `rename` is only atomic when src and dst share a filesystem). + /// Scratch filename: `.oxentmp.`. + async fn create(target: &Path) -> Result { + let parent = target.parent().filter(|p| !p.as_os_str().is_empty()); + if let Some(parent) = parent { + tokio::fs::create_dir_all(parent) + .await + .map_err(|err| OxenError::file_create_error(parent, err))?; + } + let temp_dir = parent.unwrap_or_else(|| Path::new(".")); + + let target_name = target.file_name().ok_or_else(|| { + OxenError::file_create_error( + target, + std::io::Error::other("target path has no filename component"), + ) + })?; + // `to_string_lossy` only loses information for non-UTF-8 filenames; the UUID + // suffix keeps the temp name unique regardless of how the prefix renders. + let temp_name = format!( + "{}{}{}", + target_name.to_string_lossy(), + ATOMIC_TEMP_INFIX, + uuid::Uuid::new_v4(), + ); + + let temp = async_tempfile::TempFile::new_with_name_in(&temp_name, temp_dir) + .await + .map_err(|err| match err { + async_tempfile::Error::Io(e) => OxenError::file_create_error(temp_dir, e), + other => { + OxenError::file_create_error(temp_dir, std::io::Error::other(other.to_string())) + } + })?; + Ok(Self { + temp, + target: target.to_path_buf(), + }) + } + + /// Mutable reference to the underlying async writer. Use with `write_all`, or pass to + /// `tokio::io::copy(&mut reader, tmp.as_writer())` for streamed payloads. + fn as_writer(&mut self) -> &mut async_tempfile::TempFile { + &mut self.temp + } + + /// fsync the data, rename the temp file over `target`, then best-effort fsync the + /// parent directory so the rename itself survives a crash. The parent fsync may fail + /// on platforms that don't support fsync-on-directory; that only weakens crash + /// durability — the data and rename are already on disk. + async fn commit(self) -> Result<(), OxenError> { + let temp_path = self.temp.file_path(); + + self.temp + .sync_all() + .await + .map_err(|err| OxenError::file_create_error(temp_path, err))?; + + tokio::fs::rename(temp_path, &self.target) + .await + .map_err(|err| OxenError::file_rename_error(temp_path, &self.target, err))?; + + if let Some(parent) = self.target.parent().filter(|p| !p.as_os_str().is_empty()) + && let Ok(dir) = tokio::fs::File::open(parent).await + && let Err(err) = dir.sync_all().await + { + log::warn!("AtomicTempFile::commit: parent fsync failed for {parent:?}: {err}"); + } + + // Explicit post-rename unlink. We expect `NotFound` (the rename consumed the + // temp). Anything else is unexpected: `Ok(())` means something else recreated a + // file at `temp_path` between rename and now — surprising but recoverable, so + // log and continue. A different `Err` (EACCES, EIO, etc.) means the temp + // directory itself is in a bad state, which the caller deserves to know about. + // `self.temp`'s Drop will run a second `remove_file` after this returns; on the + // expected path it'll see `NotFound` and silently swallow. + match tokio::fs::remove_file(temp_path).await { + Ok(()) => log::warn!( + "AtomicTempFile::commit: temp path {temp_path:?} unexpectedly existed \ + after rename — concurrent recreation? Continuing." + ), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} + Err(err) => { + log::error!( + "AtomicTempFile::commit: cleanup unlink failed for {temp_path:?}: {err}" + ); + return Err(err.into()); + } + } + + Ok(()) + } +} + +/// Atomically write `contents` to `target` via the write-temp-then-rename pattern. +/// +/// Use for small, fixed-size, in-memory payloads (HEAD, TOML config, small refs files). For large +/// or streamed payloads, use [`atomic_write_from_reader`] so the bytes don't have to be buffered in +/// memory. +/// +/// On the happy path: writes the bytes to a sibling temp file, fsyncs it, renames it over `target`, +/// and best-effort fsyncs the parent directory. A crash before the rename leaves the prior +/// contents at `target` intact; a crash after leaves the new contents in place. Concurrent writers +/// to the same `target` are safe — each gets its own uniquely-named temp file; whichever rename +/// lands last is what observers see. +/// +/// Cancellation or panic before commit leaves a temp file on disk that is cleaned up automatically +/// (by the temp file's Drop impl, or by fsck for the small window where the process itself dies +/// between create and Drop). +pub async fn atomic_write_to_path(target: &Path, contents: &[u8]) -> Result<(), OxenError> { + let mut tmp = AtomicTempFile::create(target).await?; + tmp.as_writer().write_all(contents).await?; + tmp.commit().await +} + +/// Atomically write everything yielded by `reader` to `target` via the write-temp-then- rename +/// pattern. Same guarantees as [`atomic_write_to_path`], without holding the whole payload in +/// memory — bytes stream from `reader` into a 10 MB `BufWriter` and out to disk in large chunks. +/// +/// Used for files whose size is unknown, or are known to be large (e.g. version-store files). +pub async fn atomic_write_from_reader(target: &Path, reader: &mut R) -> Result<(), OxenError> +where + R: tokio::io::AsyncRead + Unpin + ?Sized, +{ + const STREAMING_WRITE_BUF_SIZE: usize = 10 * 1024 * 1024; + let mut tmp = AtomicTempFile::create(target).await?; + { + let mut buf_writer = + tokio::io::BufWriter::with_capacity(STREAMING_WRITE_BUF_SIZE, tmp.as_writer()); + tokio::io::copy(reader, &mut buf_writer).await?; + // BufWriter's Drop does NOT flush — explicit flush is required so any bytes + // still in the 10 MB buffer make it to the underlying file before commit's + // fsync. Skipping this would silently truncate writes that fit entirely + // inside the buffer. + buf_writer.flush().await?; + } + tmp.commit().await +} + pub fn write_data(path: &Path, data: &[u8]) -> Result<(), OxenError> { match File::create(path) { Ok(mut file) => match file.write(data) { @@ -2015,4 +2193,207 @@ def add(a, b): let result = util::fs::list_files_in_dir(Path::new("/nonexistent/path")).await; assert!(result.is_err()); } + + #[tokio::test] + async fn test_atomic_write_to_path_round_trip() -> Result<(), OxenError> { + test::run_empty_dir_test_async(|dir| async move { + let target = dir.join("file.txt"); + util::fs::atomic_write_to_path(&target, b"hello world").await?; + + let contents = tokio::fs::read(&target).await?; + assert_eq!(contents, b"hello world"); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_atomic_write_to_path_overwrites_existing() -> Result<(), OxenError> { + test::run_empty_dir_test_async(|dir| async move { + let target = dir.join("file.txt"); + tokio::fs::write(&target, b"old contents").await?; + + util::fs::atomic_write_to_path(&target, b"new contents").await?; + + let contents = tokio::fs::read(&target).await?; + assert_eq!(contents, b"new contents"); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_atomic_write_to_path_creates_parent_dir() -> Result<(), OxenError> { + test::run_empty_dir_test_async(|dir| async move { + let target = dir.join("nested").join("deeper").join("file.txt"); + util::fs::atomic_write_to_path(&target, b"x").await?; + + let contents = tokio::fs::read(&target).await?; + assert_eq!(contents, b"x"); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_atomic_write_to_path_empty_contents() -> Result<(), OxenError> { + test::run_empty_dir_test_async(|dir| async move { + let target = dir.join("file.txt"); + util::fs::atomic_write_to_path(&target, b"").await?; + + let contents = tokio::fs::read(&target).await?; + assert!(contents.is_empty()); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_atomic_temp_file_name_pattern() -> Result<(), OxenError> { + // Verify scratch files follow `.oxentmp.` so fsck can + // match them later. The `tests` module is a child of `util::fs`, so it can + // construct `AtomicTempFile` directly to observe the on-disk name. + test::run_empty_dir_test_async(|dir| async move { + let target = dir.join("HEAD"); + let tmp = super::AtomicTempFile::create(&target).await?; + let temp_path = tmp.temp.file_path().clone(); + let temp_name = temp_path + .file_name() + .expect("temp path has a filename") + .to_string_lossy(); + + assert!( + temp_name.starts_with("HEAD"), + "temp name {temp_name:?} should start with the target basename" + ); + assert!( + temp_name.contains(super::ATOMIC_TEMP_INFIX), + "temp name {temp_name:?} should contain {:?}", + super::ATOMIC_TEMP_INFIX + ); + assert_eq!(temp_path.parent(), Some(dir.as_path())); + + drop(tmp); // async_tempfile Drop cleans up since we never committed. + assert!(!tokio::fs::try_exists(temp_path).await?); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_atomic_write_from_reader_streams() -> Result<(), OxenError> { + test::run_empty_dir_test_async(|dir| async move { + // Stream a payload through `atomic_write_from_reader` from an in-memory + // Cursor — this is the shape the version store will use for large blobs. + let target = dir.join("blob.bin"); + let payload: Vec = (0..50_000u32).flat_map(u32::to_le_bytes).collect(); + let mut reader = std::io::Cursor::new(payload.clone()); + + util::fs::atomic_write_from_reader(&target, &mut reader).await?; + + let written = tokio::fs::read(&target).await?; + assert_eq!(written, payload); + + // Parent dir contains only the target — no stray temp files. + let mut entries = tokio::fs::read_dir(&dir).await?; + let mut names = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + names.push(entry.file_name()); + } + assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}"); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_atomic_write_from_reader_cleans_up_on_read_failure() -> Result<(), OxenError> { + // A reader that succeeds for one read then errors. This exercises the error + // path through `atomic_write_from_reader`: tokio::io::copy returns Err, the + // temp file's Drop unlinks the scratch, and `commit` is never reached. The + // user-visible invariant is "no orphans on failure" — which is what we assert. + use std::pin::Pin; + use std::task::{Context, Poll}; + use tokio::io::{AsyncRead, ReadBuf}; + + struct FailingReader { + served_once: bool, + } + impl AsyncRead for FailingReader { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if !self.served_once { + buf.put_slice(b"some-bytes"); + self.served_once = true; + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(std::io::Error::other("simulated read failure"))) + } + } + } + + test::run_empty_dir_test_async(|dir| async move { + let target = dir.join("blob.bin"); + let mut reader = FailingReader { served_once: false }; + + let result = util::fs::atomic_write_from_reader(&target, &mut reader).await; + assert!(result.is_err(), "expected the streaming write to fail"); + + // Nothing at the target; nothing left behind as scratch. + assert!(!tokio::fs::try_exists(&target).await?); + let mut entries = tokio::fs::read_dir(&dir).await?; + assert!( + entries.next_entry().await?.is_none(), + "directory should be empty after failed write" + ); + Ok(()) + }) + .await + } + + #[tokio::test] + async fn test_atomic_write_to_path_concurrent_writers() -> Result<(), OxenError> { + test::run_empty_dir_test_async(|dir| async move { + let target = dir.join("file.txt"); + + // Spawn N concurrent writes with distinguishable contents. Every one must + // succeed; the final file must equal exactly one of the writers' payloads; + // no temp files may be left behind. + let n: usize = 32; + let mut handles = Vec::with_capacity(n); + for i in 0..n { + let target = target.clone(); + let payload = format!("writer-{i}").into_bytes(); + handles.push(tokio::spawn(async move { + util::fs::atomic_write_to_path(&target, &payload).await + })); + } + for h in handles { + h.await + .expect("join should succeed") + .expect("atomic_write_to_path should succeed"); + } + + let final_contents = tokio::fs::read(&target).await?; + let final_str = std::str::from_utf8(&final_contents).expect("contents should be utf-8"); + assert!( + final_str.starts_with("writer-"), + "unexpected final contents: {final_str:?}", + ); + + // Parent dir should contain only `file.txt` — no stray `.tmp.` leftovers. + let mut entries = tokio::fs::read_dir(&dir).await?; + let mut names = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + names.push(entry.file_name()); + } + assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}"); + assert_eq!(names[0], std::ffi::OsStr::new("file.txt")); + Ok(()) + }) + .await + } } From 847ca70bfff177b5794104b9d542f7e98ed90065 Mon Sep 17 00:00:00 2001 From: Nathan Stocks Date: Tue, 12 May 2026 16:39:43 -0600 Subject: [PATCH 2/2] review feedback --- crates/lib/src/util/fs.rs | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/crates/lib/src/util/fs.rs b/crates/lib/src/util/fs.rs index 93d8e6543..3db355c57 100644 --- a/crates/lib/src/util/fs.rs +++ b/crates/lib/src/util/fs.rs @@ -240,6 +240,13 @@ impl AtomicTempFile { /// filesystem (POSIX `rename` is only atomic when src and dst share a filesystem). /// Scratch filename: `.oxentmp.`. async fn create(target: &Path) -> Result { + let target_name = target.file_name().ok_or_else(|| { + OxenError::file_create_error( + target, + std::io::Error::other("target path has no filename component"), + ) + })?; + let parent = target.parent().filter(|p| !p.as_os_str().is_empty()); if let Some(parent) = parent { tokio::fs::create_dir_all(parent) @@ -248,12 +255,6 @@ impl AtomicTempFile { } let temp_dir = parent.unwrap_or_else(|| Path::new(".")); - let target_name = target.file_name().ok_or_else(|| { - OxenError::file_create_error( - target, - std::io::Error::other("target path has no filename component"), - ) - })?; // `to_string_lossy` only loses information for non-UTF-8 filenames; the UUID // suffix keeps the temp name unique regardless of how the prefix renders. let temp_name = format!( @@ -299,11 +300,24 @@ impl AtomicTempFile { .await .map_err(|err| OxenError::file_rename_error(temp_path, &self.target, err))?; - if let Some(parent) = self.target.parent().filter(|p| !p.as_os_str().is_empty()) - && let Ok(dir) = tokio::fs::File::open(parent).await - && let Err(err) = dir.sync_all().await - { - log::warn!("AtomicTempFile::commit: parent fsync failed for {parent:?}: {err}"); + // Attempt to fsync the parent directory, but don't propagate errors because this is just + // an extra precaution, and there are platforms that we know this won't work on. + if let Some(parent) = self.target.parent().filter(|p| !p.as_os_str().is_empty()) { + match tokio::fs::File::open(parent).await { + Ok(dir) => { + // For platforms that don't support fsync on directories, log but don't propagate errors. + if let Err(err) = dir.sync_all().await { + log::warn!( + "AtomicTempFile::commit: parent fsync failed for {parent:?}: {err}" + ); + } + } + // Some platforms (notably Windows) won't even open a directory as a regular + // file without special flags. Log but don't propagate errors. + Err(err) => log::warn!( + "AtomicTempFile::commit: could not open parent {parent:?} for fsync: {err}" + ), + } } // Explicit post-rename unlink. We expect `NotFound` (the rename consumed the