Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions crates/weaver-core/src/embed_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
//! Shared embed-on-write helper for `belief_nodes` producers.
//!
//! Per `docs/specs/belief-nodes-embedding-Spec.md` §4 + §13.1 (universal
//! embed-on-write principle), every producer of cognitive content embeds
//! the text before insert. This module provides the common helper that
//! Pen (`tools/notes.rs`) and engine-nap (`engine/runtime.rs`) both call,
//! plus future single-doc producers added under §13.7.
//!
//! The Spec's failure-mode escape (§4.1.2, §13.1) is encoded here: on
//! embedder unavailability or call failure, the helper logs a warning
//! and returns without mutating the document. The caller proceeds with
//! the insert; the doc lands with no embedding field; sleep stage A's
//! retroactive pass (§7) populates it on the next sleep cycle. Producers
//! never block on embedder availability.
//!
//! Batched producers (preseed materializer per §4.3) do not use this
//! helper — they call `embedder.embed(&[texts], ...)` directly so the
//! encoder batches and amortizes per-call overhead.
//!
//! ## Why a shared module instead of inline in each producer
//!
//! - Single point of contract enforcement for the embedding field
//! shape (`embedding` / `embedding_model` / `embedding_dim` /
//! `embedding_task`).
//! - Single point for the failure-mode telemetry surface — operators
//! inspecting logs see one canonical warning shape across producers.
//! - When the substrate-gateway pattern lands per Spec §13.8, the
//! refactor touches one helper rather than every producer.

use serde_json::{Value, json};
use std::sync::Arc;

use crate::embedder::Embedder;
use weaver_database::graph::belief::{
BELIEF_EMBEDDING_DIM_FIELD, BELIEF_EMBEDDING_FIELD, BELIEF_EMBEDDING_MODEL_FIELD,
BELIEF_EMBEDDING_TASK_FIELD,
};

/// Jina V4 task hint for stored documents per Spec §2.1 — selects the
/// `retrieval` adapter with `"Passage: "` prefix at embed time.
pub const PASSAGE_TASK: &str = "retrieval.passage";

/// Model identifier stamped on the `embedding_model` field. Matches
/// the canonical name used by Spec §4.1.2's example and by the Spec's
/// migration-drift filter (`FILTER doc.embedding_model != "jina-v4"`).
pub const JINA_V4_MODEL_NAME: &str = "jina-v4";

