diff --git a/src/services/observer/ObserverBudgetTracker.ts b/src/services/observer/ObserverBudgetTracker.ts new file mode 100644 index 000000000..7888e1795 --- /dev/null +++ b/src/services/observer/ObserverBudgetTracker.ts @@ -0,0 +1,171 @@ +/** + * ObserverBudgetTracker + * + * Addresses Bug #1938: Observer background sessions burn excessive tokens with no budget cap. + * + * Provides: + * 1. Daily token budget tracking (resets at midnight) + * 2. Throttling between observer runs (configurable minimum interval) + * 3. Budget check before processing each observation + * + * All state is in-memory (resets on worker restart, which is acceptable since + * it means a restart gives a fresh daily budget). + */ + +import { logger } from '../../utils/logger.js'; +import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js'; +import { USER_SETTINGS_PATH } from '../../shared/paths.js'; + +export class ObserverBudgetTracker { + private static instance: ObserverBudgetTracker | null = null; + + /** Total tokens consumed today */ + private tokensConsumedToday: number = 0; + + /** Date string (YYYY-MM-DD) for the current budget period */ + private currentBudgetDay: string; + + /** Timestamp of the last observation processing */ + private lastObservationTimestamp: number = 0; + + /** Number of observations skipped due to budget exhaustion (for logging) */ + private skippedDueToBudget: number = 0; + + /** Number of observations skipped due to throttling (for logging) */ + private skippedDueToThrottle: number = 0; + + private constructor() { + this.currentBudgetDay = this.getTodayString(); + } + + static getInstance(): ObserverBudgetTracker { + if (!ObserverBudgetTracker.instance) { + ObserverBudgetTracker.instance = new ObserverBudgetTracker(); + } + return ObserverBudgetTracker.instance; + } + + /** + * Reset the singleton (useful for testing). + */ + static resetInstance(): void { + ObserverBudgetTracker.instance = null; + } + + /** + * Check whether an observation should be processed, enforcing both + * the daily token budget and the throttle interval. + * + * Returns true if the observation is allowed, false if it should be skipped. + */ + canProcessObservation(): boolean { + this.maybeResetDailyBudget(); + + const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH); + const maxTokensPerDay = parseInt(settings.CLAUDE_MEM_OBSERVER_MAX_TOKENS_PER_DAY, 10) || 100_000; + const throttleMs = parseInt(settings.CLAUDE_MEM_OBSERVER_THROTTLE_MS, 10) || 5000; + + // Check throttle + const now = Date.now(); + const timeSinceLastObservation = now - this.lastObservationTimestamp; + if (this.lastObservationTimestamp > 0 && timeSinceLastObservation < throttleMs) { + this.skippedDueToThrottle++; + if (this.skippedDueToThrottle % 50 === 1) { + logger.debug('OBSERVER', 'Observation throttled', { + timeSinceLastMs: timeSinceLastObservation, + throttleMs, + totalSkippedThrottle: this.skippedDueToThrottle, + }); + } + return false; + } + + // Check budget + if (this.tokensConsumedToday >= maxTokensPerDay) { + this.skippedDueToBudget++; + if (this.skippedDueToBudget === 1 || this.skippedDueToBudget % 100 === 0) { + logger.warn('OBSERVER', 'Daily token budget exceeded, skipping observation', { + tokensConsumedToday: this.tokensConsumedToday, + maxTokensPerDay, + skippedCount: this.skippedDueToBudget, + budgetDay: this.currentBudgetDay, + }); + } + return false; + } + + return true; + } + + /** + * Record that an observation was processed and how many tokens it consumed. + * Call this after the observation has been successfully processed. + */ + recordTokensUsed(tokenCount: number): void { + this.maybeResetDailyBudget(); + this.tokensConsumedToday += tokenCount; + this.lastObservationTimestamp = Date.now(); + + logger.debug('OBSERVER', 'Token usage recorded', { + tokensUsed: tokenCount, + tokensConsumedToday: this.tokensConsumedToday, + budgetDay: this.currentBudgetDay, + }); + } + + /** + * Mark that an observation was processed (updates the throttle timestamp) + * even when no token count is available yet (e.g. for queuing). + */ + markObservationProcessed(): void { + this.lastObservationTimestamp = Date.now(); + } + + /** + * Get current budget status for health/status endpoints. + */ + getBudgetStatus(): { + tokensConsumedToday: number; + maxTokensPerDay: number; + budgetDay: string; + skippedDueToBudget: number; + skippedDueToThrottle: number; + budgetExhausted: boolean; + } { + this.maybeResetDailyBudget(); + const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH); + const maxTokensPerDay = parseInt(settings.CLAUDE_MEM_OBSERVER_MAX_TOKENS_PER_DAY, 10) || 100_000; + + return { + tokensConsumedToday: this.tokensConsumedToday, + maxTokensPerDay, + budgetDay: this.currentBudgetDay, + skippedDueToBudget: this.skippedDueToBudget, + skippedDueToThrottle: this.skippedDueToThrottle, + budgetExhausted: this.tokensConsumedToday >= maxTokensPerDay, + }; + } + + /** + * Reset daily budget if the day has changed (midnight rollover). + */ + private maybeResetDailyBudget(): void { + const today = this.getTodayString(); + if (today !== this.currentBudgetDay) { + logger.info('OBSERVER', 'Daily token budget reset', { + previousDay: this.currentBudgetDay, + previousTokens: this.tokensConsumedToday, + previousSkippedBudget: this.skippedDueToBudget, + previousSkippedThrottle: this.skippedDueToThrottle, + }); + this.currentBudgetDay = today; + this.tokensConsumedToday = 0; + this.skippedDueToBudget = 0; + this.skippedDueToThrottle = 0; + } + } + + private getTodayString(): string { + return new Date().toISOString().slice(0, 10); + } +} diff --git a/src/services/server/Server.ts b/src/services/server/Server.ts index 528e19d7d..7fde6efae 100644 --- a/src/services/server/Server.ts +++ b/src/services/server/Server.ts @@ -20,6 +20,7 @@ import { errorHandler, notFoundHandler } from './ErrorHandler.js'; import { getSupervisor } from '../../supervisor/index.js'; import { isPidAlive } from '../../supervisor/process-registry.js'; import { ENV_PREFIXES, ENV_EXACT_MATCHES } from '../../supervisor/env-sanitizer.js'; +import { ObserverBudgetTracker } from '../observer/ObserverBudgetTracker.js'; // Build-time injected version constant (set by esbuild define) declare const __DEFAULT_PACKAGE_VERSION__: string; @@ -175,6 +176,7 @@ export class Server { initialized: this.options.getInitializationComplete(), mcpReady: this.options.getMcpReady(), ai: this.options.getAiStatus(), + observerBudget: ObserverBudgetTracker.getInstance().getBudgetStatus(), }); }); diff --git a/src/services/transcripts/cleanup.ts b/src/services/transcripts/cleanup.ts new file mode 100644 index 000000000..55fc36983 --- /dev/null +++ b/src/services/transcripts/cleanup.ts @@ -0,0 +1,214 @@ +/** + * JSONL File Cleanup Service + * + * Addresses Bug #1937: JSONL files under ~/.claude/projects/ accumulate indefinitely + * after their content has been extracted into SQLite. This module provides: + * + * 1. Marking JSONL files as "processed" once fully read (offset === file size) + * 2. Age-based cleanup: delete processed files older than 7 days + * 3. Size-based cleanup: if total JSONL size exceeds 1GB, delete oldest processed files first + * 4. Periodic execution via setInterval (every hour) + */ + +import { existsSync, statSync, unlinkSync } from 'fs'; +import { extname } from 'path'; +import { logger } from '../../utils/logger.js'; +import type { TranscriptWatchState } from './state.js'; +import { saveWatchState } from './state.js'; + +const SEVEN_DAYS_MS = 7 * 24 * 60 * 60 * 1000; +const ONE_GB_BYTES = 1024 * 1024 * 1024; +const CLEANUP_INTERVAL_MS = 60 * 60 * 1000; // 1 hour + +export interface JsonlCleanupOptions { + /** Maximum age in milliseconds for processed files before deletion. Default: 7 days */ + maxAgeMs?: number; + /** Maximum total JSONL size in bytes before oldest processed files are deleted. Default: 1GB */ + maxTotalSizeBytes?: number; +} + +interface JsonlFileInfo { + filePath: string; + sizeBytes: number; + modifiedAtMs: number; + isProcessed: boolean; +} + +/** + * Determine if a JSONL file has been fully processed. + * A file is "processed" when the watcher's tracked offset equals or exceeds the file size, + * meaning all content has been read and sent to SQLite. + */ +export function isFileFullyProcessed(filePath: string, state: TranscriptWatchState): boolean { + const trackedOffset = state.offsets[filePath]; + if (trackedOffset === undefined) return false; + + try { + const fileSize = statSync(filePath).size; + return trackedOffset >= fileSize; + } catch { + // File may have been deleted already + return false; + } +} + +/** + * Gather info about all JSONL files tracked in the watcher state. + */ +function gatherJsonlFileInfo(state: TranscriptWatchState): JsonlFileInfo[] { + const files: JsonlFileInfo[] = []; + + for (const filePath of Object.keys(state.offsets)) { + if (extname(filePath) !== '.jsonl') continue; + if (!existsSync(filePath)) continue; + + try { + const stat = statSync(filePath); + files.push({ + filePath, + sizeBytes: stat.size, + modifiedAtMs: stat.mtimeMs, + isProcessed: (state.offsets[filePath] ?? 0) >= stat.size, + }); + } catch { + // Skip files we can't stat + } + } + + return files; +} + +/** + * Run a single cleanup pass: delete processed JSONL files that are either too old + * or that push the total JSONL footprint over the size cap. + * + * Returns the number of files deleted. + */ +export function runJsonlCleanup( + state: TranscriptWatchState, + statePath: string, + options: JsonlCleanupOptions = {} +): number { + const maxAgeMs = options.maxAgeMs ?? SEVEN_DAYS_MS; + const maxTotalSizeBytes = options.maxTotalSizeBytes ?? ONE_GB_BYTES; + const now = Date.now(); + + const allFiles = gatherJsonlFileInfo(state); + const processedFiles = allFiles.filter(f => f.isProcessed); + let totalSizeBytes = allFiles.reduce((sum, f) => sum + f.sizeBytes, 0); + + let deletedCount = 0; + const deletedPaths: string[] = []; + + // Phase 1: Age-based cleanup — delete processed files older than maxAgeMs + for (const file of processedFiles) { + const ageMs = now - file.modifiedAtMs; + if (ageMs > maxAgeMs) { + if (deleteJsonlFile(file.filePath, state)) { + totalSizeBytes -= file.sizeBytes; + deletedCount++; + deletedPaths.push(file.filePath); + } + } + } + + // Phase 2: Size-based cleanup — if still over cap, delete oldest processed files first + if (totalSizeBytes > maxTotalSizeBytes) { + // Re-gather after phase 1 deletions + const remainingProcessed = processedFiles + .filter(f => !deletedPaths.includes(f.filePath)) + .sort((a, b) => a.modifiedAtMs - b.modifiedAtMs); // oldest first + + for (const file of remainingProcessed) { + if (totalSizeBytes <= maxTotalSizeBytes) break; + + if (deleteJsonlFile(file.filePath, state)) { + totalSizeBytes -= file.sizeBytes; + deletedCount++; + deletedPaths.push(file.filePath); + } + } + } + + // Persist updated state (offsets for deleted files are removed) + if (deletedCount > 0) { + saveWatchState(statePath, state); + logger.info('TRANSCRIPT', `JSONL cleanup: deleted ${deletedCount} processed files`, { + deletedCount, + remainingTotalSizeMB: Math.round(totalSizeBytes / (1024 * 1024)), + }); + } + + return deletedCount; +} + +/** + * Delete a single JSONL file and remove its offset tracking from state. + * Returns true if deletion succeeded. + */ +function deleteJsonlFile(filePath: string, state: TranscriptWatchState): boolean { + try { + unlinkSync(filePath); + delete state.offsets[filePath]; + logger.debug('TRANSCRIPT', 'Deleted processed JSONL file', { filePath }); + return true; + } catch (error) { + logger.warn('TRANSCRIPT', 'Failed to delete JSONL file', { + filePath, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } +} + +/** + * Also clean up stale offset entries for files that no longer exist on disk. + * This prevents the state file from growing indefinitely with references to deleted files. + */ +export function cleanStaleOffsets(state: TranscriptWatchState, statePath: string): number { + let cleanedCount = 0; + for (const filePath of Object.keys(state.offsets)) { + if (!existsSync(filePath)) { + delete state.offsets[filePath]; + cleanedCount++; + } + } + + if (cleanedCount > 0) { + saveWatchState(statePath, state); + logger.debug('TRANSCRIPT', `Cleaned ${cleanedCount} stale offset entries from watch state`); + } + + return cleanedCount; +} + +/** + * Start periodic JSONL cleanup. Returns a stop function. + */ +export function startPeriodicJsonlCleanup( + state: TranscriptWatchState, + statePath: string, + options: JsonlCleanupOptions = {} +): () => void { + // Run immediately on startup + try { + cleanStaleOffsets(state, statePath); + runJsonlCleanup(state, statePath, options); + } catch (error) { + logger.error('TRANSCRIPT', 'Initial JSONL cleanup failed', {}, error as Error); + } + + // Then run every hour + const intervalId = setInterval(() => { + try { + cleanStaleOffsets(state, statePath); + runJsonlCleanup(state, statePath, options); + } catch (error) { + logger.error('TRANSCRIPT', 'Periodic JSONL cleanup failed', {}, error as Error); + } + }, CLEANUP_INTERVAL_MS); + + return () => { + clearInterval(intervalId); + }; +} diff --git a/src/services/transcripts/watcher.ts b/src/services/transcripts/watcher.ts index 36d664fec..1b491ff9c 100644 --- a/src/services/transcripts/watcher.ts +++ b/src/services/transcripts/watcher.ts @@ -6,6 +6,7 @@ import { expandHomePath } from './config.js'; import { loadWatchState, saveWatchState, type TranscriptWatchState } from './state.js'; import type { TranscriptWatchConfig, TranscriptSchema, WatchTarget } from './types.js'; import { TranscriptEventProcessor } from './processor.js'; +import { startPeriodicJsonlCleanup } from './cleanup.js'; interface TailState { offset: number; @@ -84,6 +85,7 @@ export class TranscriptWatcher { private tailers = new Map(); private state: TranscriptWatchState; private rescanTimers: Array = []; + private stopJsonlCleanup: (() => void) | null = null; constructor(private config: TranscriptWatchConfig, private statePath: string) { this.state = loadWatchState(statePath); @@ -93,9 +95,21 @@ export class TranscriptWatcher { for (const watch of this.config.watches) { await this.setupWatch(watch); } + + // Start periodic JSONL cleanup (Issue #1937) + // Runs immediately then every hour: deletes processed files older than 7 days, + // and enforces a 1GB total size cap on JSONL files. + this.stopJsonlCleanup = startPeriodicJsonlCleanup(this.state, this.statePath); + logger.info('TRANSCRIPT', 'Started periodic JSONL cleanup (hourly, 7-day retention, 1GB cap)'); } stop(): void { + // Stop JSONL cleanup timer + if (this.stopJsonlCleanup) { + this.stopJsonlCleanup(); + this.stopJsonlCleanup = null; + } + for (const tailer of this.tailers.values()) { tailer.close(); } diff --git a/src/services/worker/agents/ResponseProcessor.ts b/src/services/worker/agents/ResponseProcessor.ts index d7f446c3b..a6ba703c2 100644 --- a/src/services/worker/agents/ResponseProcessor.ts +++ b/src/services/worker/agents/ResponseProcessor.ts @@ -24,6 +24,7 @@ import type { SessionManager } from '../SessionManager.js'; import type { WorkerRef, StorageResult } from './types.js'; import { broadcastObservation, broadcastSummary } from './ObservationBroadcaster.js'; import { cleanupProcessedMessages } from './SessionCleanupHelper.js'; +import { ObserverBudgetTracker } from '../../observer/ObserverBudgetTracker.js'; /** * Process agent response text (parse XML, save to database, sync to Chroma, broadcast SSE) @@ -130,6 +131,11 @@ export async function processAgentResponse( // to the Stop hook for silent-summary-loss detection (#1633) session.lastSummaryStored = result.summaryId !== null; + // Record token usage against the daily budget (Issue #1938) + if (discoveryTokens > 0) { + ObserverBudgetTracker.getInstance().recordTokensUsed(discoveryTokens); + } + // CLAIM-CONFIRM: Now that storage succeeded, confirm all processing messages (delete from queue) // This is the critical step that prevents message loss on generator crash const pendingStore = sessionManager.getPendingMessageStore(); diff --git a/src/services/worker/http/routes/SessionRoutes.ts b/src/services/worker/http/routes/SessionRoutes.ts index 21ffbc339..143f29304 100644 --- a/src/services/worker/http/routes/SessionRoutes.ts +++ b/src/services/worker/http/routes/SessionRoutes.ts @@ -24,6 +24,7 @@ import { USER_SETTINGS_PATH } from '../../../../shared/paths.js'; import { getProcessBySession, ensureProcessExit } from '../../ProcessRegistry.js'; import { getProjectContext } from '../../../../utils/project-name.js'; import { normalizePlatformSource } from '../../../../shared/platform-source.js'; +import { ObserverBudgetTracker } from '../../../observer/ObserverBudgetTracker.js'; export class SessionRoutes extends BaseRouteHandler { private completionHandler: SessionCompletionHandler; @@ -455,6 +456,13 @@ export class SessionRoutes extends BaseRouteHandler { const sessionDbId = this.parseIntParam(req, res, 'sessionDbId'); if (sessionDbId === null) return; + // Budget & throttle gate (Issue #1938) + const budgetTracker = ObserverBudgetTracker.getInstance(); + if (!budgetTracker.canProcessObservation()) { + res.json({ status: 'skipped', reason: 'budget_or_throttle' }); + return; + } + const { tool_name, tool_input, tool_response, prompt_number, cwd } = req.body; this.sessionManager.queueObservation(sessionDbId, { @@ -465,6 +473,8 @@ export class SessionRoutes extends BaseRouteHandler { cwd }); + budgetTracker.markObservationProcessed(); + // CRITICAL: Ensure SDK agent is running to consume the queue this.ensureGeneratorRunning(sessionDbId, 'observation'); @@ -561,6 +571,13 @@ export class SessionRoutes extends BaseRouteHandler { return this.badRequest(res, 'Missing contentSessionId'); } + // Budget & throttle gate (Issue #1938) + const budgetTracker = ObserverBudgetTracker.getInstance(); + if (!budgetTracker.canProcessObservation()) { + res.json({ status: 'skipped', reason: 'budget_or_throttle' }); + return; + } + // Load skip tools from settings const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH); const skipTools = new Set(settings.CLAUDE_MEM_SKIP_TOOLS.split(',').map(t => t.trim()).filter(Boolean)); @@ -631,6 +648,8 @@ export class SessionRoutes extends BaseRouteHandler { })() }); + budgetTracker.markObservationProcessed(); + // Ensure SDK agent is running this.ensureGeneratorRunning(sessionDbId, 'observation'); diff --git a/src/shared/SettingsDefaultsManager.ts b/src/shared/SettingsDefaultsManager.ts index 99cf40131..7e9e4837b 100644 --- a/src/shared/SettingsDefaultsManager.ts +++ b/src/shared/SettingsDefaultsManager.ts @@ -76,6 +76,9 @@ export interface SettingsDefaults { CLAUDE_MEM_CHROMA_API_KEY: string; CLAUDE_MEM_CHROMA_TENANT: string; CLAUDE_MEM_CHROMA_DATABASE: string; + // Observer Budget & Throttling (Issue #1938) + CLAUDE_MEM_OBSERVER_MAX_TOKENS_PER_DAY: string; // Max tokens observer can consume per day (default: 100000) + CLAUDE_MEM_OBSERVER_THROTTLE_MS: string; // Min milliseconds between observer runs (default: 5000) } export class SettingsDefaultsManager { @@ -147,6 +150,9 @@ export class SettingsDefaultsManager { CLAUDE_MEM_CHROMA_API_KEY: '', CLAUDE_MEM_CHROMA_TENANT: 'default_tenant', CLAUDE_MEM_CHROMA_DATABASE: 'default_database', + // Observer Budget & Throttling (Issue #1938) + CLAUDE_MEM_OBSERVER_MAX_TOKENS_PER_DAY: '100000', // 100k tokens/day default cap + CLAUDE_MEM_OBSERVER_THROTTLE_MS: '5000', // 5 seconds between observer runs }; /** diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 021ccdc50..193c4c0e9 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -15,7 +15,7 @@ export enum LogLevel { SILENT = 4 } -export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_MCP' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE'; +export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_MCP' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE' | 'TRANSCRIPT' | 'OBSERVER'; interface LogContext { sessionId?: number;