diff --git a/crates/weaver-core/src/embed_write.rs b/crates/weaver-core/src/embed_write.rs new file mode 100644 index 00000000..3b4148fd --- /dev/null +++ b/crates/weaver-core/src/embed_write.rs @@ -0,0 +1,372 @@ +//! Shared embed-on-write helper for `belief_nodes` producers. +//! +//! Per `docs/specs/belief-nodes-embedding-Spec.md` §4 + §13.1 (universal +//! embed-on-write principle), every producer of cognitive content embeds +//! the text before insert. This module provides the common helper that +//! Pen (`tools/notes.rs`) and engine-nap (`engine/runtime.rs`) both call, +//! plus future single-doc producers added under §13.7. +//! +//! The Spec's failure-mode escape (§4.1.2, §13.1) is encoded here: on +//! embedder unavailability or call failure, the helper logs a warning +//! and returns without mutating the document. The caller proceeds with +//! the insert; the doc lands with no embedding field; sleep stage A's +//! retroactive pass (§7) populates it on the next sleep cycle. Producers +//! never block on embedder availability. +//! +//! Batched producers (preseed materializer per §4.3) do not use this +//! helper — they call `embedder.embed(&[texts], ...)` directly so the +//! encoder batches and amortizes per-call overhead. +//! +//! ## Why a shared module instead of inline in each producer +//! +//! - Single point of contract enforcement for the embedding field +//! shape (`embedding` / `embedding_model` / `embedding_dim` / +//! `embedding_task`). +//! - Single point for the failure-mode telemetry surface — operators +//! inspecting logs see one canonical warning shape across producers. +//! - When the substrate-gateway pattern lands per Spec §13.8, the +//! refactor touches one helper rather than every producer. + +use serde_json::{Value, json}; +use std::sync::Arc; + +use crate::embedder::Embedder; +use weaver_database::graph::belief::{ + BELIEF_EMBEDDING_DIM_FIELD, BELIEF_EMBEDDING_FIELD, BELIEF_EMBEDDING_MODEL_FIELD, + BELIEF_EMBEDDING_TASK_FIELD, +}; + +/// Jina V4 task hint for stored documents per Spec §2.1 — selects the +/// `retrieval` adapter with `"Passage: "` prefix at embed time. +pub const PASSAGE_TASK: &str = "retrieval.passage"; + +/// Embed `text` via the supplied embedder and stamp the four embedding +/// fields onto `doc` per `belief-nodes-embedding-Spec.md` §4.1.2. +/// +/// `producer_label` is a short string (e.g. `"Pen"`, `"engine_nap"`) +/// used in tracing output so operators can identify which producer's +/// write degraded under embedder failure. +/// +/// **`embedding_model` is provenance, not a constant.** Per Spec §2.1 +/// the field is provenance metadata, so it must reflect what the +/// backend *actually* produced, not the Spec example's literal string. +/// `VISION.md` §Framework vs. Instantiation commits to a +/// substrate-agnostic embedder contract — hardcoding a model identifier +/// here would bake the current instantiation (Jina V4) into the +/// framework's write path and produce false negatives in the +/// migration-detection filter (`FILTER doc.embedding_model != ""`) +/// for any future agent profile that runs a different encoder. We call +/// the trait's `embed()` directly (rather than the `embed_one` +/// convenience wrapper) specifically to retain `EmbedResult.model`. +/// +/// On `embedder == None` or call failure, logs a warning and returns +/// without mutating `doc`. The caller proceeds with the insert; the +/// doc lands with no embedding field; sleep stage A's retroactive +/// pass populates it later. Producers never block on embedder +/// availability — embedder failure degrades the substrate per Spec +/// §13.1's failure-mode escape, it does not fail the write. +pub async fn try_embed_and_stamp_belief_node( + embedder: Option<&Arc>, + text: &str, + producer_label: &str, + doc: &mut Value, +) { + let Some(embedder) = embedder else { + tracing::warn!( + producer = %producer_label, + "no embedder configured; document will land with embedding: null. \ + Degraded mode — production agents should always have an embedder. \ + Sleep stage A retroactive pass will populate the embedding later." + ); + return; + }; + + // Calling `embed()` rather than `embed_one()` so we retain + // `EmbedResult.model` for the provenance stamp. See doc comment + // above for rationale. + match embedder + .embed(&[text.to_string()], PASSAGE_TASK, None) + .await + { + Ok(result) => { + let mut embeddings = result.embeddings; + match embeddings.len() { + 1 => { + // `pop()` cannot return `None` here — we just + // matched on `len() == 1`. Vector ownership moves + // into the JSON document via `json!`. + let vec = embeddings.pop().expect("len == 1 ensures Some"); + let dim = vec.len(); + doc[BELIEF_EMBEDDING_FIELD] = json!(vec); + doc[BELIEF_EMBEDDING_MODEL_FIELD] = json!(result.model); + doc[BELIEF_EMBEDDING_DIM_FIELD] = json!(dim); + doc[BELIEF_EMBEDDING_TASK_FIELD] = json!(PASSAGE_TASK); + } + n => { + // Protocol violation: a single-text `embed()` + // call must return exactly one embedding. The + // trait's `embed_one` wrapper raises this as + // `InvalidResponse`; on the substrate-write path + // we degrade to "no embedding stamped" so the + // doc still lands and sleep stage A backfills. + tracing::warn!( + producer = %producer_label, + expected = 1, + got = n, + "embedder returned unexpected embedding count for single-text call; \ + document will land with embedding: null. \ + Sleep stage A retroactive pass will populate it later." + ); + } + } + } + Err(e) => { + tracing::warn!( + error = %e, + producer = %producer_label, + "embed call failed; document will land with embedding: null. \ + Sleep stage A retroactive pass will populate it on next sleep." + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::embedder::{EmbedResult, Embedder, EmbedderInfo, EmbeddingError, LateChunkedResult}; + use async_trait::async_trait; + + /// Fixed-dimension fixture embedder for testing. Returns a vector of + /// the configured dimension filled with a deterministic ramp so + /// stamped values are inspectable in assertions. + struct FixtureEmbedder { + dim: usize, + } + + #[async_trait] + impl Embedder for FixtureEmbedder { + async fn embed( + &self, + texts: &[String], + _task: &str, + _batch_size: Option, + ) -> Result { + Ok(EmbedResult { + embeddings: texts + .iter() + .map(|_| (0..self.dim).map(|i| i as f32).collect()) + .collect(), + model: "fixture".into(), + dimension: self.dim as u32, + duration_ms: 0, + }) + } + async fn embed_late_chunked( + &self, + _: &str, + _: &str, + ) -> Result { + unreachable!() + } + async fn info(&self) -> Result { + unreachable!() + } + } + + /// An embedder that always fails — exercises the failure-mode escape. + struct FailingEmbedder; + + #[async_trait] + impl Embedder for FailingEmbedder { + async fn embed( + &self, + _: &[String], + _: &str, + _: Option, + ) -> Result { + Err(EmbeddingError::NotAvailable("test failure".into())) + } + async fn embed_late_chunked( + &self, + _: &str, + _: &str, + ) -> Result { + unreachable!() + } + async fn info(&self) -> Result { + unreachable!() + } + } + + #[tokio::test] + async fn stamps_four_fields_on_success() { + let embedder: Arc = Arc::new(FixtureEmbedder { dim: 8 }); + let mut doc = json!({ "topic": "t", "content": "c" }); + + try_embed_and_stamp_belief_node(Some(&embedder), "t\nc", "test", &mut doc).await; + + assert_eq!(doc[BELIEF_EMBEDDING_FIELD].as_array().unwrap().len(), 8); + // Model name is sourced from `EmbedResult.model` reported by + // the backend, not from a hardcoded constant. FixtureEmbedder + // reports `"fixture"`; the production Jina V4 backend reports + // its own identifier. Provenance must reflect what actually + // wrote the embedding. + assert_eq!(doc[BELIEF_EMBEDDING_MODEL_FIELD], json!("fixture")); + assert_eq!(doc[BELIEF_EMBEDDING_DIM_FIELD], json!(8)); + assert_eq!(doc[BELIEF_EMBEDDING_TASK_FIELD], json!(PASSAGE_TASK)); + } + + /// A backend reporting a different model identifier — exercises + /// the provenance contract that `embedding_model` reflects the + /// actual backend, not a constant. + struct AlternateModelEmbedder; + + #[async_trait] + impl Embedder for AlternateModelEmbedder { + async fn embed( + &self, + _: &[String], + _: &str, + _: Option, + ) -> Result { + Ok(EmbedResult { + embeddings: vec![vec![0.0_f32; 4]], + model: "experimental-encoder-v0".into(), + dimension: 4, + duration_ms: 0, + }) + } + async fn embed_late_chunked( + &self, + _: &str, + _: &str, + ) -> Result { + unreachable!() + } + async fn info(&self) -> Result { + unreachable!() + } + } + + #[tokio::test] + async fn stamped_model_name_reflects_backend_identifier() { + let embedder: Arc = Arc::new(AlternateModelEmbedder); + let mut doc = json!({}); + + try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await; + + assert_eq!( + doc[BELIEF_EMBEDDING_MODEL_FIELD], + json!("experimental-encoder-v0"), + "embedding_model must reflect the backend-reported identifier, \ + not a hardcoded constant — otherwise the migration-detection \ + filter produces false negatives across encoder swaps" + ); + } + + /// Backend returning the wrong number of embeddings: silently + /// dropping or panicking would both be wrong. The helper must + /// degrade to "no embedding stamped" so the doc still lands and + /// sleep stage A's retroactive pass can populate it. + struct WrongCountEmbedder(usize); + + #[async_trait] + impl Embedder for WrongCountEmbedder { + async fn embed( + &self, + _: &[String], + _: &str, + _: Option, + ) -> Result { + Ok(EmbedResult { + embeddings: (0..self.0).map(|_| vec![0.0_f32; 4]).collect(), + model: "broken".into(), + dimension: 4, + duration_ms: 0, + }) + } + async fn embed_late_chunked( + &self, + _: &str, + _: &str, + ) -> Result { + unreachable!() + } + async fn info(&self) -> Result { + unreachable!() + } + } + + #[tokio::test] + async fn does_not_mutate_doc_when_backend_returns_zero_embeddings() { + let embedder: Arc = Arc::new(WrongCountEmbedder(0)); + let mut doc = json!({ "topic": "t" }); + let before = doc.clone(); + + try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await; + + assert_eq!( + doc, before, + "protocol violation (0 embeddings) must leave doc unmodified" + ); + } + + #[tokio::test] + async fn does_not_mutate_doc_when_backend_returns_multiple_embeddings() { + let embedder: Arc = Arc::new(WrongCountEmbedder(2)); + let mut doc = json!({ "topic": "t" }); + let before = doc.clone(); + + try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await; + + assert_eq!( + doc, before, + "protocol violation (2 embeddings for single-text call) must leave doc unmodified" + ); + } + + #[tokio::test] + async fn does_not_mutate_doc_when_embedder_absent() { + let mut doc = json!({ "topic": "t", "content": "c" }); + let before = doc.clone(); + + try_embed_and_stamp_belief_node(None, "t\nc", "test", &mut doc).await; + + assert_eq!(doc, before, "doc must not gain embedding fields under None"); + } + + #[tokio::test] + async fn does_not_mutate_doc_when_embedder_fails() { + let embedder: Arc = Arc::new(FailingEmbedder); + let mut doc = json!({ "topic": "t", "content": "c" }); + let before = doc.clone(); + + try_embed_and_stamp_belief_node(Some(&embedder), "t\nc", "test", &mut doc).await; + + assert_eq!( + doc, before, + "doc must not gain embedding fields when embedder errors" + ); + } + + #[tokio::test] + async fn embedding_dim_reflects_actual_vector_length() { + // The dim field reports the actual vector length (Spec §2.1): + // "Redundant with embedding.len() but cheap to filter on — a + // single-field FILTER doc.embedding_dim != 2048 query identifies + // migration-stragglers without parsing the array." + // Stamping a constant 2048 would mask drift. + for dim in [4, 16, 64, 2048] { + let embedder: Arc = Arc::new(FixtureEmbedder { dim }); + let mut doc = json!({}); + try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await; + assert_eq!(doc[BELIEF_EMBEDDING_DIM_FIELD], json!(dim)); + assert_eq!( + doc[BELIEF_EMBEDDING_FIELD].as_array().unwrap().len(), + dim, + "actual vector length matches stamped dim" + ); + } + } +} diff --git a/crates/weaver-core/src/engine/runtime.rs b/crates/weaver-core/src/engine/runtime.rs index 8725c4ac..632fc49d 100644 --- a/crates/weaver-core/src/engine/runtime.rs +++ b/crates/weaver-core/src/engine/runtime.rs @@ -151,10 +151,16 @@ impl From for AgentEvent { // --------------------------------------------------------------------------- /// Implements `ContextNapCallback` by writing nap summaries to the agent's -/// HADES memory database. Requires an `ArangoPool` (from ExecutionContext). +/// HADES memory database. Requires an `ArangoPool` (from ExecutionContext) +/// and optionally an embedder for embed-on-write per +/// `belief-nodes-embedding-Spec.md` §4.2. struct HadesNapCallback { pool: std::sync::Arc, agent_name: String, + /// Embedder for embed-on-write. `None` is the degraded-mode escape + /// per Spec §13.1: nap docs land with `embedding: null`, sleep + /// stage A retroactively populates them. + embedder: Option>, } /// Build the document an engine-nap writes to `belief_nodes`. @@ -218,7 +224,23 @@ impl ContextNapCallback for HadesNapCallback { }; let ts = chrono::Utc::now().timestamp_millis(); - let doc = build_engine_nap_doc(&self.agent_name, summary, nap_number, ts); + let mut doc = build_engine_nap_doc(&self.agent_name, summary, nap_number, ts); + + // Embed-on-write per `belief-nodes-embedding-Spec.md` §4.2 + + // §13.1's universal-principle commitment. `topic + "\n" + + // content` matches Pen's pattern (Spec §4.1.2 rationale — + // topic carries semantically-dense signal alongside body). + // Failure-mode escape per §13.1: no embedding field on + // failure, sleep stage A retroactively populates. + let topic = format!("context_nap_{nap_number}"); + let content_to_embed = format!("{topic}\n{summary}"); + crate::embed_write::try_embed_and_stamp_belief_node( + self.embedder.as_ref(), + &content_to_embed, + "engine_nap", + &mut doc, + ) + .await; client::memory_write( &ctx, @@ -629,10 +651,7 @@ impl AgentHandle { /// re-spawning the agent. Pre-Phase-1 this is the gRPC client to /// `weaver-embedder.service`; post-Phase-1 it's the in-process /// candle backend in `weaver-spu`. - pub fn set_embedder( - &mut self, - embedder: std::sync::Arc, - ) { + pub fn set_embedder(&mut self, embedder: std::sync::Arc) { self.exec_ctx.embedder = Some(embedder); } @@ -684,10 +703,15 @@ impl AgentHandle { .take() .ok_or_else(|| RuntimeError::EngineError(anyhow::anyhow!("no pending turn")))?; - // Build context nap callback if the agent has a memory pool + // Build context nap callback if the agent has a memory pool. + // Per belief-nodes-embedding-Spec.md §4.2, the callback carries + // the embedder alongside the pool so nap writes embed before + // insert. Embedder is `None` in degraded mode (Spec §13.1 + // failure-mode escape). let nap_cb = self.exec_ctx.db_pool.as_ref().map(|pool| HadesNapCallback { pool: pool.clone(), agent_name: self.name.clone(), + embedder: self.exec_ctx.embedder.clone(), }); let nap_ref: Option<&dyn ContextNapCallback> = nap_cb.as_ref().map(|cb| cb as &dyn ContextNapCallback); @@ -1224,10 +1248,7 @@ async fn ensure_model_available( // - non-empty + missing `g` → real mismatch, request a // fresh load on the pinned GPU. Some(g) => match i32::try_from(g) { - Ok(want) => { - m.gpu_ordinals.is_empty() - || m.gpu_ordinals.iter().any(|&ord| ord == want) - } + Ok(want) => m.gpu_ordinals.is_empty() || m.gpu_ordinals.contains(&want), Err(_) => false, }, } @@ -1328,7 +1349,10 @@ async fn ensure_model_available( ))); } - debug!("Model pre-flight: '{}' loaded and verified on requested GPU", model_id); + debug!( + "Model pre-flight: '{}' loaded and verified on requested GPU", + model_id + ); Ok(()) } diff --git a/crates/weaver-core/src/lib.rs b/crates/weaver-core/src/lib.rs index ee6868df..b64f4840 100644 --- a/crates/weaver-core/src/lib.rs +++ b/crates/weaver-core/src/lib.rs @@ -3,6 +3,7 @@ pub mod blocks; pub mod channel; pub mod config; pub mod default_prompt; +pub mod embed_write; pub mod embedder; pub mod engine; pub mod error; diff --git a/crates/weaver-core/src/tools/notes.rs b/crates/weaver-core/src/tools/notes.rs index d7c47a2f..44fe7839 100644 --- a/crates/weaver-core/src/tools/notes.rs +++ b/crates/weaver-core/src/tools/notes.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use serde_json::{Value, json}; +use crate::embed_write::try_embed_and_stamp_belief_node; use crate::error::ToolError; use crate::tool::*; use crate::tools::memory_format::{MemoryKind, format_memory}; @@ -689,7 +690,7 @@ impl Tool for PenTool { let location = input.get("location").cloned(); let tag_count = tags.len(); - let doc = build_note_doc( + let mut doc = build_note_doc( &topic, &content, &tags, @@ -697,6 +698,17 @@ impl Tool for PenTool { &chrono::Utc::now().to_rfc3339(), ); + // Embed-on-write per `belief-nodes-embedding-Spec.md` §4.1 + + // §13.1's universal-principle commitment. Topic concatenated + // with content gives the encoder both the semantically-dense + // title and the body, weighted naturally (Spec §4.1.2 rationale). + // Failure-mode escape per Spec §4.1.2 + §13.1 lives in the + // shared helper — no embedding field on failure, sleep stage A + // retroactively populates. + let content_to_embed = format!("{topic}\n{content}"); + try_embed_and_stamp_belief_node(ctx.embedder.as_ref(), &content_to_embed, "Pen", &mut doc) + .await; + let result = write_note(ctx, &doc).await?; let key = result .get("_key")