Skip to content
33 changes: 32 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,35 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

# ---------------------------------------------------------------------------
# E2E Memory — memory leak regression test (pipeline_sync + time_limit)
# ---------------------------------------------------------------------------
e2e_memory:
name: E2E Memory Leak
runs-on: ubuntu-24.04-arm

steps:
- uses: actions/checkout@v5

- name: Install pnpm
uses: pnpm/action-setup@v5

- name: Set up Node
uses: actions/setup-node@v6
with:
node-version-file: ./.nvmrc
cache: pnpm

- name: Install dependencies & build
run: pnpm install --frozen-lockfile && pnpm build

- name: Memory leak regression test
run: |
pnpm --filter @stripe/sync-e2e exec vitest run \
memory-leak-harness.test.ts \
memory-leak.test.ts
timeout-minutes: 10

# ---------------------------------------------------------------------------
# E2E Stripe — Stripe API + Temporal integration tests (runs on every push/PR)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -547,7 +576,9 @@ jobs:
--exclude 'test-server-sync.test.ts' \
--exclude 'test-sync-e2e.test.ts' \
--exclude 'test-sync-engine.test.ts' \
--exclude 'test-e2e-network.test.ts' # ↑ run in e2e_test_server job
--exclude 'test-e2e-network.test.ts' \
--exclude 'memory-leak-harness.test.ts' \
--exclude 'memory-leak.test.ts' # ↑ run in dedicated jobs
env:
STRIPE_API_KEY: ${{ secrets.STRIPE_API_KEY }}
POSTGRES_URL: 'postgres://postgres:postgres@localhost:55432/postgres'
Expand Down
10 changes: 8 additions & 2 deletions apps/engine/src/lib/ndjson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ export async function* parseNdjson<T = unknown>(text: string): AsyncIterable<T>
/** Serialize an AsyncIterable as a streaming NDJSON ReadableStream. */
export function toNdjsonStream(iter: AsyncIterable<unknown>): ReadableStream<Uint8Array> {
const enc = new TextEncoder()
const iterator = iter[Symbol.asyncIterator]()
return new ReadableStream({
async start(controller) {
try {
for await (const item of iter) {
controller.enqueue(enc.encode(JSON.stringify(item) + '\n'))
while (true) {
const result = await iterator.next()
if (result.done) break
controller.enqueue(enc.encode(JSON.stringify(result.value) + '\n'))
}
controller.close()
} catch (err) {
controller.error(err)
}
},
cancel() {
iterator.return?.()
},
})
}

Expand Down
42 changes: 35 additions & 7 deletions apps/engine/src/lib/remote-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,38 @@ export function createRemoteEngine(engineUrl: string): Engine {
params: { header: { 'x-source': JSON.stringify(source) } },
})
if (!response.ok) throw new Error(`source_discover failed: ${response.status}`)
yield* parseNdjsonStream<DiscoverOutput>(response.body!)
try {
yield* parseNdjsonStream<DiscoverOutput>(response.body!)
} finally {
await response.body?.cancel().catch(() => {})
}
},

async *pipeline_check(pipeline: PipelineConfig): AsyncIterable<CheckOutput> {
const res = await post('/pipeline_check', pipeline)
yield* parseNdjsonStream<CheckOutput>(res.body!)
try {
yield* parseNdjsonStream<CheckOutput>(res.body!)
} finally {
await res.body?.cancel().catch(() => {})
}
},

async *pipeline_setup(pipeline: PipelineConfig): AsyncIterable<SetupOutput> {
const res = await post('/pipeline_setup', pipeline)
yield* parseNdjsonStream<SetupOutput>(res.body!)
try {
yield* parseNdjsonStream<SetupOutput>(res.body!)
} finally {
await res.body?.cancel().catch(() => {})
}
},

