Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/lib/src/api/client/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use tempfile::TempDir;

use crate::api::client;
use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR};
use crate::core::db::merkle_node::merkle_node_db::node_db_prefix;
use crate::core::progress::push_progress::PushProgress;
use crate::core::v_latest::index::CommitMerkleTree;
use crate::error::OxenError;
Expand Down Expand Up @@ -72,7 +71,7 @@ pub async fn create_nodes(
.join(NODES_DIR);

for (i, node_hash) in nodes.iter().enumerate() {
let dir_prefix = node_db_prefix(node_hash);
let dir_prefix = node_hash.to_hex_hash().node_db_prefix();
let node_dir = node_path.join(&dir_prefix);
// log::debug!(
// "create_nodes appending objects dir {:?} to tar at path {:?}",
Expand Down
3 changes: 1 addition & 2 deletions crates/lib/src/core/commit_sync_status.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::constants;
use crate::core::db::merkle_node::merkle_node_db::node_db_prefix;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::model::MerkleHash;
Expand Down Expand Up @@ -45,7 +44,7 @@ pub fn mark_commit_as_synced(
}

fn commit_is_synced_file_path(repo: &LocalRepository, commit_hash: &MerkleHash) -> PathBuf {
let dir_prefix = node_db_prefix(commit_hash);
let dir_prefix = commit_hash.to_hex_hash().node_db_prefix();
repo.path
.join(constants::OXEN_HIDDEN_DIR)
.join(constants::TREE_DIR)
Expand Down
107 changes: 73 additions & 34 deletions crates/lib/src/core/db/merkle_node/merkle_node_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::constants;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::model::MerkleHash;
use crate::model::merkle_tree::node_type::InvalidMerkleTreeNodeType;
use crate::util;

use crate::model::merkle_tree::node::{
Expand All @@ -67,23 +68,55 @@ use crate::model::merkle_tree::node::{
const NODE_FILE: &str = "node";
const CHILDREN_FILE: &str = "children";

pub fn node_db_prefix(hash: &MerkleHash) -> PathBuf {
let hash_str = hash.to_string();
let dir_prefix_len = 3;
let dir_prefix = hash_str.chars().take(dir_prefix_len).collect::<String>();
let dir_suffix = hash_str.chars().skip(dir_prefix_len).collect::<String>();
Path::new(&dir_prefix).join(&dir_suffix)
}

/// An absolute path to the directory for the Merkle node's `node` and `children` files.
pub fn node_db_path(repo: &LocalRepository, hash: &MerkleHash) -> PathBuf {
let dir_prefix = node_db_prefix(hash);
let dir_prefix = hash.to_hex_hash().node_db_prefix();
repo.path
Comment thread
coderabbitai[bot] marked this conversation as resolved.
.join(constants::OXEN_HIDDEN_DIR)
.join(constants::TREE_DIR)
.join(constants::NODES_DIR)
.join(dir_prefix)
}

/// Errors that the Merkle node database can encounter when reading and writing nodes.
#[derive(Debug, thiserror::Error)]
pub enum MerkleDbError {
// Errors encountered in the operation of the custom file format based Merkle tree store.
#[error("Must call open before closing")]
CloseBeforeOpen,
#[error("Cannot write to read-only db")]
ReadOnly,
#[error("Cannot write size after writing data")]
IllegalOperationWriteSizeFirst,
#[error("Must call open before writing")]
WriteBeforeOpen,
#[error("Must call open before reading")]
ReadBeforeOpen,
// wrappers
#[error("Error writing to a node or children file: {0}")]
Io(#[from] std::io::Error),
#[error("Cannot encode a Merkle node: {0}")]
Encode(#[from] rmp_serde::encode::Error),
#[error("Cannot decode a Merkle node: {0}")]
Decode(#[from] rmp_serde::decode::Error),
#[error("{0}")]
TypeMismatch(#[from] InvalidMerkleTreeNodeType),
#[error("Failed to create directory: {0}")]
DirCreate(Box<OxenError>), // TODO: replace with FsError from upcoming refactoring PR
#[error("Failed to open file: {0}")]
Open(Box<OxenError>), // TODO: replace with FsError from upcoming refactoring PR
Comment thread
malcolmgreaves marked this conversation as resolved.
}

impl MerkleDbError {
fn dir_create(err: OxenError) -> Self {
Self::DirCreate(Box::new(err))
}

fn open(err: OxenError) -> Self {
Self::Open(Box::new(err))
}
}

pub struct MerkleNodeLookup {
pub data_type: u8,
pub parent_id: u128,
Expand All @@ -94,7 +127,7 @@ pub struct MerkleNodeLookup {
}

impl MerkleNodeLookup {
pub fn load(node_table_file: &mut File) -> Result<Self, OxenError> {
pub fn load(node_table_file: &mut File) -> Result<Self, MerkleDbError> {
// log::debug!("MerkleNodeLookup.load() {:?}", node_table_file);
// Read the whole node into memory
let mut file_data = Vec::new();
Expand Down Expand Up @@ -218,7 +251,7 @@ impl MerkleNodeDB {
self.data.to_owned()
}

pub fn node(&self) -> Result<EMerkleTreeNode, OxenError> {
pub fn node(&self) -> Result<EMerkleTreeNode, MerkleDbError> {
let node = Self::to_node(self.dtype, &self.data())?;
Ok(node)
}
Expand Down Expand Up @@ -249,7 +282,10 @@ impl MerkleNodeDB {
db_path.join(NODE_FILE).exists() && db_path.join(CHILDREN_FILE).exists()
}

pub fn open_read_only(repo: &LocalRepository, hash: &MerkleHash) -> Result<Self, OxenError> {
pub fn open_read_only(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<Self, MerkleDbError> {
let path = node_db_path(repo, hash);
Self::open(path, true)
}
Expand All @@ -258,23 +294,23 @@ impl MerkleNodeDB {
repo: &LocalRepository,
node: &N,
parent_id: Option<MerkleHash>,
) -> Result<Self, OxenError> {
) -> Result<Self, MerkleDbError> {
let path = node_db_path(repo, &node.hash());
if !path.exists() {
util::fs::create_dir_all(&path)?;
util::fs::create_dir_all(&path).map_err(MerkleDbError::dir_create)?;
}
log::debug!("open_read_write merkle node db at {}", path.display());
let mut db = Self::open(path, false)?;
db.write_node(node, parent_id)?;
Ok(db)
}

pub fn open(path: impl AsRef<Path>, read_only: bool) -> Result<Self, OxenError> {
pub fn open(path: impl AsRef<Path>, read_only: bool) -> Result<Self, MerkleDbError> {
let path = path.as_ref();

// mkdir if not exists
if !path.exists() {
util::fs::create_dir_all(path)?;
util::fs::create_dir_all(path).map_err(MerkleDbError::dir_create)?;
}

let node_path = path.join(NODE_FILE);
Expand All @@ -290,8 +326,8 @@ impl MerkleNodeDB {
Option<File>,
Option<File>,
) = if read_only {
let mut node_file = util::fs::open_file(node_path)?;
let children_file = util::fs::open_file(children_path)?;
let mut node_file = util::fs::open_file(node_path).map_err(MerkleDbError::open)?;
let children_file = util::fs::open_file(children_path).map_err(MerkleDbError::open)?;
// log::debug!("Opened merkle node db read_only at {}", path.display());
(
Some(MerkleNodeLookup::load(&mut node_file)?),
Expand Down Expand Up @@ -326,19 +362,21 @@ impl MerkleNodeDB {
})
}

pub fn close(&mut self) -> Result<(), OxenError> {
/// Closes the open node and children file handles.
/// WARNING: Sets the internal node_file, children_file, and lookup to None.
pub fn close(&mut self) -> Result<(), MerkleDbError> {
if let Some(node_file) = &mut self.node_file {
node_file.flush()?;
node_file.sync_data()?;
} else {
return Err(OxenError::basic_str("Must call open before closing"));
return Err(MerkleDbError::CloseBeforeOpen);
}

if let Some(children_file) = &mut self.children_file {
children_file.flush()?;
children_file.sync_data()?;
} else {
return Err(OxenError::basic_str("Must call open before closing"));
return Err(MerkleDbError::CloseBeforeOpen);
}

self.node_file = None;
Expand All @@ -348,21 +386,22 @@ impl MerkleNodeDB {
}

/// Write the base node info.
/// WARNING: Sets the internal dtype, node_id, parent_id of `self` to the values from `node`.
fn write_node<N: TMerkleTreeNode>(
&mut self,
node: &N,
parent_id: Option<MerkleHash>,
) -> Result<(), OxenError> {
) -> Result<(), MerkleDbError> {
if self.read_only {
return Err(OxenError::basic_str("Cannot write to read-only db"));
return Err(MerkleDbError::ReadOnly);
}

if self.data_offset > 0 {
return Err(OxenError::basic_str("Cannot write size after writing data"));
return Err(MerkleDbError::IllegalOperationWriteSizeFirst);
}

let Some(node_file) = self.node_file.as_mut() else {
return Err(OxenError::basic_str("Must call open before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};
// log::debug!("write_node node: {}", node);

Expand Down Expand Up @@ -396,16 +435,16 @@ impl MerkleNodeDB {
Ok(())
}

pub fn add_child<N: TMerkleTreeNode>(&mut self, item: &N) -> Result<(), OxenError> {
pub fn add_child<N: TMerkleTreeNode>(&mut self, item: &N) -> Result<(), MerkleDbError> {
if self.read_only {
return Err(OxenError::basic_str("Cannot write to read-only db"));
return Err(MerkleDbError::ReadOnly);
}

let Some(node_file) = self.node_file.as_mut() else {
return Err(OxenError::basic_str("Must call open() before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};
let Some(children_file) = self.children_file.as_mut() else {
return Err(OxenError::basic_str("Must call open() before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};

let buf = item.to_msgpack_bytes()?;
Expand Down Expand Up @@ -436,11 +475,11 @@ impl MerkleNodeDB {
D: TMerkleTreeNode + de::DeserializeOwned,
{
let Some(lookup) = self.lookup.as_ref() else {
return Err(OxenError::basic_str("Must call open before reading"));
return Err(MerkleError::ReadBeforeOpen);
};

let Some(mut children_file) = self.children_file.as_ref() else {
return Err(OxenError::basic_str("Must call open before writing"));
return Err(MerkleError::WriteBeforeOpen);
};

// Find the offset and length of the data
Expand Down Expand Up @@ -469,13 +508,13 @@ impl MerkleNodeDB {
}
*/

pub fn map(&mut self) -> Result<Vec<(MerkleHash, MerkleTreeNode)>, OxenError> {
pub fn map(&mut self) -> Result<Vec<(MerkleHash, MerkleTreeNode)>, MerkleDbError> {
// log::debug!("Loading merkle node db map");
let Some(lookup) = self.lookup.as_ref() else {
return Err(OxenError::basic_str("Must call open before reading"));
return Err(MerkleDbError::ReadBeforeOpen);
};
let Some(children_file) = self.children_file.as_mut() else {
return Err(OxenError::basic_str("Must call open before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};

// Parse the node parent id
Expand Down
3 changes: 1 addition & 2 deletions crates/lib/src/core/node_sync_status.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::constants;
use crate::core::db::merkle_node::merkle_node_db::node_db_prefix;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::model::MerkleHash;
Expand Down Expand Up @@ -45,7 +44,7 @@ pub fn mark_node_as_synced(
}

fn node_is_synced_file_path(repo: &LocalRepository, node_hash: &MerkleHash) -> PathBuf {
let dir_prefix = node_db_prefix(node_hash);
let dir_prefix = node_hash.to_hex_hash().node_db_prefix();
repo.path
.join(constants::OXEN_HIDDEN_DIR)
.join(constants::TREE_DIR)
Expand Down
4 changes: 4 additions & 0 deletions crates/lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::path::Path;
use std::path::PathBuf;
use tokio::task::JoinError;

use crate::core::db::merkle_node::merkle_node_db::MerkleDbError;
use crate::model::ParsedResource;
use crate::model::RepoNew;
use crate::model::Schema;
Expand Down Expand Up @@ -176,6 +177,9 @@ pub enum OxenError {
#[error("{0}")]
MerkleTreeError(#[from] InvalidMerkleTreeNodeType),

#[error("{0}")]
MerkleDbError(#[from] MerkleDbError),

//
// Schema (dataframes)
//
Expand Down
Loading
Loading