diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bcc2159dd..f66ba11c6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -477,6 +477,35 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # --------------------------------------------------------------------------- + # E2E Memory — memory leak regression test (pipeline_sync + time_limit) + # --------------------------------------------------------------------------- + e2e_memory: + name: E2E Memory Leak + runs-on: ubuntu-24.04-arm + + steps: + - uses: actions/checkout@v5 + + - name: Install pnpm + uses: pnpm/action-setup@v5 + + - name: Set up Node + uses: actions/setup-node@v6 + with: + node-version-file: ./.nvmrc + cache: pnpm + + - name: Install dependencies & build + run: pnpm install --frozen-lockfile && pnpm build + + - name: Memory leak regression test + run: | + pnpm --filter @stripe/sync-e2e exec vitest run \ + memory-leak-harness.test.ts \ + memory-leak.test.ts + timeout-minutes: 10 + # --------------------------------------------------------------------------- # E2E Stripe — Stripe API + Temporal integration tests (runs on every push/PR) # --------------------------------------------------------------------------- @@ -547,7 +576,9 @@ jobs: --exclude 'test-server-sync.test.ts' \ --exclude 'test-sync-e2e.test.ts' \ --exclude 'test-sync-engine.test.ts' \ - --exclude 'test-e2e-network.test.ts' # ↑ run in e2e_test_server job + --exclude 'test-e2e-network.test.ts' \ + --exclude 'memory-leak-harness.test.ts' \ + --exclude 'memory-leak.test.ts' # ↑ run in dedicated jobs env: STRIPE_API_KEY: ${{ secrets.STRIPE_API_KEY }} POSTGRES_URL: 'postgres://postgres:postgres@localhost:55432/postgres' diff --git a/apps/engine/src/lib/ndjson.ts b/apps/engine/src/lib/ndjson.ts index 05e5b9404..c2015fbd7 100644 --- a/apps/engine/src/lib/ndjson.ts +++ b/apps/engine/src/lib/ndjson.ts @@ -11,17 +11,23 @@ export async function* parseNdjson(text: string): AsyncIterable /** Serialize an AsyncIterable as a streaming NDJSON ReadableStream. */ export function toNdjsonStream(iter: AsyncIterable): ReadableStream { const enc = new TextEncoder() + const iterator = iter[Symbol.asyncIterator]() return new ReadableStream({ async start(controller) { try { - for await (const item of iter) { - controller.enqueue(enc.encode(JSON.stringify(item) + '\n')) + while (true) { + const result = await iterator.next() + if (result.done) break + controller.enqueue(enc.encode(JSON.stringify(result.value) + '\n')) } controller.close() } catch (err) { controller.error(err) } }, + cancel() { + iterator.return?.() + }, }) } diff --git a/apps/engine/src/lib/remote-engine.ts b/apps/engine/src/lib/remote-engine.ts index 5b18a5810..94cbe44f4 100644 --- a/apps/engine/src/lib/remote-engine.ts +++ b/apps/engine/src/lib/remote-engine.ts @@ -139,22 +139,38 @@ export function createRemoteEngine(engineUrl: string): Engine { params: { header: { 'x-source': JSON.stringify(source) } }, }) if (!response.ok) throw new Error(`source_discover failed: ${response.status}`) - yield* parseNdjsonStream(response.body!) + try { + yield* parseNdjsonStream(response.body!) + } finally { + await response.body?.cancel().catch(() => {}) + } }, async *pipeline_check(pipeline: PipelineConfig): AsyncIterable { const res = await post('/pipeline_check', pipeline) - yield* parseNdjsonStream(res.body!) + try { + yield* parseNdjsonStream(res.body!) + } finally { + await res.body?.cancel().catch(() => {}) + } }, async *pipeline_setup(pipeline: PipelineConfig): AsyncIterable { const res = await post('/pipeline_setup', pipeline) - yield* parseNdjsonStream(res.body!) + try { + yield* parseNdjsonStream(res.body!) + } finally { + await res.body?.cancel().catch(() => {}) + } }, async *pipeline_teardown(pipeline: PipelineConfig): AsyncIterable { const res = await post('/pipeline_teardown', pipeline) - yield* parseNdjsonStream(res.body!) + try { + yield* parseNdjsonStream(res.body!) + } finally { + await res.body?.cancel().catch(() => {}) + } }, async *pipeline_read( @@ -164,7 +180,11 @@ export function createRemoteEngine(engineUrl: string): Engine { ): AsyncIterable { const body = input ? toNdjsonStream(input) : undefined const res = await post('/pipeline_read', pipeline, opts, body) - yield* parseNdjsonStream(res.body!) + try { + yield* parseNdjsonStream(res.body!) + } finally { + await res.body?.cancel().catch(() => {}) + } }, async *pipeline_write( @@ -172,7 +192,11 @@ export function createRemoteEngine(engineUrl: string): Engine { messages: AsyncIterable ): AsyncIterable { const res = await post('/pipeline_write', pipeline, undefined, toNdjsonStream(messages)) - yield* parseNdjsonStream(res.body!) + try { + yield* parseNdjsonStream(res.body!) + } finally { + await res.body?.cancel().catch(() => {}) + } }, async *pipeline_sync( @@ -182,7 +206,11 @@ export function createRemoteEngine(engineUrl: string): Engine { ): AsyncIterable { const body = input ? toNdjsonStream(input) : undefined const res = await post('/pipeline_sync', pipeline, opts, body) - yield* parseNdjsonStream(res.body!) + try { + yield* parseNdjsonStream(res.body!) + } finally { + await res.body?.cancel().catch(() => {}) + } }, } } diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts index 993c1ac06..b246c544f 100644 --- a/apps/service/src/temporal/activities/_shared.ts +++ b/apps/service/src/temporal/activities/_shared.ts @@ -62,14 +62,12 @@ export async function drainMessages( ): Promise<{ errors: RunResult['errors'] state: SourceState - records: Message[] sourceConfig?: Record destConfig?: Record eof?: EofPayload }> { const errors: RunResult['errors'] = [] let state: SourceState = initialState ?? { streams: {}, global: {} } - const records: Message[] = [] let sourceConfig: Record | undefined let destConfig: Record | undefined let eof: EofPayload | undefined @@ -91,13 +89,11 @@ export async function drainMessages( errors.push(error) } else if (message.type === 'source_state') { state = mergeStateMessage(state, message) - } else if (message.type === 'record') { - records.push(message) } } if (count % 50 === 0) heartbeat({ messages: count }) } if (count % 50 !== 0) heartbeat({ messages: count }) - return { errors, state, records, sourceConfig, destConfig, eof } + return { errors, state, sourceConfig, destConfig, eof } } diff --git a/e2e/memory-leak-harness.test.ts b/e2e/memory-leak-harness.test.ts new file mode 100644 index 000000000..eb7042643 --- /dev/null +++ b/e2e/memory-leak-harness.test.ts @@ -0,0 +1,144 @@ +import { afterEach, describe, expect, it } from 'vitest' +import { spawn, type ChildProcess } from 'node:child_process' +import { + drainNdjsonResponse, + formatMemoryLeakSummary, + hasTimeLimitEof, + runMemoryLeakDetector, + type MemoryLeakSettings, +} from './memory-leak-harness.js' + +const DETECTOR_SETTINGS: MemoryLeakSettings = { + warmupIterations: 6, + testIterations: 12, + settleMs: 50, + slopeThresholdKb: 3000, + growthThresholdMb: 300, +} + +const SYNTHETIC_LEAK_BYTES = 8 * 1024 * 1024 + +const children = new Set() + +function spawnSyntheticServer(leakBytesPerRequest: number): Promise<{ proc: ChildProcess; baseUrl: string }> { + return new Promise((resolve, reject) => { + const code = [ + 'import http from "node:http";', + 'const retained = [];', + `const leakBytesPerRequest = ${leakBytesPerRequest};`, + 'const server = http.createServer((req, res) => {', + ' if (req.url === "/health") {', + ' res.writeHead(200, { "content-type": "application/json" });', + ' res.end(JSON.stringify({ ok: true }));', + ' return;', + ' }', + ' if (req.url?.startsWith("/pipeline_setup")) {', + ' res.writeHead(200, { "content-type": "application/x-ndjson" });', + ' res.end(JSON.stringify({ type: "control" }) + "\\n");', + ' return;', + ' }', + ' if (req.url?.startsWith("/pipeline_sync")) {', + ' if (leakBytesPerRequest > 0) retained.push(Buffer.alloc(leakBytesPerRequest, 1));', + ' res.writeHead(200, { "content-type": "application/x-ndjson" });', + ' res.end(JSON.stringify({ type: "eof", eof: { reason: "time_limit" } }) + "\\n");', + ' return;', + ' }', + ' res.writeHead(404);', + ' res.end("not found");', + '});', + 'server.listen(0, "127.0.0.1", () => {', + ' const addr = server.address();', + ' console.log(`READY:${addr.port}`);', + '});', + ].join('\n') + + const proc = spawn('node', ['--input-type=module', '-e', code], { + stdio: ['ignore', 'pipe', 'pipe'], + }) as ChildProcess + children.add(proc) + + let output = '' + const timeout = setTimeout(() => { + proc.kill('SIGKILL') + reject(new Error(`Synthetic server did not start within 5s\noutput: ${output}`)) + }, 5_000) + + proc.stdout!.on('data', (chunk: Buffer) => { + output += chunk.toString() + const match = output.match(/READY:(\d+)/) + if (!match) return + clearTimeout(timeout) + resolve({ proc, baseUrl: `http://127.0.0.1:${match[1]}` }) + }) + + proc.stderr!.on('data', (chunk: Buffer) => { + output += chunk.toString() + }) + + proc.on('error', (err: Error) => { + clearTimeout(timeout) + reject(err) + }) + + proc.on('exit', (code: number | null) => { + clearTimeout(timeout) + if (!output.includes('READY:')) { + reject(new Error(`Synthetic server exited with code ${code}\noutput: ${output}`)) + } + }) + }) +} + +async function runSyntheticScenario(leakBytesPerRequest: number) { + const { proc, baseUrl } = await spawnSyntheticServer(leakBytesPerRequest) + + const setupRes = await fetch(`${baseUrl}/pipeline_setup`, { method: 'POST' }) + expect(setupRes.ok).toBe(true) + await drainNdjsonResponse(setupRes) + + const result = await runMemoryLeakDetector({ + pid: proc.pid!, + settings: DETECTOR_SETTINGS, + iterate: async () => { + const res = await fetch(`${baseUrl}/pipeline_sync?time_limit=0.1`, { method: 'POST' }) + expect(res.ok).toBe(true) + const messages = await drainNdjsonResponse(res) + return { sawTimeLimit: hasTimeLimitEof(messages) } + }, + }) + + return result +} + +afterEach(async () => { + for (const child of children) { + if (child.pid) child.kill('SIGKILL') + await new Promise((resolve) => { + child.once('exit', () => resolve()) + setTimeout(resolve, 500) + }) + children.delete(child) + } +}) + +describe('memory leak harness', { timeout: 120_000 }, () => { + it('does not flag a stable synthetic process', async () => { + const result = await runSyntheticScenario(0) + + console.log(formatMemoryLeakSummary(result)) + + expect(result.timeLimitCount).toBe(result.totalIterations) + expect(result.passesThresholds).toBe(true) + expect(result.slopeKbPerIteration).toBeLessThan(DETECTOR_SETTINGS.slopeThresholdKb) + }) + + it('flags an intentionally leaky synthetic process', async () => { + const result = await runSyntheticScenario(SYNTHETIC_LEAK_BYTES) + + console.log(formatMemoryLeakSummary(result)) + + expect(result.timeLimitCount).toBe(result.totalIterations) + expect(result.passesThresholds).toBe(false) + expect(result.slopeKbPerIteration).toBeGreaterThan(DETECTOR_SETTINGS.slopeThresholdKb) + }) +}) diff --git a/e2e/memory-leak-harness.ts b/e2e/memory-leak-harness.ts new file mode 100644 index 000000000..3ba691ed4 --- /dev/null +++ b/e2e/memory-leak-harness.ts @@ -0,0 +1,159 @@ +import { execSync } from 'node:child_process' + +export type MemoryLeakSettings = { + warmupIterations: number + testIterations: number + settleMs: number + slopeThresholdKb: number + growthThresholdMb: number +} + +export type MemoryLeakIterationResult = { + sawTimeLimit: boolean +} + +export type MemoryLeakResult = { + settings: MemoryLeakSettings + totalIterations: number + timeLimitCount: number + rssSamplesByIterationKb: Array + postWarmupSamplesKb: number[] + slopeKbPerIteration: number + totalGrowthMb: number + passesThresholds: boolean +} + +const decoder = new TextDecoder() + +export function getRssKb(pid: number): number | null { + try { + const raw = execSync(`ps -o rss= -p ${pid}`, { encoding: 'utf8' }).trim() + const kb = parseInt(raw, 10) + return Number.isFinite(kb) ? kb : null + } catch { + return null + } +} + +/** Least-squares slope: KB growth per iteration. */ +export function linearRegressionSlope(ys: number[]): number { + const n = ys.length + if (n < 2) return 0 + let sumX = 0, + sumY = 0, + sumXY = 0, + sumXX = 0 + for (let i = 0; i < n; i++) { + sumX += i + sumY += ys[i] + sumXY += i * ys[i] + sumXX += i * i + } + return (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX) +} + +export async function drainNdjsonResponse(res: Response): Promise { + const reader = res.body?.getReader() + if (!reader) return [] + + let buffer = '' + while (true) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + } + buffer += decoder.decode() + + const messages: unknown[] = [] + for (const line of buffer.split('\n')) { + const trimmed = line.trim() + if (!trimmed) continue + messages.push(JSON.parse(trimmed)) + } + return messages +} + +export function hasTimeLimitEof(messages: unknown[]): boolean { + return messages.some((message) => { + if (!message || typeof message !== 'object') return false + if (!('type' in message) || message.type !== 'eof') return false + if (!('eof' in message) || !message.eof || typeof message.eof !== 'object') return false + return 'reason' in message.eof && message.eof.reason === 'time_limit' + }) +} + +export async function runMemoryLeakDetector(opts: { + pid: number + settings: MemoryLeakSettings + iterate: (iteration: number) => Promise +}): Promise { + const { pid, settings, iterate } = opts + const totalIterations = settings.warmupIterations + settings.testIterations + const rssSamplesByIterationKb: Array = [] + let timeLimitCount = 0 + + for (let i = 0; i < totalIterations; i++) { + const { sawTimeLimit } = await iterate(i) + if (sawTimeLimit) timeLimitCount++ + + await new Promise((resolve) => setTimeout(resolve, settings.settleMs)) + + rssSamplesByIterationKb.push(getRssKb(pid)) + } + + const postWarmupSamplesKb = rssSamplesByIterationKb + .slice(settings.warmupIterations) + .filter((value): value is number => value !== null) + + const slopeKbPerIteration = linearRegressionSlope(postWarmupSamplesKb) + const totalGrowthMb = + postWarmupSamplesKb.length >= 2 + ? (postWarmupSamplesKb[postWarmupSamplesKb.length - 1] - postWarmupSamplesKb[0]) / 1024 + : 0 + + return { + settings, + totalIterations, + timeLimitCount, + rssSamplesByIterationKb, + postWarmupSamplesKb, + slopeKbPerIteration, + totalGrowthMb, + passesThresholds: + slopeKbPerIteration < settings.slopeThresholdKb && + totalGrowthMb < settings.growthThresholdMb, + } +} + +export function formatRssSamplesTable(result: MemoryLeakResult): string { + const lines = [' RSS samples (MB):', ' iter │ RSS (MB) │ delta', ' ──────┼────────────┼────────'] + + for (let i = 0; i < result.rssSamplesByIterationKb.length; i++) { + const current = result.rssSamplesByIterationKb[i] + const previous = i > 0 ? result.rssSamplesByIterationKb[i - 1] : null + const mb = current === null ? ' n/a' : (current / 1024).toFixed(1).padStart(8) + const delta = + current === null || previous === null + ? ' n/a' + : (((current - previous) / 1024).toFixed(1)).padStart(6) + const marker = i + 1 === result.settings.warmupIterations + 1 ? ' ← warmup end' : '' + lines.push(` ${String(i + 1).padStart(4)} │ ${mb} │ ${delta}${marker}`) + } + + return lines.join('\n') +} + +export function formatMemoryLeakSummary(result: MemoryLeakResult): string { + const baseline = result.postWarmupSamplesKb[0] + const final = result.postWarmupSamplesKb[result.postWarmupSamplesKb.length - 1] + + return [ + ` Canary: ${result.timeLimitCount}/${result.totalIterations} windows ended by time_limit (${((result.timeLimitCount / result.totalIterations) * 100).toFixed(0)}%)`, + '', + ' Post-warmup analysis:', + ` Baseline RSS: ${baseline === undefined ? 'n/a' : (baseline / 1024).toFixed(1) + ' MB'}`, + ` Final RSS: ${final === undefined ? 'n/a' : (final / 1024).toFixed(1) + ' MB'}`, + ` Total growth: ${result.totalGrowthMb.toFixed(1)} MB`, + ` Slope: ${result.slopeKbPerIteration.toFixed(1)} KB/iteration`, + ].join('\n') +} diff --git a/e2e/memory-leak.test.ts b/e2e/memory-leak.test.ts new file mode 100644 index 000000000..ef4086015 --- /dev/null +++ b/e2e/memory-leak.test.ts @@ -0,0 +1,251 @@ +import { spawn, type ChildProcess } from 'node:child_process' +import path from 'node:path' +import pg from 'pg' +import { afterAll, beforeAll, describe, expect, it } from 'vitest' +import { + applyCreatedTimestampRange, + createStripeListServer, + ensureObjectTable, + ensureSchema, + startDockerPostgres18, + upsertObjects, + type DockerPostgres18Handle, + type StripeListServer, +} from '@stripe/sync-test-utils' +import { + BUNDLED_API_VERSION, + generateObjectsFromSchema, + resolveOpenApiSpec, +} from '@stripe/sync-openapi' +import { + drainNdjsonResponse, + formatMemoryLeakSummary, + formatRssSamplesTable, + hasTimeLimitEof, + runMemoryLeakDetector, + type MemoryLeakSettings, +} from './memory-leak-harness.js' + +const REPO_ROOT = path.resolve(import.meta.dirname, '..') +const SOURCE_SCHEMA = 'stripe' +const DEST_SCHEMA = 'leak_test' +const CUSTOMER_COUNT = 5_000 +const SEED_BATCH = 1000 + +const WARMUP_ITERATIONS = 25 +const TEST_ITERATIONS = 50 +// Must be short enough that syncs don't complete before the limit fires. +// 5000 rows ÷ 100/page = 50 pages at ~3ms each = ~150ms total. +// time_limit=0.1s processes ~33 pages, guaranteeing early termination. +// Each response must include an eof with reason=time_limit — verified below. +const TIME_LIMIT_SECONDS = 0.1 + +const RANGE_START = Math.floor(new Date('2021-04-03T00:00:00Z').getTime() / 1000) +const RANGE_END = Math.floor(new Date('2026-04-02T00:00:00Z').getTime() / 1000) + +const DETECTOR_SETTINGS: MemoryLeakSettings = { + warmupIterations: WARMUP_ITERATIONS, + testIterations: TEST_ITERATIONS, + settleMs: 500, + slopeThresholdKb: 5000, + growthThresholdMb: 300, +} + +// ── Engine subprocess management ───────────────────────────────── + +function spawnEngine(port: number): { proc: ChildProcess; ready: Promise } { + // Cast needed: ChildProcessByStdio omits EventEmitter + // methods in Node 24 types, but they exist at runtime. + const proc = spawn('node', ['apps/engine/dist/api/index.js'], { + cwd: REPO_ROOT, + env: { ...process.env, PORT: String(port), NODE_ENV: 'test' }, + stdio: ['ignore', 'pipe', 'pipe'], + }) as ChildProcess + + const ready = new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Engine did not start within 30s')), 30_000) + let output = '' + + // Pino logs to stdout by default + proc.stdout!.on('data', (chunk: Buffer) => { + output += chunk.toString() + if (output.includes('Sync Engine API listening')) { + clearTimeout(timeout) + resolve() + } + }) + + proc.stderr!.on('data', (chunk: Buffer) => { + output += chunk.toString() + }) + + proc.on('error', (err: Error) => { + clearTimeout(timeout) + reject(err) + }) + + proc.on('exit', (code: number | null) => { + clearTimeout(timeout) + reject(new Error(`Engine exited with code ${code} before ready.\noutput: ${output}`)) + }) + }) + + return { proc, ready } +} +// ── Test suite ─────────────────────────────────────────────────── + +describe('memory leak regression', { timeout: 600_000 }, () => { + let sourceDocker: DockerPostgres18Handle + let destDocker: DockerPostgres18Handle + let sourcePool: pg.Pool + let testServer: StripeListServer + let engineProc: ChildProcess + let enginePort: number + let pipelineHeader: string + + beforeAll(async () => { + // 1. Start two Postgres containers (source for test server, dest for sync) + const [src, dst, spec] = await Promise.all([ + startDockerPostgres18(), + startDockerPostgres18(), + resolveOpenApiSpec({ apiVersion: BUNDLED_API_VERSION }, fetch).then((r) => r.spec), + ]) + sourceDocker = src + destDocker = dst + + // 2. Seed source Postgres with customers + sourcePool = new pg.Pool({ connectionString: sourceDocker.connectionString }) + sourcePool.on('error', () => {}) + await ensureSchema(sourcePool, SOURCE_SCHEMA) + await ensureObjectTable(sourcePool, SOURCE_SCHEMA, 'customers') + + const template = generateObjectsFromSchema(spec, 'customer', 1, { + tableName: 'customers', + })[0] + const objects = applyCreatedTimestampRange( + Array.from({ length: CUSTOMER_COUNT }, (_, i) => ({ + ...template, + id: `cus_leak_${String(i).padStart(5, '0')}`, + created: 0, + })), + { startUnix: RANGE_START, endUnix: RANGE_END } + ) + for (let i = 0; i < objects.length; i += SEED_BATCH) { + await upsertObjects(sourcePool, SOURCE_SCHEMA, 'customers', objects.slice(i, i + SEED_BATCH)) + } + console.log(` Seeded ${CUSTOMER_COUNT} customers`) + + // 3. Start custom Stripe list server + testServer = await createStripeListServer({ + postgresUrl: sourceDocker.connectionString, + host: '127.0.0.1', + port: 0, + accountCreated: RANGE_START, + }) + console.log(` Test server: http://127.0.0.1:${testServer.port}`) + + // 4. Spawn engine subprocess + enginePort = 30000 + Math.floor(Math.random() * 10000) + const engine = spawnEngine(enginePort) + engineProc = engine.proc + await engine.ready + console.log(` Engine: http://localhost:${enginePort} (PID ${engineProc.pid})`) + + // 5. Build pipeline config and run setup + const pipeline = { + source: { + type: 'stripe', + stripe: { + api_key: 'sk_test_fake', + api_version: BUNDLED_API_VERSION, + base_url: `http://127.0.0.1:${testServer.port}`, + rate_limit: 1000, + }, + }, + destination: { + type: 'postgres', + postgres: { + connection_string: destDocker.connectionString, + schema: DEST_SCHEMA, + batch_size: 100, + }, + }, + streams: [{ name: 'customers', sync_mode: 'full_refresh' }], + } + pipelineHeader = JSON.stringify(pipeline) + + const setupRes = await fetch(`http://localhost:${enginePort}/pipeline_setup`, { + method: 'POST', + headers: { 'X-Pipeline': pipelineHeader }, + }) + expect(setupRes.ok, `pipeline_setup failed: ${setupRes.status}`).toBe(true) + await drainNdjsonResponse(setupRes) + console.log(` Pipeline setup complete`) + }, 120_000) + + afterAll(async () => { + if (engineProc?.pid) { + engineProc.kill('SIGTERM') + await new Promise((resolve) => { + const timer = setTimeout(() => { + engineProc.kill('SIGKILL') + resolve() + }, 5_000) + engineProc.once('exit', () => { + clearTimeout(timer) + resolve() + }) + }) + } + await testServer?.close().catch(() => {}) + await sourcePool?.end().catch(() => {}) + await destDocker?.stop() + await sourceDocker?.stop() + }) + + it('RSS does not grow unboundedly during repeated time-limited syncs', { timeout: 300_000 }, async () => { + const pid = engineProc.pid! + const result = await runMemoryLeakDetector({ + pid, + settings: DETECTOR_SETTINGS, + iterate: async () => { + const res = await fetch( + `http://localhost:${enginePort}/pipeline_sync?time_limit=${TIME_LIMIT_SECONDS}`, + { method: 'POST', headers: { 'X-Pipeline': pipelineHeader } } + ) + expect(res.ok, `pipeline_sync failed: ${res.status}`).toBe(true) + const messages = await drainNdjsonResponse(res) + return { sawTimeLimit: hasTimeLimitEof(messages) } + }, + }) + + console.log(`\n${formatRssSamplesTable(result)}`) + console.log(`\n${formatMemoryLeakSummary(result)}`) + + // Canary: if time_limit never fires, the leak path is never exercised. + expect( + result.timeLimitCount, + `Only ${result.timeLimitCount}/${result.totalIterations} syncs hit time_limit — ` + + `the test is not exercising the leak path. Reduce TIME_LIMIT_SECONDS or add more data.` + ).toBeGreaterThanOrEqual(result.totalIterations * 0.8) + + expect( + result.postWarmupSamplesKb.length, + 'Not enough post-warmup samples' + ).toBeGreaterThanOrEqual(TEST_ITERATIONS * 0.8) + + // The detector itself is validated separately in memory-leak-harness.test.ts + // against a stable synthetic child and an intentionally leaky one. + // This engine-facing assertion is a broader smoke guard over a noisy + // real pipeline workload on shared CI runners. + expect( + result.slopeKbPerIteration, + `RSS slope ${result.slopeKbPerIteration.toFixed(0)} KB/iter exceeds threshold` + ).toBeLessThan(DETECTOR_SETTINGS.slopeThresholdKb) + + expect( + result.totalGrowthMb, + `Total RSS growth ${result.totalGrowthMb.toFixed(0)} MB exceeds ${DETECTOR_SETTINGS.growthThresholdMb} MB` + ).toBeLessThan(DETECTOR_SETTINGS.growthThresholdMb) + }) +}) diff --git a/packages/protocol/src/async-iterable-utils.ts b/packages/protocol/src/async-iterable-utils.ts index d36037753..3bd7a9faf 100644 --- a/packages/protocol/src/async-iterable-utils.ts +++ b/packages/protocol/src/async-iterable-utils.ts @@ -2,7 +2,7 @@ // Pure primitives — no external deps, no engine-specific imports. /** - * Async push/pull channel. No array buffering — uses linked promise pairs. + * Async push/pull channel with unbounded buffer when push outpaces pull. * * **Error handling:** The channel itself never throws — it is a passive data * structure. Producers call `push()` and `close()`; neither can fail. @@ -12,10 +12,12 @@ export function channel(): AsyncIterable & { push(value: T): void close(): void + onReturn?: () => void } { let resolve: ((result: IteratorResult) => void) | null = null let done = false const pending: T[] = [] // only used when push() is called before next() + let onReturn: (() => void) | undefined const iter: AsyncIterableIterator = { [Symbol.asyncIterator]() { @@ -30,6 +32,17 @@ export function channel(): AsyncIterable & { resolve = r }) }, + return() { + done = true + pending.length = 0 + if (resolve) { + const r = resolve + resolve = null + r({ value: undefined as any, done: true }) + } + onReturn?.() + return Promise.resolve({ value: undefined as any, done: true }) + }, } return Object.assign(iter, { @@ -51,6 +64,9 @@ export function channel(): AsyncIterable & { r({ value: undefined as any, done: true }) } }, + set onReturn(fn: (() => void) | undefined) { + onReturn = fn + }, }) } @@ -87,13 +103,19 @@ export async function* merge( enqueue(i) } - while (pending.size > 0) { - const { index, result } = await Promise.race(pending.values()) - if (result.done) { - pending.delete(index) - } else { - yield result.value - enqueue(index) + try { + while (pending.size > 0) { + const { index, result } = await Promise.race(pending.values()) + if (result.done) { + pending.delete(index) + } else { + yield result.value + enqueue(index) + } + } + } finally { + for (const it of iterators) { + it.return?.() } } } @@ -115,16 +137,30 @@ export function split( iterable: AsyncIterable, predicate: (item: T) => item is U ): [AsyncIterable, AsyncIterable>] { + const sourceIterator = iterable[Symbol.asyncIterator]() const matches = channel() const rest = channel>() + let aborted = false + const abort = () => { + if (aborted) return + aborted = true + matches.close() + rest.close() + sourceIterator.return?.() + } + matches.onReturn = abort + rest.onReturn = abort + ;(async () => { try { - for await (const item of iterable) { - if (predicate(item)) { - matches.push(item) + while (true) { + const result = await sourceIterator.next() + if (result.done) break + if (predicate(result.value)) { + matches.push(result.value) } else { - rest.push(item as Exclude) + rest.push(result.value as Exclude) } } } finally { diff --git a/packages/ts-cli/src/ndjson.ts b/packages/ts-cli/src/ndjson.ts index 2c497a01e..b2d3bd345 100644 --- a/packages/ts-cli/src/ndjson.ts +++ b/packages/ts-cli/src/ndjson.ts @@ -23,12 +23,15 @@ export function ndjsonResponse( onError?: (err: unknown) => T ): Response { const encoder = new TextEncoder() + const iterator = iterable[Symbol.asyncIterator]() const stream = new ReadableStream({ async start(controller) { try { - for await (const item of iterable) { - controller.enqueue(encoder.encode(JSON.stringify(item) + '\n')) + while (true) { + const result = await iterator.next() + if (result.done) break + controller.enqueue(encoder.encode(JSON.stringify(result.value) + '\n')) } } catch (err) { if (onError) { @@ -38,6 +41,9 @@ export function ndjsonResponse( controller.close() } }, + cancel() { + iterator.return?.() + }, }) return new Response(stream, {