diff --git a/deps/undici/src/lib/core/diagnostics.js b/deps/undici/src/lib/core/diagnostics.js index 454ab379be3802..f7f26b2143670a 100644 --- a/deps/undici/src/lib/core/diagnostics.js +++ b/deps/undici/src/lib/core/diagnostics.js @@ -21,6 +21,7 @@ const channels = { headers: diagnosticsChannel.channel('undici:request:headers'), trailers: diagnosticsChannel.channel('undici:request:trailers'), error: diagnosticsChannel.channel('undici:request:error'), + fetchEventSourceMessage: diagnosticsChannel.channel('undici:fetch:eventsource:message'), // WebSocket open: diagnosticsChannel.channel('undici:websocket:open'), close: diagnosticsChannel.channel('undici:websocket:close'), diff --git a/deps/undici/src/lib/core/request.js b/deps/undici/src/lib/core/request.js index 2b2633f3d04224..d23b0a470bed69 100644 --- a/deps/undici/src/lib/core/request.js +++ b/deps/undici/src/lib/core/request.js @@ -96,7 +96,8 @@ class Request { servername, throwOnError, maxRedirections, - typeOfService + typeOfService, + fetchRequest }, handler) { if (typeof path !== 'string') { throw new InvalidArgumentError('path must be a string') @@ -260,7 +261,7 @@ class Request { this[kHandler] = handler if (channels.create.hasSubscribers) { - channels.create.publish({ request: this }) + channels.create.publish({ request: this, fetchRequest }) } } diff --git a/deps/undici/src/lib/web/eventsource/eventsource.js b/deps/undici/src/lib/web/eventsource/eventsource.js index 32dcf0e423e06f..26fb6ac40b330a 100644 --- a/deps/undici/src/lib/web/eventsource/eventsource.js +++ b/deps/undici/src/lib/web/eventsource/eventsource.js @@ -182,6 +182,7 @@ class EventSource extends EventTarget { // 12. Set request's initiator type to "other". initRequest.initiator = 'other' + initRequest.eventSource = true initRequest.urlList = [new URL(this.#url)] diff --git a/deps/undici/src/lib/web/fetch/index.js b/deps/undici/src/lib/web/fetch/index.js index 85d4c2e9feda0c..9d2ac7949f4693 100644 --- a/deps/undici/src/lib/web/fetch/index.js +++ b/deps/undici/src/lib/web/fetch/index.js @@ -11,8 +11,15 @@ const { getResponseState } = require('./response') const { HeadersList } = require('./headers') -const { Request, cloneRequest, getRequestDispatcher, getRequestState } = require('./request') +const { + Request, + cloneRequest, + getRequestDispatcher, + getRequestState, + kOriginalRequest +} = require('./request') const zlib = require('node:zlib') +const { channels } = require('../../core/diagnostics') const { makePolicyContainer, clonePolicyContainer, @@ -57,9 +64,10 @@ const { subresourceSet } = require('./constants') const EE = require('node:events') -const { Readable, pipeline, finished, isErrored, isReadable } = require('node:stream') +const { Readable, Transform, pipeline, finished, isErrored, isReadable } = require('node:stream') const { addAbortListener, bufferToLowerCasedHeaderName } = require('../../core/util') const { dataURLProcessor, serializeAMimeType, minimizeSupportedMimeType } = require('./data-url') +const { EventSourceStream } = require('../eventsource/eventsource-stream') const { getGlobalDispatcher } = require('../../global') const { webidl } = require('../webidl') const { STATUS_CODES } = require('node:http') @@ -75,6 +83,45 @@ const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esb /** @type {import('buffer').resolveObjectURL} */ let resolveObjectURL +function createEventSourceMessageTransform (request) { + const originalRequest = request[kOriginalRequest] ?? request + const parser = new EventSourceStream({ + eventSourceSettings: { lastEventId: '' }, + push (event) { + if (event === null) { + return false + } + channels.fetchEventSourceMessage.publish({ + request: originalRequest, + eventName: event.type, + eventId: event.options.lastEventId, + data: event.options.data + }) + return true + } + }) + + let transform + transform = new Transform({ + transform (chunk, encoding, callback) { + parser.write(chunk, encoding, (err) => { + callback(err, chunk) + }) + }, + flush (callback) { + parser.end(callback) + }, + destroy (err, callback) { + parser.destroy(err) + callback(err) + } + }) + + parser.on('error', (err) => transform.destroy(err)) + + return transform +} + class Fetch extends EE { constructor (dispatcher) { super() @@ -2153,6 +2200,7 @@ async function httpNetworkFetch ( method: request.method, body: agent.isMockActive ? request.body && (request.body.source || request.body.stream) : body, headers: request.headersList.entries, + fetchRequest: request[kOriginalRequest] ?? request, maxRedirections: 0, upgrade: request.mode === 'websocket' ? 'websocket' : undefined, ...(allowH2 === false ? { allowH2 } : null) @@ -2213,6 +2261,7 @@ async function httpNetworkFetch ( } } const location = headersList.get('location', true) + const mimeType = extractMimeType(headersList) this.body = new Readable({ read: () => controller.resume() }) @@ -2272,18 +2321,32 @@ async function httpNetworkFetch ( } const onError = (err) => this.onResponseError(controller, err) + const originalRequest = request[kOriginalRequest] ?? request + const shouldPublishEventSourceMessages = + channels.fetchEventSourceMessage.hasSubscribers && + originalRequest.eventSource !== true && + mimeType !== 'failure' && + mimeType.essence === 'text/event-stream' + const eventSourceMessageTransform = shouldPublishEventSourceMessages ? + createEventSourceMessageTransform(request) : + null + const responseBody = decoders.length || eventSourceMessageTransform ? + pipeline( + this.body, + ...decoders, + ...(eventSourceMessageTransform ? [eventSourceMessageTransform] : []), + (err) => { + if (err) { + this.onResponseError(controller, err) + } + }).on('error', onError) : + this.body.on('error', onError) resolve({ status, statusText, headersList, - body: decoders.length - ? pipeline(this.body, ...decoders, (err) => { - if (err) { - this.onResponseError(controller, err) - } - }).on('error', onError) - : this.body.on('error', onError) + body: responseBody }) }, diff --git a/deps/undici/src/lib/web/fetch/request.js b/deps/undici/src/lib/web/fetch/request.js index 6ef40f99920840..859f6e7406d26b 100644 --- a/deps/undici/src/lib/web/fetch/request.js +++ b/deps/undici/src/lib/web/fetch/request.js @@ -29,6 +29,7 @@ const assert = require('node:assert') const { getMaxListeners, setMaxListeners, defaultMaxListeners } = require('node:events') const kAbortController = Symbol('abortController') +const kOriginalRequest = Symbol('originalRequest') const requestFinalizer = new FinalizationRegistry(({ signal, abort }) => { signal.removeEventListener('abort', abort) @@ -934,6 +935,7 @@ function cloneRequest (request) { // 1. Let newRequest be a copy of request, except for its body. const newRequest = makeRequest({ ...request, body: null }) + newRequest[kOriginalRequest] = request[kOriginalRequest] ?? request // 2. If request’s body is non-null, set newRequest’s body to the // result of cloning request’s body. @@ -1110,6 +1112,7 @@ module.exports = { makeRequest, fromInnerRequest, cloneRequest, + kOriginalRequest, getRequestDispatcher, getRequestState } diff --git a/deps/undici/undici.js b/deps/undici/undici.js index 6be4f5b522b775..d622789567d76e 100644 --- a/deps/undici/undici.js +++ b/deps/undici/undici.js @@ -2465,6 +2465,7 @@ var require_diagnostics = __commonJS({ headers: diagnosticsChannel.channel("undici:request:headers"), trailers: diagnosticsChannel.channel("undici:request:trailers"), error: diagnosticsChannel.channel("undici:request:error"), + fetchEventSourceMessage: diagnosticsChannel.channel("undici:fetch:eventsource:message"), // WebSocket open: diagnosticsChannel.channel("undici:websocket:open"), close: diagnosticsChannel.channel("undici:websocket:close"), @@ -2748,14 +2749,15 @@ var require_request = __commonJS({ blocking, upgrade, headersTimeout, - bodyTimeout, - reset, - expectContinue, - servername, - throwOnError, - maxRedirections, - typeOfService - }, handler) { + bodyTimeout, + reset, + expectContinue, + servername, + throwOnError, + maxRedirections, + typeOfService, + fetchRequest + }, handler) { if (typeof path !== "string") { throw new InvalidArgumentError("path must be a string"); } else if (path[0] !== "/" && !(path.startsWith("http://") || path.startsWith("https://")) && method !== "CONNECT") { @@ -2874,7 +2876,7 @@ var require_request = __commonJS({ this.servername = servername || getServerName(this.host) || null; this[kHandler] = handler; if (channels.create.hasSubscribers) { - channels.create.publish({ request: this }); + channels.create.publish({ request: this, fetchRequest }); } } onBodySent(chunk) { @@ -11760,6 +11762,7 @@ var require_request2 = __commonJS({ var assert = require("node:assert"); var { getMaxListeners, setMaxListeners, defaultMaxListeners } = require("node:events"); var kAbortController = /* @__PURE__ */ Symbol("abortController"); + var kOriginalRequest = /* @__PURE__ */ Symbol("originalRequest"); var requestFinalizer = new FinalizationRegistry(({ signal, abort }) => { signal.removeEventListener("abort", abort); }); @@ -12341,6 +12344,7 @@ var require_request2 = __commonJS({ __name(makeRequest, "makeRequest"); function cloneRequest(request) { const newRequest = makeRequest({ ...request, body: null }); + newRequest[kOriginalRequest] = request[kOriginalRequest] ?? request; if (request.body != null) { newRequest.body = cloneBody(request.body); } @@ -12488,6 +12492,7 @@ var require_request2 = __commonJS({ makeRequest, fromInnerRequest, cloneRequest, + kOriginalRequest, getRequestDispatcher, getRequestState }; @@ -12649,8 +12654,9 @@ var require_fetch = __commonJS({ getResponseState } = require_response(); var { HeadersList } = require_headers(); - var { Request, cloneRequest, getRequestDispatcher, getRequestState } = require_request2(); + var { Request, cloneRequest, getRequestDispatcher, getRequestState, kOriginalRequest } = require_request2(); var zlib = require("node:zlib"); + var { channels } = require_diagnostics(); var { makePolicyContainer, clonePolicyContainer, @@ -12695,9 +12701,10 @@ var require_fetch = __commonJS({ subresourceSet } = require_constants3(); var EE = require("node:events"); - var { Readable, pipeline, finished, isErrored, isReadable } = require("node:stream"); + var { Readable, Transform, pipeline, finished, isErrored, isReadable } = require("node:stream"); var { addAbortListener, bufferToLowerCasedHeaderName } = require_util(); var { dataURLProcessor, serializeAMimeType, minimizeSupportedMimeType } = require_data_url(); + var { EventSourceStream } = require_eventsource_stream(); var { getGlobalDispatcher: getGlobalDispatcher2 } = require_global2(); var { webidl } = require_webidl(); var { STATUS_CODES } = require("node:http"); @@ -12706,6 +12713,42 @@ var require_fetch = __commonJS({ var GET_OR_HEAD = ["GET", "HEAD"]; var defaultUserAgent = typeof __UNDICI_IS_NODE__ !== "undefined" || true ? "node" : "undici"; var resolveObjectURL; + function createEventSourceMessageTransform(request) { + const originalRequest = request[kOriginalRequest] ?? request; + const parser = new EventSourceStream({ + eventSourceSettings: { lastEventId: "" }, + push(event) { + if (event === null) { + return false; + } + channels.fetchEventSourceMessage.publish({ + request: originalRequest, + eventName: event.type, + eventId: event.options.lastEventId, + data: event.options.data + }); + return true; + } + }); + let transform; + transform = new Transform({ + transform(chunk, encoding, callback) { + parser.write(chunk, encoding, (err) => { + callback(err, chunk); + }); + }, + flush(callback) { + parser.end(callback); + }, + destroy(err, callback) { + parser.destroy(err); + callback(err); + } + }); + parser.on("error", (err) => transform.destroy(err)); + return transform; + } + __name(createEventSourceMessageTransform, "createEventSourceMessageTransform"); var Fetch = class extends EE { static { __name(this, "Fetch"); @@ -13616,6 +13659,7 @@ var require_fetch = __commonJS({ method: request.method, body: agent.isMockActive ? request.body && (request.body.source || request.body.stream) : body2, headers: request.headersList.entries, + fetchRequest: request[kOriginalRequest] ?? request, maxRedirections: 0, upgrade: request.mode === "websocket" ? "websocket" : void 0, ...allowH2 === false ? { allowH2 } : null @@ -13656,6 +13700,7 @@ var require_fetch = __commonJS({ } } const location = headersList.get("location", true); + const mimeType = extractMimeType(headersList); this.body = new Readable({ read: /* @__PURE__ */ __name(() => controller.resume(), "read") }); const willFollow = location && request.redirect === "follow" && redirectStatusSet.has(status); const decoders = []; @@ -13700,15 +13745,24 @@ var require_fetch = __commonJS({ } } const onError = /* @__PURE__ */ __name((err) => this.onResponseError(controller, err), "onError"); + const originalRequest = request[kOriginalRequest] ?? request; + const shouldPublishEventSourceMessages = channels.fetchEventSourceMessage.hasSubscribers && originalRequest.eventSource !== true && mimeType !== "failure" && mimeType.essence === "text/event-stream"; + const eventSourceMessageTransform = shouldPublishEventSourceMessages ? createEventSourceMessageTransform(request) : null; + const responseBody = decoders.length || eventSourceMessageTransform ? pipeline( + this.body, + ...decoders, + ...(eventSourceMessageTransform ? [eventSourceMessageTransform] : []), + (err) => { + if (err) { + this.onResponseError(controller, err); + } + } + ).on("error", onError) : this.body.on("error", onError); resolve({ status, statusText, headersList, - body: decoders.length ? pipeline(this.body, ...decoders, (err) => { - if (err) { - this.onResponseError(controller, err); - } - }).on("error", onError) : this.body.on("error", onError) + body: responseBody }); }, onResponseData(controller, chunk) { @@ -15944,6 +15998,7 @@ var require_eventsource = __commonJS({ initRequest.headersList = [["accept", { name: "accept", value: "text/event-stream" }]]; initRequest.cache = "no-store"; initRequest.initiator = "other"; + initRequest.eventSource = true; initRequest.urlList = [new URL(this.#url)]; this.#request = makeRequest(initRequest); this.#connect(); diff --git a/doc/api/inspector.md b/doc/api/inspector.md index 4cdbeb62d0e71a..f4f8f6dea3b958 100644 --- a/doc/api/inspector.md +++ b/doc/api/inspector.md @@ -572,6 +572,20 @@ This feature is only available with the `--experimental-network-inspection` flag Broadcasts the `Network.responseReceived` event to connected frontends. This event indicates that HTTP response is available. +### `inspector.Network.eventSourceMessageReceived([params])` + + + +* `params` {Object} + +This feature is only available with the `--experimental-network-inspection` flag enabled. + +Broadcasts the `Network.eventSourceMessageReceived` event to connected frontends. +This event indicates that a parsed SSE message has been received from a network request. + ### `inspector.Network.loadingFinished([params])`