Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions crates/weaver-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ default = []
# which is now removed; the feature now activates the corresponding
# weaver-spu features instead.
inference = ["weaver-spu/cuda", "weaver-spu/gguf"]
# `embedder-rust` opts the daemon into the in-process Rust embedder
# backend (`weaver_spu::encoder::client::EmbedderClient`). Pulls in
# `weaver-spu/flash-attn` (which transitively enables `candle` +
# `candle-cuda`) so the daemon can construct a `JinaV4Embedder` at
# boot. With this feature OFF, `[embedder].backend = "rust"` is a
# hard boot failure rather than a silent fallback to the Python path.
# Default builds keep the Python path; flip via `--features
# embedder-rust` (combine with `inference` for the full daemon).
embedder-rust = ["weaver-spu/flash-attn", "dep:candle-core"]
# `viewer` pulls in weaver-analysis (manifest schema + load) so the
# trace viewer can ingest the bench-emitted manifest.json companion.
viewer = ["dep:ratatui", "dep:crossterm", "dep:weaver-analysis"]
Expand All @@ -44,6 +53,10 @@ futures = { workspace = true }
libc = "0.2"
ratatui = { workspace = true, optional = true }
weaver-analysis = { path = "../weaver-analysis", optional = true }
# Candle is reached for under `embedder-rust` to construct the CUDA
# device + dtype the in-process EmbedderClient needs at boot. Workspace-
# pinned (same fork as weaver-spu) so versions can't drift.
candle-core = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
Expand Down
227 changes: 173 additions & 54 deletions crates/weaver-interface/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::time::Duration;

use anyhow::{Result, bail};
use clap::Args;
use weaver_core::embedder::EmbedderInfo;
use weaver_spu::decoder::multi_model::EmbedderBackend;
use weaver_spu::encoder::grpc_client_legacy::{
DEFAULT_EMBEDDER_SOCKET, EmbeddingClient, EmbeddingClientConfig, EmbeddingEndpoint,
};
Expand Down Expand Up @@ -63,40 +65,11 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
config.server.socket_path = "/run/weaver/inference.sock".to_string();
}

// Build server state (loads any models declared in config)
let (state, loaded_names) = weaver_spu::decoder::startup::build_server_state(&config)?;

let model_count = loaded_names.len();
if model_count > 0 {
println!(
"Loaded {} model(s): {}",
model_count,
loaded_names.join(", ")
);
}

if !config.gpus.is_empty() {
println!(
"GPU policies: {}",
config
.gpus
.iter()
.map(|g| format!("GPU {}={}", g.id, g.policy))
.collect::<Vec<_>>()
.join(", ")
);
}

let socket_path = PathBuf::from(&config.server.socket_path);
let listen_addr = config.server.listen_addr.clone();

println!(
"Inference server starting (socket={}, tcp={})",
socket_path.display(),
listen_addr.as_deref().unwrap_or("none"),
);
println!("Models loaded on demand via 'weaver model load' or agent pre-flight.");

