Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions crates/weaver-demo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,28 @@ edition.workspace = true
version.workspace = true
license.workspace = true

[features]
default = []
# `embedder-rust` opts the herobench dedup gate into the in-process
# `EmbedderClient` (candle-backed) when the operator sets
# `WEAVER_SPU_JINA_V4_SNAPSHOT` at runtime. With this feature off,
# herobench falls back to the legacy gRPC `EmbeddingClient` against
# the Python `weaver-embedder.service` (or degrades to no-dedup if
# neither is reachable). Mirrors the same-named feature on
# `weaver-interface` (PR-1.G) so the daemon binary forwards it.
embedder-rust = ["weaver-spu/flash-attn", "dep:candle-core"]

[dependencies]
weaver-core = { workspace = true }
weaver-database = { workspace = true }
# `weaver-embedding` removed in PR-0.5.E. Demo uses the relocated
# encoder types via `weaver-spu` directly + the relocated `Embedder`
# trait via `weaver-core::embedder`.
weaver-spu = { workspace = true }
# Candle reached for under `embedder-rust` to construct the CUDA
# device + dtype the in-process `EmbedderClient` needs at boot.
# Workspace-pinned (same fork as weaver-spu) so versions can't drift.
candle-core = { workspace = true, optional = true }
weaver-trace = { workspace = true }
weaver-analysis = { workspace = true }
async-trait = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/weaver-demo/src/herobench/belief.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ pub enum HypothesisDedupError {
/// consume attempt numbers.
pub async fn dedup_and_upsert_attempt_hypothesis(
pool: &ArangoPool,
embedder: &weaver_spu::encoder::grpc_client_legacy::EmbeddingClient,
embedder: &dyn weaver_core::embedder::Embedder,
task_name: &str,
attempt_num: usize,
hypothesis: &str,
Expand Down
136 changes: 108 additions & 28 deletions crates/weaver-demo/src/herobench/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,96 @@ fn generate_task_ids(difficulty: u32, count: usize) -> Vec<(String, String)> {
.collect()
}

/// Construct an embedder for the PR-10 dedup gate. Tries in-process
/// (`EmbedderClient::from_snapshot`) first when the `embedder-rust`
/// feature is enabled and `WEAVER_SPU_JINA_V4_SNAPSHOT` is set; falls
/// back to the legacy gRPC `EmbeddingClient` against the Python
/// `weaver-embedder.service`. Returns `None` if both paths fail —
/// dedup degrades to unconditional writes in that case.
async fn try_construct_embedder() -> Option<Arc<dyn weaver_core::embedder::Embedder>> {
#[cfg(feature = "embedder-rust")]
{
if let Ok(snapshot) = std::env::var("WEAVER_SPU_JINA_V4_SNAPSHOT")
&& !snapshot.trim().is_empty()
{
// GPU ordinal default: 0. Operators wanting a different
// card override via `WEAVER_SPU_CUDA_DEVICE` (matches the
// `jina_embed` smoke binary's env contract).
let gpu_ordinal: usize = std::env::var("WEAVER_SPU_CUDA_DEVICE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
tracing::info!(
snapshot = %snapshot,
gpu = gpu_ordinal,
"constructing in-process EmbedderClient for dedup gate"
);
// Heavy load is sync + GPU-bound; spawn_blocking keeps the
// tokio runtime healthy across the seconds-long mmap +
// weights faulting in.
let snapshot_path = std::path::PathBuf::from(&snapshot);
let result = tokio::task::spawn_blocking(move || {
let device = candle_core::Device::new_cuda(gpu_ordinal)
.map_err(|e| anyhow::anyhow!("CUDA cuda:{gpu_ordinal} unavailable: {e}"))?;
weaver_spu::encoder::client::EmbedderClient::from_snapshot(
&snapshot_path,
candle_core::DType::BF16,
&device,
)
.map_err(|e| anyhow::anyhow!("EmbedderClient::from_snapshot: {e}"))
})
.await;
match result {
Ok(Ok(client)) => {
tracing::info!(
"in-process EmbedderClient ready; dedup gate live"
);
let arc: Arc<dyn weaver_core::embedder::Embedder> = Arc::new(client);
return Some(arc);
}
Ok(Err(e)) => {
tracing::warn!(
"in-process EmbedderClient construction failed ({e}); \
falling back to legacy gRPC embedder"
);
}
Err(e) => {
tracing::warn!(
"in-process EmbedderClient task panicked ({e}); \
falling back to legacy gRPC embedder"
);
}
}
}
}

// Fallback: legacy gRPC client against the Python
// `weaver-embedder.service`. Retires alongside the gRPC client in
// Phase 3.
match weaver_spu::encoder::grpc_client_legacy::EmbeddingClient::connect_default().await {
Ok(client) => match client.ensure_ready().await {
Ok(_) => {
let arc: Arc<dyn weaver_core::embedder::Embedder> = Arc::new(client);
Some(arc)
}
Err(e) => {
tracing::warn!(
"embedder reached but not ready ({e}); \
hypothesis dedup degraded to unconditional writes"
);
None
}
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
Err(e) => {
tracing::warn!(
"embedder not reachable ({e}); \
hypothesis dedup degraded to unconditional writes"
);
None
}
}
}

/// Run the full benchmark, returning structured results.
pub async fn run_benchmark(config: BenchmarkConfig) -> Result<HeroBenchResult> {
let total_start = Instant::now();
Expand Down Expand Up @@ -908,37 +998,27 @@ pub async fn run_benchmark(config: BenchmarkConfig) -> Result<HeroBenchResult> {
None
};

// PR-10 dedup gate: attempt to connect to the Persephone embedder. A
// missing embedder is NOT a run-abort condition — dedup degrades to the
// PR-10 dedup gate: attempt to obtain an embedder backend. A missing
// embedder is NOT a run-abort condition — dedup degrades to the
// pre-PR-10 unconditional write path (warn once, every later reflection
// stays on the degraded path). This preserves benchmark viability on
// machines without the embedder while still wiring the gate for any run
// that has one up.
let embedder: Option<Arc<weaver_spu::encoder::grpc_client_legacy::EmbeddingClient>> =
if db_pool.is_some() {
match weaver_spu::encoder::grpc_client_legacy::EmbeddingClient::connect_default().await
{
Ok(client) => match client.ensure_ready().await {
Ok(_) => Some(Arc::new(client)),
Err(e) => {
tracing::warn!(
"embedder reached but not ready ({e}); \
hypothesis dedup degraded to unconditional writes"
);
None
}
},
Err(e) => {
tracing::warn!(
"embedder not reachable ({e}); \
hypothesis dedup degraded to unconditional writes"
);
None
}
}
} else {
None
};
//
// Trait-typed (`Arc<dyn Embedder>`) so dedup is backend-agnostic.
// Construction tries paths in this order (PR-1.I):
// 1. In-process Rust `EmbedderClient` (`embedder-rust` feature)
// when `WEAVER_SPU_JINA_V4_SNAPSHOT` is set in the environment
// — the unified-SPU production path.
// 2. Legacy gRPC `EmbeddingClient::connect_default()` against the
// Python `weaver-embedder.service` — the migration-window
// fallback. Retires alongside the gRPC client in Phase 3.
// 3. None (no-dedup degradation) if both fail.
let embedder: Option<Arc<dyn weaver_core::embedder::Embedder>> = if db_pool.is_some() {
try_construct_embedder().await
} else {
None
};

// Ensure belief graph collections exist on every belief-graph run. Idempotent.
// Previously gated behind risk_profiles — that was wrong: belief_nodes etc.
Expand Down Expand Up @@ -1107,7 +1187,7 @@ pub async fn run_benchmark(config: BenchmarkConfig) -> Result<HeroBenchResult> {
Some(emb) => {
match belief::dedup_and_upsert_attempt_hypothesis(
pool,
emb,
emb.as_ref(),
task_name,
attempt_num,
&hypothesis,
Expand Down
2 changes: 1 addition & 1 deletion crates/weaver-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ inference = ["weaver-spu/cuda", "weaver-spu/gguf"]
# hard boot failure rather than a silent fallback to the Python path.
# Default builds keep the Python path; flip via `--features
# embedder-rust` (combine with `inference` for the full daemon).
embedder-rust = ["weaver-spu/flash-attn", "dep:candle-core"]
embedder-rust = ["weaver-spu/flash-attn", "dep:candle-core", "weaver-demo/embedder-rust"]
# `viewer` pulls in weaver-analysis (manifest schema + load) so the
# trace viewer can ingest the bench-emitted manifest.json companion.
viewer = ["dep:ratatui", "dep:crossterm", "dep:weaver-analysis"]
Expand Down