Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions crates/weaver-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ default = []
# which is now removed; the feature now activates the corresponding
# weaver-spu features instead.
inference = ["weaver-spu/cuda", "weaver-spu/gguf"]
# `embedder-rust` opts the daemon into the in-process Rust embedder
# backend (`weaver_spu::encoder::client::EmbedderClient`). Pulls in
# `weaver-spu/flash-attn` (which transitively enables `candle` +
# `candle-cuda`) so the daemon can construct a `JinaV4Embedder` at
# boot. With this feature OFF, `[embedder].backend = "rust"` is a
# hard boot failure rather than a silent fallback to the Python path.
# Default builds keep the Python path; flip via `--features
# embedder-rust` (combine with `inference` for the full daemon).
embedder-rust = ["weaver-spu/flash-attn", "dep:candle-core"]
# `viewer` pulls in weaver-analysis (manifest schema + load) so the
# trace viewer can ingest the bench-emitted manifest.json companion.
viewer = ["dep:ratatui", "dep:crossterm", "dep:weaver-analysis"]
Expand All @@ -44,6 +53,10 @@ futures = { workspace = true }
libc = "0.2"
ratatui = { workspace = true, optional = true }
weaver-analysis = { path = "../weaver-analysis", optional = true }
# Candle is reached for under `embedder-rust` to construct the CUDA
# device + dtype the in-process EmbedderClient needs at boot. Workspace-
# pinned (same fork as weaver-spu) so versions can't drift.
candle-core = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
Expand Down
154 changes: 134 additions & 20 deletions crates/weaver-interface/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::time::Duration;

