diff --git a/README.md b/README.md index 951dea2..e3e8126 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,37 @@ Disclosure is a *pure* read that follows the pointer already stored on each hit pointer. Keeping the agent's context append-only (to protect the prompt cache) is then the caller's discipline — `ir` hands back additive payloads. +### Graph & traverse (opt-in) + +Artifacts refer to each other — a package depends on packages, a skill has a +parent. `ir` models those as a **semantic link graph**: a typed-edge `links` +view on the store, populated at build time by an `EdgeExtractor`. + +```python +corpus = ir.build(source, edge_extractor=ir.default_edge_extractor) # deps→REF, parent→PARENT +graph = ir.CorpusGraph(corpus) +graph.neighbors("contaix", edge_type="REF") # the package's dependencies +``` + +`ir.traverse` walks that structure at query time under a pluggable `WalkPolicy` +(*score frontier → select → expand → stop*). **Safety is the operator's**: a +visited-set, depth cap, and node budget live in `traverse` itself, so even a +cyclic graph and a never-stopping policy terminate. The shipped +`collapsed_tree_policy` is pure-vector — it routes a query that matches an +artifact's *summary* down to that artifact's best *chunk*: + +```python +hits = ir.traverse(query, corpus, policy=ir.collapsed_tree_policy()) +``` + +**Flat top-k stays the default** — `traverse` is opt-in, and a policy earns its +keep only by beating flat+rerank on your eval set (a strong flat retriever wins +simple lookup; graph methods cost far more). Results are ordinary `SearchHit`s +with additive `metadata["walk_depth"]` / `["seed"]` provenance, so `select` / +`disclose` compose unchanged. This is the **semantic link graph** (cyclic, +query-time) — distinct from `ef.artifact_graph` (the acyclic build-time +derivation DAG). + ## Evaluation `ir.eval` scores discovery quality offline (reusing diff --git a/ir/__init__.py b/ir/__init__.py index 64e1592..6abe7e2 100644 --- a/ir/__init__.py +++ b/ir/__init__.py @@ -59,6 +59,7 @@ from .sources import CorpusSource from .store import CorpusStore from .strategy import Chunked, IndexingStrategy, Package, Skill, WholeText +from .traverse import WalkPolicy, WalkState, collapsed_tree_policy, traverse __all__ = [ "Artifact", @@ -88,6 +89,10 @@ "EdgeExtractor", "default_edge_extractor", "canonical_node_id", + "traverse", + "WalkPolicy", + "WalkState", + "collapsed_tree_policy", "expand", "Passage", "NeighborhoodPolicy", diff --git a/ir/traverse.py b/ir/traverse.py new file mode 100644 index 0000000..238f65b --- /dev/null +++ b/ir/traverse.py @@ -0,0 +1,327 @@ +"""Query-time graph traversal — the ``traverse`` operator (report 12). + +Recursive retrieval, summary-routing, RAPTOR collapsed-tree, PPR / +spreading-activation, and beam walks are **one operator** — *score frontier → +select → expand → test stop* — parameterized by a pluggable +:class:`WalkPolicy`. The engineering substance is termination, split in two: + +- **safety** ("*will* it stop?") — structural, and **enforced by the operator**: + a visited-set (keyed on the policy's node id), a depth cap, and a node + budget live in :class:`WalkState` and are checked by :func:`traverse` itself, + so a buggy or adversarial policy *cannot* loop forever — the highest-leverage + robustness decision when the graph may contain directed cycles; +- **sufficiency** ("*should* it stop?") — injected and fallible: the policy's + ``stop`` (default: never — run to budget). + +The policy owns graph semantics; the operator owns the loop and the safety +primitives. ``store`` is passed to the policy verbatim and never interpreted by +the operator — collapsed-tree takes a :class:`~ir.index.Corpus` (search to +seed, ledger to expand to chunks), an artifact-link policy a +:class:`~ir.graph.CorpusGraph` (``neighbors``). + +**Flat top-k + rerank stays the default** (report 12: a strong flat retriever +beats most graph methods on simple lookup, and global graph methods cost +orders of magnitude more); :func:`traverse` is opt-in, and a policy earns +promotion only by beating flat on the eval set. The shipped +:func:`collapsed_tree_policy` is pure-vector (no LLM in the query loop): it +routes a query that matches an artifact's *summary* surface down to that +artifact's best *chunk*. + +Returned :class:`~ir.base.SearchHit`\\s carry additive, JSON-clean walk +provenance — ``metadata["walk_depth"]`` and ``metadata["seed"]`` (the routing +node) — and compose with :func:`ir.select` / :func:`ir.disclose` unchanged. +""" + +from __future__ import annotations + +from collections.abc import Hashable, Iterable, Sequence +from dataclasses import dataclass, field +from typing import Any, Protocol, runtime_checkable + +import numpy as np + +from .base import Record, SearchHit +from .retrieve import records_for_artifact + +#: Default summary/router surface kinds a collapsed-tree walk seeds on. +DFLT_SUMMARY_KINDS = ("description", "synopsis", "capability", "document") +#: Default leaf surface kinds a collapsed-tree walk descends to (the results). +DFLT_LEAF_KINDS = ("chunk", "readme_chunk") +#: Default traversal bounds (operator-enforced safety). +DFLT_MAX_DEPTH = 2 +DFLT_NODE_BUDGET = 64 +#: Default number of summary surfaces a collapsed-tree walk routes from. +DFLT_SEED_K = 10 + + +@dataclass +class WalkState: + """The operator-owned state of one :func:`traverse` call — the safety home. + + ``visited`` (node ids already committed), ``budget``, and ``max_depth`` are + the structural safety primitives the operator enforces; ``results`` are the + emitted hits; ``cache`` is scratch space a policy may use (e.g. to embed the + query once). A policy reads this but the *operator* enforces the bounds — + a policy cannot opt out of termination. + """ + + query: str + max_depth: int + budget: int + visited: set = field(default_factory=set) + results: list = field(default_factory=list) + cache: dict = field(default_factory=dict) + + +@runtime_checkable +class WalkPolicy(Protocol): + """The pluggable strategy of a walk — graph semantics, not safety. + + ``seed`` produces the initial frontier; ``score`` ranks a node against the + query; ``select`` chooses which scored frontier nodes to commit/expand this + step (beam/greedy — default: all, best-first); ``expand`` yields a node's + neighbors; ``node_id`` is the hashable visited-set key; ``stop`` is the + injected sufficiency check; ``to_hit`` materializes a committed node as a + :class:`~ir.base.SearchHit` — or ``None`` for a *router-only* node (a + summary that routes but is not itself a result). + """ + + def seed(self, state: WalkState, store: Any) -> Iterable: ... + def score(self, state: WalkState, node: Any, store: Any) -> float: ... + def select(self, state: WalkState, scored: Sequence) -> Sequence: ... + def expand(self, state: WalkState, node: Any, store: Any) -> Iterable: ... + def node_id(self, node: Any) -> Hashable: ... + def stop(self, state: WalkState) -> bool: ... + def to_hit( + self, state: WalkState, node: Any, score: float, depth: int + ) -> "SearchHit | None": ... + + +def _finalize(results: list[SearchHit], k: int) -> list[SearchHit]: + """Best-first, top-*k*. Visited-set dedup already guarantees one per node.""" + return sorted(results, key=lambda h: h.score, reverse=True)[:k] + + +def traverse( + query: str, + store: Any, + *, + policy: WalkPolicy, + max_depth: int = DFLT_MAX_DEPTH, + node_budget: int = DFLT_NODE_BUDGET, + k: int = 10, +) -> list[SearchHit]: + """Walk *store* from *query* under *policy*, returning the top-*k* hits. + + The loop — *score the frontier → select → commit → expand* — is the + operator's; the **safety primitives are non-negotiable and enforced here**: + a node id is committed at most once (the visited-set), expansion stops at + ``max_depth``, and no more than ``node_budget`` nodes are ever committed. + A policy whose ``expand`` cycles forever and whose ``stop`` never fires + still terminates. + + Args: + query: the user intent. + store: passed to *policy* verbatim — a :class:`~ir.index.Corpus` for + :func:`collapsed_tree_policy`, a :class:`~ir.graph.CorpusGraph` for + an artifact-link policy. The operator never inspects it. + policy: the :class:`WalkPolicy` (e.g. :func:`collapsed_tree_policy`). + max_depth: maximum expansion depth from a seed (safety). + node_budget: maximum nodes committed (safety). + k: number of hits to return. + + Returns: + the committed hits, best-first, top-*k* — each a + :class:`~ir.base.SearchHit` with ``metadata["walk_depth"]`` / ``["seed"]``. + """ + state = WalkState(query=query, max_depth=max_depth, budget=node_budget) + frontier = [(node, 0) for node in policy.seed(state, store)] + while frontier and len(state.visited) < node_budget: + scored = [ + (node, depth, policy.score(state, node, store)) + for node, depth in frontier + if policy.node_id(node) not in state.visited + ] + if not scored: + break + next_frontier: list[tuple[Any, int]] = [] + for node, depth, score in policy.select(state, scored): + nid = policy.node_id(node) + if nid in state.visited: + continue + if len(state.visited) >= node_budget: + break + state.visited.add(nid) + hit = policy.to_hit(state, node, score, depth) + if hit is not None: + state.results.append(hit) + if policy.stop(state): + return _finalize(state.results, k) + if depth < max_depth: + for nb in policy.expand(state, node, store): + if policy.node_id(nb) not in state.visited: + next_frontier.append((nb, depth + 1)) + frontier = next_frontier + return _finalize(state.results, k) + + +# =========================================================================== # +# Collapsed-tree / summary-routing — the first shipped policy (pure-vector) +# =========================================================================== # + + +@dataclass(frozen=True) +class _WalkNode: + """A traversal node: one surface record + the artifact that routed to it. + + ``is_router`` is structural, not kind-based: a summary surface is a router + (suppressed from results, expands to its leaves) **only when its artifact + actually has leaf surfaces**. A summary surface whose artifact has no leaves + — a WholeText ``document``, a Skill ``capability`` — is *not* a router: it + is emitted directly, so a single-surface corpus degrades to flat-over- + summaries instead of silently returning nothing. + """ + + record: Record + seed: str | None = None + is_router: bool = False + + +def _cosine(query_vec: np.ndarray, vec: np.ndarray) -> float: + """Cosine of a unit *query_vec* against a raw record *vec*.""" + norm = float(np.linalg.norm(vec)) + return float(query_vec @ (vec / norm)) if norm else 0.0 + + +class _CollapsedTreePolicy: + """Summary-routing: seed on summary surfaces, descend to leaf chunks. + + The document-summary-index / RAPTOR collapsed-tree pattern, surface-grained + over ir's heterogeneous surfaces: a query that matches an artifact's + ``description`` / ``synopsis`` surface (the *router*, never itself a result) + surfaces that artifact's best leaf chunk (the *result*). Node id is the + record id, so within-artifact descent (summary and chunks share an + ``artifact_id``) is not blocked by the visited-set. + + Routing is **structural**: a seeded summary is a router only if its artifact + has leaf surfaces to descend to. On a corpus whose artifacts are a *single* + surface (WholeText ``document``, Skill ``capability``), there is no tree to + collapse — every seeded summary is leaf-less, so it is emitted directly and + the walk degrades to flat-over-summaries rather than silently returning + nothing. (A pure-chunk corpus, with no summary surface to seed from at all, + is the wrong corpus for this policy — use :func:`ir.search`.) + """ + + def __init__(self, *, summary_kinds, leaf_kinds, seed_k): + self.summary_kinds = tuple(summary_kinds) + self.leaf_kinds = tuple(leaf_kinds) + self.seed_k = seed_k + + def _query_vec(self, state: WalkState, store: Any) -> np.ndarray: + qv = state.cache.get("query_vec") + if qv is None: + from .retrieve import _embed_query + + embedder = getattr(store, "embedder", None) + qv = _embed_query(embedder, state.query) + state.cache["query_vec"] = qv + state.cache["source"] = getattr(store, "name", None) + return qv + + def seed(self, state: WalkState, store: Any) -> list[_WalkNode]: + from .retrieve import search + + self._query_vec(state, store) # warm the cache (query_vec + source) + hits = search( + store, + state.query, + surfaces=self.summary_kinds, + k=self.seed_k, + per_artifact=True, + ) + nodes: list[_WalkNode] = [] + for h in hits: + records = records_for_artifact(store, h.artifact_id) + summary = next( + (r for r in records if r.surface_kind in self.summary_kinds), None + ) + if summary is None: + continue + # A router only if there is a leaf to route to; otherwise emit it. + has_leaves = any(r.surface_kind in self.leaf_kinds for r in records) + nodes.append(_WalkNode(record=summary, seed=None, is_router=has_leaves)) + return nodes + + def score(self, state: WalkState, node: _WalkNode, store: Any) -> float: + return _cosine(self._query_vec(state, store), node.record.vector) + + def select(self, state: WalkState, scored: Sequence) -> list: + return sorted(scored, key=lambda t: t[2], reverse=True) + + def expand(self, state: WalkState, node: _WalkNode, store: Any) -> list[_WalkNode]: + # Only routers expand (to their artifact's leaves); a leaf — and a + # leaf-less summary, emitted directly — is terminal, so a query that + # matches a leaf directly doesn't fan out. + if not node.is_router: + return [] + aid = node.record.artifact_id + return [ + _WalkNode(record=r, seed=aid) + for r in records_for_artifact(store, aid) + if r.surface_kind in self.leaf_kinds + ] + + def node_id(self, node: _WalkNode) -> Hashable: + return node.record.id + + def stop(self, state: WalkState) -> bool: + return False + + def to_hit( + self, state: WalkState, node: _WalkNode, score: float, depth: int + ) -> "SearchHit | None": + # Routers route but are not results; a leaf-less summary is not a + # router (no leaves to route to), so it is emitted. + if node.is_router: + return None + meta = dict(node.record.metadata) + meta["walk_depth"] = depth + if node.seed is not None: + meta["seed"] = node.seed + return SearchHit( + artifact_id=node.record.artifact_id, + surface_kind=node.record.surface_kind, + score=float(score), + text=node.record.text, + metadata=meta, + source=state.cache.get("source"), + surface_index=node.record.surface_index, + ) + + +def collapsed_tree_policy( + *, + summary_kinds: Iterable[str] = DFLT_SUMMARY_KINDS, + leaf_kinds: Iterable[str] = DFLT_LEAF_KINDS, + seed_k: int = DFLT_SEED_K, +) -> WalkPolicy: + """The pure-vector summary-routing / collapsed-tree :class:`WalkPolicy`. + + Seeds on the top *seed_k* matches among ``summary_kinds`` surfaces and + descends to each routed artifact's ``leaf_kinds`` surfaces (the emitted + results), scored by cosine to the query. No LLM in the loop. A summary + surface is a *router* (suppressed from results) only when its artifact has + leaf surfaces; on a single-surface corpus (WholeText ``document``, Skill + ``capability``) the summaries are leaf-less and emitted directly, so the + walk degrades to flat-over-summaries instead of returning nothing. + + The defaults keep ``document`` / ``capability`` in ``summary_kinds`` *on + purpose* — that is what lets a WholeText / Skill corpus seed at all; the + structural router check (above) is what keeps those seeds from being + silently swallowed. + + >>> hits = traverse(q, corpus, policy=collapsed_tree_policy()) # doctest: +SKIP + """ + return _CollapsedTreePolicy( + summary_kinds=summary_kinds, leaf_kinds=leaf_kinds, seed_k=seed_k + ) diff --git a/tests/test_traverse.py b/tests/test_traverse.py new file mode 100644 index 0000000..2b38235 --- /dev/null +++ b/tests/test_traverse.py @@ -0,0 +1,420 @@ +"""Tests for the traversal operator (#47) — safety, routing, composition. + +Pins the #47 acceptance: operator-enforced termination on cyclic / never-stop +policies (the safety primitives are the operator's, not the policy's), the +collapsed-tree policy routing a summary match down to chunks and beating flat +top-k on a constructed routing case, PPR expressible as a degenerate policy +(shape only), and downstream select/disclose composition. Hermetic: light +embedder + memory store. +""" + +import json + +import ir +from ir.base import SearchHit +from ir.store import CorpusStore +from ir.traverse import WalkPolicy, WalkState, traverse + +# --------------------------------------------------------------------------- # +# A constructed routing case: A is gold (summary matches, answer in its chunk); +# D is the flat trap (summary does NOT match, but a chunk matches routing terms +# strongly). Routing excludes D before its chunk can compete; flat does not. +# --------------------------------------------------------------------------- # + +ROUTING_PKG = { + "A": { + "name": "A", + "description": "rtok1 rtok2 rtok3 rtok4 alpha", + "readme": "ANSTOK answer payload here.\n\nfiller one two three.", + }, + "E": { + "name": "E", + "description": "rtok1 rtok2 beta gamma", + "readme": "neutral content mu nu.\n\nmore neutral xi.", + }, + "D": { + "name": "D", + "description": "delta epsilon zeta unrelated", + "readme": "rtok1 rtok2 rtok3 rtok4 trap distractor strong.", + }, + "F": {"name": "F", "description": "omega sigma", "readme": "generic tau upsilon."}, +} +ROUTING_QUERY = "rtok1 rtok2 rtok3 rtok4 ANSTOK" + + +def _routing_corpus(): + src = ir.CorpusSource.from_mapping( + ROUTING_PKG, name="rt", strategy=ir.Package(chunk_size=120, overlap=10) + ) + return ir.build(src, store=CorpusStore.memory(), embedder="light") + + +def _first_rank(hits, needle="ANSTOK"): + return next((i for i, h in enumerate(hits) if needle in h.text), -1) + + +# --------------------------------------------------------------------------- # +# Collapsed-tree routing — beats flat on the constructed case +# --------------------------------------------------------------------------- # + + +def test_collapsed_tree_beats_flat_on_routing_case(): + corpus = _routing_corpus() + flat = ir.search(corpus, ROUTING_QUERY, k=10, per_artifact=False) + trav = ir.traverse( + ROUTING_QUERY, corpus, policy=ir.collapsed_tree_policy(seed_k=2), k=10 + ) + flat_rank = _first_rank(flat) + trav_rank = _first_rank(trav) + # Flat buries the answer chunk under A's summary + D's trap chunk; routing + # surfaces it at the top. + assert flat_rank > 0, f"expected the answer buried in flat, got rank {flat_rank}" + assert trav_rank == 0, f"expected the answer first under routing, got {trav_rank}" + assert trav[0].artifact_id == "A" + + +def test_collapsed_tree_excludes_unrouted_distractor(): + # D's summary does not match, so D is never seeded — its trap chunk (which + # flat ranks highly) cannot appear in the traversal results at all. + corpus = _routing_corpus() + trav = ir.traverse( + ROUTING_QUERY, corpus, policy=ir.collapsed_tree_policy(seed_k=2), k=10 + ) + assert all(h.artifact_id != "D" for h in trav) + + +def test_collapsed_tree_emits_only_leaves_not_routers(): + corpus = _routing_corpus() + trav = ir.traverse(ROUTING_QUERY, corpus, policy=ir.collapsed_tree_policy(), k=10) + assert trav # something came back + assert all(h.surface_kind == "readme_chunk" for h in trav) # no description + + +def test_traverse_hits_carry_walk_provenance(): + corpus = _routing_corpus() + trav = ir.traverse( + ROUTING_QUERY, corpus, policy=ir.collapsed_tree_policy(seed_k=2), k=10 + ) + top = trav[0] + assert top.metadata["walk_depth"] == 1 + assert top.metadata["seed"] == "A" + assert top.source == "rt" + json.dumps(top.to_dict()) # additive provenance stays JSON-clean + + +def test_traverse_composes_with_select_and_disclose(): + corpus = _routing_corpus() + trav = ir.traverse( + ROUTING_QUERY, corpus, policy=ir.collapsed_tree_policy(seed_k=2), k=10 + ) + selection = ir.select(trav) + results = ir.disclose(selection, level="metadata") + assert results and all(d.summary for d in results) + assert selection.selected[0].artifact_id == "A" + + +def test_traverse_respects_k(): + corpus = _routing_corpus() + assert ( + len(ir.traverse(ROUTING_QUERY, corpus, policy=ir.collapsed_tree_policy(), k=1)) + <= 1 + ) + + +def test_collapsed_tree_policy_is_a_walkpolicy(): + assert isinstance(ir.collapsed_tree_policy(), WalkPolicy) + + +# --------------------------------------------------------------------------- # +# Single-surface corpora (WholeText / Skill): every surface is a summary kind, +# none is a leaf kind. The footgun would route everything and emit nothing; +# the structural router check must instead emit the leaf-less summaries. +# --------------------------------------------------------------------------- # + + +def test_wholetext_corpus_does_not_silently_return_empty(): + # WholeText emits one "document" surface per artifact and no chunks. With + # "document" in DFLT_SUMMARY_KINDS, the naive policy would mark every + # surface a router (-> to_hit None) and find no leaves to descend to, + # returning []. The leaf-less-emit fix must surface the documents instead. + docs = { + "d1": "rtok1 rtok2 rtok3 rtok4 alpha ANSTOK answer payload here filler", + "d2": "unrelated beta gamma delta neutral content mu nu xi omicron", + "d3": "rtok1 omega sigma tau upsilon generic content here", + } + src = ir.CorpusSource.from_mapping(docs, name="wt", strategy=ir.WholeText()) + corpus = ir.build(src, store=CorpusStore.memory(), embedder="light") + trav = ir.traverse( + "rtok1 rtok2 rtok3 rtok4 ANSTOK", corpus, policy=ir.collapsed_tree_policy(), k=5 + ) + assert trav, "WholeText corpus must not silently return zero results" + assert all(h.surface_kind == "document" for h in trav) + assert trav[0].artifact_id == "d1" + # "flat-over-summaries" means *all* leaf-less seeds emit, not just the best: + # a top-1 regression (select[:1] / early stop) must not pass this. + assert {h.artifact_id for h in trav} == {"d1", "d2", "d3"} + # A leaf-less summary is emitted at its seed position (depth 0), with no + # routing parent — it IS the seed, not a descent target. + assert trav[0].metadata["walk_depth"] == 0 + assert "seed" not in trav[0].metadata + + +def test_skill_corpus_does_not_silently_return_empty(): + # Skill emits one "capability" surface per artifact and no chunks — same + # all-summary-kinds shape as WholeText, via a different summary kind. + skills = { + "s1": {"name": "deploy-app", "description": "rtok1 rtok2 rtok3 ANSTOK ship it"}, + "s2": { + "name": "other-thing", + "description": "unrelated beta gamma delta epsilon", + }, + } + src = ir.CorpusSource.from_mapping(skills, name="sk", strategy=ir.Skill()) + corpus = ir.build(src, store=CorpusStore.memory(), embedder="light") + trav = ir.traverse( + "rtok1 rtok2 rtok3 ANSTOK deploy-app", + corpus, + policy=ir.collapsed_tree_policy(), + k=5, + ) + assert trav, "Skill corpus must not silently return zero results" + assert all(h.surface_kind == "capability" for h in trav) + assert trav[0].artifact_id == "s1" + assert {h.artifact_id for h in trav} == {"s1", "s2"} # all leaf-less seeds emit + assert trav[0].metadata["walk_depth"] == 0 + + +def test_package_corpus_still_routes_summaries_not_leaf_less_emit(): + # The fix must NOT regress the genuine-tree case: a Package artifact HAS + # leaf surfaces, so its "description" summary stays a router (suppressed), + # and only readme_chunk leaves are emitted. + corpus = _routing_corpus() + trav = ir.traverse(ROUTING_QUERY, corpus, policy=ir.collapsed_tree_policy(), k=10) + assert trav + assert all(h.surface_kind == "readme_chunk" for h in trav) # no "description" + + +def test_mixed_corpus_routes_leaf_having_emits_leaf_less(): + # The decision must be PER-ARTIFACT, not corpus-global: one artifact has a + # readme (leaf-having -> routes, summary suppressed) and one has readme="" + # (leaf-less -> summary emitted directly). One traverse exercises BOTH + # has_leaves branches — a corpus-global flag would wrongly suppress no_leaf. + pkg = { + "has_leaf": { + "name": "has_leaf", + "description": "rtok1 rtok2 rtok3 rtok4 alpha", + "readme": "ANSTOK answer payload here filler one two three.", + }, + "no_leaf": { + "name": "no_leaf", + "description": "rtok1 rtok2 rtok3 rtok4 beta", + "readme": "", + }, + } + src = ir.CorpusSource.from_mapping( + pkg, name="mix", strategy=ir.Package(chunk_size=120, overlap=10) + ) + corpus = ir.build(src, store=CorpusStore.memory(), embedder="light") + trav = ir.traverse( + "rtok1 rtok2 rtok3 rtok4 ANSTOK", + corpus, + policy=ir.collapsed_tree_policy(), + k=10, + ) + by_aid = {h.artifact_id: h for h in trav} + # leaf-less artifact: emitted directly as its summary, at the seed position + assert by_aid["no_leaf"].surface_kind == "description" + assert by_aid["no_leaf"].metadata["walk_depth"] == 0 + assert "seed" not in by_aid["no_leaf"].metadata + # leaf-having artifact: summary suppressed (router), leaf emitted under it + assert by_aid["has_leaf"].surface_kind == "readme_chunk" + assert by_aid["has_leaf"].metadata["walk_depth"] == 1 + assert by_aid["has_leaf"].metadata["seed"] == "has_leaf" + + +def test_chunked_only_corpus_is_out_of_scope_documented_boundary(): + # Boundary characterization (not a footgun the fix addresses): a Chunked + # corpus has only "chunk" surfaces — a leaf kind, never a summary kind — so + # collapsed_tree has nothing to seed from and returns []. The policy + # docstring directs such corpora to ir.search; this pins that contract so an + # accidental seeding change is caught. + docs = {f"c{i}": f"rtok1 rtok2 chunk body {i} ANSTOK tail" for i in range(3)} + src = ir.CorpusSource.from_mapping(docs, name="ch", strategy=ir.Chunked()) + corpus = ir.build(src, store=CorpusStore.memory(), embedder="light") + assert ir.traverse("rtok1 ANSTOK", corpus, policy=ir.collapsed_tree_policy()) == [] + # ...and flat search is the right tool here — it does find them. + assert ir.search(corpus, "rtok1 ANSTOK", k=5) + + +# --------------------------------------------------------------------------- # +# Operator-enforced safety — termination regardless of policy behavior +# --------------------------------------------------------------------------- # + + +def _hit(node_id, score, depth): + return SearchHit( + artifact_id=str(node_id), + surface_kind="node", + score=float(score), + text=str(node_id), + metadata={"walk_depth": depth}, + ) + + +class _CyclicPolicy: + """A graph with a directed cycle and a policy that never stops.""" + + def __init__(self, adjacency): + self.adj = adjacency + + def seed(self, state, store): + return [0] + + def score(self, state, node, store): + return 0.0 + + def select(self, state, scored): + return scored + + def expand(self, state, node, store): + return self.adj.get(node, []) + + def node_id(self, node): + return node + + def stop(self, state): + return False + + def to_hit(self, state, node, score, depth): + return _hit(node, score, depth) + + +class _ExplodingPolicy: + """A never-stopping policy generating unbounded *fresh* node ids (binary).""" + + def seed(self, state, store): + return [0] + + def score(self, state, node, store): + return 0.0 + + def select(self, state, scored): + return scored + + def expand(self, state, node, store): + return [2 * node + 1, 2 * node + 2] # always-new ids; cannot be visited-deduped + + def node_id(self, node): + return node + + def stop(self, state): + return False + + def to_hit(self, state, node, score, depth): + return _hit(node, score, depth) + + +def test_cyclic_graph_terminates_via_visited_set(): + # 0 -> 1 -> 2 -> 0 : the visited-set alone must break the cycle even with + # a generous budget/depth and a policy that never stops. + policy = _CyclicPolicy({0: [1], 1: [2], 2: [0]}) + hits = traverse("q", None, policy=policy, max_depth=100, node_budget=100, k=100) + assert {h.artifact_id for h in hits} == {"0", "1", "2"} # each node once + + +def test_unbounded_fresh_nodes_terminate_via_node_budget(): + # Every node id is new (visited-set can't help), so the operator's node + # budget is the backstop that guarantees termination. + hits = traverse( + "q", None, policy=_ExplodingPolicy(), max_depth=100, node_budget=20, k=1000 + ) + assert len(hits) <= 20 + + +def test_depth_cap_bounds_expansion(): + # A linear fresh-node chain: expansion must stop at max_depth. + class _Linear(_ExplodingPolicy): + def expand(self, state, node, store): + return [node + 1] + + hits = traverse("q", None, policy=_Linear(), max_depth=3, node_budget=1000, k=1000) + assert max(h.metadata["walk_depth"] for h in hits) <= 3 + + +def test_injected_stop_halts_early(): + class _StopAfterTwo(_ExplodingPolicy): + def stop(self, state): + return len(state.results) >= 2 + + hits = traverse( + "q", None, policy=_StopAfterTwo(), max_depth=100, node_budget=100, k=100 + ) + assert len(hits) == 2 + + +# --------------------------------------------------------------------------- # +# PPR expressible as a degenerate policy (shape only — not implemented) +# --------------------------------------------------------------------------- # + + +def test_ppr_is_expressible_as_a_degenerate_policy(): + # PPR / spreading-activation as a one-shot closed-form: score is the + # personalization weight, stop() is immediately true (single relaxation), + # expand walks the artifact links. We only assert the protocol *admits* + # this shape over a CorpusGraph — we do not ship or promote it. + pkg = { + "aa": {"name": "aa", "description": "a", "readme": "", "deps": ["bb", "cc"]}, + "bb": {"name": "bb", "description": "b", "readme": "", "deps": ["cc"]}, + "cc": {"name": "cc", "description": "c", "readme": "", "deps": []}, + } + src = ir.CorpusSource.from_mapping(pkg, name="ppr", strategy=ir.Package()) + corpus = ir.build( + src, + store=CorpusStore.memory(), + embedder="light", + edge_extractor=ir.default_edge_extractor, + ) + graph = ir.CorpusGraph(corpus) + + class _PPRShape: + damping = 0.85 + + def seed(self, state, store): + return ["aa"] # the personalization vector's support + + def score(self, state, node, store): + return self.damping # closed-form weight (degenerate) + + def select(self, state, scored): + return scored + + def expand(self, state, node, store): + return [ + ir.canonical_node_id(t, source=store.source)[1] + for t in store.neighbors(node) + ] + + def node_id(self, node): + return node + + def stop(self, state): + return True # one relaxation step, then halt + + def to_hit(self, state, node, score, depth): + return _hit(node, score, depth) + + assert isinstance(_PPRShape(), WalkPolicy) + hits = traverse("q", graph, policy=_PPRShape(), max_depth=2, node_budget=10) + # stop() fires after the first committed node — the degenerate one-shot. + assert [h.artifact_id for h in hits] == ["aa"] + + +# --------------------------------------------------------------------------- # +# WalkState bounds are the operator's +# --------------------------------------------------------------------------- # + + +def test_walkstate_carries_the_safety_bounds(): + s = WalkState(query="q", max_depth=3, budget=10) + assert s.max_depth == 3 and s.budget == 10 + assert s.visited == set() and s.results == []