diff --git a/nerve/bootstrap.py b/nerve/bootstrap.py index 8ec567f..c67fe49 100644 --- a/nerve/bootstrap.py +++ b/nerve/bootstrap.py @@ -1847,6 +1847,19 @@ def _wrap_text(text: str, width: int = 51) -> list[str]: && curl -fsSL "https://github.com/steipete/gogcli/releases/download/v${GOG_VERSION}/gogcli_${GOG_VERSION}_linux_${ARCH}.tar.gz" \\ | tar xz -C /usr/local/bin gog +# Install Docker CLI (client only) so the agent can talk to the host +# docker daemon through the bind-mounted /var/run/docker.sock. Lets +# Bash run "docker compose up" / "docker run" against the host from +# inside the agent without needing a separate MCP sidecar. +RUN install -m 0755 -d /etc/apt/keyrings \\ + && curl -fsSL https://download.docker.com/linux/debian/gpg \\ + | gpg --dearmor -o /etc/apt/keyrings/docker.gpg \\ + && chmod a+r /etc/apt/keyrings/docker.gpg \\ + && echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/debian $(. /etc/os-release && echo $VERSION_CODENAME) stable" \\ + > /etc/apt/sources.list.d/docker.list \\ + && apt-get update && apt-get install -y --no-install-recommends docker-ce-cli docker-compose-plugin \\ + && rm -rf /var/lib/apt/lists/* + RUN mkdir -p /root/.nerve /root/nerve-workspace ENV NERVE_DOCKER=1 @@ -1870,29 +1883,94 @@ def _wrap_text(text: str, width: int = 51) -> list[str]: ENTRYPOINT ["/docker-entrypoint.sh"] """ +def _host_aligned_path(path: str) -> str: + """Return a YAML-safe representation of ``path`` that resolves to + the same absolute path on the host and inside the container. + + Compose substitutes ``${HOME}`` from the user's shell env at + runtime, so a ``~/foo`` workspace becomes ``${HOME}/foo`` and + expands to the host's ``$HOME``. Absolute paths are returned + untouched. Path alignment is what lets the agent pass paths to + the bind-mounted host docker daemon when launching siblings: the + same string resolves to the same files inside and out, so + ``docker run -v $PWD:$PWD ...`` from inside the agent gives the + sibling container the right host directory. + """ + if not path: + return path + if path.startswith("~/"): + return "${HOME}/" + path[2:] + if path == "~": + return "${HOME}" + return path + + def _build_docker_compose( workspace_path: str = "~/nerve-workspace", + projects_path: str = "~/projects", extra_mounts: list[str] | None = None, + docker_socket: bool = True, ) -> str: """Build docker-compose.yml content with host bind-mounts. Args: - workspace_path: Host path for the workspace (e.g. ~/nerve-workspace). - extra_mounts: Additional host:container mount pairs (e.g. ["~/code:/code"]). + workspace_path: Host path for the workspace (default ~/nerve-workspace). + projects_path: Host path for the projects directory containing + git checkouts and worktrees (default ~/projects). Mounted + with path alignment so the agent can pass the same paths to + the host daemon when starting sibling containers. + extra_mounts: Additional host:container mount pairs. + docker_socket: When True, mount the host docker socket + (``/var/run/docker.sock``) into the agent container so the + agent can run ``docker`` and ``docker compose`` directly via + Bash. Required for Grafana per-PR runs and HyperDX + ``make dev`` orchestration. Works with OrbStack and Docker + Desktop on macOS (both expose a compatibility symlink at + this path) and with stock dockerd on Linux. Disable only if + you have a reason to keep the agent isolated from the host + daemon. + + Note on the previous sidecar pattern: + Earlier revisions of this file shipped a ``docker-mcp`` service + that ran ``supercorp/supergateway`` wrapping ``ckreiling/mcp- + server-docker`` to expose Docker daemon verbs as MCP tools. That + approach (a) couldn't orchestrate ``docker compose`` (no compose + verbs in the underlying server) and (b) suffered a chronic + protocol-version drift between supergateway's hardcoded + ``MCP-Protocol-Version`` allowlist and the version Claude Code + sends in headers. Mounting the socket directly solves both + problems in five lines. The agent already had unfettered daemon + access through the sidecar's MCP tools, so the blast radius is + unchanged. """ - # Required mounts (always present) + # Host-aligned paths: same absolute string inside and outside the + # container, so the agent can pass them to the bind-mounted host + # docker daemon when mounting them into siblings. + workspace_aligned = _host_aligned_path(workspace_path) + projects_aligned = _host_aligned_path(projects_path) + + # Required mounts. ~/.nerve stays at /root/.nerve because it is + # agent-only state and never passed through to siblings. + # ~/.nerve/claude:/root/.claude persists Claude Code's in-container + # state (config + per-conversation .jsonl files under projects/) + # across container restarts. Without this mount the .jsonl files + # are wiped on every recreate and the Nerve DB's stale + # sdk_session_id rows fail every --resume with "No conversation + # found" exit 1. The path is siloed under ~/.nerve so the agent's + # CLI is isolated from the host user's personal ~/.claude (where + # macOS stores OAuth tokens via the system Keychain; auth still + # comes from config.local.yaml, not from this directory). volumes = [ ".:/nerve", "~/.nerve:/root/.nerve", - f"{workspace_path}:/root/nerve-workspace", + "~/.nerve/claude:/root/.claude", + f"{workspace_aligned}:{workspace_aligned}", + f"{projects_aligned}:{projects_aligned}", ] # Optional auth mounts — only include if the host directory exists. # Docker would create missing dirs as root-owned empties, which # confuses the tools and pollutes the host filesystem. - # Note: ~/.claude is NOT mounted — macOS stores OAuth tokens in the - # system Keychain, not on disk. The entrypoint exports ANTHROPIC_API_KEY - # from config.local.yaml instead, which the claude CLI picks up. _optional_mounts = [ ("~/.config/gh", "/root/.config/gh", "gh CLI auth"), ("~/.config/gog", "/root/.config/gog", "gog CLI auth"), @@ -1902,17 +1980,57 @@ def _build_docker_compose( if os.path.isdir(expanded): volumes.append(f"{host_path}:{container_path}") + if docker_socket: + # Direct daemon access for the agent. With this in place, + # `docker` and `docker compose` work from Bash inside the + # agent. The path-aligned ${{HOME}}/projects mount above means + # the daemon resolves bind-mount paths identically inside and + # out, so `cd ~/projects/worktrees// && make dev` + # works from the agent and creates containers visible on the + # host. OrbStack and Docker Desktop both expose the daemon at + # this exact path on macOS via a compatibility symlink. + volumes.append("/var/run/docker.sock:/var/run/docker.sock") + if extra_mounts: volumes.extend(extra_mounts) - # Build YAML by hand to keep formatting clean vol_lines = "\n".join(f" - {v}" for v in volumes) - return f"""services: - nerve: + # In-agent service ports. The agent publishes ranges for dev + # servers it runs itself (docs preview, vite, storybook). Sibling + # containers (grafana, hyperdx-*) get their own host ports + # allocated when launched directly by the host daemon, so they are + # not listed here. + in_agent_port_ranges = [ + ("docs", 3000, 3019), + ("vite", 5173, 5189), + ("storybook", 6006, 6019), + ] + port_lines = [' - "8900:8900"'] + for label, lo, hi in in_agent_port_ranges: + port_lines.append(f' - "{lo}-{hi}:{lo}-{hi}" # {label}') + + nerve_block = f""" nerve: build: . ports: - - "8900:8900" +{chr(10).join(port_lines)} + environment: + # Path alignment: the entrypoint creates /root/* symlinks pointing + # at HOST_HOME so legacy paths still resolve, and any path passed + # to the host docker daemon resolves identically inside and out. + HOST_HOME: ${{HOME}} + # Route memU embeddings at the local Ollama sidecar (defined + # below). The OpenAI SDK respects this base URL when it talks to + # /embeddings, so memU recall + memorize work without OpenAI + # auth or network egress. Empty disables the override; memu_bridge + # then falls back to api.openai.com if openai_api_key is set, + # otherwise embeddings are simply skipped. + MEMU_EMBEDDING_BASE_URL: http://embeddings:11434/v1 + MEMU_EMBEDDING_API_KEY: placeholder + MEMU_EMBED_MODEL: nomic-embed-text + depends_on: + embeddings: + condition: service_healthy volumes: {vol_lines} restart: unless-stopped @@ -1920,7 +2038,56 @@ def _build_docker_compose( tty: true env_file: - path: .env - required: false + required: false""" + + embeddings_block = """ # Self-hosted OpenAI-compatible embeddings service. + # Ollama serves nomic-embed-text (768-dim) at /v1/embeddings, the + # same wire format the OpenAI SDK speaks. This replaces the OpenAI + # /embeddings calls memU made for routing + recall, removing the + # quota / 401 single point of failure. Native ARM64 image; no + # emulation overhead on Apple Silicon. The first start of this + # service downloads the model (~270 MB) and caches it under + # ~/.nerve/ollama; subsequent starts are instant. nomic-embed-text + # returns 768-dim vectors (vs OpenAI ada-002's 1536), so any + # existing memu.sqlite embeddings get rebuilt on next memorize. + embeddings: + image: ollama/ollama:latest + volumes: + - ~/.nerve/ollama:/root/.ollama + expose: + - "11434" + restart: unless-stopped + # Pull the embedding model on first start, then run the server. + # `ollama serve` blocks; we pull in the background, wait until the + # API responds, then `wait` keeps the server in the foreground. + entrypoint: ["/bin/sh", "-c"] + command: + - | + set -e + /bin/ollama serve & + pid=$$! + until /bin/ollama list >/dev/null 2>&1; do sleep 1; done + if ! /bin/ollama list | awk '{print $$1}' | grep -q '^nomic-embed-text'; then + echo "Pulling nomic-embed-text..." + /bin/ollama pull nomic-embed-text + fi + wait $$pid + healthcheck: + # Ready means: server up AND embedding model loaded. We grep for + # the model name so we don't mark healthy before the first-run + # model pull finishes. + test: + - CMD-SHELL + - 'ollama list 2>/dev/null | grep -q "^nomic-embed-text"' + interval: 10s + timeout: 5s + retries: 60 + start_period: 30s""" + + return f"""services: +{nerve_block} + +{embeddings_block} """ _DOCKER_ENTRYPOINT_TEMPLATE = """#!/bin/bash @@ -1937,6 +2104,33 @@ def _build_docker_compose( cd web && npm ci --quiet && npm run build && cd .. fi +# --- Path alignment --- +# HOST_HOME comes from compose (set to ${HOME} on the host). For each +# host-aligned mount point, drop a symlink at the legacy /root/* path +# so anything that hard-codes /root/nerve-workspace or /root/projects +# keeps working and resolves to the same files the host docker daemon +# sees. Idempotent: if the symlink already points where we want, skip. +if [ -n "${HOST_HOME:-}" ]; then + for _name in nerve-workspace projects; do + _src="$HOST_HOME/$_name" + _dst="/root/$_name" + if [ ! -d "$_src" ]; then + continue + fi + if [ -L "$_dst" ]; then + # Already a symlink; trust it. + continue + fi + if [ -d "$_dst" ] && [ -z "$(ls -A "$_dst" 2>/dev/null)" ]; then + # Empty leftover dir from the Dockerfile mkdir or a prior + # bind mount that no longer exists. Replace with the symlink. + rmdir "$_dst" && ln -s "$_src" "$_dst" + elif [ ! -e "$_dst" ]; then + ln -s "$_src" "$_dst" + fi + done +fi + # --- Credential resolution (priority waterfall) --- # Export credentials from config.local.yaml so tools (claude CLI, gh CLI) # can authenticate inside Docker. macOS stores tokens in the Keychain @@ -1960,6 +2154,14 @@ def _build_docker_compose( [ -n "$_gh" ] && export GH_TOKEN="$_gh" fi +# Ensure the persisted Claude Code state dir exists and is writable +# before any tool that touches /root/.claude runs. The bind mount in +# docker-compose creates it as a host-owned empty dir on first boot; +# we need it owned by root with 0700 so the CLI can drop its config +# file and projects/ tree there without ENOENT or EACCES. +mkdir -p /root/.claude +chmod 700 /root/.claude + # Clean up stale PID file from previous container runs rm -f ~/.nerve/nerve.pid diff --git a/nerve/config.py b/nerve/config.py index f970b74..56ef8b9 100644 --- a/nerve/config.py +++ b/nerve/config.py @@ -287,6 +287,25 @@ class MemoryConfig: memorize_model: str = "claude-sonnet-4-6" # Extraction & preprocessing fast_model: str = "claude-haiku-4-5-20251001" # Category summaries, date resolution embed_model: str = "" + # Optional override for the OpenAI-compatible /embeddings endpoint + # memU calls. When set, takes precedence over the default + # https://api.openai.com/v1. Point it at a self-hosted sidecar + # (Ollama, TEI, LocalAI, etc.) to avoid the OpenAI quota / 401 + # single point of failure. The env vars MEMU_EMBEDDING_BASE_URL, + # MEMU_EMBEDDING_API_KEY, and MEMU_EMBED_MODEL override these + # config values at runtime, which is convenient for the docker + # compose path where the sidecar URL is known to the entrypoint, + # not the YAML config. + embedding_base_url: str = "" + embedding_api_key: str = "" + # Cap on concurrent LLM chat calls during memorize / recall. memU + # fans out per memory_type (profile, event, knowledge, behavior) + # and asyncio.gathers the results, which on lower Anthropic API + # tiers reliably blows the per-minute rate limit. Bounding + # concurrency at 1 serializes the bursts; the SDK's exponential + # backoff handles the rest. Bump to 2-4 if your API tier can + # absorb the parallel load. + llm_concurrency: int = 1 sqlite_dsn: str = "" semantic_dedup_threshold: float = 0.85 # Cosine similarity threshold for semantic dedup knowledge_filter: bool = False # Post-extraction LLM filter for generic knowledge (extra API call) @@ -302,6 +321,9 @@ def from_dict(cls, d: dict) -> MemoryConfig: memorize_model=d.get("memorize_model", "claude-sonnet-4-6"), fast_model=d.get("fast_model", "claude-haiku-4-5-20251001"), embed_model=d.get("embed_model", ""), + embedding_base_url=d.get("embedding_base_url", ""), + embedding_api_key=d.get("embedding_api_key", ""), + llm_concurrency=max(1, int(d.get("llm_concurrency", 1))), sqlite_dsn=d.get("sqlite_dsn", default_dsn), semantic_dedup_threshold=float(d.get("semantic_dedup_threshold", 0.85)), knowledge_filter=bool(d.get("knowledge_filter", False)), diff --git a/nerve/memory/memu_bridge.py b/nerve/memory/memu_bridge.py index c79ef71..4a6973b 100644 --- a/nerve/memory/memu_bridge.py +++ b/nerve/memory/memu_bridge.py @@ -11,6 +11,7 @@ import gc import json import logging +import os import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field @@ -417,6 +418,11 @@ def __init__(self, config: NerveConfig, audit_db: Any = None): # Debounce tracking for file re-indexing (path -> asyncio.Task) self._reindex_tasks: dict[str, asyncio.Task] = {} self._anthropic_client: Any | None = None # Lazy sync Anthropic + # Bounded concurrency for memU's LLM chat calls. Created lazily + # in _instrument_llm_timeouts() so it binds to the active event + # loop, then reused across _reset_llm_clients() so callers + # already queued don't double-acquire after a reset. + self._llm_semaphore: asyncio.Semaphore | None = None async def _audit(self, action: str, target_type: str, target_id: str | None = None, source: str = "bridge", details: dict | None = None) -> None: @@ -914,13 +920,48 @@ async def initialize(self) -> bool: }, } - if self.config.openai_api_key: + # Embedding profile resolution: env vars first, then YAML config, + # then OpenAI fallback. The env-var path is what docker compose + # uses to point memU at the local Ollama / TEI sidecar without + # rewriting config.yaml on every container start. Setting an + # explicit base URL implies a self-hosted endpoint; api_key is + # optional (Ollama / TEI ignore it but the OpenAI SDK requires + # a non-empty string). + embedding_base_url = ( + os.environ.get("MEMU_EMBEDDING_BASE_URL") + or self.config.memory.embedding_base_url + ) + embedding_api_key = ( + os.environ.get("MEMU_EMBEDDING_API_KEY") + or self.config.memory.embedding_api_key + ) + embed_model = ( + os.environ.get("MEMU_EMBED_MODEL") + or self.config.memory.embed_model + ) + + if embedding_base_url: + llm_profiles["embedding"] = { + "base_url": embedding_base_url, + "api_key": embedding_api_key or "placeholder", + "embed_model": embed_model, + "client_backend": "sdk", + } + embeddings_configured = True + logger.info( + "memU embeddings via self-hosted endpoint %s (model=%s)", + embedding_base_url, embed_model or "", + ) + elif self.config.openai_api_key: llm_profiles["embedding"] = { "base_url": "https://api.openai.com/v1", "api_key": self.config.openai_api_key, - "embed_model": self.config.memory.embed_model, + "embed_model": embed_model, "client_backend": "sdk", } + embeddings_configured = True + else: + embeddings_configured = False resources_dir = Path("~/.nerve/memu-resources").expanduser() resources_dir.mkdir(parents=True, exist_ok=True) @@ -971,8 +1012,8 @@ async def initialize(self) -> bool: # work goes through Anthropic — use Haiku for extraction # too to avoid saturating the rate-limit budget. "memory_extract_llm_profile": ( - fast_profile if not self.config.openai_api_key - else memorize_profile + memorize_profile if embeddings_configured + else fast_profile ), "category_update_llm_profile": fast_profile, # Pass Nerve's configured categories to memU so the LLM @@ -999,14 +1040,14 @@ async def initialize(self) -> bool: }, }, retrieve_config={ - "method": "llm" if not self.config.openai_api_key else "rag", + "method": "rag" if embeddings_configured else "llm", "route_intention": False, "sufficiency_check": False, "resource": {"enabled": False}, # Use Haiku for LLM-based ranking — cheaper and avoids # sharing Sonnet's rate-limit budget with the main agent. **({"llm_ranking_llm_profile": fast_profile} - if not self.config.openai_api_key else {}), + if not embeddings_configured else {}), }, ) self._available = True @@ -1048,7 +1089,7 @@ async def initialize(self) -> bool: # memorize pipeline's "categorize_items" step with one that # stores items and resources with embedding=None. This # avoids KeyError on the missing "embedding" LLM profile. - if not self.config.openai_api_key: + if not embeddings_configured: from memu.workflow.step import WorkflowStep as _WfStep _svc = self._service @@ -1365,7 +1406,7 @@ def _inject_bedrock_clients(self) -> None: logger.info("Injected Bedrock LLM client for profile '%s' (model=%s)", name, model) def _instrument_llm_timeouts(self) -> None: - """Configure per-call timeouts on LLM clients (two layers). + """Configure per-call timeouts and bounded concurrency on LLM clients. Layer 1: httpx-level timeout on the AsyncOpenAI transport. This catches unresponsive API calls at the socket level and raises @@ -1376,14 +1417,33 @@ def _instrument_llm_timeouts(self) -> None: (e.g. the coroutine is stuck in Python code, not I/O). It works because LLMClientWrapper.chat() delegates to self._client.chat() which resolves the instance attribute we set. + + Layer 3: asyncio.Semaphore wrapper for bounded concurrency. + memU fans out per-memory-type chat calls via asyncio.gather + (memu/app/memorize.py:_generate_entries_from_text and similar). + On lower Anthropic API tiers, a 4-way fan-out reliably hits + rate_limit_error and the SDK retry/backoff can't catch up + before the pipeline gives up. The semaphore caps simultaneous + chat() calls across all profiles at memory.llm_concurrency, + which serializes the bursts. The slot is held only while the + actual chat is running (queueing time is unbounded), so the + timeout in Layer 2 measures real call duration, not queue wait. """ import httpx as _httpx + # Lazily create the semaphore. Reused across _reset_llm_clients() + # so any callers already waiting in the queue don't lose their + # spot when clients get re-instrumented. + if self._llm_semaphore is None: + concurrency = max(1, int(self.config.memory.llm_concurrency)) + self._llm_semaphore = asyncio.Semaphore(concurrency) + logger.info("memU LLM concurrency capped at %d", concurrency) + for profile in ("memorize", "fast", "default"): try: client = self._service._get_llm_base_client(profile) - # --- Layer 1: httpx timeout + disable SDK retries --- + # --- Layer 1: httpx timeout + bounded SDK retries --- # (Bedrock clients use their own timeout; skip Layer 1 for them) if not isinstance(client, _BedrockLLMClient): inner = getattr(client, "client", None) # OpenAISDKClient.client = AsyncOpenAI @@ -1392,11 +1452,16 @@ def _instrument_llm_timeouts(self) -> None: self._LLM_CALL_TIMEOUT, connect=10.0, ) - # The OpenAI SDK defaults to max_retries=2 and 600s timeout. - # With our 120s asyncio.wait_for wrapper, SDK retries just - # waste time inside a doomed coroutine. Disable them so the - # httpx timeout fires cleanly and propagates immediately. - inner.max_retries = 0 + # SDK default max_retries=2, with exponential backoff + # that respects the API's Retry-After header on 429. + # We keep retries enabled so rate-limit responses + # recover automatically. The Layer 3 semaphore caps + # concurrency, so burst pressure stays low and the + # retries actually drain the queue. Bumped to 4 to + # cover the worst-case Anthropic minute-window roll. + # Total retry budget ~15s in the worst case (1+2+4+8s), + # well inside the Layer 2 wait_for(120s) bound. + inner.max_retries = 4 # --- Layer 2: asyncio.wait_for wrapper --- if not callable(getattr(client, "chat", None)): @@ -1408,44 +1473,47 @@ def _instrument_llm_timeouts(self) -> None: async def _timeout_chat( prompt, *, max_tokens=None, system_prompt=None, temperature=0.2, - _orig=original_chat, _prof=profile, + _orig=original_chat, _prof=profile, _sem=self._llm_semaphore, ): # Anthropic API requires max_tokens >= 1; memU sometimes # omits it. Default to 4096 to prevent 400 errors. if max_tokens is None: max_tokens = 4096 - t0 = time.monotonic() - try: - return await asyncio.wait_for( - _orig( - prompt, - max_tokens=max_tokens, - system_prompt=system_prompt, - temperature=temperature, - ), - timeout=self._LLM_CALL_TIMEOUT, - ) - except asyncio.TimeoutError: - elapsed = time.monotonic() - t0 - in_flight = len(self._metrics.in_flight) - # Dump httpx connection pool state for diagnosis - pool_info = "unknown" + # Acquire the concurrency slot first so the timeout + # only measures actual call duration, not queue wait. + async with _sem: + t0 = time.monotonic() try: - base = self._service._llm_clients.get(_prof) - sdk = getattr(base, "client", None) - transport = getattr(sdk, "_client", None) - pool = getattr(transport, "_pool", None) or getattr(transport, "_transport", None) - if pool: - pool_info = repr(pool) - except Exception: - pass - logger.error( - "memU LLM HUNG [%s]: no response after %.0fs " - "(prompt=%d chars, in_flight=%d, pool=%s)", - _prof, elapsed, len(prompt), - in_flight, pool_info, - ) - raise + return await asyncio.wait_for( + _orig( + prompt, + max_tokens=max_tokens, + system_prompt=system_prompt, + temperature=temperature, + ), + timeout=self._LLM_CALL_TIMEOUT, + ) + except asyncio.TimeoutError: + elapsed = time.monotonic() - t0 + in_flight = len(self._metrics.in_flight) + # Dump httpx connection pool state for diagnosis + pool_info = "unknown" + try: + base = self._service._llm_clients.get(_prof) + sdk = getattr(base, "client", None) + transport = getattr(sdk, "_client", None) + pool = getattr(transport, "_pool", None) or getattr(transport, "_transport", None) + if pool: + pool_info = repr(pool) + except Exception: + pass + logger.error( + "memU LLM HUNG [%s]: no response after %.0fs " + "(prompt=%d chars, in_flight=%d, pool=%s)", + _prof, elapsed, len(prompt), + in_flight, pool_info, + ) + raise _timeout_chat._nerve_timeout_wrapped = True # type: ignore[attr-defined] client.chat = _timeout_chat # type: ignore[method-assign] @@ -2448,5 +2516,9 @@ def available(self) -> bool: @property def _has_embeddings(self) -> bool: - """Whether an embedding provider (e.g. OpenAI) is configured.""" - return bool(self.config.openai_api_key) + """Whether an embedding provider (OpenAI or self-hosted) is configured.""" + return bool( + os.environ.get("MEMU_EMBEDDING_BASE_URL") + or self.config.memory.embedding_base_url + or self.config.openai_api_key + ) diff --git a/scripts/backfill_memu_embeddings.py b/scripts/backfill_memu_embeddings.py new file mode 100755 index 0000000..dbee755 --- /dev/null +++ b/scripts/backfill_memu_embeddings.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 +"""Backfill missing embedding_json values in the memU sqlite store. + +Why this exists +--------------- +memU writes ``embedding_json = NULL`` whenever the embedding profile +is unconfigured at memorize time. From 2026-05-06 to 2026-05-09 the +docker compose env var ``MEMU_EMBEDDING_BASE_URL`` was set on the +agent container, but the in-process code in ``nerve.memory.memu_bridge`` +only consulted ``self.config.openai_api_key`` when deciding whether +to register the embedding profile, so memU saw no embedding provider +and stored every new memory with a NULL vector. The result: vector +search at recall time was disabled for those rows, and queries that +should have hit recent memories returned "No relevant memories +found." See ``notes/lessons/2026-05-09-memu-embeddings-not-wired.md`` +for the full RCA. + +The companion change in this PR teaches ``memu_bridge`` to read +``MEMU_EMBEDDING_BASE_URL`` first; this script catches up the rows +that were already written with NULL. + +Targets +------- +Two tables get backfilled: + +- ``memu_memory_items``: text source is the ``summary`` column. +- ``memu_resources``: text source is the ``caption`` column. Rows + with NULL ``caption`` are skipped (nothing to embed). + +``memu_memory_categories`` does NOT need backfill: those embeddings +were already populated on 2026-05-05 when the categories were first +created and never wiped. + +Endpoint +-------- +The script reads the same env vars memU does: + +- ``MEMU_EMBEDDING_BASE_URL`` (e.g. ``http://embeddings:11434/v1``) +- ``MEMU_EMBEDDING_API_KEY`` (Ollama ignores this; OpenAI requires it) +- ``MEMU_EMBED_MODEL`` (e.g. ``nomic-embed-text``) + +It POSTs to ``{base_url}/embeddings`` with the OpenAI-compatible +payload ``{"model": ..., "input": [text1, text2, ...]}``. Ollama and +OpenAI both accept this format. + +Idempotent +---------- +The script only selects rows where ``embedding_json IS NULL OR +embedding_json = ''``. Re-running it picks up exactly the rows that +still need work (e.g. if a previous run was interrupted or hit a +transient HTTP error). + +Usage +----- +:: + + # See what would happen, no DB writes: + python3 scripts/backfill_memu_embeddings.py --dry-run + + # Backfill the first 100 rows (incremental): + python3 scripts/backfill_memu_embeddings.py --limit 100 + + # Backfill everything: + python3 scripts/backfill_memu_embeddings.py + + # Custom DB path: + python3 scripts/backfill_memu_embeddings.py \\ + --db /path/to/memu.sqlite + + # Backfill only one table: + python3 scripts/backfill_memu_embeddings.py --table memu_memory_items +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sqlite3 +import sys +import time +from pathlib import Path +from typing import Iterable + +import httpx + +logger = logging.getLogger("backfill_memu_embeddings") + +# Per-table backfill spec: (table name, text source column, log label) +TABLES = [ + ("memu_memory_items", "summary", "memory items"), + ("memu_resources", "caption", "resources"), +] + +DEFAULT_BATCH_SIZE = 32 +DEFAULT_DB = Path("~/.nerve/memu.sqlite").expanduser() +DEFAULT_BASE_URL = "http://embeddings:11434/v1" +DEFAULT_MODEL = "nomic-embed-text" +DEFAULT_API_KEY = "placeholder" +DEFAULT_TIMEOUT = 60.0 # seconds; nomic-embed on CPU is plenty fast within this + + +def _resolve_endpoint() -> tuple[str, str, str]: + base_url = os.environ.get("MEMU_EMBEDDING_BASE_URL", DEFAULT_BASE_URL).rstrip("/") + api_key = os.environ.get("MEMU_EMBEDDING_API_KEY", DEFAULT_API_KEY) or DEFAULT_API_KEY + model = os.environ.get("MEMU_EMBED_MODEL", DEFAULT_MODEL) or DEFAULT_MODEL + return base_url, api_key, model + + +def _fetch_pending( + cur: sqlite3.Cursor, + table: str, + text_col: str, + limit: int | None, +) -> list[tuple[str, str]]: + """Return rows that still need embeddings, as ``(id, text)`` tuples. + + Filters out rows where the text source is NULL or empty since + there's nothing to embed for those, and the embedding endpoint + rejects empty strings. + """ + sql = ( + f"SELECT id, {text_col} FROM {table} " + f"WHERE (embedding_json IS NULL OR embedding_json = '') " + f" AND {text_col} IS NOT NULL " + f" AND {text_col} != '' " + ) + if limit is not None: + sql += f"LIMIT {int(limit)}" + return list(cur.execute(sql)) + + +def _embed_batch( + client: httpx.Client, + base_url: str, + api_key: str, + model: str, + texts: list[str], +) -> list[list[float]]: + """POST a batch to ``/embeddings`` and return a list of vectors. + + Raises on non-2xx HTTP status. The caller controls retry policy. + """ + response = client.post( + f"{base_url}/embeddings", + headers={"Authorization": f"Bearer {api_key}"}, + json={"model": model, "input": texts}, + ) + response.raise_for_status() + payload = response.json() + data = payload.get("data") or [] + if len(data) != len(texts): + raise RuntimeError( + f"Embedding endpoint returned {len(data)} vectors for " + f"{len(texts)} inputs (model={model})" + ) + # Order is documented to match input order; sort by index defensively. + by_index = sorted(data, key=lambda d: d.get("index", 0)) + return [d["embedding"] for d in by_index] + + +def _backfill_table( + conn: sqlite3.Connection, + client: httpx.Client, + base_url: str, + api_key: str, + model: str, + table: str, + text_col: str, + label: str, + batch_size: int, + limit: int | None, + dry_run: bool, +) -> int: + """Backfill one table. Returns the number of rows written.""" + cur = conn.cursor() + pending = _fetch_pending(cur, table, text_col, limit) + total = len(pending) + if total == 0: + logger.info("%s: nothing to backfill", label) + return 0 + + # Also report the truly-empty-text count so dry-run is honest. + dropped = cur.execute( + f"SELECT COUNT(*) FROM {table} " + f"WHERE (embedding_json IS NULL OR embedding_json = '') " + f" AND ({text_col} IS NULL OR {text_col} = '')" + ).fetchone()[0] + if dropped: + logger.info( + "%s: skipping %d rows with NULL/empty %s (nothing to embed)", + label, dropped, text_col, + ) + + logger.info("%s: %d rows pending (batch=%d)", label, total, batch_size) + if dry_run: + return 0 + + written = 0 + start = time.monotonic() + for offset in range(0, total, batch_size): + chunk = pending[offset : offset + batch_size] + ids = [row[0] for row in chunk] + texts = [row[1] for row in chunk] + try: + vectors = _embed_batch(client, base_url, api_key, model, texts) + except (httpx.HTTPError, RuntimeError) as exc: + logger.error( + "%s: batch %d-%d failed (%s); skipping and continuing", + label, offset, offset + len(chunk), exc, + ) + continue + + # Single transaction per batch so an interrupt loses at most + # one batch of work. + with conn: + for row_id, vector in zip(ids, vectors): + conn.execute( + f"UPDATE {table} SET embedding_json = ? WHERE id = ?", + (json.dumps(vector), row_id), + ) + written += len(chunk) + + if written % 100 < batch_size: + elapsed = time.monotonic() - start + rate = written / elapsed if elapsed > 0 else 0 + logger.info( + "%s: %d/%d (%.1f rows/s, ~%.0fs remaining)", + label, written, total, rate, + (total - written) / rate if rate > 0 else 0, + ) + + elapsed = time.monotonic() - start + logger.info( + "%s: wrote %d embeddings in %.1fs", + label, written, elapsed, + ) + return written + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) + parser.add_argument( + "--db", + type=Path, + default=DEFAULT_DB, + help=f"Path to memu.sqlite (default: {DEFAULT_DB})", + ) + parser.add_argument( + "--batch-size", + type=int, + default=DEFAULT_BATCH_SIZE, + help=f"Rows per embedding request (default: {DEFAULT_BATCH_SIZE})", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Stop after this many rows per table (default: no limit)", + ) + parser.add_argument( + "--table", + choices=[t[0] for t in TABLES], + default=None, + help="Only backfill this table (default: all)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Count pending rows; do not embed or write", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable DEBUG logging", + ) + args = parser.parse_args(argv) + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + ) + + base_url, api_key, model = _resolve_endpoint() + logger.info( + "endpoint: %s, model: %s%s", + base_url, model, + " (DRY RUN)" if args.dry_run else "", + ) + + db_path = args.db.expanduser() + if not db_path.exists(): + logger.error("memu.sqlite not found at %s", db_path) + return 2 + + targets: Iterable[tuple[str, str, str]] = ( + [t for t in TABLES if t[0] == args.table] if args.table else TABLES + ) + + conn = sqlite3.connect(str(db_path)) + try: + with httpx.Client(timeout=DEFAULT_TIMEOUT) as client: + grand_total = 0 + for table, text_col, label in targets: + grand_total += _backfill_table( + conn, + client, + base_url, + api_key, + model, + table, + text_col, + label, + args.batch_size, + args.limit, + args.dry_run, + ) + logger.info("%s%d rows", "WOULD WRITE " if args.dry_run else "wrote ", grand_total) + finally: + conn.close() + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 9172fbe..5825e0a 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -433,6 +433,10 @@ def test_no_docker_env_defaults_to_server(self, tmp_path: Path) -> None: "ANTHROPIC_API_KEY": "sk-ant-api03-test", "NERVE_MODE": "personal", "NERVE_WORKSPACE": str(tmp_path / "ws"), + # Explicitly clear NERVE_DOCKER so the test runs correctly + # when executed inside a Docker container where NERVE_DOCKER=1 + # is already in the environment. + "NERVE_DOCKER": "", } with patch.dict(os.environ, env, clear=False): choices = run_non_interactive(tmp_path) @@ -478,7 +482,13 @@ def test_compose_bind_mounts(self) -> None: assert "~/.nerve:/root/.nerve" in volumes assert "~/.config/gh:/root/.config/gh" in volumes assert "~/.config/gog:/root/.config/gog" in volumes - assert "~/my-workspace:/root/nerve-workspace" in volumes + # Workspace and projects are mounted host-aligned: same path + # inside and outside the container so the agent can pass paths + # through to the host docker daemon (via the mounted socket) + # without translation. Legacy /root/* paths are restored via + # symlinks in the entrypoint. + assert "${HOME}/my-workspace:${HOME}/my-workspace" in volumes + assert "${HOME}/projects:${HOME}/projects" in volumes # ~/.claude is NOT mounted (macOS Keychain, not filesystem) assert "~/.claude:/root/.claude" not in volumes # No named volumes section @@ -494,7 +504,8 @@ def test_compose_skips_missing_auth_dirs(self) -> None: # Required mounts still present assert ".:/nerve" in volumes assert "~/.nerve:/root/.nerve" in volumes - assert "~/ws:/root/nerve-workspace" in volumes + assert "${HOME}/ws:${HOME}/ws" in volumes + assert "${HOME}/projects:${HOME}/projects" in volumes # Optional auth mounts absent assert "~/.config/gh:/root/.config/gh" not in volumes assert "~/.config/gog:/root/.config/gog" not in volumes diff --git a/tests/test_memu_bridge.py b/tests/test_memu_bridge.py index 396c065..4573e8c 100644 --- a/tests/test_memu_bridge.py +++ b/tests/test_memu_bridge.py @@ -354,6 +354,170 @@ def test_semantic_dedup_threshold_from_dict_default(self): config = MemoryConfig.from_dict({}) assert config.semantic_dedup_threshold == 0.85 + # --- Self-hosted embedding endpoint --- + + def test_embedding_base_url_default_empty(self): + config = MemoryConfig() + assert config.embedding_base_url == "" + assert config.embedding_api_key == "" + + def test_embedding_base_url_from_dict(self): + config = MemoryConfig.from_dict({ + "embedding_base_url": "http://embeddings:11434/v1", + "embedding_api_key": "secret", + }) + assert config.embedding_base_url == "http://embeddings:11434/v1" + assert config.embedding_api_key == "secret" + + def test_embedding_base_url_from_dict_default(self): + config = MemoryConfig.from_dict({}) + assert config.embedding_base_url == "" + assert config.embedding_api_key == "" + + # --- LLM concurrency cap --- + + def test_llm_concurrency_default_is_one(self): + config = MemoryConfig() + assert config.llm_concurrency == 1 + + def test_llm_concurrency_from_dict(self): + config = MemoryConfig.from_dict({"llm_concurrency": 4}) + assert config.llm_concurrency == 4 + + def test_llm_concurrency_from_dict_default_one(self): + config = MemoryConfig.from_dict({}) + assert config.llm_concurrency == 1 + + def test_llm_concurrency_zero_clamped_to_one(self): + # Zero would deadlock the wrapper. Clamp to 1. + config = MemoryConfig.from_dict({"llm_concurrency": 0}) + assert config.llm_concurrency == 1 + + def test_llm_concurrency_negative_clamped_to_one(self): + config = MemoryConfig.from_dict({"llm_concurrency": -3}) + assert config.llm_concurrency == 1 + + +class TestLlmConcurrencyWrapper: + """Verify _instrument_llm_timeouts wraps chat calls with the configured semaphore.""" + + @pytest.mark.asyncio + async def test_semaphore_serializes_concurrent_chat_calls(self, tmp_path): + """With llm_concurrency=1, four asyncio.gather'd chat calls run sequentially.""" + config = _make_config(tmp_path) + config.memory.llm_concurrency = 1 + bridge = MemUBridge(config) + + # Simulate memU's LLM clients with a slow-but-trackable chat method + in_flight = 0 + max_in_flight = 0 + call_order: list[int] = [] + + class _FakeBaseClient: + async def chat(self, prompt, *, max_tokens=None, system_prompt=None, temperature=0.2): + nonlocal in_flight, max_in_flight + in_flight += 1 + max_in_flight = max(max_in_flight, in_flight) + # Yield control so other gather'd tasks can race here + await asyncio.sleep(0.01) + call_order.append(int(prompt)) + in_flight -= 1 + return "ok" + + fake_inner_clients = { + "memorize": _FakeBaseClient(), + "fast": _FakeBaseClient(), + "default": _FakeBaseClient(), + } + # _instrument_llm_timeouts iterates ("memorize","fast","default") and + # calls _service._get_llm_base_client(profile). Stub the service. + bridge._service = MagicMock() + bridge._service._get_llm_base_client = lambda p: fake_inner_clients[p] + bridge._service._llm_clients = fake_inner_clients + + bridge._instrument_llm_timeouts() + + # Fan out 4 simultaneous calls on the memorize profile (matches the + # memU/extract_items pattern: one gather per memory_type). + wrapped = fake_inner_clients["memorize"].chat + await asyncio.gather(*(wrapped(str(i)) for i in range(4))) + + assert max_in_flight == 1, f"semaphore should serialize, got max_in_flight={max_in_flight}" + assert sorted(call_order) == [0, 1, 2, 3] + + @pytest.mark.asyncio + async def test_semaphore_respects_higher_concurrency(self, tmp_path): + """With llm_concurrency=3, four gather'd calls peak at 3 in-flight, not 4.""" + config = _make_config(tmp_path) + config.memory.llm_concurrency = 3 + bridge = MemUBridge(config) + + in_flight = 0 + max_in_flight = 0 + + class _FakeBaseClient: + async def chat(self, prompt, *, max_tokens=None, system_prompt=None, temperature=0.2): + nonlocal in_flight, max_in_flight + in_flight += 1 + max_in_flight = max(max_in_flight, in_flight) + await asyncio.sleep(0.02) + in_flight -= 1 + return "ok" + + fake_inner_clients = { + "memorize": _FakeBaseClient(), + "fast": _FakeBaseClient(), + "default": _FakeBaseClient(), + } + bridge._service = MagicMock() + bridge._service._get_llm_base_client = lambda p: fake_inner_clients[p] + bridge._service._llm_clients = fake_inner_clients + + bridge._instrument_llm_timeouts() + + wrapped = fake_inner_clients["memorize"].chat + await asyncio.gather(*(wrapped(str(i)) for i in range(4))) + + assert max_in_flight == 3, f"expected peak 3, got {max_in_flight}" + + @pytest.mark.asyncio + async def test_semaphore_survives_reset_and_reuses_same_instance(self, tmp_path): + """Re-running _instrument_llm_timeouts must not recreate the semaphore. + + _reset_llm_clients evicts caches and re-instruments. If we created a + fresh semaphore each time, callers waiting on the old one would lose + their slot and a parallel re-instrumentation could double the + effective concurrency. + """ + config = _make_config(tmp_path) + config.memory.llm_concurrency = 1 + bridge = MemUBridge(config) + + class _FakeBaseClient: + async def chat(self, *a, **kw): + return "ok" + + fake = {p: _FakeBaseClient() for p in ("memorize", "fast", "default")} + bridge._service = MagicMock() + bridge._service._get_llm_base_client = lambda p: fake[p] + bridge._service._llm_clients = fake + + bridge._instrument_llm_timeouts() + first_semaphore = bridge._llm_semaphore + assert first_semaphore is not None + + # Simulate a reset: drop the timeout-wrapper marker so re-instrumentation + # actually rewraps the clients. + for p in fake: + fake[p] = _FakeBaseClient() + bridge._service._get_llm_base_client = lambda p: fake[p] + bridge._service._llm_clients = fake + + bridge._instrument_llm_timeouts() + assert bridge._llm_semaphore is first_semaphore, ( + "semaphore must be the same instance across resets" + ) + class TestKnowledgeCustomPrompts: """Test that custom knowledge extraction prompts are defined correctly."""