use anyhow::{Result, bail};
use clap::Args;
use weaver_core::embedder::EmbedderInfo;
use weaver_spu::decoder::multi_model::EmbedderBackend;
use weaver_spu::encoder::grpc_client_legacy::{
DEFAULT_EMBEDDER_SOCKET, EmbeddingClient, EmbeddingClientConfig, EmbeddingEndpoint,
};
Expand Down Expand Up @@ -106,15 +108,27 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
// staying down until the embedder is reachable. Operators see a clear
// refusal instead of silent drift.
let pin_path = Path::new(DEFAULT_PIN_PATH);
// Treat an empty string in [embedder].socket as unset — otherwise a blank
// override silently falls through and the probe hits the empty path.
let embedder_socket = config
let backend = config
.embedder
.as_ref()
.map(|e| e.socket.trim())
.filter(|s| !s.is_empty())
.unwrap_or(DEFAULT_EMBEDDER_SOCKET);
match probe_embedder(embedder_socket).await {
.map(|e| e.backend)
.unwrap_or_default();
let probe_result = match backend {
EmbedderBackend::Python => {
// Treat an empty string in [embedder].socket as unset — otherwise a
// blank override silently falls through and the probe hits the empty
// path.
let embedder_socket = config
.embedder
.as_ref()
.map(|e| e.socket.trim())
.filter(|s| !s.is_empty())
.unwrap_or(DEFAULT_EMBEDDER_SOCKET);
probe_embedder_python(embedder_socket).await
}
EmbedderBackend::Rust => probe_embedder_rust(config.embedder.as_ref()).await?,
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.
match probe_result {
Some(info) => verify_embedder_pin(pin_path, &info)?,
None => {
// Path::exists() collapses "file missing" and "can't check"
Expand Down Expand Up @@ -152,7 +166,7 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
.await
}

/// Probe the embedder socket with bounded retry and log its readiness state.
/// Probe the Python embedder socket with bounded retry and log its readiness state.
///
/// Systemd ordering (`After=weaver-embedder.service` on weaver-infer) puts
/// the embedder first, but `Type=simple` considers a service "active" the
Expand All @@ -166,7 +180,7 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
/// Once we're past startup, the refuse-to-start-when-pin-exists check in
/// `handle_serve` catches a truly-wedged embedder: probe returns None →
/// refuse, rather than silently running without the pin verified.
async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::InfoResponse> {
async fn probe_embedder_python(socket: &str) -> Option<EmbedderInfo> {
let client_config = EmbeddingClientConfig {
endpoint: EmbeddingEndpoint::Unix(PathBuf::from(socket)),
connect_timeout: EMBEDDER_PROBE_TIMEOUT,
Expand All @@ -185,7 +199,21 @@ async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::In
"Embedder ready at {} after {} probe attempt(s): {} (dim={}, max_seq={})",
socket, attempts, info.model_name, info.dimension, info.max_seq_length,
);
return Some(info);
// Convert proto InfoResponse → backend-agnostic
// EmbedderInfo so the pin-verifier downstream is
// backend-shape-independent. The Python service's
// `info()` already returns the proto type via the
// gRPC client's inherent `info` method (not the
// trait one) — this conversion mirrors what the
// Embedder trait impl does internally on
// `EmbeddingClient`.
return Some(EmbedderInfo {
model_name: info.model_name,
model_loaded: info.model_loaded,
dimension: info.dimension,
max_seq_length: info.max_seq_length,
weight_revision: info.weight_revision,
});
}
Err(e) => last_err = Some(format!("ensure_ready: {e}")),
},
Expand All @@ -205,6 +233,99 @@ async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::In
None
}

/// Construct the in-process Rust embedder backend
/// ([`weaver_spu::encoder::client::EmbedderClient`]) and return its
/// identity for the cohort-pin verifier.
///
/// Unlike the Python path, there's no socket-readiness retry loop —
/// the embedder either constructs successfully (model loaded, ready
/// to serve) or it doesn't. If construction fails (snapshot path
/// missing, GPU unavailable, weights corrupted), this is a hard boot
/// failure rather than a transient that we'd retry through; callers
/// see the error directly.
///
/// **Feature gating**: requires `embedder-rust` (which transitively
/// enables `weaver-spu/flash-attn`). When the feature is OFF, this
/// function rejects `backend = "rust"` with a configuration error
/// rather than silently falling through to the Python path.
async fn probe_embedder_rust(
embedder_config: Option<&weaver_spu::decoder::multi_model::EmbedderConfig>,
) -> Result<Option<EmbedderInfo>> {
#[cfg(not(feature = "embedder-rust"))]
{
let _ = embedder_config;
bail!(
"[embedder].backend = \"rust\" requires the daemon to be built with \
the `embedder-rust` feature; rebuild with \
`cargo build -p weaver-interface --features inference,embedder-rust` \
(or omit `embedder-rust` and use `backend = \"python\"`)"
);
}

#[cfg(feature = "embedder-rust")]
{
use weaver_core::embedder::Embedder;
use weaver_spu::encoder::client::EmbedderClient;

let embedder_config = embedder_config.ok_or_else(|| {
anyhow::anyhow!(
"[embedder].backend = \"rust\" but no [embedder] block in server.toml; \
add the block with `snapshot = \"/path/to/jina-v4/snapshot/<sha>/\"`"
)
})?;
let snapshot = embedder_config.snapshot.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"[embedder].backend = \"rust\" requires [embedder].snapshot to point at \
the Jina V4 HF snapshot directory (e.g. \
`snapshot = \"/opt/weaver/huggingface/.../snapshots/<sha>/\"`)"
)
})?;
// GPU ordinal from the (now-deprecated-but-still-load-bearing) [embedder].gpu
// field. Per agent-spu-schema-Spec.md §8 this becomes per-agent post-cutover;
// for now the boot-time pin probe uses the fleet-wide value.
let device = candle_core::Device::new_cuda(embedder_config.gpu)
.map_err(|e| anyhow::anyhow!("CUDA device {} unavailable: {e}", embedder_config.gpu))?;
// BF16 is the production target per docs/specs/weaver-spu-Spec.md §3.
// Pin probe doesn't depend on the dtype choice; identity is invariant
// under dtype.
let dtype = candle_core::DType::BF16;

println!(
"Constructing in-process Rust embedder from {} on cuda:{} bf16...",
snapshot.display(),
embedder_config.gpu
);
let started = std::time::Instant::now();
// Run the load on a blocking thread — it's a sync, IO+GPU-bound
// operation that takes seconds; doing it inline would block the
// tokio runtime.
let snapshot_clone = snapshot.clone();
let client = tokio::task::spawn_blocking(move || {
EmbedderClient::from_snapshot(&snapshot_clone, dtype, &device)
})
.await
.map_err(|e| anyhow::anyhow!("EmbedderClient construction task panicked: {e}"))?
.map_err(|e| anyhow::anyhow!("EmbedderClient::from_snapshot: {e}"))?;
let info = client.info().await.map_err(|e| {
anyhow::anyhow!("in-process EmbedderClient::info() failed: {e}")
})?;
println!(
"Embedder ready (in-process) in {:.1}s: {} (dim={}, max_seq={}, rev={})",
started.elapsed().as_secs_f32(),
info.model_name,
info.dimension,
info.max_seq_length,
info.weight_revision,
);
// The constructed client gets dropped here. Phase 1 PR-G/H
// (task #118) wires it into daemon state for use by runtime
// consumers (surfacing, notepad, dedup). For Phase 1 PR-1.G
// scope the only goal is proving construction + identity
// probe work end-to-end; consumer wiring is the next PR.
Ok(Some(info))
}
}