// Embedder probe + pin verify runs BEFORE model loading so the daemon
// fails fast on a missing snapshot, busted CUDA init, or a wedged
// Python service rather than burning seconds on `build_server_state()`
// first. (PR #296 review.)
//
// Probe+verify is gated on pin presence:
// probe Some + pin any → verify (first-boot writes, match OK, mismatch aborts)
// probe Some + no pin → verify (writes the first-boot pin)
Expand All @@ -106,15 +79,27 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
// staying down until the embedder is reachable. Operators see a clear
// refusal instead of silent drift.
let pin_path = Path::new(DEFAULT_PIN_PATH);
// Treat an empty string in [embedder].socket as unset — otherwise a blank
// override silently falls through and the probe hits the empty path.
let embedder_socket = config
let backend = config
.embedder
.as_ref()
.map(|e| e.socket.trim())
.filter(|s| !s.is_empty())
.unwrap_or(DEFAULT_EMBEDDER_SOCKET);
match probe_embedder(embedder_socket).await {
.map(|e| e.backend)
.unwrap_or_default();
let probe_result = match backend {
EmbedderBackend::Python => {
// Treat an empty string in [embedder].socket as unset — otherwise a
// blank override silently falls through and the probe hits the empty
// path.
let embedder_socket = config
.embedder
.as_ref()
.map(|e| e.socket.trim())
.filter(|s| !s.is_empty())
.unwrap_or(DEFAULT_EMBEDDER_SOCKET);
probe_embedder_python(embedder_socket).await
}
EmbedderBackend::Rust => probe_embedder_rust(config.embedder.as_ref()).await?,
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.
match probe_result {
Some(info) => verify_embedder_pin(pin_path, &info)?,
None => {
// Path::exists() collapses "file missing" and "can't check"
Expand Down Expand Up @@ -143,6 +128,40 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
}
}

// Build server state (loads any models declared in config)
let (state, loaded_names) = weaver_spu::decoder::startup::build_server_state(&config)?;

let model_count = loaded_names.len();
if model_count > 0 {
println!(
"Loaded {} model(s): {}",
model_count,
loaded_names.join(", ")
);
}

if !config.gpus.is_empty() {
println!(
"GPU policies: {}",
config
.gpus
.iter()
.map(|g| format!("GPU {}={}", g.id, g.policy))
.collect::<Vec<_>>()
.join(", ")
);
}

let socket_path = PathBuf::from(&config.server.socket_path);
let listen_addr = config.server.listen_addr.clone();

println!(
"Inference server starting (socket={}, tcp={})",
socket_path.display(),
listen_addr.as_deref().unwrap_or("none"),
);
println!("Models loaded on demand via 'weaver model load' or agent pre-flight.");

weaver_spu::decoder::server::serve(
state,
Some(socket_path.as_path()),
Expand All @@ -152,7 +171,7 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
.await
}

/// Probe the embedder socket with bounded retry and log its readiness state.
/// Probe the Python embedder socket with bounded retry and log its readiness state.
///
/// Systemd ordering (`After=weaver-embedder.service` on weaver-infer) puts
/// the embedder first, but `Type=simple` considers a service "active" the
Expand All @@ -166,7 +185,7 @@ pub async fn handle_serve(args: ServeArgs) -> Result<()> {
/// Once we're past startup, the refuse-to-start-when-pin-exists check in
/// `handle_serve` catches a truly-wedged embedder: probe returns None →
/// refuse, rather than silently running without the pin verified.
async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::InfoResponse> {
async fn probe_embedder_python(socket: &str) -> Option<EmbedderInfo> {
let client_config = EmbeddingClientConfig {
endpoint: EmbeddingEndpoint::Unix(PathBuf::from(socket)),
connect_timeout: EMBEDDER_PROBE_TIMEOUT,
Expand All @@ -185,7 +204,21 @@ async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::In
"Embedder ready at {} after {} probe attempt(s): {} (dim={}, max_seq={})",
socket, attempts, info.model_name, info.dimension, info.max_seq_length,
);
return Some(info);
// Convert proto InfoResponse → backend-agnostic
// EmbedderInfo so the pin-verifier downstream is
// backend-shape-independent. The Python service's
// `info()` already returns the proto type via the
// gRPC client's inherent `info` method (not the
// trait one) — this conversion mirrors what the
// Embedder trait impl does internally on
// `EmbeddingClient`.
return Some(EmbedderInfo {
model_name: info.model_name,
model_loaded: info.model_loaded,
dimension: info.dimension,
max_seq_length: info.max_seq_length,
weight_revision: info.weight_revision,
});
}
Err(e) => last_err = Some(format!("ensure_ready: {e}")),
},
Expand All @@ -205,6 +238,99 @@ async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::In
None
}

/// Construct the in-process Rust embedder backend
/// ([`weaver_spu::encoder::client::EmbedderClient`]) and return its
/// identity for the cohort-pin verifier.
///
/// Unlike the Python path, there's no socket-readiness retry loop —
/// the embedder either constructs successfully (model loaded, ready
/// to serve) or it doesn't. If construction fails (snapshot path
/// missing, GPU unavailable, weights corrupted), this is a hard boot
/// failure rather than a transient that we'd retry through; callers
/// see the error directly.
///
/// **Feature gating**: requires `embedder-rust` (which transitively
/// enables `weaver-spu/flash-attn`). When the feature is OFF, this
/// function rejects `backend = "rust"` with a configuration error
/// rather than silently falling through to the Python path.
async fn probe_embedder_rust(
embedder_config: Option<&weaver_spu::decoder::multi_model::EmbedderConfig>,
) -> Result<Option<EmbedderInfo>> {
#[cfg(not(feature = "embedder-rust"))]
{
let _ = embedder_config;
bail!(
"[embedder].backend = \"rust\" requires the daemon to be built with \
the `embedder-rust` feature; rebuild with \
`cargo build -p weaver-interface --features inference,embedder-rust` \
(or omit `embedder-rust` and use `backend = \"python\"`)"
);
}

#[cfg(feature = "embedder-rust")]
{
use weaver_core::embedder::Embedder;
use weaver_spu::encoder::client::EmbedderClient;

let embedder_config = embedder_config.ok_or_else(|| {
anyhow::anyhow!(
"[embedder].backend = \"rust\" but no [embedder] block in server.toml; \
add the block with `snapshot = \"/path/to/jina-v4/snapshot/<sha>/\"`"
)
})?;
let snapshot = embedder_config.snapshot.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"[embedder].backend = \"rust\" requires [embedder].snapshot to point at \
the Jina V4 HF snapshot directory (e.g. \
`snapshot = \"/opt/weaver/huggingface/.../snapshots/<sha>/\"`)"
)
})?;
// GPU ordinal from the (now-deprecated-but-still-load-bearing) [embedder].gpu
// field. Per agent-spu-schema-Spec.md §8 this becomes per-agent post-cutover;
// for now the boot-time pin probe uses the fleet-wide value.
let device = candle_core::Device::new_cuda(embedder_config.gpu)
.map_err(|e| anyhow::anyhow!("CUDA device {} unavailable: {e}", embedder_config.gpu))?;
// BF16 is the production target per docs/specs/weaver-spu-Spec.md §3.
// Pin probe doesn't depend on the dtype choice; identity is invariant
// under dtype.
let dtype = candle_core::DType::BF16;

println!(
"Constructing in-process Rust embedder from {} on cuda:{} bf16...",
snapshot.display(),
embedder_config.gpu
);
let started = std::time::Instant::now();
// Run the load on a blocking thread — it's a sync, IO+GPU-bound
// operation that takes seconds; doing it inline would block the
// tokio runtime.
let snapshot_clone = snapshot.clone();
let client = tokio::task::spawn_blocking(move || {
EmbedderClient::from_snapshot(&snapshot_clone, dtype, &device)
})
.await
.map_err(|e| anyhow::anyhow!("EmbedderClient construction task panicked: {e}"))?
.map_err(|e| anyhow::anyhow!("EmbedderClient::from_snapshot: {e}"))?;
let info = client.info().await.map_err(|e| {
anyhow::anyhow!("in-process EmbedderClient::info() failed: {e}")
})?;
println!(
"Embedder ready (in-process) in {:.1}s: {} (dim={}, max_seq={}, rev={})",
started.elapsed().as_secs_f32(),
info.model_name,
info.dimension,
info.max_seq_length,
info.weight_revision,
);
// The constructed client gets dropped here. Phase 1 PR-G/H
// (task #118) wires it into daemon state for use by runtime
// consumers (surfacing, notepad, dedup). For Phase 1 PR-1.G
// scope the only goal is proving construction + identity
// probe work end-to-end; consumer wiring is the next PR.
Ok(Some(info))
}
}

