diff --git a/Cargo.toml b/Cargo.toml index 06b9a600..7a106888 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false } composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false } +splitfdstream = { version = "0.3.0", path = "crates/splitfdstream", default-features = false } [profile.dev.package.sha2] # this is *really* slow otherwise diff --git a/crates/cfsctl/Cargo.toml b/crates/cfsctl/Cargo.toml index bfb56522..05f7c41c 100644 --- a/crates/cfsctl/Cargo.toml +++ b/crates/cfsctl/Cargo.toml @@ -16,7 +16,7 @@ path = "src/lib.rs" [features] default = ['pre-6.15', 'oci'] http = ['composefs-http'] -oci = ['composefs-oci'] +oci = ['composefs-oci', 'splitfdstream'] rhel9 = ['composefs/rhel9'] 'pre-6.15' = ['composefs/pre-6.15'] @@ -29,6 +29,7 @@ composefs = { workspace = true } composefs-boot = { workspace = true } composefs-oci = { workspace = true, optional = true, features = ["boot"] } composefs-http = { workspace = true, optional = true } +splitfdstream = { workspace = true, optional = true } env_logger = { version = "0.11.0", default-features = false } hex = { version = "0.4.0", default-features = false } rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] } diff --git a/crates/cfsctl/src/lib.rs b/crates/cfsctl/src/lib.rs index 650924b7..197721fc 100644 --- a/crates/cfsctl/src/lib.rs +++ b/crates/cfsctl/src/lib.rs @@ -199,6 +199,16 @@ enum OciCommand { /// Optional human-readable name for the layer name: Option, }, + /// Imports a complete image from a splitfdstream server into the repository. + ImportImageSplitfdstream { + /// Path to the splitfdstream server socket + socket: PathBuf, + /// The image ID (manifest digest or tag) + image_id: String, + /// Tag name for imported image + #[clap(long)] + tag: Option, + }, /// List the contents of a stored tar layer LsLayer { /// Layer content digest, e.g. sha256:a1b2c3... @@ -810,6 +820,27 @@ where .await?; println!("{}", object_id.to_id()); } + OciCommand::ImportImageSplitfdstream { + socket, + image_id, + tag, + } => { + let result = composefs_oci::import_complete_image_from_splitfdstream( + &Arc::new(repo), + &socket, + &image_id, + tag.as_deref(), + )?; + + println!("Imported complete image:"); + println!(" Manifest: {}", result.manifest_digest); + println!(" Config: {}", result.config_digest); + println!(" Layers: {}", result.layers_imported); + println!(" Size: {} bytes", result.total_size_bytes); + if let Some(tag_name) = tag { + println!(" Tagged: {}", tag_name); + } + } OciCommand::LsLayer { ref name } => { composefs_oci::ls_layer(&repo, name)?; } diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index 7898a68d..543cc6a4 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -21,9 +21,11 @@ async-compression = { version = "0.4.0", default-features = false, features = [" bytes = { version = "1", default-features = false } composefs = { workspace = true } composefs-boot = { workspace = true, optional = true } +splitfdstream = { workspace = true } containers-image-proxy = { version = "0.9.2", default-features = false } hex = { version = "0.4.0", default-features = false } indicatif = { version = "0.18.0", default-features = false, features = ["tokio"] } +jsonrpc-fdpass = { git = "https://github.com/bootc-dev/jsonrpc-fdpass.git" } rustix = { version = "1.0.0", features = ["fs"] } serde = { version = "1.0", default-features = false, features = ["derive"] } thiserror = { version = "2.0.0", default-features = false } diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index 4dfaa98d..bdbef8b1 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -14,6 +14,7 @@ pub mod boot; pub mod image; pub mod oci_image; pub mod skopeo; +pub mod splitfdstream; pub mod tar; /// Test utilities for building OCI images from dumpfile strings. @@ -25,6 +26,10 @@ pub mod test_util; // Re-export the composefs crate for consumers who only need composefs-oci pub use composefs; +pub use splitfdstream::{ + CompleteImageImportResult, import_complete_image_from_splitfdstream, import_from_splitfdstream, +}; + use std::{collections::HashMap, sync::Arc}; use anyhow::{Result, ensure}; diff --git a/crates/composefs-oci/src/oci_image.rs b/crates/composefs-oci/src/oci_image.rs index 3c2c5b9e..d56f5279 100644 --- a/crates/composefs-oci/src/oci_image.rs +++ b/crates/composefs-oci/src/oci_image.rs @@ -50,8 +50,8 @@ use serde::Serialize; use composefs::{fsverity::FsVerityHashValue, repository::Repository}; -use crate::ContentAndVerity; use crate::skopeo::{OCI_BLOB_CONTENT_TYPE, OCI_CONFIG_CONTENT_TYPE, OCI_MANIFEST_CONTENT_TYPE}; +use crate::{ContentAndVerity, sha256_content_digest}; /// Data and named refs from a splitstream with external object storage. type ExternalData = (Vec, HashMap, ObjectID>); @@ -684,6 +684,54 @@ pub(crate) fn rewrite_manifest( Ok((manifest_digest.clone(), id)) } +/// Writes a manifest to the repository from raw JSON bytes. +/// +/// Unlike [`write_manifest`], this preserves the exact JSON bytes from the +/// original source, avoiding digest mismatches from re-serialization. +pub fn write_manifest_raw( + repo: &Arc>, + manifest_json: &[u8], + manifest_digest: &str, + config_verity: &ObjectID, + layer_verities: &HashMap, ObjectID>, + reference: Option<&str>, +) -> Result<(String, ObjectID)> { + let digest: OciDigest = manifest_digest.parse().context("parsing manifest digest")?; + let content_id = manifest_identifier(&digest); + + if let Some(verity) = repo.has_stream(&content_id)? { + if let Some(name) = reference { + tag_image(repo, &digest, name)?; + } + return Ok((manifest_digest.to_string(), verity)); + } + + let computed = sha256_content_digest(manifest_json); + ensure!( + digest == computed, + "Manifest digest mismatch: expected {digest}, got {computed}" + ); + + let manifest: ImageManifest = + serde_json::from_slice(manifest_json).context("parsing manifest JSON")?; + + let mut stream = repo.create_stream(OCI_MANIFEST_CONTENT_TYPE)?; + + let config_key = format!("config:{}", manifest.config().digest()); + stream.add_named_stream_ref(&config_key, config_verity); + + for (diff_id, verity) in layer_verities { + stream.add_named_stream_ref(diff_id, verity); + } + + stream.write_external(manifest_json)?; + + let oci_ref = reference.map(oci_ref_path); + let id = repo.write_stream(stream, &content_id, oci_ref.as_deref())?; + + Ok((manifest_digest.to_string(), id)) +} + /// Checks if a manifest exists. pub fn has_manifest( repo: &Repository, diff --git a/crates/composefs-oci/src/splitfdstream.rs b/crates/composefs-oci/src/splitfdstream.rs new file mode 100644 index 00000000..3fd60544 --- /dev/null +++ b/crates/composefs-oci/src/splitfdstream.rs @@ -0,0 +1,297 @@ +//! Client for importing container images and layers from a splitfdstream server. +//! +//! Uses the `jsonrpc-fdpass` crate for the JSON-RPC 2.0 + SCM_RIGHTS protocol. + +use std::collections::HashMap; +use std::fs::File; +use std::io::{Cursor, Read}; +use std::os::fd::OwnedFd; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{Context, Result, bail}; +use containers_image_proxy::oci_spec::image::{ImageConfiguration, ImageManifest}; +use jsonrpc_fdpass::{JsonRpcMessage, JsonRpcRequest, MessageWithFds, UnixSocketTransport}; +use tokio::net::UnixStream; + +use composefs::fsverity::FsVerityHashValue; +use composefs::repository::Repository; +use splitfdstream::{Chunk, SplitfdstreamReader}; + +use crate::skopeo::TAR_LAYER_CONTENT_TYPE; + +/// Send a JSON-RPC request and receive the response with file descriptors. +async fn rpc_call( + socket_path: &Path, + method: &str, + params: serde_json::Value, +) -> Result<(serde_json::Value, Vec)> { + let stream = UnixStream::connect(socket_path) + .await + .context("connecting to splitfdstream server")?; + + let transport = UnixSocketTransport::new(stream); + let (mut sender, mut receiver) = transport.split(); + + let request = JsonRpcRequest::new(method.to_string(), Some(params), serde_json::json!(1)); + let message = MessageWithFds::new(JsonRpcMessage::Request(request), Vec::new()); + sender.send(message).await.context("sending request")?; + + let response = receiver.receive().await.context("receiving response")?; + + let resp_value = response + .message + .to_json_value() + .context("converting response to value")?; + + if let Some(err) = resp_value.get("error") { + let msg = err + .get("message") + .and_then(|m| m.as_str()) + .unwrap_or("unknown error"); + bail!("splitfdstream server error: {msg}"); + } + + Ok((resp_value, response.file_descriptors)) +} + +/// Import a container layer from a splitfdstream server into the repository. +/// +/// Receives file descriptors from the server and stores each one to the +/// repository immediately (then closes it) to avoid hitting the per-process +/// file descriptor limit. +pub fn import_from_splitfdstream( + repo: &Arc>, + socket_path: impl AsRef, + diff_id: &str, + layer_id: Option<&str>, + parent_id: Option<&str>, + reference: Option<&str>, +) -> Result { + let effective_layer_id = layer_id.unwrap_or(diff_id); + let mut params = serde_json::json!({ "layerId": effective_layer_id }); + if let Some(pid) = parent_id { + params["parentId"] = serde_json::json!(pid); + } + + let rt = tokio::runtime::Handle::try_current() + .map(|h| { + // We're inside an async runtime but called synchronously; + // spawn a blocking task context so we can block_on. + h + }) + .ok(); + + let (resp_value, mut fds) = if let Some(handle) = rt { + tokio::task::block_in_place(|| { + handle.block_on(rpc_call(socket_path.as_ref(), "GetSplitFDStream", params)) + })? + } else { + let rt = tokio::runtime::Runtime::new().context("creating tokio runtime")?; + rt.block_on(rpc_call(socket_path.as_ref(), "GetSplitFDStream", params))? + }; + + let _ = resp_value; // error checking done inside rpc_call + + // allFDs[0] is a memfd with the stream data; allFDs[1:] are content FDs. + if fds.is_empty() { + bail!("no file descriptors received in response"); + } + + // Read the stream data from the first FD (memfd). + let stream_fd = fds.remove(0); + let mut stream_file = File::from(stream_fd); + let mut stream_data = Vec::new(); + stream_file + .read_to_end(&mut stream_data) + .context("reading stream data from memfd")?; + + // Store content FDs to the repository immediately. + let mut stored_objects: Vec<(ObjectID, u64)> = Vec::new(); + let mut copy_buf = vec![0u8; 1024 * 1024]; + for fd in fds { + let stat = rustix::fs::fstat(&fd).context("fstat on received fd")?; + let size = stat.st_size as u64; + let (object_id, _method) = repo + .ensure_object_from_fd(fd, size, &mut copy_buf) + .context("storing fd to repository")?; + stored_objects.push((object_id, size)); + } + + // Parse the splitfdstream and build a composefs splitstream. + let diff_digest: crate::OciDigest = diff_id.parse().context("parsing diff_id as digest")?; + let content_identifier = crate::layer_identifier(&diff_digest); + let (object_id, _) = repo.ensure_stream( + &content_identifier, + TAR_LAYER_CONTENT_TYPE, + |writer| { + let mut reader = SplitfdstreamReader::new(Cursor::new(&stream_data)); + while let Some(chunk) = reader.next_chunk().context("reading splitfdstream chunk")? { + match chunk { + Chunk::Inline(data) => { + writer.write_inline(data); + } + Chunk::External(fd_index) => { + let idx = fd_index as usize; + if idx >= stored_objects.len() { + bail!( + "splitfdstream references fd index {idx} \ + but only {} content fds received", + stored_objects.len() + ); + } + let (ref object_id, size) = stored_objects[idx]; + writer.add_external_size(size); + writer.write_reference(object_id.clone())?; + } + } + } + Ok(()) + }, + reference, + )?; + Ok(object_id) +} + +/// Result of importing a complete image via splitfdstream. +#[derive(Debug)] +pub struct CompleteImageImportResult { + /// SHA-256 digest of the manifest. + pub manifest_digest: String, + /// fs-verity hash of the stored manifest. + pub manifest_verity: ObjectID, + /// SHA-256 digest of the config. + pub config_digest: String, + /// fs-verity hash of the stored config. + pub config_verity: ObjectID, + /// Per-layer (diff_id, fs-verity hash) pairs. + pub layer_verities: Vec<(String, ObjectID)>, + /// Number of layers imported. + pub layers_imported: usize, + /// Total compressed size in bytes across all layers. + pub total_size_bytes: u64, +} + +/// Import a complete OCI image from a splitfdstream server into the repository. +/// +/// Fetches image metadata (manifest, config, layer IDs) via the `GetImage` RPC, +/// then imports each layer individually via `GetSplitFDStream` calls. +pub fn import_complete_image_from_splitfdstream( + repo: &Arc>, + socket_path: impl AsRef, + image_id: &str, + reference: Option<&str>, +) -> Result> { + let params = serde_json::json!({ "imageId": image_id }); + + let rt = tokio::runtime::Handle::try_current().ok(); + let (resp_value, _fds) = if let Some(handle) = rt { + tokio::task::block_in_place(|| { + handle.block_on(rpc_call(socket_path.as_ref(), "GetImage", params)) + })? + } else { + let rt = tokio::runtime::Runtime::new().context("creating tokio runtime")?; + rt.block_on(rpc_call(socket_path.as_ref(), "GetImage", params))? + }; + + let result = resp_value + .get("result") + .context("missing 'result' in response")?; + + let manifest_json = result + .get("manifest") + .and_then(|v| v.as_str()) + .context("missing 'manifest' in response")?; + let config_json = result + .get("config") + .and_then(|v| v.as_str()) + .context("missing 'config' in response")?; + let storage_layer_ids: Vec = result + .get("layerDigests") + .and_then(|v| v.as_array()) + .context("missing 'layerDigests' in response")? + .iter() + .map(|v| v.as_str().unwrap_or("").to_string()) + .collect(); + + let manifest: ImageManifest = + serde_json::from_str(manifest_json).context("parsing image manifest")?; + let config: ImageConfiguration = + serde_json::from_str(config_json).context("parsing image configuration")?; + + let manifest_digest = crate::sha256_content_digest(manifest_json.as_bytes()); + let config_digest = crate::sha256_content_digest(config_json.as_bytes()); + + let layers = manifest.layers(); + let diff_ids = config.rootfs().diff_ids(); + + if storage_layer_ids.len() != layers.len() { + bail!( + "server returned {} storage layer IDs but manifest has {} layers", + storage_layer_ids.len(), + layers.len() + ); + } + if storage_layer_ids.len() != diff_ids.len() { + bail!( + "server returned {} storage layer IDs but config has {} diff IDs", + storage_layer_ids.len(), + diff_ids.len() + ); + } + + let mut imported_layers = Vec::new(); + let mut total_size_bytes = 0u64; + + for (i, (layer_desc, diff_id)) in layers.iter().zip(diff_ids.iter()).enumerate() { + let layer_verity = import_from_splitfdstream( + repo, + &socket_path, + diff_id, + Some(&storage_layer_ids[i]), + None, + None, + )?; + + imported_layers.push((diff_id.to_string(), layer_verity)); + total_size_bytes += layer_desc.size(); + } + + let mut layer_refs = HashMap::new(); + for (diff_id, (_, layer_verity)) in diff_ids.iter().zip(&imported_layers) { + layer_refs.insert(diff_id.clone().into_boxed_str(), layer_verity.clone()); + } + + let (_, config_verity) = crate::write_config(repo, &config, layer_refs, None, None) + .context("storing image configuration")?; + + let mut layer_digest_to_verity = HashMap::new(); + for (layer_desc, (_, layer_verity)) in layers.iter().zip(&imported_layers) { + layer_digest_to_verity.insert( + layer_desc.digest().to_string().into_boxed_str(), + layer_verity.clone(), + ); + } + + let manifest_digest_str = manifest_digest.to_string(); + let (_, manifest_verity) = crate::oci_image::write_manifest_raw( + repo, + manifest_json.as_bytes(), + &manifest_digest_str, + &config_verity, + &layer_digest_to_verity, + reference, + ) + .context("storing image manifest")?; + + let layers_imported = imported_layers.len(); + Ok(CompleteImageImportResult { + manifest_digest: manifest_digest_str, + manifest_verity, + config_digest: config_digest.to_string(), + config_verity, + layer_verities: imported_layers, + layers_imported, + total_size_bytes, + }) +} diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index d5530592..769330e6 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -1341,6 +1341,38 @@ impl Repository { Ok(id) } + /// Store an object from a file descriptor, using reflink (FICLONE) when possible. + /// + /// Tries FICLONE first for an instant copy-on-write clone (works when the + /// source fd and the repository are on the same filesystem). Falls back to + /// a buffered data copy using the caller-provided `buf`. Passing a large + /// buffer avoids the small default used by `std::io::copy`. + #[context("Ensuring object from fd exists in repository")] + pub fn ensure_object_from_fd( + &self, + fd: OwnedFd, + size: u64, + buf: &mut [u8], + ) -> Result<(ObjectID, ObjectStoreMethod)> { + let tmpfile = self.create_object_tmpfile()?; + let src = File::from(fd); + let mut dst = File::from(tmpfile); + + if rustix::fs::ioctl_ficlone(&dst, &src).is_err() { + let mut src = src; + loop { + let n = src.read(buf).context("reading from source fd")?; + if n == 0 { + break; + } + dst.write_all(&buf[..n]) + .context("writing to repository tmpfile")?; + } + } + + self.finalize_object_tmpfile(dst, size) + } + #[context("Opening file '{filename}' with verity verification")] fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result { let fd = self