async *pipeline_teardown(pipeline: PipelineConfig): AsyncIterable<TeardownOutput> {
const res = await post('/pipeline_teardown', pipeline)
yield* parseNdjsonStream<TeardownOutput>(res.body!)
try {
yield* parseNdjsonStream<TeardownOutput>(res.body!)
} finally {
await res.body?.cancel().catch(() => {})
}
},

async *pipeline_read(
Expand All @@ -164,15 +180,23 @@ export function createRemoteEngine(engineUrl: string): Engine {
): AsyncIterable<Message> {
const body = input ? toNdjsonStream(input) : undefined
const res = await post('/pipeline_read', pipeline, opts, body)
yield* parseNdjsonStream<Message>(res.body!)
try {
yield* parseNdjsonStream<Message>(res.body!)
} finally {
await res.body?.cancel().catch(() => {})
}
},

async *pipeline_write(
pipeline: PipelineConfig,
messages: AsyncIterable<Message>
): AsyncIterable<DestinationOutput> {
const res = await post('/pipeline_write', pipeline, undefined, toNdjsonStream(messages))
yield* parseNdjsonStream<DestinationOutput>(res.body!)
try {
yield* parseNdjsonStream<DestinationOutput>(res.body!)
} finally {
await res.body?.cancel().catch(() => {})
}
},

async *pipeline_sync(
Expand All @@ -182,7 +206,11 @@ export function createRemoteEngine(engineUrl: string): Engine {
): AsyncIterable<SyncOutput> {
const body = input ? toNdjsonStream(input) : undefined
const res = await post('/pipeline_sync', pipeline, opts, body)
yield* parseNdjsonStream<SyncOutput>(res.body!)
try {
yield* parseNdjsonStream<SyncOutput>(res.body!)
} finally {
await res.body?.cancel().catch(() => {})
}
},
}
}
6 changes: 1 addition & 5 deletions apps/service/src/temporal/activities/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,12 @@ export async function drainMessages(
): Promise<{
errors: RunResult['errors']
state: SourceState
records: Message[]
sourceConfig?: Record<string, unknown>
destConfig?: Record<string, unknown>
eof?: EofPayload
}> {
const errors: RunResult['errors'] = []
let state: SourceState = initialState ?? { streams: {}, global: {} }
const records: Message[] = []
let sourceConfig: Record<string, unknown> | undefined
let destConfig: Record<string, unknown> | undefined
let eof: EofPayload | undefined
Expand All @@ -91,13 +89,11 @@ export async function drainMessages(
errors.push(error)
} else if (message.type === 'source_state') {
state = mergeStateMessage(state, message)
} else if (message.type === 'record') {
records.push(message)
}
}
if (count % 50 === 0) heartbeat({ messages: count })
}
if (count % 50 !== 0) heartbeat({ messages: count })

return { errors, state, records, sourceConfig, destConfig, eof }
return { errors, state, sourceConfig, destConfig, eof }
}
144 changes: 144 additions & 0 deletions e2e/memory-leak-harness.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { afterEach, describe, expect, it } from 'vitest'
import { spawn, type ChildProcess } from 'node:child_process'
import {
drainNdjsonResponse,
formatMemoryLeakSummary,
hasTimeLimitEof,
runMemoryLeakDetector,
type MemoryLeakSettings,
} from './memory-leak-harness.js'

const DETECTOR_SETTINGS: MemoryLeakSettings = {
warmupIterations: 6,
testIterations: 12,
settleMs: 50,
slopeThresholdKb: 3000,
growthThresholdMb: 300,
}

const SYNTHETIC_LEAK_BYTES = 8 * 1024 * 1024

const children = new Set<ChildProcess>()

