diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index c6d0633dde9..530279cfcf0 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -129,22 +129,270 @@ function receiveMessageOnPort(port: MessagePort) { // TODO: parent port emulation is not complete function fakeParentPort() { + // Node's `parentPort` has its own message dispatch that is independent of the + // worker's global scope. Bun's native worker runtime dispatches parent messages + // onto the global scope (`self.onmessage` / `self.addEventListener('message')`), + // which matches web-worker semantics but not Node's — and it means emscripten- + // style code that does + // + // parentPort.on('message', (m) => onmessage({ data: m })); + // self.onmessage = handleMessage; + // + // sees every message delivered TWICE: once by the automatic `self.onmessage` + // dispatch and once by the explicit forwarding inside the `parentPort.on` + // listener. See https://github.com/oven-sh/bun/issues/29211. + // + // Fix: give `parentPort` its own `EventTarget`, re-dispatch incoming messages + // on it, and stop the native dispatch from reaching `self.onmessage` / + // `self.addEventListener('message', …)` so it matches Node semantics. const fake = Object.create(MessagePort.prototype); + const parentPortTarget = new EventTarget(); + + // Forwarders: installed lazily on `self` only while at least one user + // listener is registered on the parentPort. They intercept the native + // parent-message dispatch, stop immediate propagation (so `self.onmessage` + // and `self.addEventListener('message', …)` handlers on the global scope + // never see parent messages — matching Node), and re-dispatch on + // `parentPortTarget`. Installing a `message` listener on `self` keeps the + // event loop alive via `onDidChangeListenerImpl` in + // `BunWorkerGlobalScope.cpp`, so we only install while parentPort is + // actually being used — that way a worker that never touches `parentPort` + // exits cleanly when its module finishes executing. + // + // `onmessage` / `onmessageerror` are spec'd as implicit event listeners: the + // setter replaces at most one listener, firing in the order it was first + // assigned relative to other listeners on the same target. + let parentPortOnMessageWrapper: ((event: Event) => void) | null = null; + let parentPortOnMessageHandler: ((event: MessageEvent) => unknown) | null = null; + let parentPortOnMessageErrorWrapper: ((event: Event) => void) | null = null; + let parentPortOnMessageErrorHandler: ((event: MessageEvent) => unknown) | null = null; + + // Separate counters per event type — installing a `message` listener on + // `self` keeps the event loop alive (via `onDidChangeListenerImpl` in + // `BunWorkerGlobalScope.cpp`, which only tracks `messageEvent`), so a + // worker that only registers a `messageerror` handler must NOT pull in the + // `message` forwarder. + let messageListenerCount = 0; + let messageErrorListenerCount = 0; + let messageForwarder: ((event: Event) => void) | null = null; + let messageErrorForwarder: ((event: Event) => void) | null = null; + + const makeForwarder = (type: "message" | "messageerror") => (event: Event) => { + // Stop the native dispatch from reaching `self.onmessage` and any + // `self.addEventListener('message', …)` handlers — in Node parent + // messages are only visible through `parentPort`, not through the + // global scope. + event.stopImmediatePropagation(); + const messageEvent = event as MessageEvent; + // Preserve `ports` so `worker.postMessage(data, [port])` still surfaces + // the transferred MessagePort(s) to `parentPort` listeners. + const nativePorts = messageEvent.ports; + const clone = new MessageEvent(type, { + data: messageEvent.data, + ports: nativePorts && nativePorts.length > 0 ? $Array.from(nativePorts) : undefined, + }); + parentPortTarget.dispatchEvent(clone); + }; + + function installMessageForwarder() { + if (messageForwarder !== null) return; + messageForwarder = makeForwarder("message"); + // Capture phase so we run before any user-installed bubbling listener + // on the global scope (if any). + self.addEventListener("message", messageForwarder, { capture: true }); + } + function uninstallMessageForwarder() { + if (messageForwarder === null) return; + self.removeEventListener("message", messageForwarder, { capture: true } as any); + messageForwarder = null; + } + function installMessageErrorForwarder() { + if (messageErrorForwarder !== null) return; + messageErrorForwarder = makeForwarder("messageerror"); + self.addEventListener("messageerror", messageErrorForwarder, { capture: true }); + } + function uninstallMessageErrorForwarder() { + if (messageErrorForwarder === null) return; + self.removeEventListener("messageerror", messageErrorForwarder, { capture: true } as any); + messageErrorForwarder = null; + } + + function acquireListener(type: "message" | "messageerror") { + if (type === "message") { + if (messageListenerCount++ === 0) installMessageForwarder(); + } else { + if (messageErrorListenerCount++ === 0) installMessageErrorForwarder(); + } + } + + function releaseListener(type: "message" | "messageerror") { + if (type === "message") { + if (messageListenerCount > 0 && --messageListenerCount === 0) uninstallMessageForwarder(); + } else { + if (messageErrorListenerCount > 0 && --messageErrorListenerCount === 0) uninstallMessageErrorForwarder(); + } + } + + // Wrap `addEventListener` / `removeEventListener` so we can track user + // listener lifetime on `parentPortTarget` and install / uninstall the + // forwarders on the global scope accordingly. Each (listener, type, capture) + // triple gets wrapped exactly once — duplicate adds are no-ops per the DOM + // spec — and the original listener object is the map key so that a + // `remove(type, original, {capture})` call finds the wrapped copy. + type TrackEntry = { wrapped: EventListener; once: boolean }; + // `${type}:${capture ? 1 : 0}` — a listener registered with different + // (type, capture) combinations lives in separate slots, matching spec. + const trackedByListener = new WeakMap>(); + + function listenerSlot(type: string, capture: boolean): string { + return capture ? type + ":1" : type + ":0"; + } + + function invokeListener(listener: EventListener | EventListenerObject, event: Event): void { + // DOM EventTarget accepts either a bare function or an object with a + // `handleEvent` method. Dispatch correctly for both forms. + if (typeof listener === "function") { + (listener as any).$call(fake, event); + } else if (listener !== null && typeof listener === "object" && typeof (listener as any).handleEvent === "function") { + (listener as any).handleEvent.$call(listener, event); + } + } + + function parentPortAddEventListener( + type: string, + listener: EventListener | EventListenerObject | null, + options?: boolean | AddEventListenerOptions, + ): void { + if (listener === null || listener === undefined) return; + const capture = typeof options === "boolean" ? options : !!options?.capture; + const once = typeof options === "object" && options !== null && !!options.once; + // `AbortSignal` auto-removal is driven from the native EventTarget in C++, + // so it would bypass our JS `parentPortRemoveEventListener` wrapper and + // leak the event-loop refcount our capture forwarder holds on `self`. + // Strip the signal from the options we pass inward and re-implement abort + // ourselves via an abort listener that routes through the JS remove path. + const signal = + typeof options === "object" && options !== null ? ((options as AddEventListenerOptions).signal ?? null) : null; + if (signal && signal.aborted) return; + // Only `message` / `messageerror` events are dispatched on `self` by the + // native worker runtime — all other event types (`close`, `error`, …) + // live purely on `parentPortTarget` and don't need the capture forwarder + // at all. + const forwarderType: "message" | "messageerror" | null = + type === "message" ? "message" : type === "messageerror" ? "messageerror" : null; + const slot = listenerSlot(type, capture); + let bucket = trackedByListener.get(listener as object); + if (bucket?.$has(slot)) { + // Duplicate add — EventTarget already dedupes, so no-op. + return; + } + // Wrap so we can release the loop ref when the listener is removed, + // including the implicit removal done by `{ once: true }` after firing. + const wrapped: EventListener = function (event) { + if (once) { + const bucketNow = trackedByListener.get(listener as object); + if (bucketNow?.$get(slot) === entry) { + bucketNow.$delete(slot); + if (bucketNow.$size === 0) trackedByListener.delete(listener as object); + if (forwarderType !== null) releaseListener(forwarderType); + } + } + invokeListener(listener, event); + }; + const entry: TrackEntry = { wrapped, once }; + if (!bucket) { + bucket = new Map(); + trackedByListener.set(listener as object, bucket); + } + bucket.$set(slot, entry); + const innerOptions: boolean | AddEventListenerOptions = + typeof options === "object" && options !== null ? { ...options, signal: undefined } : (options ?? false); + parentPortTarget.addEventListener(type, wrapped, innerOptions); + if (forwarderType !== null) acquireListener(forwarderType); + if (signal) { + signal.addEventListener( + "abort", + () => { + parentPortRemoveEventListener(type, listener, { capture }); + }, + { once: true }, + ); + } + } + + function parentPortRemoveEventListener( + type: string, + listener: EventListener | EventListenerObject | null, + options?: boolean | EventListenerOptions, + ): void { + if (listener === null || listener === undefined) return; + const capture = typeof options === "boolean" ? options : !!options?.capture; + const bucket = trackedByListener.get(listener as object); + if (!bucket) return; + const slot = listenerSlot(type, capture); + const entry = bucket.$get(slot); + if (!entry) return; + bucket.$delete(slot); + if (bucket.$size === 0) trackedByListener.delete(listener as object); + parentPortTarget.removeEventListener(type, entry.wrapped, options); + if (type === "message") releaseListener("message"); + else if (type === "messageerror") releaseListener("messageerror"); + } + Object.defineProperty(fake, "onmessage", { get() { - return self.onmessage; + return parentPortOnMessageHandler; }, set(value) { - self.onmessage = value; + // Replace the previously-installed wrapper, if any. + if (parentPortOnMessageWrapper !== null) { + parentPortTarget.removeEventListener("message", parentPortOnMessageWrapper); + parentPortOnMessageWrapper = null; + releaseListener("message"); + } + parentPortOnMessageHandler = typeof value === "function" ? value : null; + if (parentPortOnMessageHandler !== null) { + const handler = parentPortOnMessageHandler; + parentPortOnMessageWrapper = (event: Event) => { + try { + handler.$call(fake, event as MessageEvent); + } catch (err) { + queueMicrotask(() => { + throw err; + }); + } + }; + parentPortTarget.addEventListener("message", parentPortOnMessageWrapper); + acquireListener("message"); + } }, }); Object.defineProperty(fake, "onmessageerror", { get() { - return self.onmessageerror; + return parentPortOnMessageErrorHandler; }, set(value) { - self.onmessageerror = value; + if (parentPortOnMessageErrorWrapper !== null) { + parentPortTarget.removeEventListener("messageerror", parentPortOnMessageErrorWrapper); + parentPortOnMessageErrorWrapper = null; + releaseListener("messageerror"); + } + parentPortOnMessageErrorHandler = typeof value === "function" ? value : null; + if (parentPortOnMessageErrorHandler !== null) { + const handler = parentPortOnMessageErrorHandler; + parentPortOnMessageErrorWrapper = (event: Event) => { + try { + handler.$call(fake, event as MessageEvent); + } catch (err) { + queueMicrotask(() => { + throw err; + }); + } + }; + parentPortTarget.addEventListener("messageerror", parentPortOnMessageErrorWrapper); + acquireListener("messageerror"); + } }, }); @@ -182,20 +430,33 @@ function fakeParentPort() { }); Object.defineProperty(fake, "addEventListener", { - value: self.addEventListener.bind(self), + value: parentPortAddEventListener, }); Object.defineProperty(fake, "removeEventListener", { - value: self.removeEventListener.bind(self), + value: parentPortRemoveEventListener, }); - Object.defineProperty(fake, "removeListener", { - value: self.removeEventListener.bind(self), - enumerable: false, + Object.defineProperty(fake, "dispatchEvent", { + value: parentPortTarget.dispatchEvent.bind(parentPortTarget), }); + // `addListener`/`removeListener` must NOT bypass the `injectFakeEmitter` + // wrapping layer: `parentPort.on('message', fn)` wraps `fn` into a callback + // and stores `fn[wrappedListener] = callback`. A matching + // `parentPort.removeListener('message', fn)` has to resolve the wrapped + // callback before calling `removeEventListener`, otherwise it would try to + // remove `fn` itself (which was never registered) and silently leak the + // event-loop refcount held by our capture forwarder. `fake.on`/`fake.off` + // — inherited from `MessagePort.prototype` via `injectFakeEmitter` — do + // that resolution correctly, so we alias to them. Object.defineProperty(fake, "addListener", { - value: self.addEventListener.bind(self), + value: (MessagePort.prototype as any).on, + enumerable: false, + }); + + Object.defineProperty(fake, "removeListener", { + value: (MessagePort.prototype as any).off, enumerable: false, }); diff --git a/test/regression/issue/29211.test.ts b/test/regression/issue/29211.test.ts new file mode 100644 index 00000000000..5d560a1386c --- /dev/null +++ b/test/regression/issue/29211.test.ts @@ -0,0 +1,542 @@ +// https://github.com/oven-sh/bun/issues/29211 +// +// In a `node:worker_threads` worker, a message posted from the parent was +// being delivered to BOTH `parentPort` listeners AND the worker's global +// scope (`self.onmessage` / `self.addEventListener('message', …)`). Node only +// delivers parent messages via `parentPort`. +// +// Emscripten-generated pthread code (e.g. `z3-solver`) relies on this: its +// worker does +// +// parentPort.on('message', (msg) => onmessage({ data: msg })); +// self.onmessage = handleMessage; +// +// and expects `handleMessage` to run exactly once per incoming message. Under +// Bun it was running twice — once from the automatic `self.onmessage` dispatch +// and once from the explicit `onmessage({data: msg})` forwarding inside the +// parentPort listener — which tripped the `wasm-instantiate` run-dependency +// assertion inside z3's emscripten bootstrap. +// +// The fix lives in `src/js/node/worker_threads.ts`'s `fakeParentPort`: it now +// gives parentPort its own EventTarget and installs a capture-phase listener +// on `self` that stops immediate propagation, so parent messages never reach +// `self.onmessage` / user listeners on the global scope. + +import { expect, test } from "bun:test"; +import { bunEnv, bunExe, tempDir } from "harness"; + +test.concurrent("parent messages do not fire self.onmessage in a node:worker_threads worker (#29211)", async () => { + using dir = tempDir("issue-29211-self-onmessage", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + globalThis.self = globalThis; + + let handleMessageCalls = 0; + let selfAddListenerCalls = 0; + let globalAddListenerCalls = 0; + + function handleMessage(event) { + handleMessageCalls++; + } + + // z3-solver's exact pattern: install a parentPort listener that + // manually forwards to the global onmessage, then set self.onmessage. + parentPort.on('message', (msg) => { + onmessage({ data: msg }); + }); + self.onmessage = handleMessage; + + // Additional user-style listeners on the global scope — in Node these + // never fire for parent messages either, so they must be silent here. + self.addEventListener('message', () => { selfAddListenerCalls++; }); + globalThis.addEventListener('message', () => { globalAddListenerCalls++; }); + + parentPort.on('message', (msg) => { + if (msg.cmd === 'report') { + parentPort.postMessage({ + handleMessageCalls, + selfAddListenerCalls, + globalAddListenerCalls, + }); + } + }); + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const result = await new Promise((resolve, reject) => { + w.on('error', reject); + w.on('message', (msg) => { + if (msg && typeof msg.handleMessageCalls === 'number') resolve(msg); + }); + w.postMessage({ n: 1 }); + w.postMessage({ n: 2 }); + w.postMessage({ n: 3 }); + w.postMessage({ cmd: 'report' }); + }); + await w.terminate(); + console.log(JSON.stringify(result)); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // Four parent messages were posted (three data + one 'report'). The + // `parentPort.on('message', (msg) => onmessage({data:msg}))` forwarding + // listener runs once per incoming parent message and manually calls + // handleMessage, so handleMessage should run exactly 4 times — not 8 + // (the pre-fix behavior where `self.onmessage` also auto-fired on every + // parent message, doubling every dispatch). + // + // Listeners on the global scope — via `self.addEventListener('message', …)` + // or `globalThis.addEventListener('message', …)` — must never fire for + // parent messages (Node semantics). + expect(JSON.parse(stdout.trim())).toEqual({ + handleMessageCalls: 4, + selfAddListenerCalls: 0, + globalAddListenerCalls: 0, + }); + expect(exitCode).toBe(0); +}); + +test.concurrent("parentPort delivers each parent message exactly once to every listener variant (#29211)", async () => { + using dir = tempDir("issue-29211-listener-variants", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + + const counts = { + on: 0, + addEventListener: 0, + onmessage: 0, + }; + + parentPort.on('message', () => { counts.on++; }); + parentPort.addEventListener('message', () => { counts.addEventListener++; }); + parentPort.onmessage = () => { counts.onmessage++; }; + + parentPort.on('message', (msg) => { + if (msg && msg.cmd === 'report') { + parentPort.postMessage(counts); + } + }); + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const result = await new Promise((resolve, reject) => { + w.on('error', reject); + w.on('message', (msg) => { + if (msg && typeof msg.on === 'number') resolve(msg); + }); + // Five data messages, then a report. The report message also counts + // as a delivery for every 'message' listener that doesn't filter, + // so each counter should see 6 total deliveries. + for (let i = 0; i < 5; i++) w.postMessage({ n: i }); + w.postMessage({ cmd: 'report' }); + }); + await w.terminate(); + console.log(JSON.stringify(result)); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // Each listener variant must see exactly 6 deliveries — one per message + // posted. Pre-fix, `parentPort.onmessage` was 0 (never fired at all + // because the parentPort getter/setter aliased `self.onmessage` but the + // parent message only went to the global event target, not through the + // onmessage handler slot on the same target after an event handler was + // assigned via the parentPort proxy). + expect(JSON.parse(stdout.trim())).toEqual({ + on: 6, + addEventListener: 6, + onmessage: 6, + }); + expect(exitCode).toBe(0); +}); + +test.concurrent("parentPort.off removes a listener (#29211)", async () => { + using dir = tempDir("issue-29211-off", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + + let fired = 0; + function handler(msg) { + fired++; + if (fired === 1) { + parentPort.off('message', handler); + } + if (msg && msg.cmd === 'report') { + parentPort.postMessage({ fired }); + } + } + parentPort.on('message', handler); + + // A second listener that stays live so we can receive the 'report' + // command after 'handler' has unsubscribed. + parentPort.on('message', (msg) => { + if (msg && msg.cmd === 'report') { + parentPort.postMessage({ fired }); + } + }); + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const result = await new Promise((resolve, reject) => { + w.on('error', reject); + w.on('message', (msg) => { + if (msg && typeof msg.fired === 'number') resolve(msg); + }); + w.postMessage({ n: 1 }); + w.postMessage({ n: 2 }); + w.postMessage({ n: 3 }); + w.postMessage({ cmd: 'report' }); + }); + await w.terminate(); + console.log(JSON.stringify(result)); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // `handler` should fire exactly once — it unsubscribes itself on the + // first invocation — then stay silent for the remaining messages. + expect(JSON.parse(stdout.trim())).toEqual({ fired: 1 }); + expect(exitCode).toBe(0); +}); + +test.concurrent("transferred MessagePorts are still reachable via parentPort listeners (#29211)", async () => { + using dir = tempDir("issue-29211-ports", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + parentPort.on('message', (_msg, ports) => { + // Node passes ports as a second argument; the DOM-style MessageEvent + // also exposes them via event.ports. We test both surfaces below via + // addEventListener. + }); + parentPort.addEventListener('message', (event) => { + const incomingPort = event.ports && event.ports[0]; + if (!incomingPort) { + parentPort.postMessage({ ok: false, reason: 'no port on event' }); + return; + } + incomingPort.start?.(); + incomingPort.addEventListener('message', (portEvent) => { + parentPort.postMessage({ ok: true, echoed: portEvent.data }); + }); + incomingPort.postMessage('hello-from-worker'); + }); + `, + "main.mjs": String.raw` + import { Worker, MessageChannel } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const { port1, port2 } = new MessageChannel(); + const result = await new Promise((resolve, reject) => { + w.on('error', reject); + port1.on('message', (data) => { + // Echo back so the worker receives something on the transferred + // port and we can assert end-to-end delivery. + port1.postMessage('reply:' + data); + }); + w.on('message', resolve); + w.postMessage({ cmd: 'use-port' }, [port2]); + }); + port1.close(); + await w.terminate(); + console.log(JSON.stringify(result)); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // The transferred port must reach the parentPort listener via `event.ports` + // and must be usable for round-trip messaging. Before the port-preservation + // fix, `event.ports` was an empty array and the transferred MessagePort was + // silently dropped. + expect(JSON.parse(stdout.trim())).toEqual({ ok: true, echoed: "reply:hello-from-worker" }); + expect(exitCode).toBe(0); +}); + +test.concurrent("parentPort.addEventListener accepts an EventListenerObject (#29211)", async () => { + using dir = tempDir("issue-29211-listener-object", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + const listener = { + fired: 0, + handleEvent(event) { + this.fired++; + if (event.data && event.data.cmd === 'report') { + parentPort.postMessage({ fired: this.fired }); + } + }, + }; + parentPort.addEventListener('message', listener); + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const result = await new Promise((resolve, reject) => { + w.on('error', reject); + w.on('message', resolve); + w.postMessage({ n: 1 }); + w.postMessage({ n: 2 }); + w.postMessage({ cmd: 'report' }); + }); + await w.terminate(); + console.log(JSON.stringify(result)); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // Three parent messages — the EventListenerObject's `handleEvent` should + // run once per message (3 times). Pre-fix, the wrapper unconditionally + // invoked `listener.$call` which throws on a non-function. + expect(JSON.parse(stdout.trim())).toEqual({ fired: 3 }); + expect(exitCode).toBe(0); +}); + +test.concurrent("parentPort.addEventListener with AbortSignal exits cleanly after abort (#29211)", async () => { + using dir = tempDir("issue-29211-abort-signal", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + // Add a parentPort listener tied to an AbortSignal, then abort it + // immediately. After abort, the worker should have zero parentPort + // listeners and should exit naturally when its module finishes — it + // must NOT hang waiting for messages. Before the AbortSignal-leak + // fix, the capture forwarder installed on 'self' when the listener + // was registered stayed attached after the native EventTarget removed + // the wrapped listener, keeping the event loop alive forever. + const ac = new AbortController(); + parentPort.addEventListener('message', () => {}, { signal: ac.signal }); + ac.abort(); + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const exitCode = await new Promise((resolve, reject) => { + w.on('error', reject); + w.on('exit', resolve); + // If the worker hangs, this timer fires first and we terminate it + // ourselves — producing a distinguishing exit code of -1. + setTimeout(() => { w.terminate(); resolve(-1); }, 3000).unref(); + }); + console.log(JSON.stringify({ exitCode })); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // The worker must exit naturally (exit code 0), not be killed by the + // 3-second timeout (which would report -1). Pre-fix, the capture + // forwarder stayed installed after AbortSignal detached the wrapped + // listener, pinning the event loop and forcing the terminate fallback. + expect(JSON.parse(stdout.trim())).toEqual({ exitCode: 0 }); + expect(exitCode).toBe(0); +}); + +test.concurrent("parentPort listener for non-message events does not block parent messages (#29211)", async () => { + // Regression for a gating bug in `parentPortAddEventListener`: registering + // a listener for a non-message event (e.g. via `parentPort.once('close', + // …)` or `.on('error', …)`) used to bump `listenerCount`, which in turn + // installed the capture-phase `message` forwarder on `self`. The forwarder + // re-dispatched every incoming message on `parentPortTarget` — but that + // target had no 'message' listener, so all parent messages were silently + // dropped. Only listeners for 'message' / 'messageerror' should install + // the forwarder. + using dir = tempDir("issue-29211-non-message-event", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + // Register a non-message listener FIRST. This must not affect message + // delivery. + parentPort.on('close', () => {}); + let received = 0; + parentPort.on('message', (msg) => { + received++; + if (msg && msg.cmd === 'report') { + parentPort.postMessage({ received }); + } + }); + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const result = await new Promise((resolve, reject) => { + w.on('error', reject); + w.on('message', (msg) => { + if (msg && typeof msg.received === 'number') resolve(msg); + }); + w.postMessage({ n: 1 }); + w.postMessage({ n: 2 }); + w.postMessage({ cmd: 'report' }); + }); + await w.terminate(); + console.log(JSON.stringify(result)); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // All three parent messages should reach the 'message' listener — the + // earlier 'close' listener must not route anything into the forwarder. + expect(JSON.parse(stdout.trim())).toEqual({ received: 3 }); + expect(exitCode).toBe(0); +}); + +test.concurrent("parentPort.removeListener unsubscribes through the wrapped-listener slot (#29211)", async () => { + // `parentPort.on('message', fn)` wraps `fn` into a callback (setting + // `fn[wrappedListener] = callback`) and registers the callback. A matching + // `parentPort.removeListener('message', fn)` must resolve the wrapped + // callback before calling `removeEventListener`, or `handler` stays + // subscribed and its loop-ref is never released. + using dir = tempDir("issue-29211-removelistener", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + let fired = 0; + function handler(msg) { + fired++; + if (msg && msg.cmd === 'remove-me') { + parentPort.removeListener('message', handler); + // A second live listener reports the count that 'handler' saw + // for each subsequent 'report' command. Pre-fix, this ran AFTER + // handler itself had also fired again on every subsequent + // message, so 'fired' would keep climbing. + parentPort.on('message', (msg2) => { + if (msg2 && msg2.cmd === 'report') parentPort.postMessage({ fired }); + }); + parentPort.postMessage({ acked: true }); + } + } + parentPort.on('message', handler); + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + w.on('error', e => { console.error('worker error', e); process.exit(2); }); + const result = await new Promise((resolve) => { + let acked = false; + w.on('message', (msg) => { + if (msg && msg.acked) { + acked = true; + // After removal is confirmed, post some data messages followed + // by a report to see whether 'handler' keeps firing. + w.postMessage({ n: 1 }); + w.postMessage({ n: 2 }); + w.postMessage({ cmd: 'report' }); + } else if (acked && msg && typeof msg.fired === 'number') { + resolve(msg); + } + }); + w.postMessage({ cmd: 'remove-me' }); + }); + console.log(JSON.stringify(result)); + process.exit(0); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + // `handler` fired exactly once — for the 'remove-me' command that removed + // it. After that it should be detached, so 'fired' must stay at 1 even + // though three more messages were posted. Pre-fix, `removeListener` + // bypassed the wrapped-listener lookup, `handler` stayed subscribed, and + // 'fired' reached 4. + expect(JSON.parse(stdout.trim())).toEqual({ fired: 1 }); + expect(exitCode).toBe(0); +}); + +test.concurrent("parentPort.onmessageerror alone does not keep the event loop alive (#29211)", async () => { + // Registering only a `messageerror` handler on parentPort must NOT install + // the capture-phase `message` forwarder on `self`. Pre-fix, both forwarders + // were installed as a pair — so a worker that only cared about + // `messageerror` would pin `m_messageEventCount` on the global scope + // forever and hang after its module finished executing. + using dir = tempDir("issue-29211-onmessageerror-only", { + "worker.mjs": String.raw` + import { parentPort } from 'node:worker_threads'; + parentPort.onmessageerror = () => {}; + `, + "main.mjs": String.raw` + import { Worker } from 'node:worker_threads'; + const w = new Worker(new URL('./worker.mjs', import.meta.url)); + const exitCode = await new Promise((resolve, reject) => { + w.on('error', reject); + w.on('exit', resolve); + // Hard watchdog: if the worker hangs, terminate and surface -1 so + // the test assertion below distinguishes a natural exit from a + // leaked event-loop ref. + setTimeout(() => { w.terminate(); resolve(-1); }, 3000).unref(); + }); + console.log(JSON.stringify({ exitCode })); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), String(dir) + "/main.mjs"], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + expect(JSON.parse(stdout.trim())).toEqual({ exitCode: 0 }); + expect(exitCode).toBe(0); +});