/// Compare the live embedder identity against the on-disk pin and refuse
/// to start on any drift. See `embedder_pin` module for the why.
///
Expand All @@ -213,10 +339,7 @@ async fn probe_embedder(socket: &str) -> Option<weaver_spu::proto::embedding::In
/// operator recovery is `weaver harness embedder reset --force` followed
/// by a corpus re-embed, because every vector in HADES memory was produced
/// by the pinned embedder and mixing families silently corrupts similarity.
fn verify_embedder_pin(
path: &Path,
info: &weaver_spu::proto::embedding::InfoResponse,
) -> Result<()> {
fn verify_embedder_pin(path: &Path, info: &EmbedderInfo) -> Result<()> {
let live = EmbedderPin::now(
&info.model_name,
info.dimension,
Expand Down Expand Up @@ -256,7 +379,6 @@ mod tests {
use super::*;
use clap::Parser;
use tempfile::TempDir;
use weaver_spu::proto::embedding::InfoResponse;

#[derive(Parser, Debug)]
struct TestCli {
Expand Down Expand Up @@ -294,15 +416,12 @@ mod tests {
assert_eq!(cli.args.listen.as_deref(), Some("0.0.0.0:8080"));
}

fn info(model: &str, dim: u32, seq: u32, rev: &str) -> InfoResponse {
InfoResponse {
fn info(model: &str, dim: u32, seq: u32, rev: &str) -> EmbedderInfo {
EmbedderInfo {
model_name: model.into(),
dimension: dim,
max_seq_length: seq,
supported_tasks: vec!["retrieval.passage".into()],
device: "cuda:0".into(),
model_loaded: true,
uptime_seconds: 1.0,
weight_revision: rev.into(),
}
}
Expand Down
24 changes: 23 additions & 1 deletion crates/weaver-spu/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,29 @@ use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var("CARGO_FEATURE_CUDA").is_ok() {
compile_cuda_kernels();
// The `cuda` feature gates the legacy cudarc-based decoder path
// (slated for retirement in Phase 3 cleanup per Cargo.toml). The
// hand-written CUDA kernels for that path live at
// `kernels/transformer.cu`; they were lost in the PR-0.5.E
// shell-crate consolidation. Skip the compile gracefully when
// the file is absent so `--features inference` builds aren't
// broken on main pending a separate kernel-restore PR.
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")
.expect("CARGO_MANIFEST_DIR not set");
let kernel = PathBuf::from(&manifest_dir).join("kernels/transformer.cu");
if kernel.exists() {
compile_cuda_kernels();
} else {
println!(
"cargo:warning=weaver-spu: skipping CUDA kernel compile — \
kernels/transformer.cu absent (pre-existing regression \
from PR-0.5.E; cudarc decoder path will fail at link \
if its symbols are referenced)"
);
// Still emit the rerun-if-changed so a future kernel restore
// triggers a rebuild.
println!("cargo:rerun-if-changed=kernels/transformer.cu");
}
}
// Persephone proto is always compiled — `encoder::grpc_client_legacy`
// is the production embedder backend during the migration window
Expand Down
Loading