Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ Returns: `Boolean` - `false` if dispatcher is busy and further dispatch calls wo
* **path** `string`
* **method** `string`
* **reset** `boolean` (optional) - Default: `false` - If `false`, the request will attempt to create a long-living connection by sending the `connection: keep-alive` header,otherwise will attempt to close it immediately after response by sending `connection: close` within the request and closing the socket afterwards.
* **body** `string | Buffer | Uint8Array | stream.Readable | Iterable | AsyncIterable | null` (optional) - Default: `null`
* **body** `string | Buffer | Uint8Array | stream.Readable | Iterable | AsyncIterable | (() => string | Buffer | Uint8Array | stream.Readable | Iterable | AsyncIterable) | null` (optional) - Default: `null`. A factory function can be used to create a new body for each retry when used with the [`retry`](./Retry.md) interceptor.
* **headers** `UndiciHeaders` (optional) - Default: `null`.
* **query** `Record<string, any> | null` (optional) - Default: `null` - Query string params to be embedded in the request URL. Note that both keys and values of query are encoded using `encodeURIComponent`. If for some reason you need to send them unencoded, embed query params into path directly instead.
* **idempotent** `boolean` (optional) - Default: `true` if `method` is `'HEAD'` or `'GET'` - Whether the requests can be safely retried or not. If `false` the request won't be sent until all preceding requests in the pipeline has completed.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/api/RetryAgent.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Returns: `ProxyAgent`
- **minTimeout** `number` (optional) - Minimum number of milliseconds to wait before retrying. Default: `500` (half a second)
- **timeoutFactor** `number` (optional) - Factor to multiply the timeout by for each retry attempt. Default: `2`
- **retryAfter** `boolean` (optional) - It enables automatic retry after the `Retry-After` header is received. Default: `true`
- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'PUT', 'HEAD', 'OPTIONS', 'DELETE']`
- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE', 'TRACE']`
- **statusCodes** `number[]` (optional) - Array of HTTP status codes to retry. Default: `[429, 500, 502, 503, 504]`
- **errorCodes** `string[]` (optional) - Array of Error codes to retry. Default: `['ECONNRESET', 'ECONNREFUSED', 'ENOTFOUND', 'ENETDOWN','ENETUNREACH', 'EHOSTDOWN', 'UND_ERR_SOCKET']`

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/api/RetryHandler.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Extends: [`Dispatch.DispatchOptions`](/docs/docs/api/Dispatcher.md#parameter-dis
- **minTimeout** `number` (optional) - Minimum number of milliseconds to wait before retrying. Default: `500` (half a second)
- **timeoutFactor** `number` (optional) - Factor to multiply the timeout by for each retry attempt. Default: `2`
- **retryAfter** `boolean` (optional) - It enables automatic retry after the `Retry-After` header is received. Default: `true`
- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'PUT', 'HEAD', 'OPTIONS', 'DELETE']`
- **methods** `string[]` (optional) - Array of HTTP methods to retry. Default: `['GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE', 'TRACE']`
- **statusCodes** `number[]` (optional) - Array of HTTP status codes to retry. Default: `[429, 500, 502, 503, 504]`
- **errorCodes** `string[]` (optional) - Array of Error codes to retry. Default: `['ECONNRESET', 'ECONNREFUSED', 'ENOTFOUND', 'ENETDOWN','ENETUNREACH', 'EHOSTDOWN', 'UND_ERR_SOCKET']`

Expand All @@ -47,7 +47,7 @@ It represents the retry state for a given request.
- **handler** Extends [`Dispatch.DispatchHandler`](/docs/docs/api/Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted.

>__Note__: The `RetryHandler` does not retry over stateful bodies (e.g. streams, AsyncIterable) as those, once consumed, are left in a state that cannot be reutilized. For these situations the `RetryHandler` will identify
>the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`.
>the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`. If you need to retry with a stateful body, pass a factory function that creates a new body for each retry: `body: () => fs.createReadStream('file.txt')`.

Examples:

Expand Down
73 changes: 71 additions & 2 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,78 @@ class BodyAsyncIterable {
}
}

/**
* A factory that creates a new body instance for each dispatch.
* This enables retryable requests with bodies that can be recreated,
* such as file streams or async generators.
* @private
*/
class BodyFactory {
/**
* @param {() => (string|Buffer|Uint8Array|Readable|null|FormData|AsyncIterable)} factory
*/
constructor (factory) {
this[kBody] = factory
/** @type {any} */
this.body = null
}

/**
* Creates a new body instance from the factory function.
* Called by the retry handler before each dispatch.
* @param {any} existingBody
*/
create (existingBody) {
const newBody = this[kBody]()
// Store the newly created body
this.body = wrapRequestBody(newBody)
// If the existing body is still referenced, update it too
if (existingBody) {
Object.setPrototypeOf(existingBody, Object.getPrototypeOf(this.body))
Object.assign(existingBody, this.body)
}
return this.body
}

/**
* @returns {AsyncIterableIterator<Buffer|Uint8Array>}
*/
async * [Symbol.asyncIterator] () {
// If body hasn't been created yet, create it now
if (!this.body) {
this.create(null)
}
// Yield from the wrapped body
if (isStream(this.body)) {
for await (const chunk of this.body) {
yield chunk
}
} else if (this.body && typeof this.body[Symbol.asyncIterator] === 'function') {
for await (const chunk of this.body) {
yield chunk
}
} else if (typeof this.body === 'string') {
yield Buffer.from(this.body)
} else if (this.body instanceof Buffer || ArrayBuffer.isView(this.body) || this.body instanceof ArrayBuffer) {
yield this.body instanceof Buffer ? this.body : Buffer.from(this.body)
}
// For FormData and Blob, the body will be handled by the Request class
}
}

function noop () {}

/**
* @param {*} body
* @returns {*}
*/
function wrapRequestBody (body) {
if (isStream(body)) {
// Support body as a factory function: body: () => createReadStream('file')
if (typeof body === 'function') {
const factory = new BodyFactory(body)
factory.create(null)
return factory
} else if (isStream(body)) {
// TODO (fix): Provide some way for the user to cache the file to e.g. /tmp
// so that it can be dispatched again?
// TODO (fix): Do we need 100-expect support to provide a way to do this properly?
Expand Down Expand Up @@ -586,6 +650,10 @@ function assertRequestHandler (handler, method, upgrade) {
* @returns {boolean}
*/
function isDisturbed (body) {
// BodyFactory instances can be recreated, so they're never "disturbed"
if (body instanceof BodyFactory) {
return false
}
// TODO (fix): Why is body[kBodyUsed] needed?
return !!(body && (stream.isDisturbed(body) || body[kBodyUsed]))
}
Expand Down Expand Up @@ -990,5 +1058,6 @@ module.exports = {
safeHTTPMethods: Object.freeze(['GET', 'HEAD', 'OPTIONS', 'TRACE']),
wrapRequestBody,
setupConnectTimeout,
getProtocolFromUrlString
getProtocolFromUrlString,
BodyFactory
}
2 changes: 1 addition & 1 deletion lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class Client extends DispatcherBase {
}

[kDispatch] (opts, handler) {
const request = new Request(this[kUrl].origin, opts, handler)
const request = new Request(this[kUrl].origin, { ...opts, body: util.wrapRequestBody(opts.body) }, handler)

this[kQueue].push(request)
if (this[kResuming]) {
Expand Down
8 changes: 7 additions & 1 deletion lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const { RequestRetryError } = require('../core/errors')
const {
isDisturbed,
parseRangeHeader,
wrapRequestBody
wrapRequestBody,
BodyFactory
} = require('../core/util')

function calculateRetryAfterHeader (retryAfter) {
Expand Down Expand Up @@ -345,6 +346,11 @@ class RetryHandler {
}
}

// If body is a factory, recreate it for this retry attempt
if (this.opts.body instanceof BodyFactory) {
this.opts.body.create(this.opts.body)
}

try {
this.retryCountCheckpoint = this.retryCount
this.dispatch(this.opts, this)
Expand Down
148 changes: 148 additions & 0 deletions test/retry-body-factory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
'use strict'

const { createServer } = require('node:http')
const { test, after } = require('node:test')
const { tspl } = require('@matteo.collina/tspl')
const { Client, interceptors } = require('..')
const { Readable } = require('node:stream')
const { once } = require('node:events')

test('retry with body factory function - stream', async t => {
t = tspl(t, { plan: 2 })

let requestCount = 0
const server = createServer((req, res) => {
requestCount++
req.on('data', () => {})
req.on('end', () => {
if (requestCount < 2) {
res.writeHead(500, { 'content-type': 'application/json' })
res.end('{"message": "failed"}')
} else {
res.writeHead(200, { 'content-type': 'application/json' })
res.end('{"message": "success"}')
}
})
})

server.listen(0)

await once(server, 'listening')

const client = new Client(
`http://localhost:${server.address().port}`
).compose(interceptors.retry({
minTimeout: 100,
maxTimeout: 100,
methods: ['POST']
}))

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const response = await client.request({
method: 'POST',
path: '/',
headers: { 'content-type': 'application/json' },
// Body factory function - creates a new stream for each retry
body: () => Readable.from(Buffer.from(JSON.stringify({ hello: 'world' })))
})