function spawnSyntheticServer(leakBytesPerRequest: number): Promise<{ proc: ChildProcess; baseUrl: string }> {
return new Promise((resolve, reject) => {
const code = [
'import http from "node:http";',
'const retained = [];',
`const leakBytesPerRequest = ${leakBytesPerRequest};`,
'const server = http.createServer((req, res) => {',
' if (req.url === "/health") {',
' res.writeHead(200, { "content-type": "application/json" });',
' res.end(JSON.stringify({ ok: true }));',
' return;',
' }',
' if (req.url?.startsWith("/pipeline_setup")) {',
' res.writeHead(200, { "content-type": "application/x-ndjson" });',
' res.end(JSON.stringify({ type: "control" }) + "\\n");',
' return;',
' }',
' if (req.url?.startsWith("/pipeline_sync")) {',
' if (leakBytesPerRequest > 0) retained.push(Buffer.alloc(leakBytesPerRequest, 1));',
' res.writeHead(200, { "content-type": "application/x-ndjson" });',
' res.end(JSON.stringify({ type: "eof", eof: { reason: "time_limit" } }) + "\\n");',
' return;',
' }',
' res.writeHead(404);',
' res.end("not found");',
'});',
'server.listen(0, "127.0.0.1", () => {',
' const addr = server.address();',
' console.log(`READY:${addr.port}`);',
'});',
].join('\n')

const proc = spawn('node', ['--input-type=module', '-e', code], {
stdio: ['ignore', 'pipe', 'pipe'],
}) as ChildProcess
children.add(proc)

let output = ''
const timeout = setTimeout(() => {
proc.kill('SIGKILL')
reject(new Error(`Synthetic server did not start within 5s\noutput: ${output}`))
}, 5_000)

proc.stdout!.on('data', (chunk: Buffer) => {
output += chunk.toString()
const match = output.match(/READY:(\d+)/)
if (!match) return
clearTimeout(timeout)
resolve({ proc, baseUrl: `http://127.0.0.1:${match[1]}` })
})

proc.stderr!.on('data', (chunk: Buffer) => {
output += chunk.toString()
})

proc.on('error', (err: Error) => {
clearTimeout(timeout)
reject(err)
})

proc.on('exit', (code: number | null) => {
clearTimeout(timeout)
if (!output.includes('READY:')) {
reject(new Error(`Synthetic server exited with code ${code}\noutput: ${output}`))
}
})
})
}

async function runSyntheticScenario(leakBytesPerRequest: number) {
const { proc, baseUrl } = await spawnSyntheticServer(leakBytesPerRequest)

const setupRes = await fetch(`${baseUrl}/pipeline_setup`, { method: 'POST' })
expect(setupRes.ok).toBe(true)
await drainNdjsonResponse(setupRes)

const result = await runMemoryLeakDetector({
pid: proc.pid!,
settings: DETECTOR_SETTINGS,
iterate: async () => {
const res = await fetch(`${baseUrl}/pipeline_sync?time_limit=0.1`, { method: 'POST' })
expect(res.ok).toBe(true)
const messages = await drainNdjsonResponse(res)
return { sawTimeLimit: hasTimeLimitEof(messages) }
},
})

return result
}

afterEach(async () => {
for (const child of children) {
if (child.pid) child.kill('SIGKILL')
await new Promise<void>((resolve) => {
child.once('exit', () => resolve())
setTimeout(resolve, 500)
})
children.delete(child)
}
})

describe('memory leak harness', { timeout: 120_000 }, () => {
it('does not flag a stable synthetic process', async () => {
const result = await runSyntheticScenario(0)

console.log(formatMemoryLeakSummary(result))

expect(result.timeLimitCount).toBe(result.totalIterations)
expect(result.passesThresholds).toBe(true)
expect(result.slopeKbPerIteration).toBeLessThan(DETECTOR_SETTINGS.slopeThresholdKb)
})

it('flags an intentionally leaky synthetic process', async () => {
const result = await runSyntheticScenario(SYNTHETIC_LEAK_BYTES)

console.log(formatMemoryLeakSummary(result))

expect(result.timeLimitCount).toBe(result.totalIterations)
expect(result.passesThresholds).toBe(false)
expect(result.slopeKbPerIteration).toBeGreaterThan(DETECTOR_SETTINGS.slopeThresholdKb)
})
})
Loading
Loading