diff --git a/docs/docs/api/Dispatcher.md b/docs/docs/api/Dispatcher.md index 4ceba0122d5..21105b22e90 100644 --- a/docs/docs/api/Dispatcher.md +++ b/docs/docs/api/Dispatcher.md @@ -193,7 +193,7 @@ Returns: `Boolean` - `false` if dispatcher is busy and further dispatch calls wo * **path** `string` * **method** `string` * **reset** `boolean` (optional) - Default: `false` - If `false`, the request will attempt to create a long-living connection by sending the `connection: keep-alive` header,otherwise will attempt to close it immediately after response by sending `connection: close` within the request and closing the socket afterwards. -* **body** `string | Buffer | Uint8Array | stream.Readable | Iterable | AsyncIterable | null` (optional) - Default: `null` +* **body** `string | Buffer | Uint8Array | stream.Readable | Iterable | AsyncIterable | (() => string | Buffer | Uint8Array | stream.Readable | Iterable | AsyncIterable) | null` (optional) - Default: `null`. A factory function can be used to create a new body for each retry when used with the [`retry`](./Retry.md) interceptor. * **headers** `UndiciHeaders` (optional) - Default: `null`. * **query** `Record | null` (optional) - Default: `null` - Query string params to be embedded in the request URL. Note that both keys and values of query are encoded using `encodeURIComponent`. If for some reason you need to send them unencoded, embed query params into path directly instead. * **idempotent** `boolean` (optional) - Default: `true` if `method` is `'HEAD'` or `'GET'` - Whether the requests can be safely retried or not. If `false` the request won't be sent until all preceding requests in the pipeline has completed. diff --git a/docs/docs/api/RetryAgent.md b/docs/docs/api/RetryAgent.md index fdf394fbad4..39f2b7a6137 100644 --- a/docs/docs/api/RetryAgent.md +++ b/docs/docs/api/RetryAgent.md @@ -23,7 +23,7 @@ Returns: `ProxyAgent` - **minTimeout** `number` (optional) - Minimum number of milliseconds to wait before retrying. Default: `500` (half a second) - **timeoutFactor** `number` (optional) - Factor to multiply the timeout by for each retry attempt. Default: `2` - **retryAfter** `boolean` (optional) - It enables automatic retry after the `Retry-After` header is received. Default: `true` -- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'PUT', 'HEAD', 'OPTIONS', 'DELETE']` +- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE', 'TRACE']` - **statusCodes** `number[]` (optional) - Array of HTTP status codes to retry. Default: `[429, 500, 502, 503, 504]` - **errorCodes** `string[]` (optional) - Array of Error codes to retry. Default: `['ECONNRESET', 'ECONNREFUSED', 'ENOTFOUND', 'ENETDOWN','ENETUNREACH', 'EHOSTDOWN', 'UND_ERR_SOCKET']` diff --git a/docs/docs/api/RetryHandler.md b/docs/docs/api/RetryHandler.md index 07e7a2dac33..0b6af9c444d 100644 --- a/docs/docs/api/RetryHandler.md +++ b/docs/docs/api/RetryHandler.md @@ -26,7 +26,7 @@ Extends: [`Dispatch.DispatchOptions`](/docs/docs/api/Dispatcher.md#parameter-dis - **minTimeout** `number` (optional) - Minimum number of milliseconds to wait before retrying. Default: `500` (half a second) - **timeoutFactor** `number` (optional) - Factor to multiply the timeout by for each retry attempt. Default: `2` - **retryAfter** `boolean` (optional) - It enables automatic retry after the `Retry-After` header is received. Default: `true` -- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'PUT', 'HEAD', 'OPTIONS', 'DELETE']` +- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE', 'TRACE']` - **statusCodes** `number[]` (optional) - Array of HTTP status codes to retry. Default: `[429, 500, 502, 503, 504]` - **errorCodes** `string[]` (optional) - Array of Error codes to retry. Default: `['ECONNRESET', 'ECONNREFUSED', 'ENOTFOUND', 'ENETDOWN','ENETUNREACH', 'EHOSTDOWN', 'UND_ERR_SOCKET']` @@ -47,7 +47,7 @@ It represents the retry state for a given request. - **handler** Extends [`Dispatch.DispatchHandler`](/docs/docs/api/Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted. >__Note__: The `RetryHandler` does not retry over stateful bodies (e.g. streams, AsyncIterable) as those, once consumed, are left in a state that cannot be reutilized. For these situations the `RetryHandler` will identify ->the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`. +>the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`. If you need to retry with a stateful body, pass a factory function that creates a new body for each retry: `body: () => fs.createReadStream('file.txt')`. Examples: diff --git a/lib/core/util.js b/lib/core/util.js index 0c72e5d598a..408ce4ca303 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -25,6 +25,65 @@ class BodyAsyncIterable { } } +/** + * A factory that creates a new body instance for each dispatch. + * This enables retryable requests with bodies that can be recreated, + * such as file streams or async generators. + * @private + */ +class BodyFactory { + /** + * @param {() => (string|Buffer|Uint8Array|Readable|null|FormData|AsyncIterable)} factory + */ + constructor (factory) { + this[kBody] = factory + /** @type {any} */ + this.body = null + } + + /** + * Creates a new body instance from the factory function. + * Called by the retry handler before each dispatch. + * @param {any} existingBody + */ + create (existingBody) { + const newBody = this[kBody]() + // Store the newly created body + this.body = wrapRequestBody(newBody) + // If the existing body is still referenced, update it too + if (existingBody) { + Object.setPrototypeOf(existingBody, Object.getPrototypeOf(this.body)) + Object.assign(existingBody, this.body) + } + return this.body + } + + /** + * @returns {AsyncIterableIterator} + */ + async * [Symbol.asyncIterator] () { + // If body hasn't been created yet, create it now + if (!this.body) { + this.create(null) + } + // Yield from the wrapped body + if (isStream(this.body)) { + for await (const chunk of this.body) { + yield chunk + } + } else if (this.body && typeof this.body[Symbol.asyncIterator] === 'function') { + for await (const chunk of this.body) { + yield chunk + } + } else if (typeof this.body === 'string') { + yield Buffer.from(this.body) + } else if (this.body instanceof Buffer || ArrayBuffer.isView(this.body) || this.body instanceof ArrayBuffer) { + yield this.body instanceof Buffer ? this.body : Buffer.from(this.body) + } + // For FormData and Blob, the body will be handled by the Request class + } +} + function noop () {} /** @@ -32,7 +91,12 @@ function noop () {} * @returns {*} */ function wrapRequestBody (body) { - if (isStream(body)) { + // Support body as a factory function: body: () => createReadStream('file') + if (typeof body === 'function') { + const factory = new BodyFactory(body) + factory.create(null) + return factory + } else if (isStream(body)) { // TODO (fix): Provide some way for the user to cache the file to e.g. /tmp // so that it can be dispatched again? // TODO (fix): Do we need 100-expect support to provide a way to do this properly? @@ -586,6 +650,10 @@ function assertRequestHandler (handler, method, upgrade) { * @returns {boolean} */ function isDisturbed (body) { + // BodyFactory instances can be recreated, so they're never "disturbed" + if (body instanceof BodyFactory) { + return false + } // TODO (fix): Why is body[kBodyUsed] needed? return !!(body && (stream.isDisturbed(body) || body[kBodyUsed])) } @@ -990,5 +1058,6 @@ module.exports = { safeHTTPMethods: Object.freeze(['GET', 'HEAD', 'OPTIONS', 'TRACE']), wrapRequestBody, setupConnectTimeout, - getProtocolFromUrlString + getProtocolFromUrlString, + BodyFactory } diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index f40fbb2c75b..daf8d142d96 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -336,7 +336,7 @@ class Client extends DispatcherBase { } [kDispatch] (opts, handler) { - const request = new Request(this[kUrl].origin, opts, handler) + const request = new Request(this[kUrl].origin, { ...opts, body: util.wrapRequestBody(opts.body) }, handler) this[kQueue].push(request) if (this[kResuming]) { diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index ee2f69a2043..ccabc99f405 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -6,7 +6,8 @@ const { RequestRetryError } = require('../core/errors') const { isDisturbed, parseRangeHeader, - wrapRequestBody + wrapRequestBody, + BodyFactory } = require('../core/util') function calculateRetryAfterHeader (retryAfter) { @@ -345,6 +346,11 @@ class RetryHandler { } } + // If body is a factory, recreate it for this retry attempt + if (this.opts.body instanceof BodyFactory) { + this.opts.body.create(this.opts.body) + } + try { this.retryCountCheckpoint = this.retryCount this.dispatch(this.opts, this) diff --git a/test/retry-body-factory.js b/test/retry-body-factory.js new file mode 100644 index 00000000000..7b906664ffb --- /dev/null +++ b/test/retry-body-factory.js @@ -0,0 +1,148 @@ +'use strict' + +const { createServer } = require('node:http') +const { test, after } = require('node:test') +const { tspl } = require('@matteo.collina/tspl') +const { Client, interceptors } = require('..') +const { Readable } = require('node:stream') +const { once } = require('node:events') + +test('retry with body factory function - stream', async t => { + t = tspl(t, { plan: 2 }) + + let requestCount = 0 + const server = createServer((req, res) => { + requestCount++ + req.on('data', () => {}) + req.on('end', () => { + if (requestCount < 2) { + res.writeHead(500, { 'content-type': 'application/json' }) + res.end('{"message": "failed"}') + } else { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end('{"message": "success"}') + } + }) + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(interceptors.retry({ + minTimeout: 100, + maxTimeout: 100, + methods: ['POST'] + })) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'POST', + path: '/', + headers: { 'content-type': 'application/json' }, + // Body factory function - creates a new stream for each retry + body: () => Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))) + }) + + t.equal(response.statusCode, 200) + t.equal(requestCount, 2, 'server received 2 requests') +}) + +test('retry with body factory function - async generator', async t => { + t = tspl(t, { plan: 2 }) + + let requestCount = 0 + const server = createServer((req, res) => { + requestCount++ + req.on('data', () => {}) + req.on('end', () => { + if (requestCount < 2) { + res.writeHead(500, { 'content-type': 'application/json' }) + res.end('{"message": "failed"}') + } else { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end('{"message": "success"}') + } + }) + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(interceptors.retry({ + minTimeout: 100, + maxTimeout: 100, + methods: ['POST'] + })) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'POST', + path: '/', + headers: { 'content-type': 'application/json' }, + // Body factory function returning async generator + body: () => (async function * () { + yield '{"hello": "world"}' + })() + }) + + t.equal(response.statusCode, 200) + t.equal(requestCount, 2, 'server received 2 requests') +}) + +test('non-retryable body (regular stream) fails on retry', async t => { + t = tspl(t, { plan: 2 }) + + let requestCount = 0 + const server = createServer((req, res) => { + requestCount++ + res.writeHead(500, { 'content-type': 'application/json' }) + res.end('{"message": "failed"}') + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(interceptors.retry({ + minTimeout: 100, + maxTimeout: 100, + methods: ['POST'], + throwOnError: false + })) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'POST', + path: '/', + headers: { 'content-type': 'application/json' }, + body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))) + }) + + // The retry should not happen because the stream was consumed, + // so we should get the 500 response directly + t.equal(response.statusCode, 500) + t.equal(requestCount, 1, 'only 1 request sent (stream consumed)') +}) diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index de2545bc84d..857096f590e 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -102,7 +102,7 @@ declare namespace Dispatcher { path: string; method: HttpMethod; /** Default: `null` */ - body?: string | Buffer | Uint8Array | Readable | null | FormData; + body?: string | Buffer | Uint8Array | Readable | null | FormData | (() => string | Buffer | Uint8Array | Readable | null | FormData); /** Default: `null` */ headers?: UndiciHeaders; /** Query string params to be embedded in the request URL. Default: `null` */