Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
372 changes: 372 additions & 0 deletions crates/weaver-core/src/embed_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,372 @@
//! Shared embed-on-write helper for `belief_nodes` producers.
//!
//! Per `docs/specs/belief-nodes-embedding-Spec.md` §4 + §13.1 (universal
//! embed-on-write principle), every producer of cognitive content embeds
//! the text before insert. This module provides the common helper that
//! Pen (`tools/notes.rs`) and engine-nap (`engine/runtime.rs`) both call,
//! plus future single-doc producers added under §13.7.
//!
//! The Spec's failure-mode escape (§4.1.2, §13.1) is encoded here: on
//! embedder unavailability or call failure, the helper logs a warning
//! and returns without mutating the document. The caller proceeds with
//! the insert; the doc lands with no embedding field; sleep stage A's
//! retroactive pass (§7) populates it on the next sleep cycle. Producers
//! never block on embedder availability.
//!
//! Batched producers (preseed materializer per §4.3) do not use this
//! helper — they call `embedder.embed(&[texts], ...)` directly so the
//! encoder batches and amortizes per-call overhead.
//!
//! ## Why a shared module instead of inline in each producer
//!
//! - Single point of contract enforcement for the embedding field
//! shape (`embedding` / `embedding_model` / `embedding_dim` /
//! `embedding_task`).
//! - Single point for the failure-mode telemetry surface — operators
//! inspecting logs see one canonical warning shape across producers.
//! - When the substrate-gateway pattern lands per Spec §13.8, the
//! refactor touches one helper rather than every producer.

use serde_json::{Value, json};
use std::sync::Arc;

use crate::embedder::Embedder;
use weaver_database::graph::belief::{
BELIEF_EMBEDDING_DIM_FIELD, BELIEF_EMBEDDING_FIELD, BELIEF_EMBEDDING_MODEL_FIELD,
BELIEF_EMBEDDING_TASK_FIELD,
};

/// Jina V4 task hint for stored documents per Spec §2.1 — selects the
/// `retrieval` adapter with `"Passage: "` prefix at embed time.
pub const PASSAGE_TASK: &str = "retrieval.passage";

