Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 205 additions & 8 deletions src/js/node/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,215 @@

// TODO: parent port emulation is not complete
function fakeParentPort() {
// Node's `parentPort` has its own message dispatch that is independent of the
// worker's global scope. Bun's native worker runtime dispatches parent messages
// onto the global scope (`self.onmessage` / `self.addEventListener('message')`),
// which matches web-worker semantics but not Node's — and it means emscripten-
// style code that does
//
// parentPort.on('message', (m) => onmessage({ data: m }));
// self.onmessage = handleMessage;
//
// sees every message delivered TWICE: once by the automatic `self.onmessage`
// dispatch and once by the explicit forwarding inside the `parentPort.on`
// listener. See https://github.com/oven-sh/bun/issues/29211.
//
// Fix: give `parentPort` its own `EventTarget`, re-dispatch incoming messages
// on it, and stop the native dispatch from reaching `self.onmessage` /
// `self.addEventListener('message', …)` so it matches Node semantics.
const fake = Object.create(MessagePort.prototype);
const parentPortTarget = new EventTarget();

// Forwarders: installed lazily on `self` only while at least one user
// listener is registered on the parentPort. They intercept the native
// parent-message dispatch, stop immediate propagation (so `self.onmessage`
// and `self.addEventListener('message', …)` handlers on the global scope
// never see parent messages — matching Node), and re-dispatch on
// `parentPortTarget`. Installing a `message` listener on `self` keeps the
// event loop alive via `onDidChangeListenerImpl` in
// `BunWorkerGlobalScope.cpp`, so we only install while parentPort is
// actually being used — that way a worker that never touches `parentPort`
// exits cleanly when its module finishes executing.
//
// `onmessage` / `onmessageerror` are spec'd as implicit event listeners: the
// setter replaces at most one listener, firing in the order it was first
// assigned relative to other listeners on the same target.
let parentPortOnMessageWrapper: ((event: Event) => void) | null = null;
let parentPortOnMessageHandler: ((event: MessageEvent) => unknown) | null = null;
let parentPortOnMessageErrorWrapper: ((event: Event) => void) | null = null;
let parentPortOnMessageErrorHandler: ((event: MessageEvent) => unknown) | null = null;

let listenerCount = 0;
let messageForwarder: ((event: Event) => void) | null = null;
let messageErrorForwarder: ((event: Event) => void) | null = null;

function installForwarders() {
if (messageForwarder !== null) return;
const makeForwarder = (type: "message" | "messageerror") => (event: Event) => {
// Stop the native dispatch from reaching `self.onmessage` and any
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
// `self.addEventListener('message', …)` handlers — in Node parent
// messages are only visible through `parentPort`, not through the
// global scope.
event.stopImmediatePropagation();
const data = (event as MessageEvent).data;
const clone =
type === "message"
? new MessageEvent("message", { data })
: new MessageEvent("messageerror", { data });
parentPortTarget.dispatchEvent(clone);
};
messageForwarder = makeForwarder("message");
messageErrorForwarder = makeForwarder("messageerror");
// Capture phase so we run before any user-installed bubbling listener
// on the global scope (if any).
self.addEventListener("message", messageForwarder, { capture: true });
self.addEventListener("messageerror", messageErrorForwarder, { capture: true });
}

function uninstallForwarders() {
if (messageForwarder === null) return;
self.removeEventListener("message", messageForwarder, { capture: true } as any);
self.removeEventListener("messageerror", messageErrorForwarder!, { capture: true } as any);
messageForwarder = null;
messageErrorForwarder = null;
}

function acquireListener() {
if (listenerCount++ === 0) {
installForwarders();
}
}

function releaseListener() {
if (listenerCount > 0 && --listenerCount === 0) {
uninstallForwarders();
}
}

// Wrap `addEventListener` / `removeEventListener` so we can track user
// listener lifetime on `parentPortTarget` and install / uninstall the
// forwarders on the global scope accordingly. Each (listener, type, capture)
// triple gets wrapped exactly once — duplicate adds are no-ops per the DOM
// spec — and the original listener object is the map key so that a
// `remove(type, original, {capture})` call finds the wrapped copy.
type TrackEntry = { wrapped: EventListener; once: boolean };
// `${type}:${capture ? 1 : 0}` — a listener registered with different
// (type, capture) combinations lives in separate slots, matching spec.
const trackedByListener = new WeakMap<object, Map<string, TrackEntry>>();

function listenerSlot(type: string, capture: boolean): string {
return capture ? type + ":1" : type + ":0";
}

function parentPortAddEventListener(
type: string,
listener: EventListener | null,
options?: boolean | AddEventListenerOptions,
): void {
Comment thread
claude[bot] marked this conversation as resolved.
if (listener === null || listener === undefined) return;
const capture = typeof options === "boolean" ? options : !!options?.capture;
const once = typeof options === "object" && options !== null && !!options.once;
const slot = listenerSlot(type, capture);
let bucket = trackedByListener.get(listener as object);
if (bucket?.has(slot)) {
// Duplicate add — EventTarget already dedupes, so no-op.
return;
}
// Wrap so we can release the loop ref when the listener is removed,
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
// including the implicit removal done by `{ once: true }` after firing.
const wrapped: EventListener = function (event) {
if (once) {
const bucketNow = trackedByListener.get(listener as object);
if (bucketNow?.get(slot) === entry) {
bucketNow.delete(slot);
if (bucketNow.size === 0) trackedByListener.delete(listener as object);
releaseListener();
}
}
(listener as EventListener).$call(fake, event);
};
const entry: TrackEntry = { wrapped, once };
if (!bucket) {
bucket = new Map();
trackedByListener.set(listener as object, bucket);
}
bucket.set(slot, entry);
parentPortTarget.addEventListener(type, wrapped, options);
acquireListener();
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
}

Check failure on line 267 in src/js/node/worker_threads.ts

View check run for this annotation

Claude / Claude Code Review

acquireListener triggered for non-message event types silently drops all parent messages

In parentPortAddEventListener, acquireListener() is called unconditionally for every event type — including 'close', 'error', etc. — triggering installForwarders(), which installs a capture-phase messageForwarder on self that calls stopImmediatePropagation() and re-dispatches to parentPortTarget. If a worker registers only parentPort.once('close', cleanup) while relying on receiving messages, all subsequent parent messages are silently dropped because parentPortTarget has no 'message' listener.

function parentPortRemoveEventListener(
type: string,
listener: EventListener | null,
options?: boolean | EventListenerOptions,
): void {
if (listener === null || listener === undefined) return;
const capture = typeof options === "boolean" ? options : !!options?.capture;
const bucket = trackedByListener.get(listener as object);
if (!bucket) return;
const slot = listenerSlot(type, capture);
const entry = bucket.get(slot);
if (!entry) return;
bucket.delete(slot);
if (bucket.size === 0) trackedByListener.delete(listener as object);
parentPortTarget.removeEventListener(type, entry.wrapped, options);
releaseListener();
}

Object.defineProperty(fake, "onmessage", {
get() {
return self.onmessage;
return parentPortOnMessageHandler;
},
set(value) {
self.onmessage = value;
// Replace the previously-installed wrapper, if any.
if (parentPortOnMessageWrapper !== null) {
parentPortTarget.removeEventListener("message", parentPortOnMessageWrapper);
parentPortOnMessageWrapper = null;
releaseListener();
}
parentPortOnMessageHandler = typeof value === "function" ? value : null;
if (parentPortOnMessageHandler !== null) {
const handler = parentPortOnMessageHandler;
parentPortOnMessageWrapper = (event: Event) => {
try {
handler.$call(fake, event as MessageEvent);
} catch (err) {
queueMicrotask(() => {
throw err;
});
}
};
parentPortTarget.addEventListener("message", parentPortOnMessageWrapper);
acquireListener();
}
},
});

Object.defineProperty(fake, "onmessageerror", {
get() {
return self.onmessageerror;
return parentPortOnMessageErrorHandler;
},
set(value) {
self.onmessageerror = value;
if (parentPortOnMessageErrorWrapper !== null) {
parentPortTarget.removeEventListener("messageerror", parentPortOnMessageErrorWrapper);
parentPortOnMessageErrorWrapper = null;
releaseListener();
}
parentPortOnMessageErrorHandler = typeof value === "function" ? value : null;
if (parentPortOnMessageErrorHandler !== null) {
const handler = parentPortOnMessageErrorHandler;
parentPortOnMessageErrorWrapper = (event: Event) => {
try {
handler.$call(fake, event as MessageEvent);
} catch (err) {
queueMicrotask(() => {
throw err;
});
}
};
parentPortTarget.addEventListener("messageerror", parentPortOnMessageErrorWrapper);
acquireListener();
}
},
});

Expand Down Expand Up @@ -182,20 +375,24 @@
});

Object.defineProperty(fake, "addEventListener", {
value: self.addEventListener.bind(self),
value: parentPortAddEventListener,
});

Object.defineProperty(fake, "removeEventListener", {
value: self.removeEventListener.bind(self),
value: parentPortRemoveEventListener,
});
Comment thread
claude[bot] marked this conversation as resolved.

Object.defineProperty(fake, "dispatchEvent", {
value: parentPortTarget.dispatchEvent.bind(parentPortTarget),
});

Object.defineProperty(fake, "removeListener", {
value: self.removeEventListener.bind(self),
value: parentPortRemoveEventListener,
enumerable: false,
});

Object.defineProperty(fake, "addListener", {
value: self.addEventListener.bind(self),
value: parentPortAddEventListener,
enumerable: false,
});
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
Comment thread
claude[bot] marked this conversation as resolved.
Comment thread
claude[bot] marked this conversation as resolved.

Expand Down
Loading
Loading