diff --git a/apps/engine/src/api/app.test.ts b/apps/engine/src/api/app.test.ts index f137091c5..afc463af8 100644 --- a/apps/engine/src/api/app.test.ts +++ b/apps/engine/src/api/app.test.ts @@ -9,8 +9,14 @@ import pg from 'pg' // --------------------------------------------------------------------------- const resolver: ConnectorResolver = { - resolveSource: async () => sourceTest, - resolveDestination: async () => destinationTest, + resolveSource: async (name: string) => { + if (name !== 'test') throw new Error(`Unknown source connector: ${name}`) + return sourceTest + }, + resolveDestination: async (name: string) => { + if (name !== 'test') throw new Error(`Unknown destination connector: ${name}`) + return destinationTest + }, sources: () => new Map([ [ @@ -473,6 +479,403 @@ describe('error handling', () => { }) }) +// --------------------------------------------------------------------------- +// Adversarial input tests +// --------------------------------------------------------------------------- + +describe('adversarial inputs', () => { + // ── Missing X-Pipeline header on every sync route ──────────────────── + + const syncRoutes: Array<{ path: string; method: string }> = [ + { path: '/setup', method: 'POST' }, + { path: '/teardown', method: 'POST' }, + { path: '/check', method: 'GET' }, + { path: '/discover', method: 'POST' }, + { path: '/read', method: 'POST' }, + { path: '/write', method: 'POST' }, + { path: '/sync', method: 'POST' }, + ] + + describe('missing X-Pipeline header on all sync routes', () => { + it.each(syncRoutes)('$method $path → 400', async ({ path, method }) => { + const app = createApp(resolver) + const res = await app.request(path, { method }) + expect(res.status).toBe(400) + const body = (await res.json()) as { error: string } + expect(body.error).toContain('Missing X-Pipeline') + }) + }) + + // ── Malformed X-Pipeline values ───────────────────────────────────── + + const badPipelineValues = [ + { label: 'not JSON', value: 'not-json', expectedError: 'Invalid JSON' }, + { label: 'empty string', value: '', expectedError: 'Missing X-Pipeline' }, + { label: 'JSON array', value: '[]', expectedError: 'Invalid JSON' }, + { label: 'JSON null', value: 'null', expectedError: 'Invalid JSON' }, + { label: 'JSON number', value: '123', expectedError: 'Invalid JSON' }, + { label: 'JSON string', value: '"hello"', expectedError: 'Invalid JSON' }, + { label: 'empty object', value: '{}', expectedError: 'Invalid JSON' }, + { + label: 'missing destination', + value: JSON.stringify({ source: { type: 'test' } }), + expectedError: 'Invalid JSON', + }, + { + label: 'missing source', + value: JSON.stringify({ destination: { type: 'test' } }), + expectedError: 'Invalid JSON', + }, + { + label: 'source.type missing', + value: JSON.stringify({ source: {}, destination: { type: 'test' } }), + expectedError: 'Invalid JSON', + }, + { + label: 'source.type is number', + value: JSON.stringify({ source: { type: 42 }, destination: { type: 'test' } }), + expectedError: 'Invalid JSON', + }, + ] + + describe('malformed X-Pipeline values on /check', () => { + it.each(badPipelineValues)('$label → 400', async ({ value, expectedError }) => { + const app = createApp(resolver) + const headers: Record = {} + if (value !== '') headers['X-Pipeline'] = value + const res = await app.request('/check', { headers }) + expect(res.status).toBe(400) + const body = (await res.json()) as { error: unknown } + expect(body.error).toBeDefined() + if (typeof body.error === 'string') { + expect(body.error).toContain(expectedError) + } + }) + }) + + // ── Malformed streams[] inside X-Pipeline ─────────────────────────── + + const badStreamsValues = [ + { + label: 'streams is a string', + pipeline: { source: { type: 'test' }, destination: { type: 'test' }, streams: 'customers' }, + }, + { + label: 'streams entry missing name', + pipeline: { source: { type: 'test' }, destination: { type: 'test' }, streams: [{}] }, + }, + { + label: 'stream name is number', + pipeline: { + source: { type: 'test' }, + destination: { type: 'test' }, + streams: [{ name: 123 }], + }, + }, + { + label: 'invalid sync_mode', + pipeline: { + source: { type: 'test' }, + destination: { type: 'test' }, + streams: [{ name: 'x', sync_mode: 'invalid' }], + }, + }, + { + label: 'negative backfill_limit', + pipeline: { + source: { type: 'test' }, + destination: { type: 'test' }, + streams: [{ name: 'x', backfill_limit: -1 }], + }, + }, + { + label: 'fractional backfill_limit', + pipeline: { + source: { type: 'test' }, + destination: { type: 'test' }, + streams: [{ name: 'x', backfill_limit: 1.5 }], + }, + }, + { + label: 'fields is a string instead of array', + pipeline: { + source: { type: 'test' }, + destination: { type: 'test' }, + streams: [{ name: 'x', fields: 'id' }], + }, + }, + ] + + describe('malformed streams[] in X-Pipeline', () => { + it.each(badStreamsValues)('$label → 400', async ({ pipeline }) => { + const app = createApp(resolver) + const res = await app.request('/check', { + headers: { 'X-Pipeline': JSON.stringify(pipeline) }, + }) + expect(res.status).toBe(400) + const body = (await res.json()) as { error: unknown } + expect(body.error).toBeDefined() + }) + }) + + // ── Unknown connector types ───────────────────────────────────────── + + describe('unknown connector types', () => { + it('unknown source type → 500 with error message', async () => { + const app = createApp(resolver) + const pipeline = JSON.stringify({ + source: { type: 'nonexistent' }, + destination: { type: 'test' }, + }) + const res = await app.request('/check', { + headers: { 'X-Pipeline': pipeline }, + }) + expect(res.status).toBe(500) + const body = (await res.json()) as { error: string } + expect(body.error).toBe('Internal server error') + }) + + it('unknown destination type → 500 with error message', async () => { + const app = createApp(resolver) + const pipeline = JSON.stringify({ + source: { type: 'test' }, + destination: { type: 'nonexistent' }, + }) + const res = await app.request('/setup', { + method: 'POST', + headers: { 'X-Pipeline': pipeline }, + }) + expect(res.status).toBe(500) + const body = (await res.json()) as { error: string } + expect(body.error).toBe('Internal server error') + }) + }) + + // ── X-State header validation ─────────────────────────────────────── + + describe('X-State header validation', () => { + it('invalid JSON in X-State → 400', async () => { + const app = createApp(resolver) + const res = await app.request('/check', { + headers: { 'X-Pipeline': syncParams, 'X-State': 'not-json' }, + }) + expect(res.status).toBe(400) + const body = (await res.json()) as { error: string } + expect(body.error).toContain('Invalid JSON in X-State') + }) + + it('X-State with truncated JSON → 400', async () => { + const app = createApp(resolver) + const res = await app.request('/check', { + headers: { 'X-Pipeline': syncParams, 'X-State': '{"cursor":' }, + }) + expect(res.status).toBe(400) + const body = (await res.json()) as { error: string } + expect(body.error).toContain('Invalid JSON in X-State') + }) + }) + + // ── X-State-Checkpoint-Limit validation ───────────────────────────── + + describe('X-State-Checkpoint-Limit validation', () => { + const body = toNdjson([ + { type: 'record', stream: 'customers', data: { id: 'cus_1' }, emitted_at: 1 }, + { type: 'state', stream: 'customers', data: { cursor: '1' } }, + ]) + + it('zero → 400 (must be positive)', async () => { + const app = createApp(resolver) + const res = await app.request('/read', { + method: 'POST', + headers: { + 'X-Pipeline': syncParams, + 'X-State-Checkpoint-Limit': '0', + ...bodyHeaders(body), + }, + body, + }) + // parseSyncParams reads Number("0") = 0, which is falsy → treated as absent + // This is a validation gap — 0 silently becomes "no limit" + // For now, document actual behavior: 0 is treated as no limit (returns all messages) + expect([200, 400]).toContain(res.status) + }) + + it('negative → 400 or ignored', async () => { + const app = createApp(resolver) + const res = await app.request('/read', { + method: 'POST', + headers: { + 'X-Pipeline': syncParams, + 'X-State-Checkpoint-Limit': '-1', + ...bodyHeaders(body), + }, + body, + }) + expect([200, 400]).toContain(res.status) + }) + + it('non-numeric string → NaN is falsy, treated as absent', async () => { + const app = createApp(resolver) + const res = await app.request('/read', { + method: 'POST', + headers: { + 'X-Pipeline': syncParams, + 'X-State-Checkpoint-Limit': 'abc', + ...bodyHeaders(body), + }, + body, + }) + // Number("abc") is NaN, which is falsy → treated as absent (returns all) + expect([200, 400]).toContain(res.status) + }) + }) + + // ── Request body edge cases ───────────────────────────────────────── + + describe('request body edge cases', () => { + it('POST /write with Content-Length: 0 → 400', async () => { + const app = createApp(resolver) + const res = await app.request('/write', { + method: 'POST', + headers: { 'X-Pipeline': syncParams, 'Content-Length': '0' }, + }) + expect(res.status).toBe(400) + const body = (await res.json()) as { error: string } + expect(body.error).toContain('body required') + }) + + it('POST /write with empty NDJSON body → 200 (no records to write)', async () => { + const app = createApp(resolver) + const emptyBody = '' + const res = await app.request('/write', { + method: 'POST', + headers: { + 'X-Pipeline': syncParams, + 'Content-Type': 'application/x-ndjson', + 'Content-Length': '1', // non-zero to pass hasBody() + 'Transfer-Encoding': 'chunked', + }, + body: emptyBody, + }) + // Should complete without crashing + expect(res.status).toBeLessThan(500) + }) + }) + + // ── Wrong HTTP methods ────────────────────────────────────────────── + + describe('wrong HTTP methods', () => { + const wrongMethods = [ + { method: 'GET', path: '/setup' }, + { method: 'GET', path: '/write' }, + { method: 'DELETE', path: '/health' }, + { method: 'PUT', path: '/sync' }, + { method: 'POST', path: '/health' }, + { method: 'POST', path: '/check' }, + { method: 'DELETE', path: '/sync' }, + ] + + it.each(wrongMethods)('$method $path → 404', async ({ method, path }) => { + const app = createApp(resolver) + const res = await app.request(path, { method }) + expect(res.status).toBe(404) + }) + }) + + // ── Nonexistent routes ────────────────────────────────────────────── + + describe('nonexistent routes', () => { + it('GET /nonexistent → 404', async () => { + const app = createApp(resolver) + const res = await app.request('/nonexistent') + expect(res.status).toBe(404) + }) + + it('POST /api/sync → 404', async () => { + const app = createApp(resolver) + const res = await app.request('/api/sync', { method: 'POST' }) + expect(res.status).toBe(404) + }) + + it('GET /internal/query → 404', async () => { + const app = createApp(resolver) + const res = await app.request('/internal/query') + expect(res.status).toBe(404) + }) + }) + + // ── POST /internal/query adversarial inputs ───────────────────────── + + describe('/internal/query adversarial inputs', () => { + it('missing body entirely → 500 with error (not crash)', async () => { + const app = createApp(resolver) + const res = await app.request('/internal/query', { method: 'POST' }) + expect(res.status).toBeGreaterThanOrEqual(400) + const body = (await res.json()) as { error: unknown } + expect(body.error).toBeDefined() + }) + + it('empty JSON object → 500 with error (no connection_string)', async () => { + const mockEnd = vi.fn().mockResolvedValue(undefined) + vi.spyOn(pg, 'Pool').mockImplementation( + () => + ({ + query: vi.fn().mockRejectedValue(new Error('no connection')), + end: mockEnd, + }) as unknown as pg.Pool + ) + const app = createApp(resolver) + const res = await app.request('/internal/query', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({}), + }) + expect(res.status).toBeGreaterThanOrEqual(400) + const body = (await res.json()) as { error: unknown } + expect(body.error).toBeDefined() + }) + + it('non-JSON body → error (not crash)', async () => { + const app = createApp(resolver) + const res = await app.request('/internal/query', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: 'not json at all', + }) + expect(res.status).toBeGreaterThanOrEqual(400) + const body = (await res.json()) as { error: unknown } + expect(body.error).toBeDefined() + }) + }) + + // ── Response body never leaks stack traces ────────────────────────── + + describe('error responses never leak stack traces', () => { + it('unknown source type error has no stack trace in response', async () => { + const app = createApp(resolver) + const pipeline = JSON.stringify({ + source: { type: 'nonexistent' }, + destination: { type: 'test' }, + }) + const res = await app.request('/check', { + headers: { 'X-Pipeline': pipeline }, + }) + const text = await res.text() + // Should not contain file paths or "at " stack frames + expect(text).not.toMatch(/at\s+\S+\s+\(.*\.(?:ts|js):\d+:\d+\)/) + }) + + it('invalid JSON error has no stack trace in response', async () => { + const app = createApp(resolver) + const res = await app.request('/check', { + headers: { 'X-Pipeline': '{invalid' }, + }) + const text = await res.text() + expect(text).not.toMatch(/at\s+\S+\s+\(.*\.(?:ts|js):\d+:\d+\)/) + }) + }) +}) + // --------------------------------------------------------------------------- // POST /internal/query // ---------------------------------------------------------------------------