diff --git a/lib/api/readable.js b/lib/api/readable.js index cdede959980..5ebd04ebe83 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -1,6 +1,7 @@ 'use strict' const assert = require('node:assert') +const { addAbortListener } = require('node:events') const { Readable } = require('node:stream') const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors') const util = require('../core/util') @@ -293,10 +294,10 @@ class BodyReadable extends Readable { const onAbort = () => { this.destroy(signal.reason ?? new AbortError()) } - signal.addEventListener('abort', onAbort) + const abortListener = addAbortListener(signal, onAbort) this .on('close', function () { - signal.removeEventListener('abort', onAbort) + abortListener[Symbol.dispose]() if (signal.aborted) { reject(signal.reason ?? new AbortError()) } else { diff --git a/lib/core/util.js b/lib/core/util.js index 0c72e5d598a..68965133b93 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -6,7 +6,7 @@ const { IncomingMessage } = require('node:http') const stream = require('node:stream') const net = require('node:net') const { stringify } = require('node:querystring') -const { EventEmitter: EE } = require('node:events') +const { EventEmitter: EE, addAbortListener: addAbortListenerNative } = require('node:events') const timers = require('../util/timers') const { InvalidArgumentError, ConnectTimeoutError } = require('./errors') const { headerNameLowerCasedRecord } = require('./constants') @@ -678,7 +678,12 @@ function isFormDataLike (object) { } function addAbortListener (signal, listener) { - if ('addEventListener' in signal) { + if (signal instanceof AbortSignal) { + const disposable = addAbortListenerNative(signal, listener) + return () => disposable[Symbol.dispose]() + } + + if (typeof signal.addEventListener === 'function') { signal.addEventListener('abort', listener, { once: true }) return () => signal.removeEventListener('abort', listener) } diff --git a/lib/web/websocket/stream/websocketstream.js b/lib/web/websocket/stream/websocketstream.js index d8061658fd9..eca2daa5bc8 100644 --- a/lib/web/websocket/stream/websocketstream.js +++ b/lib/web/websocket/stream/websocketstream.js @@ -1,5 +1,6 @@ 'use strict' +const { addAbortListener } = require('node:events') const { environmentSettingsObject } = require('../../fetch/util') const { states, opcodes, sentCloseFrameState } = require('../constants') const { webidl } = require('../../webidl') @@ -132,7 +133,7 @@ class WebSocketStream { } // 8.3. Add the following abort steps to signal : - signal.addEventListener('abort', () => { + addAbortListener(signal, () => { // 8.3.1. If the WebSocket connection is not yet established : [WSP] if (!isEstablished(this.#handler.readyState)) { // 8.3.1.1. Fail the WebSocket connection . @@ -148,7 +149,7 @@ class WebSocketStream { // Set this 's handshake aborted to true. this.#handshakeAborted = true } - }, { once: true }) + }) } // 9. Let client be this 's relevant settings object . diff --git a/test/node-test/util.js b/test/node-test/util.js index 267486869f8..3110c743c6c 100644 --- a/test/node-test/util.js +++ b/test/node-test/util.js @@ -20,6 +20,35 @@ test('isStream', () => { assert.ok(util.isStream(ee) === false) }) +test('addAbortListener supports AbortSignal', async () => { + const ac = new AbortController() + let calls = 0 + + util.addAbortListener(ac.signal, () => { + calls++ + }) + + ac.abort() + await new Promise((resolve) => setImmediate(resolve)) + + assert.equal(calls, 1) +}) + +test('addAbortListener removes native AbortSignal listener', async () => { + const ac = new AbortController() + let calls = 0 + + const remove = util.addAbortListener(ac.signal, () => { + calls++ + }) + + remove() + ac.abort() + await new Promise((resolve) => setImmediate(resolve)) + + assert.equal(calls, 0) +}) + test('getServerName', () => { assert.equal(util.getServerName('1.1.1.1'), '') assert.equal(util.getServerName('1.1.1.1:443'), '') diff --git a/test/websocket/stream/abort-before-open.js b/test/websocket/stream/abort-before-open.js new file mode 100644 index 00000000000..88f13a2323b --- /dev/null +++ b/test/websocket/stream/abort-before-open.js @@ -0,0 +1,43 @@ +'use strict' + +const { test } = require('node:test') +const { createServer } = require('node:http') + +const { WebSocketStream } = require('../../..') + +test('WebSocketStream aborts before handshake completes', async (t) => { + const sockets = new Set() + const server = createServer() + + server.on('upgrade', (req, socket) => { + sockets.add(socket) + socket.on('close', () => sockets.delete(socket)) + }) + + await new Promise((resolve) => server.listen(0, resolve)) + + t.after(async () => { + for (const socket of sockets) { + socket.destroy() + } + + await new Promise((resolve) => server.close(resolve)) + }) + + const ac = new AbortController() + const wss = new WebSocketStream(`ws://localhost:${server.address().port}`, { + signal: ac.signal + }) + + ac.abort(new Error('abort before open')) + + const [opened, closed] = await Promise.allSettled([wss.opened, wss.closed]) + + t.assert.strictEqual(opened.status, 'rejected') + t.assert.strictEqual(opened.reason.name, 'WebSocketError') + t.assert.strictEqual(opened.reason.message, 'Socket never opened') + + t.assert.strictEqual(closed.status, 'rejected') + t.assert.strictEqual(closed.reason.name, 'WebSocketError') + t.assert.strictEqual(closed.reason.message, 'unclean close') +})