Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 82 additions & 30 deletions crates/lib/src/core/db/merkle_node/merkle_node_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ use crate::constants;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::model::MerkleHash;
use crate::model::merkle_tree::merkle_hash::HexHash;
use crate::model::merkle_tree::node_type::InvalidMerkleTreeNodeType;
use crate::util;

use crate::model::merkle_tree::node::{
Expand All @@ -67,14 +69,19 @@ use crate::model::merkle_tree::node::{
const NODE_FILE: &str = "node";
const CHILDREN_FILE: &str = "children";

/// Produces a relative path for the 2-level directory structure used to store Merkle nodes.
/// The first directory name is the first 3 characters of the hex-encoded hash. The second
/// is the remaining characters.
pub fn node_db_prefix(hash: &MerkleHash) -> PathBuf {
let hash_str = hash.to_string();
let dir_prefix_len = 3;
let dir_prefix = hash_str.chars().take(dir_prefix_len).collect::<String>();
let dir_suffix = hash_str.chars().skip(dir_prefix_len).collect::<String>();
Path::new(&dir_prefix).join(&dir_suffix)
let as_hex = HexHash::new(hash);
let hash_str = as_hex.as_str();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple things you could do for ergonomics if you wanted to:

  • Sometimes it is difficult to remember what separate type/function to use to convert something, but easy to look for conversion methods on the type you already have. We could add a method like this to MerkleHash:
    pub fn to_hex_hash(&self) -> HexHash {
        HexHash(format!("{self}"))
    }
  • If you make a reference to HexHash act like a &str, then you don't need a method that does an explicit conversion anymore:
impl Deref for HexHash {
    type Target = str;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

If you do both above, then this code becomes:

Suggested change
let as_hex = HexHash::new(hash);
let hash_str = as_hex.as_str();
let hash_str = hash.to_hex_hash();

and then you could remove all of this:

impl HexHash {
    #[inline(always)]
    pub fn new(hash: &MerkleHash) -> Self {
        Self(format!("{hash}"))
    }

    pub(crate) fn as_str(&self) -> &str {
        &self.0
    }
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like to_hex_hash for MekleHash -- I'll add that one.

For &str, I want to be careful with this. In general, the intention is to not treat a HexHash as a string. I want there to be a little bit of friction so that a string can't be confused for a hexidecimal formatted MerkleHash value. With that context, I think having a Deref implementation for HexHash will make it too easy to break the type safety of the HexHash wrapper.

I added the as_str to make this prefix-suffix split function avoid two String allocations. 🤔 I could instead move this logic into HexHash. That way I can avoid having as_str all together and have the function return the 2-layered nested directories directly. I think this makes sense since this directory structure is a primary use case of hex-formatting the hash value.

const DIR_PREFIX_LEN: usize = 3;
let dir_prefix = &hash_str[0..DIR_PREFIX_LEN];
let dir_suffix = &hash_str[DIR_PREFIX_LEN..];
Path::new(dir_prefix).join(dir_suffix)
}

/// An absolute path to the directory for the Merkle node's `node` and `children` files.
pub fn node_db_path(repo: &LocalRepository, hash: &MerkleHash) -> PathBuf {
let dir_prefix = node_db_prefix(hash);
repo.path
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Expand All @@ -84,6 +91,45 @@ pub fn node_db_path(repo: &LocalRepository, hash: &MerkleHash) -> PathBuf {
.join(dir_prefix)
}

/// Errors that the Merkle node database can encounter when reading and writing nodes.
#[derive(Debug, thiserror::Error)]
pub enum MerkleDbError {
// Errors encountered in the operation of the custom file format based Merkle tree store.
#[error("Must call open before closing")]
CloseBeforeOpen,
#[error("Cannot write to read-only db")]
ReadOnly,
#[error("Cannot write size after writing data")]
IllegalOperationWriteSizeFirst,
#[error("Must call open before writing")]
WriteBeforeOpen,
#[error("Must call open before reading")]
ReadBeforeOpen,
// wrappers
#[error("Error writing to a node or children file: {0}")]
Io(#[from] std::io::Error),
#[error("Cannot encode a Merkle node: {0}")]
Encode(#[from] rmp_serde::encode::Error),
#[error("Cannot decode a Merkle node: {0}")]
Decode(#[from] rmp_serde::decode::Error),
#[error("{0}")]
TypeMismatch(#[from] InvalidMerkleTreeNodeType),
#[error("Failed to create directory: {0}")]
DirCreate(Box<OxenError>), // TODO: replace with FsError from upcoming refactoring PR
#[error("Failed to open file: {0}")]
Open(Box<OxenError>), // TODO: replace with FsError from upcoming refactoring PR
Comment thread
malcolmgreaves marked this conversation as resolved.
}

impl MerkleDbError {
fn dir_create(err: OxenError) -> Self {
Self::DirCreate(Box::new(err))
}

fn open(err: OxenError) -> Self {
Self::Open(Box::new(err))
}
}

pub struct MerkleNodeLookup {
pub data_type: u8,
pub parent_id: u128,
Expand All @@ -94,7 +140,7 @@ pub struct MerkleNodeLookup {
}

impl MerkleNodeLookup {
pub fn load(node_table_file: &mut File) -> Result<Self, OxenError> {
pub fn load(node_table_file: &mut File) -> Result<Self, MerkleDbError> {
// log::debug!("MerkleNodeLookup.load() {:?}", node_table_file);
// Read the whole node into memory
let mut file_data = Vec::new();
Expand Down Expand Up @@ -218,7 +264,7 @@ impl MerkleNodeDB {
self.data.to_owned()
}

pub fn node(&self) -> Result<EMerkleTreeNode, OxenError> {
pub fn node(&self) -> Result<EMerkleTreeNode, MerkleDbError> {
let node = Self::to_node(self.dtype, &self.data())?;
Ok(node)
}
Expand Down Expand Up @@ -249,7 +295,10 @@ impl MerkleNodeDB {
db_path.join(NODE_FILE).exists() && db_path.join(CHILDREN_FILE).exists()
}

pub fn open_read_only(repo: &LocalRepository, hash: &MerkleHash) -> Result<Self, OxenError> {
pub fn open_read_only(
repo: &LocalRepository,
hash: &MerkleHash,
) -> Result<Self, MerkleDbError> {
let path = node_db_path(repo, hash);
Self::open(path, true)
}
Expand All @@ -258,23 +307,23 @@ impl MerkleNodeDB {
repo: &LocalRepository,
node: &N,
parent_id: Option<MerkleHash>,
) -> Result<Self, OxenError> {
) -> Result<Self, MerkleDbError> {
let path = node_db_path(repo, &node.hash());
if !path.exists() {
util::fs::create_dir_all(&path)?;
util::fs::create_dir_all(&path).map_err(MerkleDbError::dir_create)?;
}
log::debug!("open_read_write merkle node db at {}", path.display());
let mut db = Self::open(path, false)?;
db.write_node(node, parent_id)?;
Ok(db)
}

pub fn open(path: impl AsRef<Path>, read_only: bool) -> Result<Self, OxenError> {
pub fn open(path: impl AsRef<Path>, read_only: bool) -> Result<Self, MerkleDbError> {
let path = path.as_ref();

// mkdir if not exists
if !path.exists() {
util::fs::create_dir_all(path)?;
util::fs::create_dir_all(path).map_err(MerkleDbError::dir_create)?;
}

let node_path = path.join(NODE_FILE);
Expand All @@ -290,8 +339,8 @@ impl MerkleNodeDB {
Option<File>,
Option<File>,
) = if read_only {
let mut node_file = util::fs::open_file(node_path)?;
let children_file = util::fs::open_file(children_path)?;
let mut node_file = util::fs::open_file(node_path).map_err(MerkleDbError::open)?;
let children_file = util::fs::open_file(children_path).map_err(MerkleDbError::open)?;
// log::debug!("Opened merkle node db read_only at {}", path.display());
(
Some(MerkleNodeLookup::load(&mut node_file)?),
Expand Down Expand Up @@ -326,19 +375,21 @@ impl MerkleNodeDB {
})
}

pub fn close(&mut self) -> Result<(), OxenError> {
/// Closes the open node and children file handles.
/// WARNING: Sets the internal node_file, children_file, and lookup to None.
pub fn close(&mut self) -> Result<(), MerkleDbError> {
if let Some(node_file) = &mut self.node_file {
node_file.flush()?;
node_file.sync_data()?;
} else {
return Err(OxenError::basic_str("Must call open before closing"));
return Err(MerkleDbError::CloseBeforeOpen);
}

if let Some(children_file) = &mut self.children_file {
children_file.flush()?;
children_file.sync_data()?;
} else {
return Err(OxenError::basic_str("Must call open before closing"));
return Err(MerkleDbError::CloseBeforeOpen);
}

self.node_file = None;
Expand All @@ -348,21 +399,22 @@ impl MerkleNodeDB {
}

/// Write the base node info.
/// WARNING: Sets the internal dtype, node_id, parent_id of `self` to the values from `node`.
fn write_node<N: TMerkleTreeNode>(
&mut self,
node: &N,
parent_id: Option<MerkleHash>,
) -> Result<(), OxenError> {
) -> Result<(), MerkleDbError> {
if self.read_only {
return Err(OxenError::basic_str("Cannot write to read-only db"));
return Err(MerkleDbError::ReadOnly);
}

if self.data_offset > 0 {
return Err(OxenError::basic_str("Cannot write size after writing data"));
return Err(MerkleDbError::IllegalOperationWriteSizeFirst);
}

let Some(node_file) = self.node_file.as_mut() else {
return Err(OxenError::basic_str("Must call open before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};
// log::debug!("write_node node: {}", node);

Expand Down Expand Up @@ -396,16 +448,16 @@ impl MerkleNodeDB {
Ok(())
}

pub fn add_child<N: TMerkleTreeNode>(&mut self, item: &N) -> Result<(), OxenError> {
pub fn add_child<N: TMerkleTreeNode>(&mut self, item: &N) -> Result<(), MerkleDbError> {
if self.read_only {
return Err(OxenError::basic_str("Cannot write to read-only db"));
return Err(MerkleDbError::ReadOnly);
}

let Some(node_file) = self.node_file.as_mut() else {
return Err(OxenError::basic_str("Must call open() before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};
let Some(children_file) = self.children_file.as_mut() else {
return Err(OxenError::basic_str("Must call open() before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};

let buf = item.to_msgpack_bytes()?;
Expand Down Expand Up @@ -436,11 +488,11 @@ impl MerkleNodeDB {
D: TMerkleTreeNode + de::DeserializeOwned,
{
let Some(lookup) = self.lookup.as_ref() else {
return Err(OxenError::basic_str("Must call open before reading"));
return Err(MerkleError::ReadBeforeOpen);
};

let Some(mut children_file) = self.children_file.as_ref() else {
return Err(OxenError::basic_str("Must call open before writing"));
return Err(MerkleError::WriteBeforeOpen);
};

// Find the offset and length of the data
Expand Down Expand Up @@ -469,13 +521,13 @@ impl MerkleNodeDB {
}
*/

pub fn map(&mut self) -> Result<Vec<(MerkleHash, MerkleTreeNode)>, OxenError> {
pub fn map(&mut self) -> Result<Vec<(MerkleHash, MerkleTreeNode)>, MerkleDbError> {
// log::debug!("Loading merkle node db map");
let Some(lookup) = self.lookup.as_ref() else {
return Err(OxenError::basic_str("Must call open before reading"));
return Err(MerkleDbError::ReadBeforeOpen);
};
let Some(children_file) = self.children_file.as_mut() else {
return Err(OxenError::basic_str("Must call open before writing"));
return Err(MerkleDbError::WriteBeforeOpen);
};

// Parse the node parent id
Expand Down
4 changes: 4 additions & 0 deletions crates/lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::path::Path;
use std::path::PathBuf;
use tokio::task::JoinError;

use crate::core::db::merkle_node::merkle_node_db::MerkleDbError;
use crate::model::ParsedResource;
use crate::model::RepoNew;
use crate::model::Schema;
Expand Down Expand Up @@ -176,6 +177,9 @@ pub enum OxenError {
#[error("{0}")]
MerkleTreeError(#[from] InvalidMerkleTreeNodeType),

#[error("{0}")]
MerkleDbError(#[from] MerkleDbError),

//
// Schema (dataframes)
//
Expand Down
35 changes: 35 additions & 0 deletions crates/lib/src/model/merkle_tree/merkle_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ use crate::error::OxenError;
pub struct MerkleHash(u128);

impl MerkleHash {
#[inline(always)]
pub fn new(hash: u128) -> Self {
Self(hash)
}

#[inline(always)]
pub fn to_le_bytes(&self) -> [u8; 16] {
self.0.to_le_bytes()
}

#[inline(always)]
pub fn to_u128(&self) -> u128 {
self.0
}
Expand All @@ -39,6 +42,7 @@ impl MerkleHash {
}
}

/// Parses a hexadecimal string into a `MerkleHash`.
impl FromStr for MerkleHash {
type Err = OxenError;

Expand All @@ -48,6 +52,7 @@ impl FromStr for MerkleHash {
}
}

/// Parses a hexadecimal string into a `MerkleHash`.
impl TryFrom<String> for MerkleHash {
type Error = OxenError;

Expand All @@ -56,6 +61,7 @@ impl TryFrom<String> for MerkleHash {
}
}

/// Writes the hash value in hexadecimal format.
impl fmt::Display for MerkleHash {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:x}", self.0)
Expand Down Expand Up @@ -85,3 +91,32 @@ serde_with::serde_conv!(
|hash: &MerkleHash| hash.to_string(),
|s: String| MerkleHash::try_from(s)
);

/// A hexadecimal representation of a `MerkleHash`. Can only be created from a `MerkleHash`.
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct HexHash(String);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

impl HexHash {
#[inline(always)]
pub fn new(hash: &MerkleHash) -> Self {
Self(format!("{hash}"))
}

pub(crate) fn as_str(&self) -> &str {
&self.0
}
}

/// Converts a `MerkleHash` into a `HexHash`.
impl From<MerkleHash> for HexHash {
fn from(value: MerkleHash) -> Self {
Self::new(&value)
}
}

/// Convert a reference to a `MerkleHash` into a `HexHash`.
impl<'a> From<&'a MerkleHash> for HexHash {
fn from(value: &'a MerkleHash) -> Self {
Self::new(value)
}
}
2 changes: 1 addition & 1 deletion crates/lib/src/repositories/commits/commit_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ fn r_create_dir_node(
match &entry.node.node {
EMerkleTreeNode::Directory(node) => {
// If the dir has updates, we need a new dir db
let dir_path = entry.node.maybe_path().unwrap();
let dir_path = entry.node.maybe_path()?;
// log::debug!("Processing dir node {:?}", dir_path);
let dir_node = if entries.contains_key(&dir_path) {
let dir_node =
Expand Down
Loading