diff --git a/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketClient.ts b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketClient.ts index a07686790..a74cab857 100644 --- a/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketClient.ts +++ b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketClient.ts @@ -14,6 +14,7 @@ import { mkdir, stat } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import lockfile from 'proper-lockfile'; +import { FrameDecoder, encodeFrame } from './socketFraming'; import type { WorkerRequest, WorkerResponse } from './worker'; const isWindows = process.platform === 'win32'; @@ -255,7 +256,7 @@ export class SocketClient { } >(); - private buffer = ''; + private decoder = new FrameDecoder(); constructor(socketDir?: string) { this.socketDir = socketDir; @@ -317,42 +318,45 @@ export class SocketClient { /** * Handle incoming data from socket. - * Optimized to avoid O(n²) behavior on large messages: only split the buffer - * when the incoming chunk actually contains a newline delimiter. + * + * Uses length-prefixed binary framing via `v8.serialize`/`v8.deserialize` + * (see `./socketFraming`). Binary framing removes Node's ~500 MB UTF-8 + * string-length ceiling that the old `JSON.parse` + newline decoder hit on + * large consumer projects, and avoids the O(n²) `buffer += chunk.toString()` + * reallocation on deep extractor outputs. */ private handleData(data: Buffer): void { - const chunk = data.toString(); - this.buffer += chunk; - - // Fast path: skip expensive split if this chunk has no message boundary - if (!chunk.includes('\n')) { + let messages: unknown[]; + try { + messages = this.decoder.push(data); + } catch (error) { + console.error('[SocketClient] Failed to decode frame:', error); + // Recovery: reset decoder so a corrupt frame doesn't stall subsequent + // messages. Pending requests time out on their own 5-minute window. + this.decoder.reset(); return; } - // Process complete messages (delimited by newlines) - const messages = this.buffer.split('\n'); - this.buffer = messages.pop() || ''; - - for (const messageStr of messages) { - if (!messageStr.trim()) { + for (const message of messages) { + if (!message || typeof message !== 'object' || !('id' in message)) { + console.error('[SocketClient] Ignoring malformed message:', message); continue; } - - try { - const message = JSON.parse(messageStr); - const pending = this.pendingRequests.get(message.id); - - if (pending) { - this.pendingRequests.delete(message.id); - - if (message.type === 'success') { - pending.resolve(message.data); - } else { - pending.reject(new Error(message.data?.error || 'Unknown error')); - } - } - } catch (error) { - console.error('[SocketClient] Failed to parse message:', error); + const msg = message as { + id: string; + type: 'success' | 'error'; + data: WorkerResponse | { error?: string }; + }; + const pending = this.pendingRequests.get(msg.id); + if (!pending) { + continue; + } + this.pendingRequests.delete(msg.id); + if (msg.type === 'success') { + pending.resolve(msg.data as WorkerResponse); + } else { + const err = (msg.data as { error?: string })?.error ?? 'Unknown error'; + pending.reject(new Error(err)); } } } @@ -373,11 +377,17 @@ export class SocketClient { const message = { id, - type: 'process-types', + type: 'process-types' as const, data: request, }; - this.socket!.write(`${JSON.stringify(message)}\n`); + try { + this.socket!.write(encodeFrame(message)); + } catch (err) { + this.pendingRequests.delete(id); + reject(err instanceof Error ? err : new Error(String(err))); + return; + } // Timeout after 5 minutes setTimeout( diff --git a/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketFraming.test.ts b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketFraming.test.ts new file mode 100644 index 000000000..a0a7cd2fd --- /dev/null +++ b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketFraming.test.ts @@ -0,0 +1,99 @@ +import { describe, it, expect } from 'vitest'; +import { encodeFrame, FrameDecoder } from './socketFraming'; + +describe('socketFraming', () => { + it('round-trips a simple object', () => { + const decoder = new FrameDecoder(); + const msg = { id: 'req-1', type: 'success', data: { hello: 'world' } }; + const out = decoder.push(encodeFrame(msg)); + expect(out).toHaveLength(1); + expect(out[0]).toEqual(msg); + }); + + it('round-trips a structure JSON would reject (Map, BigInt)', () => { + const decoder = new FrameDecoder(); + const msg = { + id: 'req-2', + data: new Map([ + ['a', 1n], + ['b', new Uint8Array([1, 2, 3])], + ]), + }; + const [out] = decoder.push(encodeFrame(msg)) as [typeof msg]; + expect(out.id).toBe('req-2'); + const map = out.data as Map; + expect(map.get('a')).toBe(1n); + expect(map.get('b')).toEqual(new Uint8Array([1, 2, 3])); + }); + + it('handles multiple frames written as one Buffer', () => { + const decoder = new FrameDecoder(); + const a = encodeFrame({ id: 'a' }); + const b = encodeFrame({ id: 'b' }); + const c = encodeFrame({ id: 'c' }); + const out = decoder.push(Buffer.concat([a, b, c])); + expect(out.map((m) => (m as { id: string }).id)).toEqual(['a', 'b', 'c']); + }); + + it('handles partial frames split across arbitrary chunk boundaries', () => { + const decoder = new FrameDecoder(); + const original = { id: 'split', payload: 'x'.repeat(10_000) }; + const frame = encodeFrame(original); + + // Deliver the frame one byte at a time — the absolute worst case. + const received: unknown[] = []; + for (let i = 0; i < frame.byteLength; i += 1) { + received.push(...decoder.push(frame.subarray(i, i + 1))); + } + expect(received).toHaveLength(1); + expect(received[0]).toEqual(original); + }); + + it('keeps a trailing partial frame buffered until the rest arrives', () => { + const decoder = new FrameDecoder(); + const complete = encodeFrame({ id: 'complete' }); + const partial = encodeFrame({ id: 'partial', body: 'hello world' }); + + // First push: one complete frame + first half of the next. + const halfPoint = Math.floor(partial.byteLength / 2); + const firstChunk = Buffer.concat([complete, partial.subarray(0, halfPoint)]); + const round1 = decoder.push(firstChunk); + expect(round1).toHaveLength(1); + expect((round1[0] as { id: string }).id).toBe('complete'); + + // Second push: the tail of the partial frame. + const round2 = decoder.push(partial.subarray(halfPoint)); + expect(round2).toHaveLength(1); + expect(round2[0]).toEqual({ id: 'partial', body: 'hello world' }); + }); + + it('handles a payload larger than Node\u0027s string-length cap would allow for JSON', () => { + // ~200 MB of data — well under v8.serialize\u0027s own ceiling but + // impossible for a JSON string round-trip on most Node versions. Using a + // Uint8Array keeps the allocation bounded to the raw byte count without + // materializing a giant JS string. + const decoder = new FrameDecoder(); + const payload = new Uint8Array(200 * 1024 * 1024); + // Fill with a non-zero pattern so v8.deserialize can\u0027t shortcut. + for (let i = 0; i < payload.byteLength; i += 4096) { + payload[i] = i & 0xff; + } + const frame = encodeFrame({ id: 'big', data: payload }); + const out = decoder.push(frame); + expect(out).toHaveLength(1); + const decoded = (out[0] as { id: string; data: Uint8Array }).data; + expect(decoded.byteLength).toBe(payload.byteLength); + expect(decoded[0]).toBe(0); + expect(decoded[4096]).toBe(4096 & 0xff); + }); + + it('reset() clears the internal buffer', () => { + const decoder = new FrameDecoder(); + const partial = encodeFrame({ id: 'will-be-dropped' }).subarray(0, 3); + decoder.push(partial); + decoder.reset(); + // Now push a fresh frame — it should decode cleanly. + const out = decoder.push(encodeFrame({ id: 'fresh' })); + expect(out).toEqual([{ id: 'fresh' }]); + }); +}); diff --git a/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketFraming.ts b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketFraming.ts new file mode 100644 index 000000000..b61f5e792 --- /dev/null +++ b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketFraming.ts @@ -0,0 +1,89 @@ +/** + * Length-prefixed binary framing for the SocketServer/SocketClient protocol. + * + * The previous protocol used newline-delimited JSON (NDJSON): each message was + * `JSON.stringify(message) + '\n'`. Two problems: + * + * 1. `JSON.stringify` fails with `RangeError: Invalid string length` on + * payloads whose UTF-8 size exceeds Node's ~500 MB string cap. Large + * consumer projects (e.g. mui-x `DataGridProps`, 131 direct props with a + * fully expanded generic chain) hit this and crash the worker pool. + * 2. The client-side decoder concatenated into a JS string (`this.buffer += chunk`) + * which shared the same cap and paid O(n) UTF-8 decoding on every chunk. + * + * The new framing layer uses `v8.serialize` / `v8.deserialize` (the same + * structured-clone algorithm used internally by `worker_threads.postMessage`). + * It preserves binary structure exactly, has no UTF-8 string limit, and is + * faster than JSON for deeply nested objects. + * + * Wire format: + * [ 4-byte big-endian uint32 body length ][ body bytes ] + * + * Max message size per frame: 2^32 − 1 bytes (~4 GB), far beyond anything the + * type extractor currently produces. + */ + +// eslint-disable-next-line n/prefer-node-protocol +import v8 from 'v8'; + +/** + * Encode a message as a length-prefixed binary frame. + * + * Does NOT catch serialization errors — callers that want to recover should + * wrap their own try/catch. Failures here mean the message contains something + * structured-clone can't handle (functions, host objects, etc.), which is a + * programming bug, not a size issue. + */ +export function encodeFrame(message: unknown): Buffer { + const body = v8.serialize(message); + const header = Buffer.allocUnsafe(4); + header.writeUInt32BE(body.byteLength, 0); + return Buffer.concat([header, body], 4 + body.byteLength); +} + +/** + * Stateful decoder that accepts raw socket chunks and yields complete decoded + * messages. Reuses a single internal buffer to avoid the O(n²) re-concat that + * the old string-concat decoder suffered from on large payloads. + */ +export class FrameDecoder { + /** Concatenated pending bytes. Compact — trimmed whenever a message is emitted. */ + private buffer: Buffer = Buffer.alloc(0); + + /** + * Push a new chunk from the socket and return any complete messages it + * completes. The decoder retains any remaining partial frame for the next + * call. + */ + push(chunk: Buffer): unknown[] { + this.buffer = + this.buffer.byteLength === 0 + ? chunk + : Buffer.concat([this.buffer, chunk], this.buffer.byteLength + chunk.byteLength); + + const messages: unknown[] = []; + let offset = 0; + + while (this.buffer.byteLength - offset >= 4) { + const bodyLength = this.buffer.readUInt32BE(offset); + const frameEnd = offset + 4 + bodyLength; + if (this.buffer.byteLength < frameEnd) { + break; + } + const body = this.buffer.subarray(offset + 4, frameEnd); + // v8.deserialize copies into its own V8-managed structure, so we can + // safely discard the source buffer after this call. + messages.push(v8.deserialize(body)); + offset = frameEnd; + } + + // Drop fully-consumed bytes from the front of the buffer. + this.buffer = offset === 0 ? this.buffer : this.buffer.subarray(offset); + return messages; + } + + /** Reset state — useful for connection resets. */ + reset(): void { + this.buffer = Buffer.alloc(0); + } +} diff --git a/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketServer.ts b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketServer.ts index d70632235..dd97da457 100644 --- a/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketServer.ts +++ b/packages/docs-infra/src/pipeline/loadServerTypesMeta/socketServer.ts @@ -11,6 +11,7 @@ import { createServer, Server, Socket } from 'node:net'; import { unlink, stat } from 'node:fs/promises'; import { getSocketPath, ensureSocketDir } from './socketClient'; +import { FrameDecoder, encodeFrame } from './socketFraming'; import type { WorkerRequest, WorkerResponse } from './worker'; const isWindows = process.platform === 'win32'; @@ -135,31 +136,36 @@ export class SocketServer { private handleConnection(socket: Socket): void { this.connections.add(socket); - let buffer = ''; + const decoder = new FrameDecoder(); socket.on('data', (data) => { - buffer += data.toString(); - - // Process complete messages (delimited by newlines) - const messages = buffer.split('\n'); - buffer = messages.pop() || ''; - - for (const messageStr of messages) { - if (!messageStr.trim()) { - continue; - } + let messages: unknown[]; + try { + messages = decoder.push(data); + } catch (error) { + console.error('[SocketServer] Failed to decode frame:', error); + this.sendResponse(socket, { + id: 'unknown', + type: 'error', + data: { + error: `Invalid frame: ${error instanceof Error ? error.message : String(error)}`, + }, + }); + // Reset the decoder so one corrupt frame doesn't poison the whole connection. + decoder.reset(); + return; + } - try { - const message: ServerMessage = JSON.parse(messageStr); - // Enqueue message for serialized processing - this.handleMessage(socket, message); - } catch (error) { - console.error('[SocketServer] Failed to parse message:', error); - this.sendResponse(socket, { - id: 'unknown', - type: 'error', - data: { error: 'Invalid message format' }, - }); + for (const message of messages) { + if ( + message && + typeof message === 'object' && + 'id' in message && + 'type' in message + ) { + this.handleMessage(socket, message as ServerMessage); + } else { + console.error('[SocketServer] Ignoring malformed message:', message); } } }); @@ -208,10 +214,33 @@ export class SocketServer { } /** - * Send response to client + * Send response to client using the length-prefixed binary framing defined + * in `./socketFraming`. `v8.serialize` has no UTF-8 string-length ceiling, + * so payloads that previously crashed with `RangeError: Invalid string length` + * (e.g. mui-x `DataGridProps`) now go through cleanly. */ private sendResponse(socket: Socket, response: ServerResponse): void { - socket.write(`${JSON.stringify(response)}\n`); + let frame: Buffer; + try { + frame = encodeFrame(response); + } catch (err) { + // structured-clone failure: bad shape (function, host object, etc.). + // Emit a minimal error frame for the same request id instead of + // crashing the worker pool. + const id = 'id' in response ? response.id : 'unknown'; + console.error( + `[SocketServer] Failed to encode response for request ${id}:`, + err instanceof Error ? err.message : String(err), + ); + frame = encodeFrame({ + id, + type: 'error', + data: { + error: `Failed to encode response: ${err instanceof Error ? err.message : String(err)}`, + }, + } satisfies ServerResponse); + } + socket.write(frame); } /**