Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use std::path::Path;
use super::Migrate;

use crate::config::RepositoryConfig;
use crate::core::db::merkle_node::MerkleNodeDB;
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::merkle_tree::merkle_tree_node_cache;
use crate::model::merkle_tree::merkle_writer::{
MerkleWriteSession, MerkleWriter, NodeWriteSession,
};
use crate::model::merkle_tree::node::vnode::VNodeOpts;
use crate::model::merkle_tree::node::{
CommitNode, DirNode, EMerkleTreeNode, MerkleTreeNode, VNode,
Expand Down Expand Up @@ -135,14 +137,27 @@ fn run_on_commit(repository: &LocalRepository, commit: &Commit) -> Result<(), Ox
dir_node_opts.num_entries = num_children as u64;
let dir_node = DirNode::new(&new_repo, dir_node_opts)?;

// Write a new commit db
// Write a new commit node via a session on the old repo.
let commit_node = CommitNode::from_commit(commit.clone());
let mut root_commit_db =
MerkleNodeDB::open_read_write(&old_repo, &commit_node, root_node.parent_id)?;
root_commit_db.add_child(&dir_node)?;
let old_store = old_repo.merkle_store();
let new_store = new_repo.merkle_store();
let old_session = old_store.begin()?;
let new_session = new_store.begin()?;
let mut root_commit_ns = old_session.create_node(&commit_node, root_node.parent_id)?;
root_commit_ns.add_child(&dir_node)?;
root_commit_ns.finish()?;

let current_path = Path::new("");
rewrite_nodes(&old_repo, &new_repo, &root_node, current_path)?;
rewrite_nodes(
&old_repo,
&new_repo,
&old_session,
&new_session,
&root_node,
current_path,
)?;
new_session.finish()?;
old_session.finish()?;

// println!("new tree for commit {}", commit);
// repositories::tree::print_tree(&new_repo, commit)?;
Expand All @@ -158,9 +173,12 @@ fn run_on_commit(repository: &LocalRepository, commit: &Commit) -> Result<(), Ox

// Forgive me if you are reading this for reference, we don't have great writers for the
// merkle tree yet - so there is a lot of duplicate logic with `commit_writer.rs`
fn rewrite_nodes(
#[allow(clippy::too_many_arguments)]
fn rewrite_nodes<OldS: MerkleWriteSession, NewS: MerkleWriteSession>(
old_repo: &LocalRepository,
new_repo: &LocalRepository,
old_session: &OldS,
new_session: &NewS,
node: &MerkleTreeNode,
current_dir: &Path,
) -> Result<(), OxenError> {
Expand All @@ -169,19 +187,10 @@ fn rewrite_nodes(
EMerkleTreeNode::Directory(dir) => {
// Load all the children of children (files and folders)
// Then redistribute into buckets...
// and then just use the MerkleNodeDB to write the nodes
// to the new tree
// and then just use the session to write the nodes to the new tree
let dir_children = repositories::tree::list_files_and_folders(child)?;
let current_dir = current_dir.join(dir.name());

// log::debug!(
// "rewrite_nodes {} children on current_dir {:?} DIRECTORY {} {}",
// dir_children.len(),
// current_dir,
// dir.hash(),
// dir
// );

let total_children = dir_children.len();
let vnode_size = old_repo.vnode_size();
let num_vnodes = (total_children as f32 / vnode_size as f32).ceil() as u128;
Expand All @@ -190,30 +199,14 @@ fn rewrite_nodes(
let mut dir_node_opts = dir.get_opts();
dir_node_opts.num_entries = total_children as u64;
let dir = DirNode::new(new_repo, dir_node_opts)?;
let mut dir_db = MerkleNodeDB::open_read_write(old_repo, &dir, node.parent_id)?;

// log::debug!(
// "rewrite_nodes {} VNodes for {} children in {} with vnode size {}",
// num_vnodes,
// total_children,
// dir,
// vnode_size
// );
let mut dir_ns = old_session.create_node(&dir, node.parent_id)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the current node's hash as the directory parent ID.

Line 202 passes node.parent_id when creating a rewritten directory node. That stores the grandparent's hash instead of the immediate parent, so migrated directory records get incorrect parent_id values. For example, the root dir under a commit ends up pointing at the previous commit instead of the current commit node. This should be Some(node.hash).

🛠️ Proposed fix
-                let mut dir_ns = old_session.create_node(&dir, node.parent_id)?;
+                let mut dir_ns = old_session.create_node(&dir, Some(node.hash))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut dir_ns = old_session.create_node(&dir, node.parent_id)?;
let mut dir_ns = old_session.create_node(&dir, Some(node.hash))?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/lib/src/command/migrate/m20250111083535_add_child_counts_to_nodes.rs`
at line 202, The call to old_session.create_node currently passes node.parent_id
which sets the directory's parent to the grandparent; change it to use the
current node's hash so the directory's parent is the immediate node — i.e. pass
Some(node.hash) (or equivalent) as the parent_id when calling
old_session.create_node(&dir, ...) so dir records get the correct parent_id;
update the invocation that currently uses node.parent_id to use node.hash
instead.


// Compute buckets
let mut buckets: Vec<Vec<MerkleTreeNode>> = vec![vec![]; num_vnodes as usize];
for dir_child in dir_children {
let path = current_dir.join(dir_child.maybe_path().unwrap());
let hash = hasher::hash_buffer_128bit(path.to_str().unwrap().as_bytes());
let bucket_idx = hash % num_vnodes;
// log::debug!(
// "\trewrite_nodes dir_child {:?} bucket {} num_vnodes {} hash {} {}",
// path,
// bucket_idx,
// num_vnodes,
// hash,
// dir_child
// );
buckets[bucket_idx as usize].push(dir_child);
}

Expand Down Expand Up @@ -242,51 +235,58 @@ fn rewrite_nodes(
vnodes.push((vnode_id, bucket.clone()));
}

// log::debug!("rewrite_nodes count vnodes: {}", vnodes.len());
for (hash, entries) in vnodes.iter() {
// create a new vnode obj and add the the db
// create a new vnode obj and add to the dir session
let opts = VNodeOpts {
hash: *hash,
num_entries: entries.len() as u64,
};
let vnode_obj = VNode::new(new_repo, opts)?;
// log::debug!("rewrite_nodes adding VNode to DirNode! {:?}", vnode_obj);
dir_db.add_child(&vnode_obj)?;
dir_ns.add_child(&vnode_obj)?;

let mut vnode_db =
MerkleNodeDB::open_read_write(new_repo, &vnode_obj, Some(dir_db.node_id))?;
let dir_ns_id = *dir_ns.node_id();
let mut vnode_ns = new_session.create_node(&vnode_obj, Some(dir_ns_id))?;

// log::debug!("rewrite_nodes count entries {}", entries.len());
for entry in entries {
match &entry.node {
EMerkleTreeNode::File(f_node) => {
// log::debug!("rewrite_nodes adding FileNode to VNode! {}", f_node);
vnode_db.add_child(f_node)?;
vnode_ns.add_child(f_node)?;
}
EMerkleTreeNode::Directory(d_node) => {
let mut d_node_opts = d_node.get_opts();
let d_children = repositories::tree::list_files_and_folders(entry)?;
d_node_opts.num_entries = d_children.len() as u64;
// log::debug!(
// "rewrite_nodes adding DirNode to VNode with {} num_entries {}",
// d_node_opts.num_entries,
// d_node
// );
let d_node = DirNode::new(new_repo, d_node_opts)?;
vnode_db.add_child(&d_node)?;
vnode_ns.add_child(&d_node)?;
}
_ => {
panic!("Shouldn't reach here.")
}
}
}
vnode_ns.finish()?;
}

rewrite_nodes(old_repo, new_repo, child, &current_dir)?;
dir_ns.finish()?;

rewrite_nodes(
old_repo,
new_repo,
old_session,
new_session,
child,
&current_dir,
)?;
}
EMerkleTreeNode::VNode(_) => {
// VNode just needs to traverse to the next dirnode
rewrite_nodes(old_repo, new_repo, child, current_dir)?;
rewrite_nodes(
old_repo,
new_repo,
old_session,
new_session,
child,
current_dir,
)?;
}
_ => {
// pass, FileNode was not changed, so it is on the latest version
Expand Down
22 changes: 16 additions & 6 deletions crates/lib/src/core/v_latest/commits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use std::str;

use crate::constants::COMMIT_COUNT_DIR;
use crate::core::db::key_val::{opts, str_val_db};
use crate::core::db::merkle_node::MerkleNodeDB;
use crate::core::v_latest::index::CommitMerkleTree;
use crate::model::merkle_tree::merkle_writer::{
MerkleWriteSession, MerkleWriter, NodeWriteSession,
};
use rocksdb::{DBWithThreadMode, MultiThreaded, SingleThreaded};

/// Configuration for commit traversal operations
Expand Down Expand Up @@ -283,10 +285,14 @@ pub fn create_empty_commit(
)?;

let parent_id = Some(existing_node.hash);
let mut commit_db = MerkleNodeDB::open_read_write(repo, &commit_node, parent_id)?;
let store = repo.merkle_store();
let session = store.begin()?;
let mut commit_ns = session.create_node(&commit_node, parent_id)?;
// There should always be one child, the root directory
let dir_node = existing_node.children.first().unwrap().dir()?;
commit_db.add_child(&dir_node)?;
commit_ns.add_child(&dir_node)?;
commit_ns.finish()?;
session.finish()?;

// Copy the dir hashes db to the new commit
repositories::tree::cp_dir_hashes_to(repo, &existing_commit_id, commit_node.hash())?;
Expand Down Expand Up @@ -362,9 +368,13 @@ pub fn create_initial_commit(
},
)?;

// Open the commit database and add the root directory
let mut commit_db = MerkleNodeDB::open_read_write(repo, &commit_node, None)?;
commit_db.add_child(&dir_node)?;
// Open the commit write session and add the root directory
let store = repo.merkle_store();
let session = store.begin()?;
let mut commit_ns = session.create_node(&commit_node, None)?;
commit_ns.add_child(&dir_node)?;
commit_ns.finish()?;
session.finish()?;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Initialize the dir_hash_db with the root directory hash
let commit_id_string = commit_id.to_string();
Expand Down
54 changes: 31 additions & 23 deletions crates/lib/src/core/v_latest/entries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::core;
use crate::core::db::merkle_node::MerkleNodeDB;
use crate::error::OxenError;
use crate::model::entry::metadata_entry::WorkspaceMetadataEntry;
use crate::model::merkle_tree::merkle_writer::{
MerkleWriteSession, MerkleWriter, NodeWriteSession,
};
use crate::model::merkle_tree::node::{DirNode, EMerkleTreeNode, FileNode, MerkleTreeNode};
use crate::model::metadata::MetadataDir;
use crate::model::metadata::generic_metadata::GenericMetadata;
Expand Down Expand Up @@ -545,15 +547,18 @@ pub fn update_metadata(repo: &LocalRepository, revision: impl AsRef<str>) -> Res
// Initialize data structures for aggregation
let mut num_bytes = 0;

// Start the recursive traversal
traverse_and_update_sizes_and_counts(repo, &mut node, &mut num_bytes)?;
// One merkle write session covers every node written during the traversal.
let store = repo.merkle_store();
let session = store.begin()?;
traverse_and_update_sizes_and_counts(&session, &mut node, &mut num_bytes)?;
session.finish()?;

Ok(())
}

#[allow(clippy::type_complexity)]
fn traverse_and_update_sizes_and_counts(
repo: &LocalRepository,
fn traverse_and_update_sizes_and_counts<S: MerkleWriteSession>(
session: &S,
node: &mut MerkleTreeNode,
num_bytes: &mut u64,
) -> Result<(HashMap<String, u64>, HashMap<String, u64>), OxenError> {
Expand All @@ -566,40 +571,43 @@ fn traverse_and_update_sizes_and_counts(
EMerkleTreeNode::Commit(commit_node) => {
log::debug!("Traversing node {commit_node:?}");
process_children(
repo,
session,
children,
&mut local_counts,
&mut local_sizes,
num_bytes,
)?;
let mut dir_db = MerkleNodeDB::open_read_write(repo, commit_node, node.parent_id)?;
add_children_to_db(&mut dir_db, &node.children)?;
let mut dir_ns = session.create_node(commit_node, node.parent_id)?;
add_children_to_session(&mut dir_ns, &node.children)?;
dir_ns.finish()?;
}
EMerkleTreeNode::VNode(vnode) => {
log::debug!("Traversing vnode {vnode:?}");
process_children(
repo,
session,
children,
&mut local_counts,
&mut local_sizes,
num_bytes,
)?;
let mut dir_db = MerkleNodeDB::open_read_write(repo, vnode, node.parent_id)?;
add_children_to_db(&mut dir_db, &node.children)?;
let mut dir_ns = session.create_node(vnode, node.parent_id)?;
add_children_to_session(&mut dir_ns, &node.children)?;
dir_ns.finish()?;
}
EMerkleTreeNode::Directory(dir_node) => {
log::debug!("No need to aggregate dir {}", dir_node.name());
process_children(
repo,
session,
children,
&mut local_counts,
&mut local_sizes,
num_bytes,
)?;
dir_node.set_data_type_counts(local_counts.clone());
dir_node.set_data_type_sizes(local_sizes.clone());
let mut dir_db = MerkleNodeDB::open_read_write(repo, dir_node, node.parent_id)?;
add_children_to_db(&mut dir_db, &node.children)?;
let mut dir_ns = session.create_node(dir_node, node.parent_id)?;
add_children_to_session(&mut dir_ns, &node.children)?;
dir_ns.finish()?;
}
EMerkleTreeNode::File(file_node) => {
log::debug!(
Expand All @@ -626,16 +634,16 @@ fn traverse_and_update_sizes_and_counts(
Ok((local_counts, local_sizes))
}

fn process_children(
repo: &LocalRepository,
fn process_children<S: MerkleWriteSession>(
session: &S,
children: &mut [MerkleTreeNode],
local_counts: &mut HashMap<String, u64>,
local_sizes: &mut HashMap<String, u64>,
num_bytes: &mut u64,
) -> Result<(), OxenError> {
for child in children.iter_mut() {
let (child_counts, child_sizes) =
traverse_and_update_sizes_and_counts(repo, child, num_bytes)?;
traverse_and_update_sizes_and_counts(session, child, num_bytes)?;
for (key, count) in child_counts {
*local_counts.entry(key).or_insert(0) += count;
}
Expand All @@ -646,23 +654,23 @@ fn process_children(
Ok(())
}

fn add_children_to_db(
dir_db: &mut MerkleNodeDB,
fn add_children_to_session<S: NodeWriteSession>(
ns: &mut S,
children: &[MerkleTreeNode],
) -> Result<(), OxenError> {
for child in children {
match &child.node {
EMerkleTreeNode::Commit(commit_node) => {
dir_db.add_child(commit_node)?;
ns.add_child(commit_node)?;
}
EMerkleTreeNode::Directory(dir_node) => {
dir_db.add_child(dir_node)?;
ns.add_child(dir_node)?;
}
EMerkleTreeNode::File(file_node) => {
dir_db.add_child(file_node)?;
ns.add_child(file_node)?;
}
EMerkleTreeNode::VNode(vnode) => {
dir_db.add_child(vnode)?;
ns.add_child(vnode)?;
}
_ => {
return Err(OxenError::basic_str("Unsupported node type"));
Expand Down
Loading
Loading