diff --git a/crates/lib/src/api/client/tree.rs b/crates/lib/src/api/client/tree.rs index 238ef4cbc..06065d6d7 100644 --- a/crates/lib/src/api/client/tree.rs +++ b/crates/lib/src/api/client/tree.rs @@ -1,19 +1,19 @@ -use async_compression::futures::bufread::GzipDecoder; -use async_tar::Archive; -use flate2::Compression; -use flate2::write::GzEncoder; use futures_util::TryStreamExt; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use std::time; -use tempfile::TempDir; +use tokio_util::io::{ReaderStream, StreamReader, SyncIoBridge}; +use crate::api; use crate::api::client; -use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR}; +use crate::core::db::merkle_node::file_backend; use crate::core::progress::push_progress::PushProgress; use crate::core::v_latest::index::CommitMerkleTree; use crate::error::OxenError; +use crate::model::merkle_tree::merkle_transport::{ + MerklePacker, MerkleUnpacker, PackOptions, UnpackOptions, +}; use crate::model::merkle_tree::node::MerkleTreeNode; use crate::model::{LocalRepository, MerkleHash, RemoteRepository}; use crate::opts::download_tree_opts::DownloadTreeOpts; @@ -21,7 +21,6 @@ use crate::opts::fetch_opts::FetchOpts; use crate::view::tree::MerkleHashResponse; use crate::view::tree::merkle_hashes::MerkleHashes; use crate::view::{MerkleHashesResponse, StatusMessage}; -use crate::{api, util}; /// Check if a node exists in the remote repository merkle tree by hash #[tracing::instrument(skip_all)] @@ -50,61 +49,71 @@ pub async fn has_node( } } -/// Upload a node to the remote repository merkle tree +/// Upload a set of merkle nodes to the remote repository. +/// +/// Packs the nodes into the canonical tar-gz wire format and streams the bytes straight +/// into the HTTP upload body — no intermediate `Vec` is materialized. The pack runs +/// on a blocking worker (`tokio::task::spawn_blocking`) that writes into one end of a +/// `tokio::io::duplex`; the HTTP body reads from the other end through `ReaderStream`, +/// so upload and pack progress together with back-pressure. pub async fn create_nodes( local_repo: &LocalRepository, remote_repo: &RemoteRepository, nodes: HashSet, progress: &Arc, ) -> Result<(), OxenError> { - // Compress the node - log::debug!("create_nodes starting compression"); - // OPT: Try Compression::fast(); - let enc = GzEncoder::new(Vec::new(), Compression::default()); - log::debug!("create_nodes compressing nodes"); - let mut tar = tar::Builder::new(enc); - log::debug!("create_nodes creating tar"); - let node_path = local_repo - .path - .join(OXEN_HIDDEN_DIR) - .join(TREE_DIR) - .join(NODES_DIR); - - for (i, node_hash) in nodes.iter().enumerate() { - let dir_prefix = node_hash.to_hex_hash().node_db_prefix(); - let node_dir = node_path.join(&dir_prefix); - // log::debug!( - // "create_nodes appending objects dir {:?} to tar at path {:?}", - // dir_prefix, - // node_dir - // ); - progress.set_message(format!("Packing {}/{} nodes", i + 1, nodes.len())); - - log::debug!("create_nodes appending dir to tar"); - tar.append_dir_all(dir_prefix, node_dir)?; - } - - tar.finish()?; - log::debug!("create_nodes finished tar"); - let buffer: Vec = tar.into_inner()?.finish()?; + let n = nodes.len(); + progress.set_message(format!("Pushing {n} nodes")); + + // Extend the progress bar's total length by an uncompressed-bytes estimate of the + // tarball so the upload phase has a known end and a meaningful ETA. Random-ish + // merkle hash bytes compress to ~1.0×, so the uncompressed estimate is a tight + // upper bound on the bytes that will actually flow over the wire. + let estimated_upload_bytes = file_backend::pack_nodes_byte_estimate(local_repo, &nodes); + progress.inc_total_bytes(estimated_upload_bytes); + + // Pack -> duplex writer (sync) -> duplex reader (async) -> HTTP body stream. + // 64 KiB duplex buffer mirrors the server-side streaming pattern in + // `crates/server/src/controllers/versions.rs`. + let (async_writer, async_reader) = tokio::io::duplex(64 * 1024); + let repo = local_repo.clone(); + let pack_handle = tokio::task::spawn_blocking(move || -> Result<(), OxenError> { + let sync_writer = SyncIoBridge::new(async_writer); + // Legacy client-push wire format: required so older `oxen-server` deployments + // (which pre-pend `tree/nodes/` server-side at install time) install entries + // at the right paths. + repo.merkle_store() + .pack_nodes(&nodes, PackOptions::LegacyClientPush, sync_writer)?; + Ok(()) + }); + + // Tick `progress` per chunk so the user sees upload progress moving. + let progress_for_stream = Arc::clone(progress); + let body_stream = ReaderStream::new(async_reader).inspect_ok(move |chunk| { + progress_for_stream.add_bytes(chunk.len() as u64); + }); - // Upload the node let uri = "/tree/nodes".to_string(); let url = api::endpoint::url_from_repo(remote_repo, &uri)?; let client = client::builder_for_url(&url)? .timeout(time::Duration::from_secs(120)) .build()?; + log::debug!("uploading {n} nodes to {url}"); - let size = buffer.len() as u64; - log::debug!( - "uploading node of size {} to {}", - bytesize::ByteSize::b(size), - url - ); - let res = client.post(&url).body(buffer.to_owned()).send().await?; + let res = client + .post(&url) + .body(reqwest::Body::wrap_stream(body_stream)) + .send() + .await?; let body = client::parse_json_body(&url, res).await?; log::debug!("upload node complete {body}"); + // Surface any pack error after the upload completes (the duplex reader reaching EOF + // signals pack end-of-stream; panics and Result::Err come through the join handle). + pack_handle + .await + .map_err(|e| OxenError::basic_str(format!("pack task panicked: {e}")))??; + Ok(()) } @@ -330,6 +339,13 @@ pub async fn download_trees_between( Ok(()) } +/// Download a merkle-tree tarball from the remote repository and unpack it into the +/// local store. Streams the response body straight into the `MerkleUnpacker` so nothing +/// buffers the whole payload in memory. +/// +/// The VFS branch is preserved but no longer lives here: `FileBackend::unpack` handles +/// the `is_vfs` case internally (tempdir + `copy_dir_all` dance). That keeps the client +/// logic generic across backends. async fn node_download_request( local_repo: &LocalRepository, url: impl AsRef, @@ -339,41 +355,25 @@ async fn node_download_request( let client = client::builder_for_url(url)? .timeout(time::Duration::from_secs(12000)) .build()?; - log::debug!("node_download_request about to send request {url}"); + log::debug!("node_download_request sending request {url}"); let res = client.get(url).send().await?; let res = client::handle_non_json_response(url, res).await?; - // The remote tar packs it in TREE_DIR/NODES_DIR - // So this will unpack it in OXEN_HIDDEN_DIR/TREE_DIR/NODES_DIR - let full_unpacked_path = local_repo.path.join(OXEN_HIDDEN_DIR); - - // Create the temp path if it doesn't exist - util::fs::create_dir_all(&full_unpacked_path)?; - - let reader = res - .bytes_stream() - .map_err(futures::io::Error::other) - .into_async_read(); - - let decoder = GzipDecoder::new(futures::io::BufReader::new(reader)); - let archive = Archive::new(decoder); - - // If the repo is stored on a virtual file system, re-route the nodes through a temp dir - if local_repo.is_vfs() { - let temp_dir = TempDir::new()?; - let temp_path = temp_dir.path(); - - // Unpack the tar in a temp dir - log::debug!("node_download_request unpacking to {temp_path:?}"); - util::fs::unpack_async_tar_archive(archive, temp_path).await?; - log::debug!("Succesfully unpacked tar to temp dir"); - - // Copy to the repo - util::fs::copy_dir_all(&temp_dir, &full_unpacked_path)?; - } else { - // Else, unpack directly to the repo - util::fs::unpack_async_tar_archive(archive, &full_unpacked_path).await?; - } + // async Stream> → AsyncRead → sync Read, bridged across + // the spawn_blocking boundary so the sync trait consumes streamed bytes incrementally. + let async_reader = StreamReader::new(res.bytes_stream().map_err(std::io::Error::other)); + let sync_reader = SyncIoBridge::new(async_reader); + + let repo = local_repo.clone(); + tokio::task::spawn_blocking(move || -> Result<(), OxenError> { + // Download path: overwrite existing files on disk, matching `main`'s + // `util::fs::unpack_async_tar_archive` behaviour. + repo.merkle_store() + .unpack(sync_reader, UnpackOptions::Overwrite)?; + Ok(()) + }) + .await + .map_err(|e| OxenError::basic_str(format!("unpack task panicked: {e}")))??; Ok(()) } @@ -470,6 +470,7 @@ pub async fn mark_nodes_as_synced( mod tests { use crate::api; use crate::error::OxenError; + use crate::model::{Remote, RemoteRepository}; use crate::opts::FetchOpts; use crate::repositories; use crate::test; @@ -674,4 +675,55 @@ mod tests { }) .await } + + /// Regression: a corrupted gzip stream returned by the remote server must surface + /// as an `Err(OxenError)` from [`download_tree`] (and therefore the underlying + /// private `node_download_request`), **not** a panic on the spawn_blocking + /// `JoinHandle`. + /// + /// The `node_download_request` pipeline pipes the response body through + /// `StreamReader` → `SyncIoBridge` → [`MerkleUnpacker::unpack`] inside a + /// `tokio::task::spawn_blocking`. The unpack returns `Err` on garbage gzip; + /// that `Err` must be propagated through the join handle as an `OxenError`, + /// not lost to a panic. Mockito serves a fixed garbage body so the test + /// doesn't depend on the live oxen-server. + #[tokio::test] + async fn test_node_download_request_propagates_corrupted_gzip_error() -> Result<(), OxenError> { + test::run_empty_local_repo_test_async(|local_repo| async move { + let mut server = mockito::Server::new_async().await; + let server_url = server.url(); + let namespace = "ns"; + let name = "repo"; + // download_tree uses URI "/tree/download", joined with API_NAMESPACE + // and the remote path -> "/api/repos/{namespace}/{name}/tree/download". + let path = format!("/api/repos/{namespace}/{name}/tree/download"); + let _mock = server + .mock("GET", path.as_str()) + .with_status(200) + .with_header("Content-Type", "application/gzip") + // Garbage bytes that aren't a valid gzip stream. + .with_body(b"NOT A VALID GZIP STREAM".as_slice()) + .create_async() + .await; + + let remote_repo = RemoteRepository { + namespace: namespace.to_string(), + name: name.to_string(), + remote: Remote { + name: "origin".to_string(), + url: format!("{server_url}/{namespace}/{name}"), + }, + min_version: None, + is_empty: false, + }; + + let res = api::client::tree::download_tree(&local_repo, &remote_repo).await; + assert!( + res.is_err(), + "download_tree must return Err for corrupted gzip stream, got {res:?}" + ); + Ok(()) + }) + .await + } } diff --git a/crates/lib/src/core/db/merkle_node/file_backend.rs b/crates/lib/src/core/db/merkle_node/file_backend.rs index f953538b4..bd2142924 100644 --- a/crates/lib/src/core/db/merkle_node/file_backend.rs +++ b/crates/lib/src/core/db/merkle_node/file_backend.rs @@ -22,8 +22,7 @@ use crate::model::merkle_tree::node::MerkleTreeNode; use crate::model::{MerkleHash, TMerkleTreeNode}; use crate::util; -/// File-based Merkle node store backend. Implements the [`MerkleReader`], -/// [`MerkleWriter`], [`MerklePacker`], and [`MerkleUnpacker`] traits. +/// File-based Merkle node store backend. Implements the [`MerkleStore`] trait. /// /// Holds a borrowed `&LocalRepository` so it can delegate straight to /// [`MerkleNodeDB`]'s existing repository-based methods without any modification. @@ -314,8 +313,8 @@ fn write_all_tar( /// /// Tolerates two historical tarball layouts so that either a new or legacy client can talk /// to a store built with this trait: -/// - **Server-style** (emitted by [`write_hashes_tar`] / [`write_all_tar`] and the old -/// `compress_*` helpers): entries carry the full `tree/nodes/{prefix}/{suffix}/{node,children}` +/// - **Server-style** (emitted by [`pack_nodes`] / [`pack_all`] and the old `compress_*` +/// helpers): entries carry the full `tree/nodes/{prefix}/{suffix}/{node,children}` /// prefix. Joined directly under `oxen_hidden`. /// - **Legacy client-push style** (emitted by the old `api::client::tree::create_nodes`): /// entries start at `{prefix}/{suffix}/{node,children}` with no `tree/nodes/` prefix. @@ -347,11 +346,12 @@ fn extract_tar_under( let tree_nodes_prefix = Path::new(TREE_DIR).join(NODES_DIR); for entry in entries { - let Ok(mut file) = entry else { - log::error!("Could not unpack file in merkle tar archive"); - // TODO: raise this error to the caller instead!? - continue; - }; + // Propagate per-entry read errors (e.g. truncated/corrupted gzip stream surfacing + // as the iterator yields `Err`). Matches `main`'s `unpack_async_tar_archive` + // behavior, which `?`'d the entry directly. Silent-skip would otherwise return + // `Ok` with an empty hash set on a corrupted body, hiding the failure from the + // download path's `JoinHandle`-error surface in `api::client::tree`. + let mut file = entry.map_err(MerkleDbError::CannotReadMerkle)?; let path = file.path()?.into_owned(); // Path-traversal guard: refuse any entry whose path resolves above its container. if path.components().any(|c| matches!(c, Component::ParentDir)) { @@ -551,7 +551,7 @@ mod tests { // Scope the session so Drop runs at its end. { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let session = store.begin().expect("Could not begin session"); let mut ns = session .create_node(&commit, None) @@ -561,7 +561,7 @@ mod tests { // Deliberately DO NOT call ns.finish() or session.finish(). } - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); assert!( store .exists(commit_hash) @@ -844,23 +844,22 @@ mod tests { async fn test_transport_round_trip() -> Result<(), OxenError> { test::run_one_commit_local_repo_test(|repo| { let mut packed = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut packed) .expect("pack_all failed"); assert!(!packed.is_empty(), "pack_all produced empty buffer"); let tmp = tempfile::TempDir::new()?; let clone = repositories::init(tmp.path())?; - let installed = FileBackend::new(&clone) + let installed = clone + .merkle_store() .unpack(&packed[..], UnpackOptions::Overwrite) .expect("unpack failed"); assert!(!installed.is_empty(), "unpack installed no nodes"); for hash in &installed { assert!( - FileBackend::new(&clone) - .exists(hash) - .expect("exists failed"), + clone.merkle_store().exists(hash).expect("exists failed"), "expected installed hash {hash} to be readable" ); } @@ -884,7 +883,7 @@ mod tests { let new_pack_method = { let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut via_trait) .expect("pack_nodes failed"); via_trait @@ -909,7 +908,7 @@ mod tests { let new_pack_method = { let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut via_trait) .expect("pack_all failed"); via_trait @@ -940,7 +939,7 @@ mod tests { let new_pack_method = { let hashes = HashSet::from_iter([hash]); let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut via_trait) .expect("pack_nodes failed"); via_trait @@ -974,7 +973,7 @@ mod tests { hashes.insert(c.hash().expect("no hash for commit")); } let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut via_trait) .expect("pack_nodes failed"); via_trait @@ -1039,7 +1038,8 @@ mod tests { let repo_new = repositories::init(tmp_new.path())?; // Old `unpack_nodes` skipped existing files; mirror that with // `UnpackOptions::SkipExisting` so the parity check is semantically faithful. - let new_hashes = FileBackend::new(&repo_new) + let new_hashes = repo_new + .merkle_store() .unpack(&bytes[..], UnpackOptions::SkipExisting) .expect("new unpack failed"); @@ -1055,13 +1055,15 @@ mod tests { // Every installed hash must be readable through both stores. for h in &new_hashes { assert!( - FileBackend::new(&repo_old) + repo_old + .merkle_store() .exists(h) .expect("old repo exists check failed"), "hash {h} not readable in repo unpacked via legacy unpack_nodes" ); assert!( - FileBackend::new(&repo_new) + repo_new + .merkle_store() .exists(h) .expect("new repo exists check failed"), "hash {h} not readable in repo unpacked via trait unpack" @@ -1100,7 +1102,7 @@ mod tests { /// today), feed the **same** bytes to: /// - the old client unpack: `node_download_request_unpack_old` (the verbatim /// `unpack_async_tar_archive` install from `main`'s `node_download_request`), - /// - the new client unpack: `FileBackend::unpack(...)` (overwrite-existing + /// - the new client unpack: `merkle_store().unpack(...)` (overwrite-existing /// default, matching `unpack_async_tar_archive`'s behaviour). /// The on-disk merkle-node tree under `/tree/nodes/` must be identical /// in both target repos. The set of hashes the trait reports is also asserted to @@ -1109,7 +1111,7 @@ mod tests { async fn test_node_download_request_unpack_unchanged() -> Result<(), OxenError> { test::run_one_commit_local_repo_test_async(|repo| async move { let mut packed = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut packed) .expect("pack_all failed"); assert!(!packed.is_empty(), "pack_all produced empty buffer"); @@ -1124,7 +1126,8 @@ mod tests { // New client install path: trait, with download-path overwrite semantics. let tmp_new = tempfile::TempDir::new()?; let repo_new = repositories::init(tmp_new.path())?; - let installed = FileBackend::new(&repo_new) + let installed = repo_new + .merkle_store() .unpack(&packed[..], UnpackOptions::Overwrite) .expect("new unpack failed"); @@ -1157,9 +1160,7 @@ mod tests { assert!(!installed.is_empty(), "trait unpack reported no hashes"); for h in &installed { assert!( - FileBackend::new(&repo_new) - .exists(h) - .expect("exists failed"), + repo_new.merkle_store().exists(h).expect("exists failed"), "hash {h} not readable in repo unpacked via trait unpack" ); } @@ -1196,7 +1197,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&buf[..], UnpackOptions::Overwrite) .expect_err("path traversal must be rejected"); let msg = format!("{err}"); @@ -1234,7 +1236,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&buf[..], UnpackOptions::Overwrite) .expect_err("unsupported entry type must be rejected"); let msg = format!("{err}"); @@ -1281,14 +1284,15 @@ mod tests { // Pack just this hash. let hashes = HashSet::from_iter([stripped_hash]); let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes failed"); // Unpack into a fresh repo and confirm the short hash made it out. let tmp = tempfile::TempDir::new()?; let target = repositories::init(tmp.path())?; - let installed = FileBackend::new(&target) + let installed = target + .merkle_store() .unpack(&buf[..], UnpackOptions::Overwrite) .expect("unpack failed"); @@ -1326,7 +1330,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&buf[..], UnpackOptions::Overwrite) .expect_err("non-hex node id must be rejected"); let msg = format!("{err}"); @@ -1376,7 +1381,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&buf[..], UnpackOptions::Overwrite) .expect_err("over-deep entry must be rejected"); let msg = format!("{err}"); @@ -1420,7 +1426,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&buf[..], UnpackOptions::Overwrite) .expect_err("unknown leaf filename must be rejected"); let msg = format!("{err}"); @@ -1540,7 +1547,7 @@ mod tests { let new_pack = { let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::LegacyClientPush, &mut buf) .expect("new pack failed"); buf @@ -1560,7 +1567,7 @@ mod tests { #[test] fn test_exists_returns_false_for_missing_hash() -> Result<(), OxenError> { test::run_empty_local_repo_test(|repo| { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); assert!( !store.exists(&missing).expect("exists must not error"), @@ -1574,7 +1581,7 @@ mod tests { #[test] fn test_get_node_returns_none_for_missing_hash() -> Result<(), OxenError> { test::run_empty_local_repo_test(|repo| { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); assert!( store @@ -1596,7 +1603,7 @@ mod tests { let commit = CommitNode::default(); let commit_hash = *commit.hash(); { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let session = store.begin().expect("begin failed"); let ns = session .create_node(&commit, None) @@ -1604,7 +1611,7 @@ mod tests { ns.finish().expect("finish node session failed"); session.finish().expect("finish session failed"); } - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let children = store .get_children(&commit_hash) .expect("get_children must not error"); @@ -1622,7 +1629,7 @@ mod tests { #[test] fn test_writer_session_with_no_nodes() -> Result<(), OxenError> { test::run_empty_local_repo_test(|repo| { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let session = store.begin().expect("begin failed"); session .finish() @@ -1637,13 +1644,14 @@ mod tests { async fn test_unpack_empty_tarball() -> Result<(), OxenError> { test::run_one_commit_local_repo_test(|repo| { let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&HashSet::new(), PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes(empty) must not error"); let tmp = tempfile::TempDir::new()?; let target = repositories::init(tmp.path())?; - let installed = FileBackend::new(&target) + let installed = target + .merkle_store() .unpack(&buf[..], UnpackOptions::Overwrite) .expect("unpack of empty tarball must not error"); assert!( @@ -1671,7 +1679,7 @@ mod tests { hashes.insert(absent); let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes failed"); @@ -1717,7 +1725,7 @@ mod tests { assert!(estimate > 0, "estimate must be non-zero for a present hash"); let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes failed"); assert!( @@ -1757,7 +1765,7 @@ mod tests { async fn test_unpack_via_vfs_branch() -> Result<(), OxenError> { test::run_one_commit_local_repo_test(|repo| { let mut packed = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut packed) .expect("pack_all failed"); assert!(!packed.is_empty(), "pack_all produced empty buffer"); @@ -1767,7 +1775,8 @@ mod tests { clone.set_vfs(Some(true)); assert!(clone.is_vfs(), "vfs flag should be on for this test"); - let installed = FileBackend::new(&clone) + let installed = clone + .merkle_store() .unpack(&packed[..], UnpackOptions::Overwrite) .expect("unpack via vfs branch failed"); assert!( @@ -1776,7 +1785,7 @@ mod tests { ); for h in &installed { assert!( - FileBackend::new(&clone).exists(h).expect("exists failed"), + clone.merkle_store().exists(h).expect("exists failed"), "hash {h} not readable in vfs-cloned repo" ); } diff --git a/crates/lib/src/core/progress/push_progress.rs b/crates/lib/src/core/progress/push_progress.rs index 490207258..ec4bb6552 100644 --- a/crates/lib/src/core/progress/push_progress.rs +++ b/crates/lib/src/core/progress/push_progress.rs @@ -41,6 +41,12 @@ impl PushProgress { self.sync_progress.add_bytes(bytes); } + /// Extend the underlying progress bar's total length by `delta`. See + /// [`SyncProgress::inc_total_bytes`] for usage notes. + pub fn inc_total_bytes(&self, delta: u64) { + self.sync_progress.inc_total_bytes(delta); + } + pub fn get_num_files(&self) -> u64 { self.sync_progress.get_num_files() } diff --git a/crates/lib/src/core/progress/sync_progress.rs b/crates/lib/src/core/progress/sync_progress.rs index 0297c852e..fa6a7fc02 100644 --- a/crates/lib/src/core/progress/sync_progress.rs +++ b/crates/lib/src/core/progress/sync_progress.rs @@ -72,6 +72,15 @@ impl SyncProgress { self.total_bytes = Some(total_bytes); } + /// Extend the underlying progress bar's total length by `delta`. Used when an + /// additional pre-known byte count needs to be added to a bar that was already + /// constructed with an initial total (e.g., the merkle-tree upload bytes layered + /// on top of file-content upload bytes during a push). Uses `ProgressBar`'s + /// interior mutability, so this works through `&Arc`. + pub fn inc_total_bytes(&self, delta: u64) { + self.progress_bar.inc_length(delta); + } + pub fn set_message(&self, message: impl Into>) { self.progress_bar.set_message(message); } diff --git a/crates/lib/src/model/merkle_tree.rs b/crates/lib/src/model/merkle_tree.rs index 9ec546ec8..824b7d168 100644 --- a/crates/lib/src/model/merkle_tree.rs +++ b/crates/lib/src/model/merkle_tree.rs @@ -23,15 +23,20 @@ pub trait MerkleStore: MerkleReader + MerkleWriter MerkleStore for T where T: MerkleReader + MerkleWriter::Error> {} -/// A Merkle tree backend that can also be transported (packed/unpacked) over the wire. +/// A [`MerkleStore`] that can also pack and unpack the canonical tar-gz wire format. /// -/// The store-side and transport-side `Error` types are intentionally allowed to differ — -/// transport-layer errors (tar parsing, gzip framing, path-traversal rejection) have a -/// different shape than store-layer errors (filesystem / database faults), and pinning -/// them together would force one surface to absorb the other's variants. See -/// [`MerkleTransport`] for more detail. +/// The store side ([`MerkleStore`] = [`MerkleReader`] + [`MerkleWriter`]) uses one error type, +/// and the transport side ([`MerkleTransport`] = [`MerklePacker`] + [`MerkleUnpacker`]) uses +/// one error type, but those two sides are **not** required to share the same type. That gives +/// backends room to use a transport-specific error that extends their store error with +/// tar/gzip/parse variants, rather than having to fold every transport concern into the +/// store error. Both sides still implement [`IntoOxenError`], so `?` at call sites converts +/// each error directly to [`OxenError`] with no further plumbing. +/// +/// [`IntoOxenError`]: crate::error::IntoOxenError +/// [`OxenError`]: crate::error::OxenError pub trait TransportableMerkleStore: MerkleStore + MerkleTransport {} -/// Any type that implements both [`MerkleStore`] and [`MerkleTransport`] is automatically -/// a [`TransportableMerkleStore`]. +/// Any type that is a [`MerkleStore`] and a [`MerkleTransport`] is automatically a +/// [`TransportableMerkleStore`]. The store-side and transport-side error types can differ. impl TransportableMerkleStore for T where T: MerkleStore + MerkleTransport {} diff --git a/crates/lib/src/model/merkle_tree/merkle_transport.rs b/crates/lib/src/model/merkle_tree/merkle_transport.rs index 7faba4c3f..3f59cd1a6 100644 --- a/crates/lib/src/model/merkle_tree/merkle_transport.rs +++ b/crates/lib/src/model/merkle_tree/merkle_transport.rs @@ -108,18 +108,8 @@ pub trait MerkleUnpacker: Send + Sync { /// Marker super-trait: a backend that can both pack and unpack with a single error type. /// -/// Note: [`MerkleTransport`]'s shared error type is intentionally **separate** from any -/// store-side error type (e.g. on a [`MerkleStore`]-also implementor). Transport errors -/// (tar parse, gzip, path traversal) and store errors (RocksDB / filesystem layout) -/// have different shapes and decoupling them lets each surface report what it knows -/// without leaking unrelated variants. See the -/// `transport errors != merkle store errors` discussion in the merkle-tree refactor -/// branch for the historical motivation. -/// /// The blanket impl below makes any type that implements [`MerklePacker`] and /// [`MerkleUnpacker`] with matching error types automatically a [`MerkleTransport`]. -/// -/// [`MerkleStore`]: crate::model::merkle_tree::MerkleStore pub trait MerkleTransport: MerklePacker + MerkleUnpacker::Error> { diff --git a/crates/lib/src/model/repository/local_repository.rs b/crates/lib/src/model/repository/local_repository.rs index 391995957..e9edf054c 100644 --- a/crates/lib/src/model/repository/local_repository.rs +++ b/crates/lib/src/model/repository/local_repository.rs @@ -4,7 +4,7 @@ use crate::constants::{self, DEFAULT_VNODE_SIZE, MIN_OXEN_VERSION}; use crate::core::db::merkle_node::FileBackend; use crate::core::versions::MinOxenVersion; use crate::error::OxenError; -use crate::model::merkle_tree::MerkleStore; +use crate::model::merkle_tree::TransportableMerkleStore; use crate::model::merkle_tree::node::FileNode; use crate::model::{MetadataEntry, Remote, RemoteRepository}; use crate::opts::StorageOpts; @@ -100,15 +100,22 @@ impl LocalRepository { /// Obtain the Merkle tree store for this repository. /// - /// Returns an opaque `impl MerkleStore` whose concrete type is the private + /// Returns an opaque `impl TransportableMerkleStore` whose concrete type is the private /// dispatch enum in [`merkle_store_dispatch`]. Callers use it purely through the - /// trait surface (read, write); backend selection is an implementation detail - /// of this method. + /// trait surface (read, write, pack, unpack); backend selection is an implementation + /// detail of this method. + /// + /// Wire-format and existing-file policy are per-call concerns — see + /// [`PackOptions`] and [`UnpackOptions`] for the variants each pack/unpack + /// call site picks. /// /// When new backends (e.g. LMDB) are added, they are registered in /// `merkle_store_dispatch::define_merkle_store_dispatch!`, and the /// dispatch logic for choosing among them lives here. - pub fn merkle_store(&self) -> impl MerkleStore + '_ { + /// + /// [`PackOptions`]: crate::model::merkle_tree::merkle_transport::PackOptions + /// [`UnpackOptions`]: crate::model::merkle_tree::merkle_transport::UnpackOptions + pub fn merkle_store(&self) -> impl TransportableMerkleStore + '_ { merkle_store_dispatch::StoreEnum::File(FileBackend::new(self)) } diff --git a/crates/lib/src/model/repository/local_repository/merkle_store_dispatch.rs b/crates/lib/src/model/repository/local_repository/merkle_store_dispatch.rs index f456bd0f7..fd984a450 100644 --- a/crates/lib/src/model/repository/local_repository/merkle_store_dispatch.rs +++ b/crates/lib/src/model/repository/local_repository/merkle_store_dispatch.rs @@ -18,6 +18,9 @@ use crate::error::{IntoOxenError, OxenError}; use crate::model::MerkleHash; use crate::model::TMerkleTreeNode; use crate::model::merkle_tree::merkle_reader::{MerkleNodeRecord, MerkleReader}; +use crate::model::merkle_tree::merkle_transport::{ + MerklePacker, MerkleUnpacker, PackOptions, UnpackOptions, +}; use crate::model::merkle_tree::merkle_writer::{ MerkleWriteSession, MerkleWriter, NodeWriteSession, }; @@ -170,6 +173,41 @@ macro_rules! define_merkle_store_dispatch { } } } + + impl<'repo> MerklePacker for StoreEnum<'repo> { + type Error = StoreError; + + fn pack_nodes( + &self, + hashes: &std::collections::HashSet, + opts: PackOptions, + out: W, + ) -> Result<(), StoreError> { + match self { + $( Self::$variant(b) => b.pack_nodes(hashes, opts, out).map_err(StoreError::$variant) ),* + } + } + + fn pack_all(&self, out: W) -> Result<(), StoreError> { + match self { + $( Self::$variant(b) => b.pack_all(out).map_err(StoreError::$variant) ),* + } + } + } + + impl<'repo> MerkleUnpacker for StoreEnum<'repo> { + type Error = StoreError; + + fn unpack( + &self, + reader: R, + opts: UnpackOptions, + ) -> Result, StoreError> { + match self { + $( Self::$variant(b) => b.unpack(reader, opts).map_err(StoreError::$variant) ),* + } + } + } }; } diff --git a/crates/lib/src/repositories/tree.rs b/crates/lib/src/repositories/tree.rs index f1444304d..7d5fb2cb0 100644 --- a/crates/lib/src/repositories/tree.rs +++ b/crates/lib/src/repositories/tree.rs @@ -1,21 +1,17 @@ use bytesize::ByteSize; -use flate2::Compression; -use flate2::read::GzDecoder; -use flate2::write::GzEncoder; use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; -use std::str; -use tar::Archive; -use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR}; use crate::core::commit_sync_status; -use crate::core::db::merkle_node::merkle_node_db::node_db_path; use crate::core::node_sync_status; use crate::core::v_latest::index::CommitMerkleTree as CommitMerkleTreeLatest; use crate::core::v_latest::index::CommitMerkleTree; use crate::core::v_old::v0_19_0::index::CommitMerkleTree as CommitMerkleTreeV0_19_0; use crate::core::versions::MinOxenVersion; use crate::error::OxenError; +use crate::model::merkle_tree::merkle_transport::{ + MerklePacker, MerkleUnpacker, PackOptions, UnpackOptions, +}; use crate::model::merkle_tree::merkle_writer::{ MerkleWriteSession, MerkleWriter, NodeWriteSession, }; @@ -849,180 +845,67 @@ pub fn cp_dir_hashes_to( } pub fn compress_tree(repository: &LocalRepository) -> Result, OxenError> { - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - compress_full_tree(repository, &mut tar)?; - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX); - + let mut buf = Vec::new(); + repository.merkle_store().pack_all(&mut buf)?; + let total_size: u64 = u64::try_from(buf.len()).unwrap_or(u64::MAX); log::debug!("Compressed entire tree size is {}", ByteSize::b(total_size)); - - Ok(buffer) -} - -pub fn compress_full_tree( - repository: &LocalRepository, - tar: &mut tar::Builder>>, -) -> Result<(), OxenError> { - // This will be the subdir within the tarball, - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR); - let nodes_dir = repository - .path - .join(OXEN_HIDDEN_DIR) - .join(TREE_DIR) - .join(NODES_DIR); - - log::debug!("Compressing tree in dir {nodes_dir:?}"); - - if nodes_dir.exists() { - tar.append_dir_all(&tar_subdir, nodes_dir)?; - } - - Ok(()) + Ok(buf) } pub fn compress_nodes( repository: &LocalRepository, hashes: &HashSet, ) -> Result, OxenError> { - // zip up the node directories for each commit tree - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - log::debug!("Compressing {} unique nodes...", hashes.len()); - for hash in hashes { - // This will be the subdir within the tarball - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let dir_prefix = hash.to_hex_hash().node_db_prefix(); - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix); - - let node_dir = node_db_path(repository, hash); - // log::debug!("Compressing node from dir {:?}", node_dir); - if node_dir.exists() { - tar.append_dir_all(&tar_subdir, node_dir)?; - } - } - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - Ok(buffer) + let mut buf = Vec::new(); + repository + .merkle_store() + .pack_nodes(hashes, PackOptions::ServerCanonical, &mut buf)?; + Ok(buf) } pub fn compress_node( repository: &LocalRepository, hash: &MerkleHash, ) -> Result, OxenError> { - // This will be the subdir within the tarball - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let dir_prefix = hash.to_hex_hash().node_db_prefix(); - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix); - - // zip up the node directory - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - let node_dir = node_db_path(repository, hash); - - // log::debug!("Compressing node {} from dir {:?}", hash, node_dir); - if node_dir.exists() { - tar.append_dir_all(&tar_subdir, node_dir)?; - } - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX); + let mut set = HashSet::with_capacity(1); + set.insert(*hash); + let mut buf = Vec::new(); + repository + .merkle_store() + .pack_nodes(&set, PackOptions::ServerCanonical, &mut buf)?; + let total_size: u64 = u64::try_from(buf.len()).unwrap_or(u64::MAX); log::debug!( "Compressed node {} size is {}", hash, ByteSize::b(total_size) ); - - Ok(buffer) + Ok(buf) } pub fn compress_commits( repository: &LocalRepository, commits: &Vec, ) -> Result, OxenError> { - // zip up the node directory - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - + let mut hashes = HashSet::with_capacity(commits.len()); for commit in commits { - let hash = commit.hash()?; - // This will be the subdir within the tarball - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let dir_prefix = hash.to_hex_hash().node_db_prefix(); - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix); - - let node_dir = node_db_path(repository, &hash); - log::debug!("Compressing commit from dir {node_dir:?}"); - if node_dir.exists() { - tar.append_dir_all(&tar_subdir, node_dir)?; - } + hashes.insert(commit.hash()?); } - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - Ok(buffer) + let mut buf = Vec::new(); + repository + .merkle_store() + .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut buf)?; + Ok(buf) } pub fn unpack_nodes( repository: &LocalRepository, buffer: &[u8], ) -> Result, OxenError> { - let mut hashes: HashSet = HashSet::new(); - log::debug!("Unpacking nodes from buffer..."); - let decoder = GzDecoder::new(buffer); - log::debug!("Decoder created"); - let mut archive = Archive::new(decoder); - log::debug!("Archive created"); - let Ok(entries) = archive.entries() else { - return Err(OxenError::basic_str( - "Could not unpack tree database from archive", - )); - }; - log::debug!("Extracting entries..."); - for file in entries { - let Ok(mut file) = file else { - log::error!("Could not unpack file in archive..."); - continue; - }; - let path = file.path().unwrap(); - let oxen_hidden_path = repository.path.join(OXEN_HIDDEN_DIR); - let dst_path = oxen_hidden_path.join(TREE_DIR).join(NODES_DIR).join(path); - - if let Some(parent) = dst_path.parent() { - util::fs::create_dir_all(parent).expect("Could not create parent dir"); - } - // log::debug!("create_node writing {:?}", dst_path); - if dst_path.exists() { - log::debug!("Node already exists at {dst_path:?}"); - continue; - } - file.unpack(&dst_path)?; - - // the hash is the last two path components combined - if !dst_path.ends_with("node") && !dst_path.ends_with("children") { - let id = dst_path - .components() - .rev() - .take(2) - .map(|c| c.as_os_str().to_str().unwrap()) - .collect::>() - .into_iter() - .rev() - .collect::(); - hashes.insert(id.parse()?); - } - } - Ok(hashes) + log::debug!("Unpacking nodes from buffer ({} bytes)", buffer.len()); + Ok(repository + .merkle_store() + .unpack(buffer, UnpackOptions::SkipExisting)?) } /// Write a node to disk