t.equal(response.statusCode, 200)
t.equal(requestCount, 2, 'server received 2 requests')
})

test('retry with body factory function - async generator', async t => {
t = tspl(t, { plan: 2 })

let requestCount = 0
const server = createServer((req, res) => {
requestCount++
req.on('data', () => {})
req.on('end', () => {
if (requestCount < 2) {
res.writeHead(500, { 'content-type': 'application/json' })
res.end('{"message": "failed"}')
} else {
res.writeHead(200, { 'content-type': 'application/json' })
res.end('{"message": "success"}')
}
})
})

server.listen(0)

await once(server, 'listening')

const client = new Client(
`http://localhost:${server.address().port}`
).compose(interceptors.retry({
minTimeout: 100,
maxTimeout: 100,
methods: ['POST']
}))

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const response = await client.request({
method: 'POST',
path: '/',
headers: { 'content-type': 'application/json' },
// Body factory function returning async generator
body: () => (async function * () {
yield '{"hello": "world"}'
})()
})

t.equal(response.statusCode, 200)
t.equal(requestCount, 2, 'server received 2 requests')
})

test('non-retryable body (regular stream) fails on retry', async t => {
t = tspl(t, { plan: 2 })

let requestCount = 0
const server = createServer((req, res) => {
requestCount++
res.writeHead(500, { 'content-type': 'application/json' })
res.end('{"message": "failed"}')
})

server.listen(0)

await once(server, 'listening')

const client = new Client(
`http://localhost:${server.address().port}`
).compose(interceptors.retry({
minTimeout: 100,
maxTimeout: 100,
methods: ['POST'],
throwOnError: false
}))

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const response = await client.request({
method: 'POST',
path: '/',
headers: { 'content-type': 'application/json' },
body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' })))
})

// The retry should not happen because the stream was consumed,
// so we should get the 500 response directly
t.equal(response.statusCode, 500)
t.equal(requestCount, 1, 'only 1 request sent (stream consumed)')
})
2 changes: 1 addition & 1 deletion types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ declare namespace Dispatcher {
path: string;
method: HttpMethod;
/** Default: `null` */
body?: string | Buffer | Uint8Array | Readable | null | FormData;
body?: string | Buffer | Uint8Array | Readable | null | FormData | (() => string | Buffer | Uint8Array | Readable | null | FormData);
/** Default: `null` */
headers?: UndiciHeaders;
/** Query string params to be embedded in the request URL. Default: `null` */
Expand Down
Loading