/// Compare the live embedder identity against the on-disk pin and refuse
/// to start on any drift. See `embedder_pin` module for the why.
///
Expand All @@ -213,10 +334,7 @@ async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::In
/// operator recovery is `weaver harness embedder reset --force` followed
/// by a corpus re-embed, because every vector in HADES memory was produced
/// by the pinned embedder and mixing families silently corrupts similarity.
fn verify_embedder_pin(
path: &Path,
info: &weaver_spu::proto::embedding::InfoResponse,
) -> Result<()> {
fn verify_embedder_pin(path: &Path, info: &EmbedderInfo) -> Result<()> {
let live = EmbedderPin::now(
&info.model_name,
info.dimension,
Expand Down Expand Up @@ -256,7 +374,6 @@ mod tests {
use super::*;
use clap::Parser;
use tempfile::TempDir;
use weaver_spu::proto::embedding::InfoResponse;

#[derive(Parser, Debug)]
struct TestCli {
Expand Down Expand Up @@ -294,15 +411,12 @@ mod tests {
assert_eq!(cli.args.listen.as_deref(), Some("0.0.0.0:8080"));
}

fn info(model: &str, dim: u32, seq: u32, rev: &str) -> InfoResponse {
InfoResponse {
fn info(model: &str, dim: u32, seq: u32, rev: &str) -> EmbedderInfo {
EmbedderInfo {
model_name: model.into(),
dimension: dim,
max_seq_length: seq,
supported_tasks: vec!["retrieval.passage".into()],
device: "cuda:0".into(),
model_loaded: true,
uptime_seconds: 1.0,
weight_revision: rev.into(),
}
}
Expand Down
24 changes: 23 additions & 1 deletion crates/weaver-spu/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,29 @@ use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var("CARGO_FEATURE_CUDA").is_ok() {
compile_cuda_kernels();
// The `cuda` feature gates the legacy cudarc-based decoder path
// (slated for retirement in Phase 3 cleanup per Cargo.toml). The
// hand-written CUDA kernels for that path live at
// `kernels/transformer.cu`; they were lost in the PR-0.5.E
// shell-crate consolidation. Skip the compile gracefully when
// the file is absent so `--features inference` builds aren't
// broken on main pending a separate kernel-restore PR.
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")
.expect("CARGO_MANIFEST_DIR not set");
let kernel = PathBuf::from(&manifest_dir).join("kernels/transformer.cu");
if kernel.exists() {
compile_cuda_kernels();
} else {
println!(
"cargo:warning=weaver-spu: skipping CUDA kernel compile — \
kernels/transformer.cu absent (pre-existing regression \
from PR-0.5.E; cudarc decoder path will fail at link \
if its symbols are referenced)"
);
// Still emit the rerun-if-changed so a future kernel restore
// triggers a rebuild.
println!("cargo:rerun-if-changed=kernels/transformer.cu");
}
}
// Persephone proto is always compiled — `encoder::grpc_client_legacy`
// is the production embedder backend during the migration window
Expand Down
47 changes: 47 additions & 0 deletions crates/weaver-spu/src/decoder/multi_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,34 @@ pub struct GpuPairConfig {
pub decoder_gpu: usize,
}

/// Backend discriminator for the embedder.
///
/// Selects which implementation of [`weaver_core::embedder::Embedder`]
/// the daemon constructs at boot. Per `embedder-oxidization-Spec.md` §3:
///
/// - **Python** (default during the migration window): connect to the
/// external `weaver-embedder.service` over Unix-socket gRPC. The
/// daemon's only embedder use today is the boot-time cohort-pin probe.
/// - **Rust** (Phase 1 cutover): construct an in-process `EmbedderClient`
/// that owns a `JinaV4Embedder` via candle, no RPC hop. Requires the
/// `weaver-spu/flash-attn` feature on the daemon binary; without it,
/// selecting `Rust` here is a hard fail at boot rather than a silent
/// fallback to the Python path.
///
/// Default stays `Python` until Phase 3 retires the gRPC path and the
/// default flips. See `docs/specs/embedder-oxidization-Spec.md` for the
/// cutover plan.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum EmbedderBackend {
/// External Python `weaver-embedder.service` over Unix-socket gRPC.
#[default]
Python,
/// In-process candle-backed embedder (`weaver_spu::encoder::client::
/// EmbedderClient`).
Rust,
}

/// Embedder service placement + operational defaults.
///
/// Harness-scoped singleton: `weaver-infer` up ⇒ embedder up on the
Expand All @@ -112,6 +140,25 @@ pub struct GpuPairConfig {
/// single source of truth; the Python service reads the same TOML.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbedderConfig {
/// Which backend implementation to use. See [`EmbedderBackend`] for
/// the migration-window semantics. Defaults to `python` so existing
/// deployments keep their current path until the operator explicitly
/// flips to `rust` and provides a `snapshot` path.
#[serde(default)]
pub backend: EmbedderBackend,

/// Filesystem path to the Jina V4 HuggingFace snapshot directory
/// (e.g. `/opt/weaver/huggingface/hub/.../snapshots/<sha>/`). Read
/// by the in-process Rust backend at boot to construct the
/// `EmbedderClient` and run the cohort-pin probe.
///
/// Only required when `backend = "rust"`. Ignored under
/// `backend = "python"` (the Python service has its own snapshot
/// resolution via `model_name`). Required-but-missing under `rust`
/// is a hard boot failure.
#[serde(default)]
pub snapshot: Option<std::path::PathBuf>,
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Unix socket path the Python embedder service binds.
#[serde(default = "default_embedder_socket")]
pub socket: String,
Expand Down