Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { mkdir, stat } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import lockfile from 'proper-lockfile';
import { FrameDecoder, encodeFrame } from './socketFraming';
import type { WorkerRequest, WorkerResponse } from './worker';

const isWindows = process.platform === 'win32';
Expand Down Expand Up @@ -255,7 +256,7 @@ export class SocketClient {
}
>();

private buffer = '';
private decoder = new FrameDecoder();

constructor(socketDir?: string) {
this.socketDir = socketDir;
Expand Down Expand Up @@ -317,42 +318,45 @@ export class SocketClient {

/**
* Handle incoming data from socket.
* Optimized to avoid O(n²) behavior on large messages: only split the buffer
* when the incoming chunk actually contains a newline delimiter.
*
* Uses length-prefixed binary framing via `v8.serialize`/`v8.deserialize`
* (see `./socketFraming`). Binary framing removes Node's ~500 MB UTF-8
* string-length ceiling that the old `JSON.parse` + newline decoder hit on
* large consumer projects, and avoids the O(n²) `buffer += chunk.toString()`
* reallocation on deep extractor outputs.
*/
private handleData(data: Buffer): void {
const chunk = data.toString();
this.buffer += chunk;

// Fast path: skip expensive split if this chunk has no message boundary
if (!chunk.includes('\n')) {
let messages: unknown[];
try {
messages = this.decoder.push(data);
} catch (error) {
console.error('[SocketClient] Failed to decode frame:', error);
// Recovery: reset decoder so a corrupt frame doesn't stall subsequent
// messages. Pending requests time out on their own 5-minute window.
this.decoder.reset();
return;
}

// Process complete messages (delimited by newlines)
const messages = this.buffer.split('\n');
this.buffer = messages.pop() || '';

for (const messageStr of messages) {
if (!messageStr.trim()) {
for (const message of messages) {
if (!message || typeof message !== 'object' || !('id' in message)) {
console.error('[SocketClient] Ignoring malformed message:', message);
continue;
}

try {
const message = JSON.parse(messageStr);
const pending = this.pendingRequests.get(message.id);

if (pending) {
this.pendingRequests.delete(message.id);

if (message.type === 'success') {
pending.resolve(message.data);
} else {
pending.reject(new Error(message.data?.error || 'Unknown error'));
}
}
} catch (error) {
console.error('[SocketClient] Failed to parse message:', error);
const msg = message as {
id: string;
type: 'success' | 'error';
data: WorkerResponse | { error?: string };
};
const pending = this.pendingRequests.get(msg.id);
if (!pending) {
continue;
}
this.pendingRequests.delete(msg.id);
if (msg.type === 'success') {
pending.resolve(msg.data as WorkerResponse);
} else {
const err = (msg.data as { error?: string })?.error ?? 'Unknown error';
pending.reject(new Error(err));
}
}
}
Expand All @@ -373,11 +377,17 @@ export class SocketClient {

const message = {
id,
type: 'process-types',
type: 'process-types' as const,
data: request,
};

this.socket!.write(`${JSON.stringify(message)}\n`);
try {
this.socket!.write(encodeFrame(message));
} catch (err) {
this.pendingRequests.delete(id);
reject(err instanceof Error ? err : new Error(String(err)));
return;
}

// Timeout after 5 minutes
setTimeout(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { describe, it, expect } from 'vitest';
import { encodeFrame, FrameDecoder } from './socketFraming';

describe('socketFraming', () => {
it('round-trips a simple object', () => {
const decoder = new FrameDecoder();
const msg = { id: 'req-1', type: 'success', data: { hello: 'world' } };
const out = decoder.push(encodeFrame(msg));
expect(out).toHaveLength(1);
expect(out[0]).toEqual(msg);
});

it('round-trips a structure JSON would reject (Map, BigInt)', () => {
const decoder = new FrameDecoder();
const msg = {
id: 'req-2',
data: new Map<string, unknown>([
['a', 1n],
['b', new Uint8Array([1, 2, 3])],
]),
};
const [out] = decoder.push(encodeFrame(msg)) as [typeof msg];
expect(out.id).toBe('req-2');
const map = out.data as Map<string, unknown>;
expect(map.get('a')).toBe(1n);
expect(map.get('b')).toEqual(new Uint8Array([1, 2, 3]));
});

it('handles multiple frames written as one Buffer', () => {
const decoder = new FrameDecoder();
const a = encodeFrame({ id: 'a' });
const b = encodeFrame({ id: 'b' });
const c = encodeFrame({ id: 'c' });
const out = decoder.push(Buffer.concat([a, b, c]));
expect(out.map((m) => (m as { id: string }).id)).toEqual(['a', 'b', 'c']);
});

it('handles partial frames split across arbitrary chunk boundaries', () => {
const decoder = new FrameDecoder();
const original = { id: 'split', payload: 'x'.repeat(10_000) };
const frame = encodeFrame(original);

// Deliver the frame one byte at a time — the absolute worst case.
const received: unknown[] = [];
for (let i = 0; i < frame.byteLength; i += 1) {
received.push(...decoder.push(frame.subarray(i, i + 1)));
}
expect(received).toHaveLength(1);
expect(received[0]).toEqual(original);
});

it('keeps a trailing partial frame buffered until the rest arrives', () => {
const decoder = new FrameDecoder();
const complete = encodeFrame({ id: 'complete' });
const partial = encodeFrame({ id: 'partial', body: 'hello world' });

// First push: one complete frame + first half of the next.
const halfPoint = Math.floor(partial.byteLength / 2);
const firstChunk = Buffer.concat([complete, partial.subarray(0, halfPoint)]);
const round1 = decoder.push(firstChunk);
expect(round1).toHaveLength(1);
expect((round1[0] as { id: string }).id).toBe('complete');

// Second push: the tail of the partial frame.
const round2 = decoder.push(partial.subarray(halfPoint));
expect(round2).toHaveLength(1);
expect(round2[0]).toEqual({ id: 'partial', body: 'hello world' });
});

it('handles a payload larger than Node\u0027s string-length cap would allow for JSON', () => {
// ~200 MB of data — well under v8.serialize\u0027s own ceiling but
// impossible for a JSON string round-trip on most Node versions. Using a
// Uint8Array keeps the allocation bounded to the raw byte count without
// materializing a giant JS string.
const decoder = new FrameDecoder();
const payload = new Uint8Array(200 * 1024 * 1024);
// Fill with a non-zero pattern so v8.deserialize can\u0027t shortcut.
for (let i = 0; i < payload.byteLength; i += 4096) {
payload[i] = i & 0xff;
}
const frame = encodeFrame({ id: 'big', data: payload });
const out = decoder.push(frame);
expect(out).toHaveLength(1);
const decoded = (out[0] as { id: string; data: Uint8Array }).data;
expect(decoded.byteLength).toBe(payload.byteLength);
expect(decoded[0]).toBe(0);
expect(decoded[4096]).toBe(4096 & 0xff);
});

it('reset() clears the internal buffer', () => {
const decoder = new FrameDecoder();
const partial = encodeFrame({ id: 'will-be-dropped' }).subarray(0, 3);
decoder.push(partial);
decoder.reset();
// Now push a fresh frame — it should decode cleanly.
const out = decoder.push(encodeFrame({ id: 'fresh' }));
expect(out).toEqual([{ id: 'fresh' }]);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Length-prefixed binary framing for the SocketServer/SocketClient protocol.
*
* The previous protocol used newline-delimited JSON (NDJSON): each message was
* `JSON.stringify(message) + '\n'`. Two problems:
*
* 1. `JSON.stringify` fails with `RangeError: Invalid string length` on
* payloads whose UTF-8 size exceeds Node's ~500 MB string cap. Large
* consumer projects (e.g. mui-x `DataGridProps`, 131 direct props with a
* fully expanded generic chain) hit this and crash the worker pool.
* 2. The client-side decoder concatenated into a JS string (`this.buffer += chunk`)
* which shared the same cap and paid O(n) UTF-8 decoding on every chunk.
*
* The new framing layer uses `v8.serialize` / `v8.deserialize` (the same
* structured-clone algorithm used internally by `worker_threads.postMessage`).
* It preserves binary structure exactly, has no UTF-8 string limit, and is
* faster than JSON for deeply nested objects.
*
* Wire format:
* [ 4-byte big-endian uint32 body length ][ body bytes ]
*
* Max message size per frame: 2^32 − 1 bytes (~4 GB), far beyond anything the
* type extractor currently produces.
*/

// eslint-disable-next-line n/prefer-node-protocol
import v8 from 'v8';

/**
* Encode a message as a length-prefixed binary frame.
*
* Does NOT catch serialization errors — callers that want to recover should
* wrap their own try/catch. Failures here mean the message contains something
* structured-clone can't handle (functions, host objects, etc.), which is a
* programming bug, not a size issue.
*/
export function encodeFrame(message: unknown): Buffer {
const body = v8.serialize(message);
const header = Buffer.allocUnsafe(4);
header.writeUInt32BE(body.byteLength, 0);
return Buffer.concat([header, body], 4 + body.byteLength);
}

/**
* Stateful decoder that accepts raw socket chunks and yields complete decoded
* messages. Reuses a single internal buffer to avoid the O(n²) re-concat that
* the old string-concat decoder suffered from on large payloads.
*/
export class FrameDecoder {
/** Concatenated pending bytes. Compact — trimmed whenever a message is emitted. */
private buffer: Buffer = Buffer.alloc(0);

/**
* Push a new chunk from the socket and return any complete messages it
* completes. The decoder retains any remaining partial frame for the next
* call.
*/
push(chunk: Buffer): unknown[] {
this.buffer =
this.buffer.byteLength === 0
? chunk
: Buffer.concat([this.buffer, chunk], this.buffer.byteLength + chunk.byteLength);

const messages: unknown[] = [];
let offset = 0;

while (this.buffer.byteLength - offset >= 4) {
const bodyLength = this.buffer.readUInt32BE(offset);
const frameEnd = offset + 4 + bodyLength;
if (this.buffer.byteLength < frameEnd) {
break;
}
const body = this.buffer.subarray(offset + 4, frameEnd);
// v8.deserialize copies into its own V8-managed structure, so we can
// safely discard the source buffer after this call.
messages.push(v8.deserialize(body));
offset = frameEnd;
}

// Drop fully-consumed bytes from the front of the buffer.
this.buffer = offset === 0 ? this.buffer : this.buffer.subarray(offset);
return messages;
}

/** Reset state — useful for connection resets. */
reset(): void {
this.buffer = Buffer.alloc(0);
}
}
Loading
Loading