/// Embed `text` via the supplied embedder and stamp the four embedding
/// fields onto `doc` per `belief-nodes-embedding-Spec.md` §4.1.2.
///
/// `producer_label` is a short string (e.g. `"Pen"`, `"engine_nap"`)
/// used in tracing output so operators can identify which producer's
/// write degraded under embedder failure.
///
/// On `embedder == None` or call failure, logs a warning and returns
/// without mutating `doc`. The caller proceeds with the insert; the
/// doc lands with no embedding field; sleep stage A's retroactive
/// pass populates it later. Producers never block on embedder
/// availability — embedder failure degrades the substrate per Spec
/// §13.1's failure-mode escape, it does not fail the write.
pub async fn try_embed_and_stamp_belief_node(
embedder: Option<&Arc<dyn Embedder>>,
text: &str,
producer_label: &str,
doc: &mut Value,
) {
let Some(embedder) = embedder else {
tracing::warn!(
producer = %producer_label,
"no embedder configured; document will land with embedding: null. \
Degraded mode — production agents should always have an embedder. \
Sleep stage A retroactive pass will populate the embedding later."
);
return;
};

match embedder.embed_one(text, PASSAGE_TASK).await {
Ok(vec) => {
let dim = vec.len();
doc[BELIEF_EMBEDDING_FIELD] = json!(vec);
doc[BELIEF_EMBEDDING_MODEL_FIELD] = json!(JINA_V4_MODEL_NAME);
doc[BELIEF_EMBEDDING_DIM_FIELD] = json!(dim);
doc[BELIEF_EMBEDDING_TASK_FIELD] = json!(PASSAGE_TASK);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
Err(e) => {
tracing::warn!(
error = %e,
producer = %producer_label,
"embed call failed; document will land with embedding: null. \
Sleep stage A retroactive pass will populate it on next sleep."
);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::embedder::{EmbedResult, Embedder, EmbedderInfo, EmbeddingError, LateChunkedResult};
use async_trait::async_trait;

/// Fixed-dimension fixture embedder for testing. Returns a vector of
/// the configured dimension filled with a deterministic ramp so
/// stamped values are inspectable in assertions.
struct FixtureEmbedder {
dim: usize,
}

#[async_trait]
impl Embedder for FixtureEmbedder {
async fn embed(
&self,
texts: &[String],
_task: &str,
_batch_size: Option<u32>,
) -> Result<EmbedResult, EmbeddingError> {
Ok(EmbedResult {
embeddings: texts
.iter()
.map(|_| (0..self.dim).map(|i| i as f32).collect())
.collect(),
model: "fixture".into(),
dimension: self.dim as u32,
duration_ms: 0,
})
}
async fn embed_late_chunked(
&self,
_: &str,
_: &str,
) -> Result<LateChunkedResult, EmbeddingError> {
unreachable!()
}
async fn info(&self) -> Result<EmbedderInfo, EmbeddingError> {
unreachable!()
}
}

/// An embedder that always fails — exercises the failure-mode escape.
struct FailingEmbedder;

#[async_trait]
impl Embedder for FailingEmbedder {
async fn embed(
&self,
_: &[String],
_: &str,
_: Option<u32>,
) -> Result<EmbedResult, EmbeddingError> {
Err(EmbeddingError::NotAvailable("test failure".into()))
}
async fn embed_late_chunked(
&self,
_: &str,
_: &str,
) -> Result<LateChunkedResult, EmbeddingError> {
unreachable!()
}
async fn info(&self) -> Result<EmbedderInfo, EmbeddingError> {
unreachable!()
}
}

#[tokio::test]
async fn stamps_four_fields_on_success() {
let embedder: Arc<dyn Embedder> = Arc::new(FixtureEmbedder { dim: 8 });
let mut doc = json!({ "topic": "t", "content": "c" });

try_embed_and_stamp_belief_node(Some(&embedder), "t\nc", "test", &mut doc).await;

assert_eq!(doc[BELIEF_EMBEDDING_FIELD].as_array().unwrap().len(), 8);
assert_eq!(doc[BELIEF_EMBEDDING_MODEL_FIELD], json!(JINA_V4_MODEL_NAME));
assert_eq!(doc[BELIEF_EMBEDDING_DIM_FIELD], json!(8));
assert_eq!(doc[BELIEF_EMBEDDING_TASK_FIELD], json!(PASSAGE_TASK));
}

#[tokio::test]
async fn does_not_mutate_doc_when_embedder_absent() {
let mut doc = json!({ "topic": "t", "content": "c" });
let before = doc.clone();

try_embed_and_stamp_belief_node(None, "t\nc", "test", &mut doc).await;

assert_eq!(doc, before, "doc must not gain embedding fields under None");
}

#[tokio::test]
async fn does_not_mutate_doc_when_embedder_fails() {
let embedder: Arc<dyn Embedder> = Arc::new(FailingEmbedder);
let mut doc = json!({ "topic": "t", "content": "c" });
let before = doc.clone();

try_embed_and_stamp_belief_node(Some(&embedder), "t\nc", "test", &mut doc).await;

assert_eq!(
doc, before,
"doc must not gain embedding fields when embedder errors"
);
}

#[tokio::test]
async fn embedding_dim_reflects_actual_vector_length() {
// The dim field reports the actual vector length (Spec §2.1):
// "Redundant with embedding.len() but cheap to filter on — a
// single-field FILTER doc.embedding_dim != 2048 query identifies
// migration-stragglers without parsing the array."
// Stamping a constant 2048 would mask drift.
for dim in [4, 16, 64, 2048] {
let embedder: Arc<dyn Embedder> = Arc::new(FixtureEmbedder { dim });
let mut doc = json!({});
try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await;
assert_eq!(doc[BELIEF_EMBEDDING_DIM_FIELD], json!(dim));
assert_eq!(
doc[BELIEF_EMBEDDING_FIELD].as_array().unwrap().len(),
dim,
"actual vector length matches stamped dim"
);
}
}
}
48 changes: 36 additions & 12 deletions crates/weaver-core/src/engine/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,16 @@ impl From<QueryEvent> for AgentEvent {
// ---------------------------------------------------------------------------

/// Implements `ContextNapCallback` by writing nap summaries to the agent's
/// HADES memory database. Requires an `ArangoPool` (from ExecutionContext).
/// HADES memory database. Requires an `ArangoPool` (from ExecutionContext)
/// and optionally an embedder for embed-on-write per
/// `belief-nodes-embedding-Spec.md` §4.2.
struct HadesNapCallback {
pool: std::sync::Arc<weaver_database::db::ArangoPool>,
agent_name: String,
/// Embedder for embed-on-write. `None` is the degraded-mode escape
/// per Spec §13.1: nap docs land with `embedding: null`, sleep
/// stage A retroactively populates them.
embedder: Option<std::sync::Arc<dyn crate::embedder::Embedder>>,
}

/// Build the document an engine-nap writes to `belief_nodes`.
Expand Down Expand Up @@ -218,7 +224,23 @@ impl ContextNapCallback for HadesNapCallback {
};

let ts = chrono::Utc::now().timestamp_millis();
let doc = build_engine_nap_doc(&self.agent_name, summary, nap_number, ts);
let mut doc = build_engine_nap_doc(&self.agent_name, summary, nap_number, ts);

// Embed-on-write per `belief-nodes-embedding-Spec.md` §4.2 +
// §13.1's universal-principle commitment. `topic + "\n" +
// content` matches Pen's pattern (Spec §4.1.2 rationale —
// topic carries semantically-dense signal alongside body).
// Failure-mode escape per §13.1: no embedding field on
// failure, sleep stage A retroactively populates.
let topic = format!("context_nap_{nap_number}");
let content_to_embed = format!("{topic}\n{summary}");
crate::embed_write::try_embed_and_stamp_belief_node(
self.embedder.as_ref(),
&content_to_embed,
"engine_nap",
&mut doc,
)
.await;

client::memory_write(
&ctx,
Expand Down Expand Up @@ -629,10 +651,7 @@ impl AgentHandle {
/// re-spawning the agent. Pre-Phase-1 this is the gRPC client to
/// `weaver-embedder.service`; post-Phase-1 it's the in-process
/// candle backend in `weaver-spu`.
pub fn set_embedder(
&mut self,
embedder: std::sync::Arc<dyn crate::embedder::Embedder>,
) {
pub fn set_embedder(&mut self, embedder: std::sync::Arc<dyn crate::embedder::Embedder>) {
self.exec_ctx.embedder = Some(embedder);
}

Expand Down Expand Up @@ -684,10 +703,15 @@ impl AgentHandle {
.take()
.ok_or_else(|| RuntimeError::EngineError(anyhow::anyhow!("no pending turn")))?;

// Build context nap callback if the agent has a memory pool
// Build context nap callback if the agent has a memory pool.
// Per belief-nodes-embedding-Spec.md §4.2, the callback carries
// the embedder alongside the pool so nap writes embed before
// insert. Embedder is `None` in degraded mode (Spec §13.1
// failure-mode escape).
let nap_cb = self.exec_ctx.db_pool.as_ref().map(|pool| HadesNapCallback {
pool: pool.clone(),
agent_name: self.name.clone(),
embedder: self.exec_ctx.embedder.clone(),
});
let nap_ref: Option<&dyn ContextNapCallback> =
nap_cb.as_ref().map(|cb| cb as &dyn ContextNapCallback);
Expand Down Expand Up @@ -1224,10 +1248,7 @@ async fn ensure_model_available(
// - non-empty + missing `g` → real mismatch, request a
// fresh load on the pinned GPU.
Some(g) => match i32::try_from(g) {
Ok(want) => {
m.gpu_ordinals.is_empty()
|| m.gpu_ordinals.iter().any(|&ord| ord == want)
}
Ok(want) => m.gpu_ordinals.is_empty() || m.gpu_ordinals.contains(&want),
Err(_) => false,
},
}
Expand Down Expand Up @@ -1328,7 +1349,10 @@ async fn ensure_model_available(
)));
}

debug!("Model pre-flight: '{}' loaded and verified on requested GPU", model_id);
debug!(
"Model pre-flight: '{}' loaded and verified on requested GPU",
model_id
);
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions crates/weaver-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod blocks;
pub mod channel;
pub mod config;
pub mod default_prompt;
pub mod embed_write;
pub mod embedder;
pub mod engine;
pub mod error;
Expand Down
14 changes: 13 additions & 1 deletion crates/weaver-core/src/tools/notes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use async_trait::async_trait;
use serde_json::{Value, json};

use crate::embed_write::try_embed_and_stamp_belief_node;
use crate::error::ToolError;
use crate::tool::*;
use crate::tools::memory_format::{MemoryKind, format_memory};
Expand Down Expand Up @@ -689,14 +690,25 @@ impl Tool for PenTool {
let location = input.get("location").cloned();
let tag_count = tags.len();

let doc = build_note_doc(
let mut doc = build_note_doc(
&topic,
&content,
&tags,
location,
&chrono::Utc::now().to_rfc3339(),
);

// Embed-on-write per `belief-nodes-embedding-Spec.md` §4.1 +
// §13.1's universal-principle commitment. Topic concatenated
// with content gives the encoder both the semantically-dense
// title and the body, weighted naturally (Spec §4.1.2 rationale).
// Failure-mode escape per Spec §4.1.2 + §13.1 lives in the
// shared helper — no embedding field on failure, sleep stage A
// retroactively populates.
let content_to_embed = format!("{topic}\n{content}");
try_embed_and_stamp_belief_node(ctx.embedder.as_ref(), &content_to_embed, "Pen", &mut doc)
.await;

let result = write_note(ctx, &doc).await?;
let key = result
.get("_key")
Expand Down