diff --git a/crates/lib/src/core/db/merkle_node.rs b/crates/lib/src/core/db/merkle_node.rs index 62d67660a..f9c4ce10e 100644 --- a/crates/lib/src/core/db/merkle_node.rs +++ b/crates/lib/src/core/db/merkle_node.rs @@ -1,3 +1,4 @@ +pub mod file_backend; pub mod merkle_node_db; pub(crate) use merkle_node_db::MerkleNodeDB; diff --git a/crates/lib/src/core/db/merkle_node/file_backend.rs b/crates/lib/src/core/db/merkle_node/file_backend.rs new file mode 100644 index 000000000..79965eafd --- /dev/null +++ b/crates/lib/src/core/db/merkle_node/file_backend.rs @@ -0,0 +1,326 @@ +use std::path::{Path, PathBuf}; + +use crate::core::db::merkle_node::merkle_node_db::{MerkleDbError, MerkleNodeDB}; +use crate::error::OxenError; +use crate::model::merkle_tree::merkle_reader::{MerkleEntry, MerkleReader}; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; +use crate::model::merkle_tree::node::MerkleTreeNode; +use crate::model::{LocalRepository, MerkleHash, TMerkleTreeNode}; + +/// File-based Merkle node store backend. Implements the [`MerkleReader`] and +/// [`MerkleWriter`] traits. +/// +/// Borrows the path to a local repository (via the [`LocalRepository`]'s [`PathBuf`]) so it +/// can delegate straight to [`MerkleNodeDB`]'s existing repository-based methods without any +/// modification. +#[derive(Debug)] +pub struct FileBackend { + /// Location to the repository's root in the filesystem. Must be an absolute path. + pub(crate) repo_path: PathBuf, +} + +impl FileBackend { + pub fn new(repo: &LocalRepository) -> Self { + Self { + repo_path: repo.path.clone(), + } + } +} + +/// Merkle reader implementation for the [`FileBackend`]. +/// +/// NOTE: Uses MerkleDbError internally, but is unfortunately forced to convert into +/// an [`OxenError`] at the trait boundary. It is a bug if any of these methods do +/// not return a wrapped `MerkleDbError`. +impl MerkleReader for FileBackend { + /// Checks if a node with the given `hash` exists in the store. + /// + /// Alias for [`MerkleNodeDB::exists`]. + fn exists(&self, hash: &MerkleHash) -> Result { + Ok(MerkleNodeDB::exists(&self.repo_path, hash)) + } + + /// Retrieves the node with the given `hash` from the store. `None` means no such node exists. + /// + /// Alias for [`MerkleNodeDB::open_read_only`]. + fn get_node(&self, hash: &MerkleHash) -> Result, OxenError> { + if !MerkleNodeDB::exists(&self.repo_path, hash) { + return Ok(None); + } + let db = MerkleNodeDB::open_read_only(&self.repo_path, hash)?; + let node = db.node()?; + Ok(Some(MerkleEntry { + node, + parent_id: db.parent_id, + })) + } + + /// Retrieves the children of the node with the given `hash` from the store. + /// An empty vec means that either the node is a not a directory or virtual node or it is one + /// but has no files. + /// + /// Alias for [`MerkleNodeDB::open_read_only`] & a `.map()` call on it. + fn get_children( + &self, + hash: &MerkleHash, + ) -> Result, OxenError> { + if !MerkleNodeDB::exists(&self.repo_path, hash) { + return Ok(Vec::with_capacity(0)); + } + let mut db = MerkleNodeDB::open_read_only(&self.repo_path, hash)?; + let children = db.map()?; + Ok(children) + } + + /// Load a [`MerkleTreeNode`] with full node info and 1-level (aka direct) children for any non-file node. + /// More efficient than doing a [`get_node`] and a [`get_children`] call. + fn read_full_node(&self, hash: &MerkleHash) -> Result, OxenError> { + if !MerkleNodeDB::exists(&self.repo_path, hash) { + return Ok(None); + } + let mut db = MerkleNodeDB::open_read_only(&self.repo_path, hash)?; + let node = db.node()?; + let children = db.map()?; + Ok(Some(MerkleTreeNode { + hash: *hash, + node, + parent_id: db.parent_id, + children: children.into_iter().map(|(_, c)| c).collect(), + })) + } +} + +/// Merkle writer implementation for the [`FileBackend`]. +/// +/// NOTE: Uses MerkleDbError internally, but is unfortunately forced to convert into +/// an [`OxenError`] at the trait boundary. It is a bug if any of these methods do +/// not return a wrapped `MerkleDbError`. +impl MerkleWriter for FileBackend { + /// Returns a new [`FileWriteSession`] for writing Merkle tree nodes to the store. + fn begin<'a>(&'a self) -> Result, OxenError> { + Ok(Box::new(FileWriteSession { + repo_path: &self.repo_path, + })) + } +} + +/// Write session for the file backend. Used to write multiple nodes & their children. +/// +/// Writes happen eagerly through each [`FileNodeSession`]; this session's +/// [`finish`] is a no-op. +pub struct FileWriteSession<'repo> { + repo_path: &'repo Path, +} + +/// Merkle write session implementation that the [`FileBackend`] uses. +/// +/// NOTE: Uses MerkleDbError internally, but is unfortunately forced to convert into +/// an [`OxenError`] at the trait boundary. It is a bug if any of these methods do +/// not return a wrapped `MerkleDbError`. +impl<'repo> MerkleWriteSession for FileWriteSession<'repo> { + /// Creates a new session for writing a `node` and `children` file. + /// Calls [`MerkleNodeDB::open_read_write`] internally. + fn create_node<'a>( + &'a self, + node: &dyn TMerkleTreeNode, + parent_id: Option, + ) -> Result, OxenError> { + let session = FileNodeSession::new(self.repo_path, node, parent_id)?; + Ok(Box::new(session)) + } + + /// A no-op -- the node write session from [`create_node`] eagerly writes its files. + /// The [`FileNodeSession::finish`] method flushes and closes open file handles. + fn finish(self: Box) -> Result<(), OxenError> { + Ok(()) + } +} + +/// Per-node write handle for the file backend. Writes exactly 1 `node` and 1 `children` file. +/// +/// Acts as a newtype around [`MerkleNodeDB`] with a `finished` sentinel that guards [`Drop`] +/// against double-closing the underlying file handles. When required, the drop implementation +/// will call [`FileNodeSession::finish`]. +pub struct FileNodeSession { + db: MerkleNodeDB, + finished: bool, +} + +impl FileNodeSession { + /// Opens a new [`MerkleNodeDB`] in read-write mode. + fn new( + repo_path: &Path, + node: &dyn TMerkleTreeNode, + parent_id: Option, + ) -> Result { + Ok(Self { + db: MerkleNodeDB::open_read_write(repo_path, node, parent_id)?, + finished: false, + }) + } + + /// The `finish` implementation, but using `&mut self` so that it can be used in `Drop`. + fn idempotent_finish(&mut self) -> Result<(), MerkleDbError> { + if self.finished { + Ok(()) + } else { + self.finished = true; + MerkleNodeDB::close(&mut self.db) + } + } +} + +/// Ensure that the `node` and `children` file handles are flushed and closed when dropped. +impl Drop for FileNodeSession { + fn drop(&mut self) { + self.idempotent_finish() + .expect("Did not explicitly call finish() and encountered an error."); + } +} + +/// Merkle node write session that the [`FileBackend`] uses. +/// +/// NOTE: Uses MerkleDbError internally, but is unfortunately forced to convert into +/// an [`OxenError`] at the trait boundary. It is a bug if any of these methods do +/// not return a wrapped `MerkleDbError`. +impl NodeWriteSession for FileNodeSession { + /// The node currently being written. + fn node_id(&self) -> &MerkleHash { + &self.db.node_id + } + + /// Adds an entry to the `children` file for the current node. Alias for [`MerkleNodeDB::add_child`]. + fn add_child(&mut self, child: &dyn TMerkleTreeNode) -> Result<(), OxenError> { + MerkleNodeDB::add_child(&mut self.db, child)?; + Ok(()) + } + + /// Flushes the open `node` and `children` file handles, closes them, then calls `fsync` on them. + /// Consumes the boxed session; [`Drop`] becomes a no-op after this returns `Ok` because the + /// `finished` sentinel guards `idempotent_finish`. + fn finish(mut self: Box) -> Result<(), OxenError> { + self.idempotent_finish()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::OxenError; + use crate::model::merkle_tree::node::{CommitNode, DirNode}; + use crate::test; + + /// Dropping a `FileNodeSession` without calling `finish()` must still + /// flush+sync the underlying files. This is to match the implicit-drop semantics + /// that `MerkleNodeDB` has before these `MerkleStore` traits were introduced. + #[test] + fn test_drop_finishes_file_node_session() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + let commit = CommitNode::default(); + let dir = DirNode::default(); + let commit_hash = commit.hash(); + + // Scope the session so Drop runs at its end. + { + let store = FileBackend::new(&repo); + let session = store.begin().expect("Could not begin session"); + let mut ns = session + .create_node(&commit, None) + .expect("Could not begin node session"); + ns.add_child(&dir) + .expect("Could not add a child to the node session"); + // Deliberately DO NOT call ns.finish() or session.finish(). + } + + let store = FileBackend::new(&repo); + assert!( + store + .exists(commit_hash) + .expect("commit to exist after being written") + ); + let children = store + .get_children(commit_hash) + .expect("children to exist after being written"); + assert_eq!(children.len(), 1, "expected the single dir child"); + Ok(()) + }) + } + + /// `exists` on a hash that was never written returns `Ok(false)`. + #[test] + fn test_exists_returns_false_for_missing_hash() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + let store = FileBackend::new(&repo); + let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); + assert!( + !store.exists(&missing).expect("exists must not error"), + "expected exists() to return false for an unwritten hash" + ); + Ok(()) + }) + } + + /// `get_node` on a hash that was never written returns `Ok(None)`. + #[test] + fn test_get_node_returns_none_for_missing_hash() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + let store = FileBackend::new(&repo); + let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); + assert!( + store + .get_node(&missing) + .expect("get_node must not error") + .is_none(), + "expected get_node() to return None for an unwritten hash" + ); + Ok(()) + }) + } + + /// `get_children` on a node that was written but never had children added returns + /// `Ok(empty vec)`. Documents the leaf-children-are-empty contract from + /// [`MerkleReader::get_children`]. + #[test] + fn test_get_children_returns_empty_for_node_without_children() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + let commit = CommitNode::default(); + let commit_hash = *commit.hash(); + { + let store = FileBackend::new(&repo); + let session = store.begin().expect("begin failed"); + let ns = session + .create_node(&commit, None) + .expect("create_node failed"); + ns.finish().expect("finish node session failed"); + session.finish().expect("finish session failed"); + } + let store = FileBackend::new(&repo); + let children = store + .get_children(&commit_hash) + .expect("get_children must not error"); + assert!( + children.is_empty(), + "expected an empty children list for a node with no add_child calls; got {} entries", + children.len() + ); + Ok(()) + }) + } + + /// A write session that begins and finishes without creating any node sessions + /// should round-trip cleanly with no error. + #[test] + fn test_writer_session_with_no_nodes() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + let store = FileBackend::new(&repo); + let session = store.begin().expect("begin failed"); + session + .finish() + .expect("finish must not error on empty session"); + Ok(()) + }) + } +} diff --git a/crates/lib/src/core/db/merkle_node/merkle_node_db.rs b/crates/lib/src/core/db/merkle_node/merkle_node_db.rs index 4e81933a3..63e047ab6 100644 --- a/crates/lib/src/core/db/merkle_node/merkle_node_db.rs +++ b/crates/lib/src/core/db/merkle_node/merkle_node_db.rs @@ -60,8 +60,7 @@ use crate::model::merkle_tree::node_type::InvalidMerkleTreeNodeType; use crate::util; use crate::model::merkle_tree::node::{ - CommitNode, DirNode, EMerkleTreeNode, FileChunkNode, FileNode, MerkleTreeNode, - MerkleTreeNodeType, TMerkleTreeNode, VNode, + EMerkleTreeNode, MerkleTreeNode, MerkleTreeNodeType, TMerkleTreeNode, }; const NODE_FILE: &str = "node"; @@ -126,16 +125,20 @@ struct MerkleNodeLookup { } impl MerkleNodeLookup { - pub fn load(node_table_file: &mut File) -> Result { + pub(crate) fn load(node_table_file: &mut File) -> Result { // log::debug!("MerkleNodeLookup.load() {:?}", node_table_file); - // Read the whole node into memory let mut file_data = Vec::new(); node_table_file.read_to_end(&mut file_data)?; // log::debug!( // "MerkleNodeLookup.load() read file_data: {}", // file_data.len() // ); + Self::deserialize(file_data) + } + /// Takes the on-disk bytes of a `node` file and deserializes it into a [`MerkleNodeLookup`]. + #[inline(always)] + pub(crate) fn deserialize(file_data: Vec) -> Result { // Create a cursor to iterate over data let mut cursor = std::io::Cursor::new(file_data); @@ -144,7 +147,7 @@ impl MerkleNodeLookup { cursor.read_exact(&mut buffer)?; let node_data_type = u8::from_le_bytes(buffer); // log::debug!( - // "MerkleNodeLookup.load() data_type: {:?}", + // "MerkleNodeLookup.deserialize() data_type: {:?}", // MerkleTreeNodeType::from_u8(node_data_type) // ); @@ -152,19 +155,19 @@ impl MerkleNodeLookup { let mut buffer = [0u8; 16]; // u128 is 16 bytes cursor.read_exact(&mut buffer)?; let parent_id = u128::from_le_bytes(buffer); - // log::debug!("MerkleNodeLookup.load() parent_id: {:x}", parent_id); + // log::debug!("MerkleNodeLookup.deserialize() parent_id: {:x}", parent_id); // Read the length of the node data let mut buffer = [0u8; 4]; // u32 is 4 bytes cursor.read_exact(&mut buffer)?; let data_len = u32::from_le_bytes(buffer); - // log::debug!("MerkleNodeLookup.load() data_len: {}", data_len); + // log::debug!("MerkleNodeLookup.deserialize() data_len: {}", data_len); // Read the length of the data and save buffer let mut buffer = vec![0u8; data_len as usize]; cursor.read_exact(&mut buffer)?; let data = buffer; - // log::debug!("MerkleNodeLookup.load() read data: {}", data.len()); + // log::debug!("MerkleNodeLookup.deserialize() read data: {}", data.len()); // Read the map of offsets let mut offsets: Vec<(u128, (u8, u64, u64))> = Vec::new(); @@ -176,28 +179,28 @@ impl MerkleNodeLookup { // Will loop until we hit an EOF error // let mut i = 0; while cursor.read_exact(&mut dtype_buffer).is_ok() { - // log::debug!("MerkleNodeLookup.load() --reading-- {}", i); + // log::debug!("MerkleNodeLookup.deserialize() --reading-- {}", i); let data_type = u8::from_le_bytes(dtype_buffer); // log::debug!( - // "MerkleNodeLookup.load() got data_type {:?}", + // "MerkleNodeLookup.deserialize() got data_type {:?}", // MerkleTreeNodeType::from_u8(data_type) // ); // Read the hash cursor.read_exact(&mut hash_buffer)?; let hash = u128::from_le_bytes(hash_buffer); - // log::debug!("MerkleNodeLookup.load() got hash {:x}", hash); + // log::debug!("MerkleNodeLookup.deserialize() got hash {:x}", hash); // Read the offset cursor.read_exact(&mut offset_buffer)?; let data_offset = u64::from_le_bytes(offset_buffer); - // log::debug!("MerkleNodeLookup.load() got data_offset {}", data_offset); + // log::debug!("MerkleNodeLookup.deserialize() got data_offset {}", data_offset); // Read the length cursor.read_exact(&mut len_buffer)?; let data_len = u64::from_le_bytes(len_buffer); - // log::debug!("MerkleNodeLookup.load() got data_len {}", data_len); + // log::debug!("MerkleNodeLookup.deserialize() got data_len {}", data_len); offsets.push((hash, (data_type, data_offset, data_len))); // i += 1; @@ -205,7 +208,7 @@ impl MerkleNodeLookup { let num_children = offsets.len() as u64; // log::debug!( - // "MerkleNodeLookup.load() parent_id {:x} num_children {}", + // "MerkleNodeLookup.deserialize() parent_id {:x} num_children {}", // parent_id, // num_children // ); @@ -249,17 +252,7 @@ impl MerkleNodeDB { dtype: MerkleTreeNodeType, data: &[u8], ) -> Result { - match dtype { - MerkleTreeNodeType::Commit => { - Ok(EMerkleTreeNode::Commit(CommitNode::deserialize(data)?)) - } - MerkleTreeNodeType::Dir => Ok(EMerkleTreeNode::Directory(DirNode::deserialize(data)?)), - MerkleTreeNodeType::File => Ok(EMerkleTreeNode::File(FileNode::deserialize(data)?)), - MerkleTreeNodeType::VNode => Ok(EMerkleTreeNode::VNode(VNode::deserialize(data)?)), - MerkleTreeNodeType::FileChunk => Ok(EMerkleTreeNode::FileChunk( - FileChunkNode::deserialize(data)?, - )), - } + EMerkleTreeNode::from_type_and_bytes(dtype, data) } pub(crate) fn exists(repo_path: &Path, hash: &MerkleHash) -> bool { @@ -275,9 +268,9 @@ impl MerkleNodeDB { Self::open(path, true, *hash) } - pub(crate) fn open_read_write( + pub(crate) fn open_read_write( repo_path: &Path, - node: &N, + node: &dyn TMerkleTreeNode, parent_id: Option, ) -> Result { let path = node_db_path(repo_path, &node.hash()); @@ -369,9 +362,9 @@ impl MerkleNodeDB { /// Write the base node info. /// WARNING: Sets the internal dtype, node_id, parent_id of `self` to the values from `node`. - fn write_node( + fn write_node( &mut self, - node: &N, + node: &dyn TMerkleTreeNode, parent_id: Option, ) -> Result<(), MerkleDbError> { if self.read_only { @@ -417,7 +410,7 @@ impl MerkleNodeDB { Ok(()) } - pub(crate) fn add_child(&mut self, item: &N) -> Result<(), MerkleDbError> { + pub(crate) fn add_child(&mut self, item: &dyn TMerkleTreeNode) -> Result<(), MerkleDbError> { if self.read_only { return Err(MerkleDbError::ReadOnly); } diff --git a/crates/lib/src/model/merkle_tree/node.rs b/crates/lib/src/model/merkle_tree/node.rs index 5625bcede..52027bea0 100644 --- a/crates/lib/src/model/merkle_tree/node.rs +++ b/crates/lib/src/model/merkle_tree/node.rs @@ -72,4 +72,22 @@ impl EMerkleTreeNode { EMerkleTreeNode::File(_) | EMerkleTreeNode::FileChunk(_) ) } + + /// Deserialize a Merkle tree node from its on-disk type marker and msgpack-encoded body. + pub fn from_type_and_bytes( + dtype: MerkleTreeNodeType, + data: &[u8], + ) -> Result { + match dtype { + MerkleTreeNodeType::Commit => { + Ok(EMerkleTreeNode::Commit(CommitNode::deserialize(data)?)) + } + MerkleTreeNodeType::Dir => Ok(EMerkleTreeNode::Directory(DirNode::deserialize(data)?)), + MerkleTreeNodeType::File => Ok(EMerkleTreeNode::File(FileNode::deserialize(data)?)), + MerkleTreeNodeType::VNode => Ok(EMerkleTreeNode::VNode(VNode::deserialize(data)?)), + MerkleTreeNodeType::FileChunk => Ok(EMerkleTreeNode::FileChunk( + FileChunkNode::deserialize(data)?, + )), + } + } }