-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Stop dispatching parent messages to self.onmessage in node:worker_threads workers #29215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
50c12d3
2a883b1
6cc8135
3f220d2
08de32c
0fbebc1
5bc8c4c
e03dc95
dc7375b
86c5d84
5b8ba71
346f824
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -129,22 +129,270 @@ | |
|
|
||
| // TODO: parent port emulation is not complete | ||
| function fakeParentPort() { | ||
| // Node's `parentPort` has its own message dispatch that is independent of the | ||
| // worker's global scope. Bun's native worker runtime dispatches parent messages | ||
| // onto the global scope (`self.onmessage` / `self.addEventListener('message')`), | ||
| // which matches web-worker semantics but not Node's — and it means emscripten- | ||
| // style code that does | ||
| // | ||
| // parentPort.on('message', (m) => onmessage({ data: m })); | ||
| // self.onmessage = handleMessage; | ||
| // | ||
| // sees every message delivered TWICE: once by the automatic `self.onmessage` | ||
| // dispatch and once by the explicit forwarding inside the `parentPort.on` | ||
| // listener. See https://github.com/oven-sh/bun/issues/29211. | ||
| // | ||
| // Fix: give `parentPort` its own `EventTarget`, re-dispatch incoming messages | ||
| // on it, and stop the native dispatch from reaching `self.onmessage` / | ||
| // `self.addEventListener('message', …)` so it matches Node semantics. | ||
| const fake = Object.create(MessagePort.prototype); | ||
| const parentPortTarget = new EventTarget(); | ||
|
|
||
| // Forwarders: installed lazily on `self` only while at least one user | ||
| // listener is registered on the parentPort. They intercept the native | ||
| // parent-message dispatch, stop immediate propagation (so `self.onmessage` | ||
| // and `self.addEventListener('message', …)` handlers on the global scope | ||
| // never see parent messages — matching Node), and re-dispatch on | ||
| // `parentPortTarget`. Installing a `message` listener on `self` keeps the | ||
| // event loop alive via `onDidChangeListenerImpl` in | ||
| // `BunWorkerGlobalScope.cpp`, so we only install while parentPort is | ||
| // actually being used — that way a worker that never touches `parentPort` | ||
| // exits cleanly when its module finishes executing. | ||
| // | ||
| // `onmessage` / `onmessageerror` are spec'd as implicit event listeners: the | ||
| // setter replaces at most one listener, firing in the order it was first | ||
| // assigned relative to other listeners on the same target. | ||
| let parentPortOnMessageWrapper: ((event: Event) => void) | null = null; | ||
| let parentPortOnMessageHandler: ((event: MessageEvent) => unknown) | null = null; | ||
| let parentPortOnMessageErrorWrapper: ((event: Event) => void) | null = null; | ||
| let parentPortOnMessageErrorHandler: ((event: MessageEvent) => unknown) | null = null; | ||
|
|
||
| // Separate counters per event type — installing a `message` listener on | ||
| // `self` keeps the event loop alive (via `onDidChangeListenerImpl` in | ||
| // `BunWorkerGlobalScope.cpp`, which only tracks `messageEvent`), so a | ||
| // worker that only registers a `messageerror` handler must NOT pull in the | ||
| // `message` forwarder. | ||
| let messageListenerCount = 0; | ||
| let messageErrorListenerCount = 0; | ||
| let messageForwarder: ((event: Event) => void) | null = null; | ||
| let messageErrorForwarder: ((event: Event) => void) | null = null; | ||
|
|
||
| const makeForwarder = (type: "message" | "messageerror") => (event: Event) => { | ||
| // Stop the native dispatch from reaching `self.onmessage` and any | ||
| // `self.addEventListener('message', …)` handlers — in Node parent | ||
| // messages are only visible through `parentPort`, not through the | ||
| // global scope. | ||
| event.stopImmediatePropagation(); | ||
| const messageEvent = event as MessageEvent; | ||
| // Preserve `ports` so `worker.postMessage(data, [port])` still surfaces | ||
| // the transferred MessagePort(s) to `parentPort` listeners. | ||
| const nativePorts = messageEvent.ports; | ||
| const clone = new MessageEvent(type, { | ||
| data: messageEvent.data, | ||
| ports: nativePorts && nativePorts.length > 0 ? $Array.from(nativePorts) : undefined, | ||
| }); | ||
| parentPortTarget.dispatchEvent(clone); | ||
| }; | ||
|
|
||
| function installMessageForwarder() { | ||
| if (messageForwarder !== null) return; | ||
| messageForwarder = makeForwarder("message"); | ||
| // Capture phase so we run before any user-installed bubbling listener | ||
| // on the global scope (if any). | ||
| self.addEventListener("message", messageForwarder, { capture: true }); | ||
| } | ||
| function uninstallMessageForwarder() { | ||
| if (messageForwarder === null) return; | ||
| self.removeEventListener("message", messageForwarder, { capture: true } as any); | ||
| messageForwarder = null; | ||
| } | ||
| function installMessageErrorForwarder() { | ||
| if (messageErrorForwarder !== null) return; | ||
| messageErrorForwarder = makeForwarder("messageerror"); | ||
| self.addEventListener("messageerror", messageErrorForwarder, { capture: true }); | ||
| } | ||
| function uninstallMessageErrorForwarder() { | ||
| if (messageErrorForwarder === null) return; | ||
| self.removeEventListener("messageerror", messageErrorForwarder, { capture: true } as any); | ||
| messageErrorForwarder = null; | ||
| } | ||
|
|
||
| function acquireListener(type: "message" | "messageerror") { | ||
| if (type === "message") { | ||
| if (messageListenerCount++ === 0) installMessageForwarder(); | ||
| } else { | ||
| if (messageErrorListenerCount++ === 0) installMessageErrorForwarder(); | ||
| } | ||
| } | ||
|
|
||
| function releaseListener(type: "message" | "messageerror") { | ||
| if (type === "message") { | ||
| if (messageListenerCount > 0 && --messageListenerCount === 0) uninstallMessageForwarder(); | ||
| } else { | ||
| if (messageErrorListenerCount > 0 && --messageErrorListenerCount === 0) uninstallMessageErrorForwarder(); | ||
| } | ||
| } | ||
|
|
||
| // Wrap `addEventListener` / `removeEventListener` so we can track user | ||
| // listener lifetime on `parentPortTarget` and install / uninstall the | ||
| // forwarders on the global scope accordingly. Each (listener, type, capture) | ||
| // triple gets wrapped exactly once — duplicate adds are no-ops per the DOM | ||
| // spec — and the original listener object is the map key so that a | ||
| // `remove(type, original, {capture})` call finds the wrapped copy. | ||
| type TrackEntry = { wrapped: EventListener; once: boolean }; | ||
| // `${type}:${capture ? 1 : 0}` — a listener registered with different | ||
| // (type, capture) combinations lives in separate slots, matching spec. | ||
| const trackedByListener = new WeakMap<object, Map<string, TrackEntry>>(); | ||
|
|
||
| function listenerSlot(type: string, capture: boolean): string { | ||
| return capture ? type + ":1" : type + ":0"; | ||
| } | ||
|
|
||
| function invokeListener(listener: EventListener | EventListenerObject, event: Event): void { | ||
| // DOM EventTarget accepts either a bare function or an object with a | ||
| // `handleEvent` method. Dispatch correctly for both forms. | ||
| if (typeof listener === "function") { | ||
| (listener as any).$call(fake, event); | ||
| } else if (listener !== null && typeof listener === "object" && typeof (listener as any).handleEvent === "function") { | ||
| (listener as any).handleEvent.$call(listener, event); | ||
| } | ||
| } | ||
|
|
||
| function parentPortAddEventListener( | ||
| type: string, | ||
| listener: EventListener | EventListenerObject | null, | ||
| options?: boolean | AddEventListenerOptions, | ||
| ): void { | ||
| if (listener === null || listener === undefined) return; | ||
| const capture = typeof options === "boolean" ? options : !!options?.capture; | ||
| const once = typeof options === "object" && options !== null && !!options.once; | ||
| // `AbortSignal` auto-removal is driven from the native EventTarget in C++, | ||
| // so it would bypass our JS `parentPortRemoveEventListener` wrapper and | ||
| // leak the event-loop refcount our capture forwarder holds on `self`. | ||
| // Strip the signal from the options we pass inward and re-implement abort | ||
| // ourselves via an abort listener that routes through the JS remove path. | ||
| const signal = | ||
| typeof options === "object" && options !== null ? ((options as AddEventListenerOptions).signal ?? null) : null; | ||
| if (signal && signal.aborted) return; | ||
| // Only `message` / `messageerror` events are dispatched on `self` by the | ||
| // native worker runtime — all other event types (`close`, `error`, …) | ||
| // live purely on `parentPortTarget` and don't need the capture forwarder | ||
| // at all. | ||
| const forwarderType: "message" | "messageerror" | null = | ||
| type === "message" ? "message" : type === "messageerror" ? "messageerror" : null; | ||
| const slot = listenerSlot(type, capture); | ||
| let bucket = trackedByListener.get(listener as object); | ||
| if (bucket?.$has(slot)) { | ||
| // Duplicate add — EventTarget already dedupes, so no-op. | ||
| return; | ||
| } | ||
| // Wrap so we can release the loop ref when the listener is removed, | ||
claude[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // including the implicit removal done by `{ once: true }` after firing. | ||
| const wrapped: EventListener = function (event) { | ||
| if (once) { | ||
| const bucketNow = trackedByListener.get(listener as object); | ||
| if (bucketNow?.$get(slot) === entry) { | ||
| bucketNow.$delete(slot); | ||
| if (bucketNow.$size === 0) trackedByListener.delete(listener as object); | ||
| if (forwarderType !== null) releaseListener(forwarderType); | ||
| } | ||
| } | ||
| invokeListener(listener, event); | ||
| }; | ||
| const entry: TrackEntry = { wrapped, once }; | ||
| if (!bucket) { | ||
| bucket = new Map(); | ||
| trackedByListener.set(listener as object, bucket); | ||
| } | ||
| bucket.$set(slot, entry); | ||
| const innerOptions: boolean | AddEventListenerOptions = | ||
| typeof options === "object" && options !== null ? { ...options, signal: undefined } : (options ?? false); | ||
| parentPortTarget.addEventListener(type, wrapped, innerOptions); | ||
| if (forwarderType !== null) acquireListener(forwarderType); | ||
| if (signal) { | ||
|
Comment on lines
+306
to
+312
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 When Extended reasoning...What the bug is and how it manifests The root cause is that The specific code path that triggers it
Why existing code doesn't prevent it The dedup guard in What the impact would be
How to fix it Before creating a new wrapped callback in Step-by-step proof
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The attempted fix in commit 5bc8c4c (fake.removeListener = MessagePort.prototype.off) addresses the single-registration removeListener bypass scenario but does NOT fix the double-registration bug described in this comment. The root cause is in injectFakeEmitter's wrapped() (line 56): listener[wrappedListener] = callback unconditionally overwrites the slot. When parentPort.on('message', fn) is called twice with the same fn:
After two parentPort.off('message', fn) calls:
callback1 is permanently orphaned in trackedByListener and parentPortTarget. messageListenerCount stays at 1 forever: uninstallMessageForwarder() is never called, messageForwarder stays on self, m_messageEventCount > 0 in BunWorkerGlobalScope.cpp, and the worker event loop never exits. The fix requires checking whether fn[wrappedListener] already points to a live entry in trackedByListener before creating a new wrapped callback -- if it does, reuse it. Alternatively, the dedup guard in parentPortAddEventListener (line 285) could be changed to key on the original listener fn rather than the wrapped callback, so that a second registration of the same function is treated as a duplicate and rejected.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch on the wrappedListener-overwrite edge case, but this is a pre-existing property of |
||
| signal.addEventListener( | ||
| "abort", | ||
| () => { | ||
| parentPortRemoveEventListener(type, listener, { capture }); | ||
| }, | ||
| { once: true }, | ||
| ); | ||
| } | ||
|
Comment on lines
+312
to
+320
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 The abort closure installed at lines 312–320 captures Extended reasoning...What the bug is The abort handler installed in The specific code path that triggers it The const bucketNow = trackedByListener.get(listener as object);
if (bucketNow?.$get(slot) === entry) { // identity check against captured entry
...releaseListener(forwarderType);
}The abort handler (lines 312–320) has no such guard — it always proceeds with the removal regardless of which entry currently occupies the slot. Why existing code doesn't prevent it The dedup check earlier in What the impact would be Concrete sequence:
The listener registered in step 3 is silently removed even though it has no AbortSignal. From the worker's perspective, How to fix it Capture the if (signal) {
const capturedEntry = entry;
signal.addEventListener(
'abort',
() => {
// Only remove if this specific registration is still the live one.
if (trackedByListener.get(listener as object)?.$get(slot) === capturedEntry) {
parentPortRemoveEventListener(type, listener, { capture });
}
},
{ once: true },
);
}Step-by-step proof
The inconsistency with the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, but this edge case (register-with-signal → manual-remove → re-register-without-signal → abort fires) is not exercised by any of the five issues this PR fixes, and the ASAN CI lane is currently green on the latest revision. I want to keep this PR focused and avoid another churn cycle; the identity-guard fix (capture |
||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| function parentPortRemoveEventListener( | ||
| type: string, | ||
| listener: EventListener | EventListenerObject | null, | ||
| options?: boolean | EventListenerOptions, | ||
| ): void { | ||
| if (listener === null || listener === undefined) return; | ||
| const capture = typeof options === "boolean" ? options : !!options?.capture; | ||
| const bucket = trackedByListener.get(listener as object); | ||
| if (!bucket) return; | ||
| const slot = listenerSlot(type, capture); | ||
| const entry = bucket.$get(slot); | ||
| if (!entry) return; | ||
| bucket.$delete(slot); | ||
| if (bucket.$size === 0) trackedByListener.delete(listener as object); | ||
| parentPortTarget.removeEventListener(type, entry.wrapped, options); | ||
| if (type === "message") releaseListener("message"); | ||
| else if (type === "messageerror") releaseListener("messageerror"); | ||
| } | ||
|
|
||
| Object.defineProperty(fake, "onmessage", { | ||
| get() { | ||
| return self.onmessage; | ||
| return parentPortOnMessageHandler; | ||
| }, | ||
| set(value) { | ||
| self.onmessage = value; | ||
| // Replace the previously-installed wrapper, if any. | ||
| if (parentPortOnMessageWrapper !== null) { | ||
| parentPortTarget.removeEventListener("message", parentPortOnMessageWrapper); | ||
| parentPortOnMessageWrapper = null; | ||
| releaseListener("message"); | ||
| } | ||
| parentPortOnMessageHandler = typeof value === "function" ? value : null; | ||
| if (parentPortOnMessageHandler !== null) { | ||
| const handler = parentPortOnMessageHandler; | ||
| parentPortOnMessageWrapper = (event: Event) => { | ||
| try { | ||
| handler.$call(fake, event as MessageEvent); | ||
| } catch (err) { | ||
| queueMicrotask(() => { | ||
| throw err; | ||
| }); | ||
| } | ||
| }; | ||
| parentPortTarget.addEventListener("message", parentPortOnMessageWrapper); | ||
| acquireListener("message"); | ||
| } | ||
| }, | ||
| }); | ||
|
|
||
| Object.defineProperty(fake, "onmessageerror", { | ||
| get() { | ||
| return self.onmessageerror; | ||
| return parentPortOnMessageErrorHandler; | ||
| }, | ||
| set(value) { | ||
| self.onmessageerror = value; | ||
| if (parentPortOnMessageErrorWrapper !== null) { | ||
| parentPortTarget.removeEventListener("messageerror", parentPortOnMessageErrorWrapper); | ||
| parentPortOnMessageErrorWrapper = null; | ||
| releaseListener("messageerror"); | ||
| } | ||
| parentPortOnMessageErrorHandler = typeof value === "function" ? value : null; | ||
| if (parentPortOnMessageErrorHandler !== null) { | ||
| const handler = parentPortOnMessageErrorHandler; | ||
| parentPortOnMessageErrorWrapper = (event: Event) => { | ||
| try { | ||
| handler.$call(fake, event as MessageEvent); | ||
| } catch (err) { | ||
| queueMicrotask(() => { | ||
| throw err; | ||
| }); | ||
| } | ||
| }; | ||
| parentPortTarget.addEventListener("messageerror", parentPortOnMessageErrorWrapper); | ||
| acquireListener("messageerror"); | ||
| } | ||
| }, | ||
| }); | ||
|
|
||
|
|
@@ -182,22 +430,35 @@ | |
| }); | ||
|
|
||
| Object.defineProperty(fake, "addEventListener", { | ||
| value: self.addEventListener.bind(self), | ||
| value: parentPortAddEventListener, | ||
| }); | ||
|
|
||
| Object.defineProperty(fake, "removeEventListener", { | ||
| value: self.removeEventListener.bind(self), | ||
| value: parentPortRemoveEventListener, | ||
| }); | ||
claude[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Object.defineProperty(fake, "removeListener", { | ||
| value: self.removeEventListener.bind(self), | ||
| enumerable: false, | ||
| Object.defineProperty(fake, "dispatchEvent", { | ||
| value: parentPortTarget.dispatchEvent.bind(parentPortTarget), | ||
| }); | ||
|
|
||
| // `addListener`/`removeListener` must NOT bypass the `injectFakeEmitter` | ||
| // wrapping layer: `parentPort.on('message', fn)` wraps `fn` into a callback | ||
| // and stores `fn[wrappedListener] = callback`. A matching | ||
| // `parentPort.removeListener('message', fn)` has to resolve the wrapped | ||
| // callback before calling `removeEventListener`, otherwise it would try to | ||
| // remove `fn` itself (which was never registered) and silently leak the | ||
| // event-loop refcount held by our capture forwarder. `fake.on`/`fake.off` | ||
| // — inherited from `MessagePort.prototype` via `injectFakeEmitter` — do | ||
| // that resolution correctly, so we alias to them. | ||
| Object.defineProperty(fake, "addListener", { | ||
| value: self.addEventListener.bind(self), | ||
| value: (MessagePort.prototype as any).on, | ||
| enumerable: false, | ||
| }); | ||
|
|
||
| Object.defineProperty(fake, "removeListener", { | ||
| value: (MessagePort.prototype as any).off, | ||
| enumerable: false, | ||
| }); | ||
|
Check failure on line 461 in src/js/node/worker_threads.ts
|
||
|
Comment on lines
+458
to
461
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Aliasing Extended reasoning...What the bug is In the PR, The specific code path
Why this is a regression Before the PR, After the PR, routing through Addressing the refutation The refutation claims the observable outcome (worker hang) is identical before and after the PR, because the stale Impact With Fix Either (A) clear
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same class of edge case as the prior comment (stale |
||
|
|
||
| return fake; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.