/// Embed `text` via the supplied embedder and stamp the four embedding
/// fields onto `doc` per `belief-nodes-embedding-Spec.md` §4.1.2.
///
/// `producer_label` is a short string (e.g. `"Pen"`, `"engine_nap"`)
/// used in tracing output so operators can identify which producer's
/// write degraded under embedder failure.
///
/// **`embedding_model` is provenance, not a constant.** Per Spec §2.1
/// the field is provenance metadata, so it must reflect what the
/// backend *actually* produced, not the Spec example's literal string.
/// `VISION.md` §Framework vs. Instantiation commits to a
/// substrate-agnostic embedder contract — hardcoding a model identifier
/// here would bake the current instantiation (Jina V4) into the
/// framework's write path and produce false negatives in the
/// migration-detection filter (`FILTER doc.embedding_model != "<id>"`)
/// for any future agent profile that runs a different encoder. We call
/// the trait's `embed()` directly (rather than the `embed_one`
/// convenience wrapper) specifically to retain `EmbedResult.model`.
///
/// On `embedder == None` or call failure, logs a warning and returns
/// without mutating `doc`. The caller proceeds with the insert; the
/// doc lands with no embedding field; sleep stage A's retroactive
/// pass populates it later. Producers never block on embedder
/// availability — embedder failure degrades the substrate per Spec
/// §13.1's failure-mode escape, it does not fail the write.
pub async fn try_embed_and_stamp_belief_node(
embedder: Option<&Arc<dyn Embedder>>,
text: &str,
producer_label: &str,
doc: &mut Value,
) {
let Some(embedder) = embedder else {
tracing::warn!(
producer = %producer_label,
"no embedder configured; document will land with embedding: null. \
Degraded mode — production agents should always have an embedder. \
Sleep stage A retroactive pass will populate the embedding later."
);
return;
};

// Calling `embed()` rather than `embed_one()` so we retain
// `EmbedResult.model` for the provenance stamp. See doc comment
// above for rationale.
match embedder
.embed(&[text.to_string()], PASSAGE_TASK, None)
.await
{
Ok(result) => {
let mut embeddings = result.embeddings;
match embeddings.len() {
1 => {
// `pop()` cannot return `None` here — we just
// matched on `len() == 1`. Vector ownership moves
// into the JSON document via `json!`.
let vec = embeddings.pop().expect("len == 1 ensures Some");
let dim = vec.len();
doc[BELIEF_EMBEDDING_FIELD] = json!(vec);
doc[BELIEF_EMBEDDING_MODEL_FIELD] = json!(result.model);
doc[BELIEF_EMBEDDING_DIM_FIELD] = json!(dim);
doc[BELIEF_EMBEDDING_TASK_FIELD] = json!(PASSAGE_TASK);
}
n => {
// Protocol violation: a single-text `embed()`
// call must return exactly one embedding. The
// trait's `embed_one` wrapper raises this as
// `InvalidResponse`; on the substrate-write path
// we degrade to "no embedding stamped" so the
// doc still lands and sleep stage A backfills.
tracing::warn!(
producer = %producer_label,
expected = 1,
got = n,
"embedder returned unexpected embedding count for single-text call; \
document will land with embedding: null. \
Sleep stage A retroactive pass will populate it later."
);
}
}
}
Err(e) => {
tracing::warn!(
error = %e,
producer = %producer_label,
"embed call failed; document will land with embedding: null. \
Sleep stage A retroactive pass will populate it on next sleep."
);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::embedder::{EmbedResult, Embedder, EmbedderInfo, EmbeddingError, LateChunkedResult};
use async_trait::async_trait;

/// Fixed-dimension fixture embedder for testing. Returns a vector of
/// the configured dimension filled with a deterministic ramp so
/// stamped values are inspectable in assertions.
struct FixtureEmbedder {
dim: usize,
}

#[async_trait]
impl Embedder for FixtureEmbedder {
async fn embed(
&self,
texts: &[String],
_task: &str,
_batch_size: Option<u32>,
) -> Result<EmbedResult, EmbeddingError> {
Ok(EmbedResult {
embeddings: texts
.iter()
.map(|_| (0..self.dim).map(|i| i as f32).collect())
.collect(),
model: "fixture".into(),
dimension: self.dim as u32,
duration_ms: 0,
})
}
async fn embed_late_chunked(
&self,
_: &str,
_: &str,
) -> Result<LateChunkedResult, EmbeddingError> {
unreachable!()
}
async fn info(&self) -> Result<EmbedderInfo, EmbeddingError> {
unreachable!()
}
}

/// An embedder that always fails — exercises the failure-mode escape.
struct FailingEmbedder;

#[async_trait]
impl Embedder for FailingEmbedder {
async fn embed(
&self,
_: &[String],
_: &str,
_: Option<u32>,
) -> Result<EmbedResult, EmbeddingError> {
Err(EmbeddingError::NotAvailable("test failure".into()))
}
async fn embed_late_chunked(
&self,
_: &str,
_: &str,
) -> Result<LateChunkedResult, EmbeddingError> {
unreachable!()
}
async fn info(&self) -> Result<EmbedderInfo, EmbeddingError> {
unreachable!()
}
}

#[tokio::test]
async fn stamps_four_fields_on_success() {
let embedder: Arc<dyn Embedder> = Arc::new(FixtureEmbedder { dim: 8 });
let mut doc = json!({ "topic": "t", "content": "c" });

try_embed_and_stamp_belief_node(Some(&embedder), "t\nc", "test", &mut doc).await;

assert_eq!(doc[BELIEF_EMBEDDING_FIELD].as_array().unwrap().len(), 8);
// Model name is sourced from `EmbedResult.model` reported by
// the backend, not from a hardcoded constant. FixtureEmbedder
// reports `"fixture"`; the production Jina V4 backend reports
// its own identifier. Provenance must reflect what actually
// wrote the embedding.
assert_eq!(doc[BELIEF_EMBEDDING_MODEL_FIELD], json!("fixture"));
assert_eq!(doc[BELIEF_EMBEDDING_DIM_FIELD], json!(8));
assert_eq!(doc[BELIEF_EMBEDDING_TASK_FIELD], json!(PASSAGE_TASK));
}

/// A backend reporting a different model identifier — exercises
/// the provenance contract that `embedding_model` reflects the
/// actual backend, not a constant.
struct AlternateModelEmbedder;

#[async_trait]
impl Embedder for AlternateModelEmbedder {
async fn embed(
&self,
_: &[String],
_: &str,
_: Option<u32>,
) -> Result<EmbedResult, EmbeddingError> {
Ok(EmbedResult {
embeddings: vec![vec![0.0_f32; 4]],
model: "experimental-encoder-v0".into(),
dimension: 4,
duration_ms: 0,
})
}
async fn embed_late_chunked(
&self,
_: &str,
_: &str,
) -> Result<LateChunkedResult, EmbeddingError> {
unreachable!()
}
async fn info(&self) -> Result<EmbedderInfo, EmbeddingError> {
unreachable!()
}
}

#[tokio::test]
async fn stamped_model_name_reflects_backend_identifier() {
let embedder: Arc<dyn Embedder> = Arc::new(AlternateModelEmbedder);
let mut doc = json!({});

try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await;

assert_eq!(
doc[BELIEF_EMBEDDING_MODEL_FIELD],
json!("experimental-encoder-v0"),
"embedding_model must reflect the backend-reported identifier, \
not a hardcoded constant — otherwise the migration-detection \
filter produces false negatives across encoder swaps"
);
}

/// Backend returning the wrong number of embeddings: silently
/// dropping or panicking would both be wrong. The helper must
/// degrade to "no embedding stamped" so the doc still lands and
/// sleep stage A's retroactive pass can populate it.
struct WrongCountEmbedder(usize);

#[async_trait]
impl Embedder for WrongCountEmbedder {
async fn embed(
&self,
_: &[String],
_: &str,
_: Option<u32>,
) -> Result<EmbedResult, EmbeddingError> {
Ok(EmbedResult {
embeddings: (0..self.0).map(|_| vec![0.0_f32; 4]).collect(),
model: "broken".into(),
dimension: 4,
duration_ms: 0,
})
}
async fn embed_late_chunked(
&self,
_: &str,
_: &str,
) -> Result<LateChunkedResult, EmbeddingError> {
unreachable!()
}
async fn info(&self) -> Result<EmbedderInfo, EmbeddingError> {
unreachable!()
}
}

#[tokio::test]
async fn does_not_mutate_doc_when_backend_returns_zero_embeddings() {
let embedder: Arc<dyn Embedder> = Arc::new(WrongCountEmbedder(0));
let mut doc = json!({ "topic": "t" });
let before = doc.clone();

try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await;

assert_eq!(
doc, before,
"protocol violation (0 embeddings) must leave doc unmodified"
);
}

#[tokio::test]
async fn does_not_mutate_doc_when_backend_returns_multiple_embeddings() {
let embedder: Arc<dyn Embedder> = Arc::new(WrongCountEmbedder(2));
let mut doc = json!({ "topic": "t" });
let before = doc.clone();

try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await;

assert_eq!(
doc, before,
"protocol violation (2 embeddings for single-text call) must leave doc unmodified"
);
}

#[tokio::test]
async fn does_not_mutate_doc_when_embedder_absent() {
let mut doc = json!({ "topic": "t", "content": "c" });
let before = doc.clone();

try_embed_and_stamp_belief_node(None, "t\nc", "test", &mut doc).await;

assert_eq!(doc, before, "doc must not gain embedding fields under None");
}

#[tokio::test]
async fn does_not_mutate_doc_when_embedder_fails() {
let embedder: Arc<dyn Embedder> = Arc::new(FailingEmbedder);
let mut doc = json!({ "topic": "t", "content": "c" });
let before = doc.clone();

try_embed_and_stamp_belief_node(Some(&embedder), "t\nc", "test", &mut doc).await;

assert_eq!(
doc, before,
"doc must not gain embedding fields when embedder errors"
);
}

#[tokio::test]
async fn embedding_dim_reflects_actual_vector_length() {
// The dim field reports the actual vector length (Spec §2.1):
// "Redundant with embedding.len() but cheap to filter on — a
// single-field FILTER doc.embedding_dim != 2048 query identifies
// migration-stragglers without parsing the array."
// Stamping a constant 2048 would mask drift.
for dim in [4, 16, 64, 2048] {
let embedder: Arc<dyn Embedder> = Arc::new(FixtureEmbedder { dim });
let mut doc = json!({});
try_embed_and_stamp_belief_node(Some(&embedder), "x", "test", &mut doc).await;
assert_eq!(doc[BELIEF_EMBEDDING_DIM_FIELD], json!(dim));
assert_eq!(
doc[BELIEF_EMBEDDING_FIELD].as_array().unwrap().len(),
dim,
"actual vector length matches stamped dim"
);
}
}
}
Loading