From c09ba25ed00f47452dfcffc29cfd61832ccf6471 Mon Sep 17 00:00:00 2001 From: Alex Newman Date: Sat, 18 Apr 2026 14:22:05 -0700 Subject: [PATCH 1/2] fix: viewer model dropdown, SSE cleanup, OpenRouter URL, zh-TW mode, alias fix, restart guard, FTS5 fallback, httpcore dep - #1958: Map precise model IDs in viewer settings dropdown, preserve on save - #1959: Clean up dead SSE clients on disconnect/error, periodic 30s sweep - #1960: Make OpenRouter base URL configurable via CLAUDE_MEM_OPENROUTER_BASE_URL - #1961: Add Traditional Chinese (zh-TW) mode file with proper translations - #2054: Use version-agnostic alias path, always update existing aliases - #2053: Replace flat restart counter with time-windowed counter (60s window, cap 10, 5m decay) - #2048: Fall back to FTS5 full-text search when Chroma disabled for text queries - #2046: Add httpcore and httpx to chroma-mcp uvx dependencies Co-Authored-By: Claude Opus 4.6 (1M context) --- plugin/modes/code--zh-tw.json | 24 +++++ plugin/scripts/smart-install.js | 36 +++++--- src/services/sync/ChromaMcpManager.ts | 4 + src/services/worker-service.ts | 20 ++-- src/services/worker-types.ts | 1 + src/services/worker/OpenRouterAgent.ts | 27 ++++-- src/services/worker/RestartGuard.ts | 92 +++++++++++++++++++ src/services/worker/SSEBroadcaster.ts | 73 ++++++++++++++- src/services/worker/SearchManager.ts | 24 +++-- src/services/worker/SessionManager.ts | 1 + .../worker/http/routes/SessionRoutes.ts | 21 ++--- .../components/ContextSettingsModal.tsx | 31 ++++++- 12 files changed, 288 insertions(+), 66 deletions(-) create mode 100644 plugin/modes/code--zh-tw.json create mode 100644 src/services/worker/RestartGuard.ts diff --git a/plugin/modes/code--zh-tw.json b/plugin/modes/code--zh-tw.json new file mode 100644 index 0000000000..03c307edc4 --- /dev/null +++ b/plugin/modes/code--zh-tw.json @@ -0,0 +1,24 @@ +{ + "name": "Code Development (Traditional Chinese)", + "prompts": { + "footer": "IMPORTANT! DO NOT do any work right now other than generating this OBSERVATIONS from tool use messages - and remember that you are a memory agent designed to summarize a DIFFERENT claude code session, not this one.\n\nNever reference yourself or your own actions. Do not output anything other than the observation content formatted in the XML structure above. All other output is ignored by the system, and the system has been designed to be smart about token usage. Please spend your tokens wisely on useful observations.\n\nRemember that we record these observations as a way of helping us stay on track with our progress, and to help us keep important decisions and changes at the forefront of our minds! :) Thank you so much for your help!\n\nLANGUAGE REQUIREMENTS: Please write the observation data in 繁體中文", + + "xml_title_placeholder": "[**title**: 捕捉核心行動或主題的簡短標題]", + "xml_subtitle_placeholder": "[**subtitle**: 一句話解釋(最多24個單詞)]", + "xml_fact_placeholder": "[簡潔、獨立的陳述]", + "xml_narrative_placeholder": "[**narrative**: 完整背景:做了什麼、如何運作、為什麼重要]", + "xml_concept_placeholder": "[知識類型類別]", + "xml_file_placeholder": "[檔案路徑]", + + "xml_summary_request_placeholder": "[捕捉使用者請求和討論/完成內容實質的簡短標題]", + "xml_summary_investigated_placeholder": "[到目前為止探索了什麼?檢查了什麼?]", + "xml_summary_learned_placeholder": "[你了解到了什麼運作原理?]", + "xml_summary_completed_placeholder": "[到目前為止完成了什麼工作?發佈或更改了什麼?]", + "xml_summary_next_steps_placeholder": "[在此會話中,你正在積極處理或計劃接下來處理什麼?]", + "xml_summary_notes_placeholder": "[關於當前進度的其他見解或觀察]", + + "continuation_instruction": "IMPORTANT: Continue generating observations from tool use messages using the XML structure below.\n\nLANGUAGE REQUIREMENTS: Please write the observation data in 繁體中文", + + "summary_footer": "IMPORTANT! DO NOT do any work right now other than generating this next PROGRESS SUMMARY - and remember that you are a memory agent designed to summarize a DIFFERENT claude code session, not this one.\n\nNever reference yourself or your own actions. Do not output anything other than the summary content formatted in the XML structure above. All other output is ignored by the system, and the system has been designed to be smart about token usage. Please spend your tokens wisely on useful summary content.\n\nThank you, this summary will be very useful for keeping track of our progress!\n\nLANGUAGE REQUIREMENTS: Please write ALL summary content (request, investigated, learned, completed, next_steps, notes) in 繁體中文" + } +} diff --git a/plugin/scripts/smart-install.js b/plugin/scripts/smart-install.js index 4d2f1a3767..cbeda385e4 100644 --- a/plugin/scripts/smart-install.js +++ b/plugin/scripts/smart-install.js @@ -340,20 +340,20 @@ function installUv() { } /** - * Add shell alias for claude-mem command + * Add shell alias for claude-mem command. + * Uses a version-agnostic path that resolves the latest installed version at runtime, + * so the alias survives plugin upgrades without needing to re-source shell config. */ function installCLI() { - const WORKER_CLI = join(ROOT, 'scripts', 'worker-service.cjs'); const bunPath = getBunPath() || 'bun'; - const aliasLine = `alias claude-mem='${bunPath} "${WORKER_CLI}"'`; - const markerPath = join(ROOT, '.cli-installed'); - - // Skip if already installed - if (existsSync(markerPath)) return; + // Version-agnostic: resolve the latest installed version at runtime + const versionAgnosticScript = `$(ls -d ~/.claude/plugins/cache/thedotmack/claude-mem/*/scripts/worker-service.cjs 2>/dev/null | sort -V | tail -1 || echo "${join(ROOT, 'scripts', 'worker-service.cjs')}")`; + const aliasLine = `alias claude-mem='${bunPath} "${versionAgnosticScript}"'`; try { if (IS_WINDOWS) { // Windows: Add to PATH via PowerShell profile + const WORKER_CLI = join(ROOT, 'scripts', 'worker-service.cjs'); const profilePath = join(process.env.USERPROFILE || homedir(), 'Documents', 'PowerShell', 'Microsoft.PowerShell_profile.ps1'); const profileDir = join(process.env.USERPROFILE || homedir(), 'Documents', 'PowerShell'); const functionDef = `function claude-mem { & "${bunPath}" "${WORKER_CLI}" $args }\n`; @@ -363,13 +363,18 @@ function installCLI() { } const existingContent = existsSync(profilePath) ? readFileSync(profilePath, 'utf-8') : ''; - if (!existingContent.includes('function claude-mem')) { + if (existingContent.includes('function claude-mem')) { + // Update existing function definition + const updated = existingContent.replace(/function claude-mem \{[^\n]*\}\n?/, functionDef); + writeFileSync(profilePath, updated); + console.error(`✅ PowerShell function updated in profile`); + } else { writeFileSync(profilePath, existingContent + '\n' + functionDef); console.error(`✅ PowerShell function added to profile`); - console.error(' Restart your terminal to use: claude-mem '); } + console.error(' Restart your terminal to use: claude-mem '); } else { - // Unix: Add alias to shell configs + // Unix: Add or update alias in shell configs const shellConfigs = [ join(homedir(), '.bashrc'), join(homedir(), '.zshrc') @@ -378,7 +383,12 @@ function installCLI() { for (const config of shellConfigs) { if (existsSync(config)) { const content = readFileSync(config, 'utf-8'); - if (!content.includes('alias claude-mem=')) { + if (content.includes('alias claude-mem=')) { + // Update existing alias to use version-agnostic path + const updated = content.replace(/alias claude-mem='[^']*'/g, aliasLine); + writeFileSync(config, updated); + console.error(`✅ Alias updated in ${config}`); + } else { writeFileSync(config, content + '\n' + aliasLine + '\n'); console.error(`✅ Alias added to ${config}`); } @@ -386,11 +396,9 @@ function installCLI() { } console.error(' Restart your terminal to use: claude-mem '); } - - writeFileSync(markerPath, new Date().toISOString()); } catch (error) { console.error(`⚠️ Could not add shell alias: ${error.message}`); - console.error(` Use directly: ${bunPath} "${WORKER_CLI}" `); + console.error(` Use directly: ${bunPath} "${join(ROOT, 'scripts', 'worker-service.cjs')}" `); } } diff --git a/src/services/sync/ChromaMcpManager.ts b/src/services/sync/ChromaMcpManager.ts index c293cbf08d..37afab4719 100644 --- a/src/services/sync/ChromaMcpManager.ts +++ b/src/services/sync/ChromaMcpManager.ts @@ -207,6 +207,8 @@ export class ChromaMcpManager { const args = [ '--python', pythonVersion, + '--with', 'httpcore', + '--with', 'httpx', 'chroma-mcp', '--client-type', 'http', '--host', chromaHost, @@ -233,6 +235,8 @@ export class ChromaMcpManager { // Local mode: persistent client with data directory return [ '--python', pythonVersion, + '--with', 'httpcore', + '--with', 'httpx', 'chroma-mcp', '--client-type', 'persistent', '--data-dir', DEFAULT_CHROMA_DATA_DIR.replace(/\\/g, '/') diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 6d37cab8ac..18e1eadbf7 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -759,19 +759,12 @@ export class WorkerService { } // Fall through to pending-work restart below } - const MAX_PENDING_RESTARTS = 3; - if (pendingCount > 0) { - // Track consecutive pending-work restarts to prevent infinite loops (e.g. FK errors) - session.consecutiveRestarts = (session.consecutiveRestarts || 0) + 1; - - if (session.consecutiveRestarts > MAX_PENDING_RESTARTS) { - logger.error('SYSTEM', 'Exceeded max pending-work restarts, stopping to prevent infinite loop', { - sessionId: session.sessionDbId, - pendingCount, - consecutiveRestarts: session.consecutiveRestarts - }); - session.consecutiveRestarts = 0; + // Time-windowed restart guard: only count restarts within last 60s, cap at 10 + const { recordRestartAndCheckAllowed, resetRestartCounter } = await import('./worker/RestartGuard.js'); + + if (!recordRestartAndCheckAllowed(session, 'Pending-work restart')) { + resetRestartCounter(session); this.terminateSession(session.sessionDbId, 'max_restarts_exceeded'); return; } @@ -789,7 +782,8 @@ export class WorkerService { } else { // Successful completion with no pending work — clean up session // removeSessionImmediate fires onSessionDeletedCallback → broadcastProcessingStatus() - session.consecutiveRestarts = 0; + const { resetRestartCounter: resetCounter } = await import('./worker/RestartGuard.js'); + resetCounter(session); this.sessionManager.removeSessionImmediate(session.sessionDbId); } }); diff --git a/src/services/worker-types.ts b/src/services/worker-types.ts index 79dc9199a5..b8489c9325 100644 --- a/src/services/worker-types.ts +++ b/src/services/worker-types.ts @@ -35,6 +35,7 @@ export interface ActiveSession { conversationHistory: ConversationMessage[]; // Shared conversation history for provider switching currentProvider: 'claude' | 'gemini' | 'openrouter' | null; // Track which provider is currently running consecutiveRestarts: number; // Track consecutive restart attempts to prevent infinite loops + restartTimestamps: number[]; // Timestamps of recent restarts for time-windowed counting forceInit?: boolean; // Force fresh SDK session (skip resume) idleTimedOut?: boolean; // Set when session exits due to idle timeout (prevents restart loop) lastGeneratorActivity: number; // Timestamp of last generator progress (for stale detection, Issue #1099) diff --git a/src/services/worker/OpenRouterAgent.ts b/src/services/worker/OpenRouterAgent.ts index f034987f3b..e71701bd56 100644 --- a/src/services/worker/OpenRouterAgent.ts +++ b/src/services/worker/OpenRouterAgent.ts @@ -28,8 +28,8 @@ import { type WorkerRef } from './agents/index.js'; -// OpenRouter API endpoint -const OPENROUTER_API_URL = 'https://openrouter.ai/api/v1/chat/completions'; +// OpenRouter API endpoint (configurable via env or settings) +const DEFAULT_OPENROUTER_API_URL = 'https://openrouter.ai/api/v1/chat/completions'; // Context window management constants (defaults, overridable via settings) const DEFAULT_MAX_CONTEXT_MESSAGES = 20; // Maximum messages to keep in conversation history @@ -86,7 +86,7 @@ export class OpenRouterAgent { async startSession(session: ActiveSession, worker?: WorkerRef): Promise { try { // Get OpenRouter configuration - const { apiKey, model, siteUrl, appName } = this.getOpenRouterConfig(); + const { apiKey, model, siteUrl, appName, baseUrl } = this.getOpenRouterConfig(); if (!apiKey) { throw new Error('OpenRouter API key not configured. Set CLAUDE_MEM_OPENROUTER_API_KEY in settings or OPENROUTER_API_KEY environment variable.'); @@ -110,7 +110,7 @@ export class OpenRouterAgent { // Add to conversation history and query OpenRouter with full context session.conversationHistory.push({ role: 'user', content: initPrompt }); - const initResponse = await this.queryOpenRouterMultiTurn(session.conversationHistory, apiKey, model, siteUrl, appName); + const initResponse = await this.queryOpenRouterMultiTurn(session.conversationHistory, apiKey, model, siteUrl, appName, baseUrl); if (initResponse.content) { // Add response to conversation history @@ -181,7 +181,7 @@ export class OpenRouterAgent { // Add to conversation history and query OpenRouter with full context session.conversationHistory.push({ role: 'user', content: obsPrompt }); - const obsResponse = await this.queryOpenRouterMultiTurn(session.conversationHistory, apiKey, model, siteUrl, appName); + const obsResponse = await this.queryOpenRouterMultiTurn(session.conversationHistory, apiKey, model, siteUrl, appName, baseUrl); let tokensUsed = 0; if (obsResponse.content) { @@ -224,7 +224,7 @@ export class OpenRouterAgent { // Add to conversation history and query OpenRouter with full context session.conversationHistory.push({ role: 'user', content: summaryPrompt }); - const summaryResponse = await this.queryOpenRouterMultiTurn(session.conversationHistory, apiKey, model, siteUrl, appName); + const summaryResponse = await this.queryOpenRouterMultiTurn(session.conversationHistory, apiKey, model, siteUrl, appName, baseUrl); let tokensUsed = 0; if (summaryResponse.content) { @@ -356,7 +356,8 @@ export class OpenRouterAgent { apiKey: string, model: string, siteUrl?: string, - appName?: string + appName?: string, + apiUrl?: string ): Promise<{ content: string; tokensUsed?: number }> { // Truncate history to prevent runaway costs const truncatedHistory = this.truncateHistory(history); @@ -370,7 +371,8 @@ export class OpenRouterAgent { estimatedTokens }); - const response = await fetch(OPENROUTER_API_URL, { + const resolvedApiUrl = apiUrl || DEFAULT_OPENROUTER_API_URL; + const response = await fetch(resolvedApiUrl, { method: 'POST', headers: { 'Authorization': `Bearer ${apiKey}`, @@ -438,7 +440,7 @@ export class OpenRouterAgent { * Get OpenRouter configuration from settings or environment * Issue #733: Uses centralized ~/.claude-mem/.env for credentials, not random project .env files */ - private getOpenRouterConfig(): { apiKey: string; model: string; siteUrl?: string; appName?: string } { + private getOpenRouterConfig(): { apiKey: string; model: string; siteUrl?: string; appName?: string; baseUrl: string } { const settingsPath = USER_SETTINGS_PATH; const settings = SettingsDefaultsManager.loadFromFile(settingsPath); @@ -449,11 +451,16 @@ export class OpenRouterAgent { // Model: from settings or default const model = settings.CLAUDE_MEM_OPENROUTER_MODEL || 'xiaomi/mimo-v2-flash:free'; + // Base URL: configurable for proxies or alternative endpoints + const baseUrl = process.env.CLAUDE_MEM_OPENROUTER_BASE_URL || + settings.CLAUDE_MEM_OPENROUTER_BASE_URL || + DEFAULT_OPENROUTER_API_URL; + // Optional analytics headers const siteUrl = settings.CLAUDE_MEM_OPENROUTER_SITE_URL || ''; const appName = settings.CLAUDE_MEM_OPENROUTER_APP_NAME || 'claude-mem'; - return { apiKey, model, siteUrl, appName }; + return { apiKey, model, siteUrl, appName, baseUrl }; } } diff --git a/src/services/worker/RestartGuard.ts b/src/services/worker/RestartGuard.ts new file mode 100644 index 0000000000..6edf258457 --- /dev/null +++ b/src/services/worker/RestartGuard.ts @@ -0,0 +1,92 @@ +/** + * RestartGuard: Time-windowed restart counter for session generators. + * + * Replaces the flat consecutiveRestarts counter with a windowed approach: + * - Only counts restarts within the last RESTART_WINDOW_MS (60 seconds) + * - Higher raw cap (10) to accommodate legitimate long sessions + * - Resets after RESTART_DECAY_MS (5 minutes) of successful processing + * + * Shared between worker-service.ts and SessionRoutes.ts to prevent + * inconsistent restart guard logic. + */ + +import type { ActiveSession } from '../worker-types.js'; +import { logger } from '../../utils/logger.js'; + +/** Only count restarts within this window */ +const RESTART_WINDOW_MS = 60_000; // 60 seconds + +/** Reset counter after this much successful processing */ +const RESTART_DECAY_MS = 5 * 60_000; // 5 minutes + +/** Maximum restarts allowed within the window */ +const MAX_WINDOWED_RESTARTS = 10; + +/** + * Record a restart attempt and check whether the session has exceeded the limit. + * + * @returns true if the restart is allowed, false if it should be blocked + */ +export function recordRestartAndCheckAllowed(session: ActiveSession, logContext: string): boolean { + const now = Date.now(); + + // Initialize restartTimestamps if missing (backward compat) + if (!session.restartTimestamps) { + session.restartTimestamps = []; + } + + // Add current restart timestamp + session.restartTimestamps.push(now); + + // Prune timestamps outside the window + session.restartTimestamps = session.restartTimestamps.filter( + ts => (now - ts) < RESTART_WINDOW_MS + ); + + // Also maintain the legacy counter for logging + session.consecutiveRestarts = (session.consecutiveRestarts || 0) + 1; + + const restartsInWindow = session.restartTimestamps.length; + + if (restartsInWindow > MAX_WINDOWED_RESTARTS) { + logger.error('SYSTEM', `${logContext}: Exceeded max windowed restarts (${restartsInWindow}/${MAX_WINDOWED_RESTARTS} in ${RESTART_WINDOW_MS / 1000}s)`, { + sessionId: session.sessionDbId, + restartsInWindow, + maxRestarts: MAX_WINDOWED_RESTARTS, + windowMs: RESTART_WINDOW_MS + }); + return false; + } + + return true; +} + +/** + * Reset the restart counter after successful processing. + * Called when a session completes with no pending work, or after + * sustained successful processing (decay). + */ +export function resetRestartCounter(session: ActiveSession): void { + session.consecutiveRestarts = 0; + session.restartTimestamps = []; +} + +/** + * Apply time decay: if enough time has passed since the last restart, + * clear the restart history. Call this periodically during successful processing. + */ +export function applyRestartDecay(session: ActiveSession): void { + if (!session.restartTimestamps || session.restartTimestamps.length === 0) return; + + const now = Date.now(); + const mostRecentRestart = Math.max(...session.restartTimestamps); + + if (now - mostRecentRestart > RESTART_DECAY_MS) { + logger.debug('SYSTEM', 'Restart counter decayed after sustained success', { + sessionId: session.sessionDbId, + previousRestarts: session.restartTimestamps.length, + decayMs: RESTART_DECAY_MS + }); + resetRestartCounter(session); + } +} diff --git a/src/services/worker/SSEBroadcaster.ts b/src/services/worker/SSEBroadcaster.ts index da2cb4bffc..5f9659ed15 100644 --- a/src/services/worker/SSEBroadcaster.ts +++ b/src/services/worker/SSEBroadcaster.ts @@ -14,6 +14,14 @@ import type { SSEEvent, SSEClient } from '../worker-types.js'; export class SSEBroadcaster { private sseClients: Set = new Set(); + private cleanupInterval: ReturnType | null = null; + + constructor() { + // Periodic cleanup of dead/disconnected SSE clients every 30 seconds + this.cleanupInterval = setInterval(() => { + this.cleanupDeadClients(); + }, 30_000); + } /** * Add a new SSE client connection @@ -27,6 +35,11 @@ export class SSEBroadcaster { this.removeClient(res); }); + // Also handle error events (e.g., broken pipe) + res.on('error', () => { + this.removeClient(res); + }); + // Send initial event this.sendToClient(res, { type: 'connected', timestamp: Date.now() }); } @@ -40,7 +53,35 @@ export class SSEBroadcaster { } /** - * Broadcast an event to all connected clients (single-pass) + * Remove all dead/disconnected clients from the active set. + * Checks each client's underlying socket for writability. + */ + private cleanupDeadClients(): void { + const initialSize = this.sseClients.size; + if (initialSize === 0) return; + + const deadClients: SSEClient[] = []; + for (const client of this.sseClients) { + // Check if the underlying socket is destroyed or not writable + if (client.writableEnded || client.writableFinished || client.destroyed) { + deadClients.push(client); + } + } + + for (const dead of deadClients) { + this.sseClients.delete(dead); + } + + if (deadClients.length > 0) { + logger.debug('WORKER', 'Cleaned up dead SSE clients', { + removed: deadClients.length, + remaining: this.sseClients.size + }); + } + } + + /** + * Broadcast an event to all connected clients (single-pass with dead client cleanup) */ broadcast(event: SSEEvent): void { if (this.sseClients.size === 0) { @@ -53,9 +94,23 @@ export class SSEBroadcaster { logger.debug('WORKER', 'SSE broadcast sent', { eventType: event.type, clients: this.sseClients.size }); - // Single-pass write + // Single-pass write with inline dead client cleanup + const deadClients: SSEClient[] = []; for (const client of this.sseClients) { - client.write(data); + try { + if (client.writableEnded || client.destroyed) { + deadClients.push(client); + } else { + client.write(data); + } + } catch { + deadClients.push(client); + } + } + + // Remove any dead clients discovered during broadcast + for (const dead of deadClients) { + this.sseClients.delete(dead); } } @@ -66,6 +121,18 @@ export class SSEBroadcaster { return this.sseClients.size; } + /** + * Stop the periodic cleanup interval. + * Call this during graceful shutdown. + */ + dispose(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + this.sseClients.clear(); + } + /** * Send event to a specific client */ diff --git a/src/services/worker/SearchManager.ts b/src/services/worker/SearchManager.ts index ba64fb0903..918e61050b 100644 --- a/src/services/worker/SearchManager.ts +++ b/src/services/worker/SearchManager.ts @@ -260,14 +260,24 @@ export class SearchManager { logger.debug('SEARCH', 'ChromaDB found no matches (final result, no FTS5 fallback)', {}); } } - // ChromaDB not initialized - mark as failed to show proper error message + // ChromaDB not initialized - fall back to FTS5 full-text search else if (query) { - chromaFailed = true; - logger.debug('SEARCH', 'ChromaDB not initialized - semantic search unavailable', {}); - logger.debug('SEARCH', 'Install UVX/Python to enable vector search', { url: 'https://docs.astral.sh/uv/getting-started/installation/' }); - observations = []; - sessions = []; - prompts = []; + logger.debug('SEARCH', 'ChromaDB not initialized - falling back to FTS5 full-text search', {}); + const obsOptions = { ...options, type: obs_type, concepts, files }; + if (searchObservations) { + observations = this.sessionSearch.searchObservations(query, obsOptions); + } + if (searchSessions) { + sessions = this.sessionSearch.searchSessions(query, options); + } + if (searchPrompts) { + prompts = this.sessionSearch.searchUserPrompts(query, options); + } + // If FTS5 also returned nothing, mark as failed so we show the install message + if (observations.length === 0 && sessions.length === 0 && prompts.length === 0) { + chromaFailed = true; + logger.debug('SEARCH', 'FTS5 fallback returned no results, suggesting Chroma install', {}); + } } const totalResults = observations.length + sessions.length + prompts.length; diff --git a/src/services/worker/SessionManager.ts b/src/services/worker/SessionManager.ts index 5ef91aaf42..6f5c62d9c5 100644 --- a/src/services/worker/SessionManager.ts +++ b/src/services/worker/SessionManager.ts @@ -218,6 +218,7 @@ export class SessionManager { conversationHistory: [], // Initialize empty - will be populated by agents currentProvider: null, // Will be set when generator starts consecutiveRestarts: 0, // Track consecutive restart attempts to prevent infinite loops + restartTimestamps: [], // Timestamps of recent restarts for time-windowed counting processingMessageIds: [], // CLAIM-CONFIRM: Track message IDs for confirmProcessed() lastGeneratorActivity: Date.now() // Initialize for stale detection (Issue #1099) }; diff --git a/src/services/worker/http/routes/SessionRoutes.ts b/src/services/worker/http/routes/SessionRoutes.ts index 21ffbc3393..0e9f68128e 100644 --- a/src/services/worker/http/routes/SessionRoutes.ts +++ b/src/services/worker/http/routes/SessionRoutes.ts @@ -289,10 +289,6 @@ export class SessionRoutes extends BaseRouteHandler { const pendingStore = this.sessionManager.getPendingMessageStore(); const pendingCount = pendingStore.getPendingCount(sessionDbId); - // CRITICAL: Limit consecutive restarts to prevent infinite loops - // This prevents runaway API costs when there's a persistent error (e.g., memorySessionId not captured) - const MAX_CONSECUTIVE_RESTARTS = 3; - if (pendingCount > 0) { // GUARD: Prevent duplicate crash recovery spawns if (this.crashRecoveryScheduled.has(sessionDbId)) { @@ -300,16 +296,10 @@ export class SessionRoutes extends BaseRouteHandler { return; } - session.consecutiveRestarts = (session.consecutiveRestarts || 0) + 1; + // Time-windowed restart guard: only count restarts within last 60s, cap at 10 + const { recordRestartAndCheckAllowed } = await import('../../RestartGuard.js'); - if (session.consecutiveRestarts > MAX_CONSECUTIVE_RESTARTS) { - logger.error('SESSION', `CRITICAL: Generator restart limit exceeded - stopping to prevent runaway costs`, { - sessionId: sessionDbId, - pendingCount, - consecutiveRestarts: session.consecutiveRestarts, - maxRestarts: MAX_CONSECUTIVE_RESTARTS, - action: 'Generator will NOT restart. Check logs for root cause. Messages remain in pending state.' - }); + if (!recordRestartAndCheckAllowed(session, 'Crash-recovery restart')) { // Don't restart - abort to prevent further API calls session.abortController.abort(); return; @@ -319,7 +309,7 @@ export class SessionRoutes extends BaseRouteHandler { sessionId: sessionDbId, pendingCount, consecutiveRestarts: session.consecutiveRestarts, - maxRestarts: MAX_CONSECUTIVE_RESTARTS + restartsInWindow: session.restartTimestamps?.length ?? 0 }); // Abort OLD controller before replacing to prevent child process leaks @@ -345,7 +335,8 @@ export class SessionRoutes extends BaseRouteHandler { // No pending work - abort to kill the child process session.abortController.abort(); // Reset restart counter on successful completion - session.consecutiveRestarts = 0; + const { resetRestartCounter } = await import('../../RestartGuard.js'); + resetRestartCounter(session); logger.debug('SESSION', 'Aborted controller after natural completion', { sessionId: sessionDbId }); diff --git a/src/ui/viewer/components/ContextSettingsModal.tsx b/src/ui/viewer/components/ContextSettingsModal.tsx index e7f0caf614..b2079e13a0 100644 --- a/src/ui/viewer/components/ContextSettingsModal.tsx +++ b/src/ui/viewer/components/ContextSettingsModal.tsx @@ -355,12 +355,35 @@ export function ContextSettingsModal({ tooltip="Claude model used for generating observations" > )} From b9f6046ccaac81d5c0e74c4891bffa5b43ab08ca Mon Sep 17 00:00:00 2001 From: Alex Newman Date: Sat, 18 Apr 2026 14:32:28 -0700 Subject: [PATCH 2/2] fix: wire SSE dispose to shutdown, call applyRestartDecay after successful processing Co-Authored-By: Claude Opus 4.6 (1M context) --- src/services/worker-service.ts | 10 ++++++++++ src/services/worker/SessionManager.ts | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 18e1eadbf7..fd1df781d3 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -533,6 +533,13 @@ export class WorkerService { if (reaped > 0) { logger.info('SYSTEM', `Reaped ${reaped} stale sessions`); } + + // Apply restart decay: if a session has been processing successfully + // for 5+ minutes since its last restart, clear the restart history + const { applyRestartDecay } = await import('./worker/RestartGuard.js'); + this.sessionManager.forEachActiveSession((session) => { + applyRestartDecay(session); + }); } catch (e) { logger.error('SYSTEM', 'Stale session reaper error', { error: e instanceof Error ? e.message : String(e) }); } @@ -1001,6 +1008,9 @@ export class WorkerService { this.staleSessionReaperInterval = null; } + // Stop SSE broadcaster cleanup interval to prevent timer leak + this.sseBroadcaster.dispose(); + await performGracefulShutdown({ server: this.server.getHttpServer(), sessionManager: this.sessionManager, diff --git a/src/services/worker/SessionManager.ts b/src/services/worker/SessionManager.ts index 6f5c62d9c5..614ad78e71 100644 --- a/src/services/worker/SessionManager.ts +++ b/src/services/worker/SessionManager.ts @@ -255,6 +255,15 @@ export class SessionManager { return this.sessions.get(sessionDbId); } + /** + * Iterate over all active sessions (for periodic maintenance tasks) + */ + forEachActiveSession(callback: (session: ActiveSession) => void): void { + for (const session of this.sessions.values()) { + callback(session); + } + } + /** * Queue an observation for processing (zero-latency notification) * Auto-initializes session if not in memory but exists in database