Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 37 additions & 39 deletions lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { InvalidArgumentError, MaxOriginsReachedError } = require('../core/errors')
const { kClients, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols')
const { kBusy, kClients, kConnected, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
Expand Down Expand Up @@ -65,7 +65,7 @@ class Agent extends DispatcherBase {

get [kRunning] () {
let ret = 0
for (const { dispatcher } of this[kClients].values()) {
for (const dispatcher of this[kClients].values()) {
ret += dispatcher[kRunning]
}
return ret
Expand All @@ -86,54 +86,52 @@ class Agent extends DispatcherBase {
throw new MaxOriginsReachedError()
}

const result = this[kClients].get(key)
let dispatcher = result && result.dispatcher
let dispatcher = this[kClients].get(key)
if (!dispatcher) {
const closeClientIfUnused = (connected) => {
const result = this[kClients].get(key)
if (result) {
if (connected) result.count -= 1
if (result.count <= 0) {
this[kClients].delete(key)
if (!result.dispatcher.destroyed) {
result.dispatcher.close()
}
}
dispatcher = this[kFactory](opts.origin, allowH2 === false
? { ...this[kOptions], allowH2: false }
: this[kOptions])

let hasOrigin = false
for (const entry of this[kClients].values()) {
if (entry.origin === origin) {
hasOrigin = true
break
}
}
const closeClientIfUnused = () => {
if (this[kClients].get(key) !== dispatcher) {
return
}

if (dispatcher[kConnected] > 0 || dispatcher[kBusy]) {
return
}
Comment on lines +100 to +102
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key difference is here - we check active connections as well as kBusy:

get [kBusy] () {
return Boolean(
this[kHTTPContext]?.busy(null) ||
(this[kSize] >= (getPipelining(this) || 1)) ||
this[kPending] > 0
)
}

If we have either active connections or the dispatcher is busy, we don't close it.

This lets us clean up the connection-tracking bookkeeping and just use information already in the dispatcher.


this[kClients].delete(key)
if (!dispatcher.destroyed) {
dispatcher.close()
}

if (!hasOrigin) {
this[kOrigins].delete(origin)
let hasOrigin = false
for (const client of this[kClients].values()) {
if (client[kUrl].origin === dispatcher[kUrl].origin) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slightly updated logic here, again to reduce bookkeeping -- access origin from client[kUrl].origin instead of storing it

hasOrigin = true
break
}
}

if (!hasOrigin) {
this[kOrigins].delete(dispatcher[kUrl].origin)
}
}
dispatcher = this[kFactory](opts.origin, allowH2 === false
? { ...this[kOptions], allowH2: false }
: this[kOptions])

dispatcher
.on('drain', this[kOnDrain])
.on('connect', (origin, targets) => {
const result = this[kClients].get(key)
if (result) {
result.count += 1
}
this[kOnConnect](origin, targets)
})
.on('connect', this[kOnConnect])
.on('disconnect', (origin, targets, err) => {
closeClientIfUnused(true)
closeClientIfUnused()
this[kOnDisconnect](origin, targets, err)
})
.on('connectionError', (origin, targets, err) => {
closeClientIfUnused(false)
closeClientIfUnused()
this[kOnConnectionError](origin, targets, err)
})

this[kClients].set(key, { count: 0, dispatcher, origin })
this[kClients].set(key, dispatcher)
this[kOrigins].add(origin)
}

Expand All @@ -142,7 +140,7 @@ class Agent extends DispatcherBase {

[kClose] () {
const closePromises = []
for (const { dispatcher } of this[kClients].values()) {
for (const dispatcher of this[kClients].values()) {
closePromises.push(dispatcher.close())
}
this[kClients].clear()
Expand All @@ -152,7 +150,7 @@ class Agent extends DispatcherBase {

[kDestroy] (err) {
const destroyPromises = []
for (const { dispatcher } of this[kClients].values()) {
for (const dispatcher of this[kClients].values()) {
destroyPromises.push(dispatcher.destroy(err))
}
this[kClients].clear()
Expand All @@ -162,7 +160,7 @@ class Agent extends DispatcherBase {

get stats () {
const allClientStats = {}
for (const { dispatcher } of this[kClients].values()) {
for (const dispatcher of this[kClients].values()) {
if (dispatcher.stats) {
allClientStats[dispatcher[kUrl].origin] = dispatcher.stats
}
Expand Down
2 changes: 1 addition & 1 deletion lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ function writeH1 (client, request) {
socket[kReset] = reset
}

if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
if (client[kMaxRequests] && ++socket[kCounter] >= client[kMaxRequests]) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slightly unrelated to this PR, but I cherry-picked f68bd53 from #5033 so that my tests pass (otherwise, the number of requests per connection would be off by one).

socket[kReset] = true
}

Expand Down
16 changes: 8 additions & 8 deletions lib/mock/mock-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class MockAgent extends Dispatcher {
}

[kMockAgentSet] (origin, dispatcher) {
this[kClients].set(origin, { count: 0, dispatcher })
this[kClients].set(origin, dispatcher)
}

[kFactory] (origin) {
Expand All @@ -179,9 +179,9 @@ class MockAgent extends Dispatcher {

[kMockAgentGet] (origin) {
// First check if we can immediately find it
const result = this[kClients].get(origin)
if (result?.dispatcher) {
return result.dispatcher
const dispatcher = this[kClients].get(origin)
if (dispatcher) {
return dispatcher
}

// If the origin is not a string create a dummy parent pool and return to user
Expand All @@ -192,11 +192,11 @@ class MockAgent extends Dispatcher {
}

// If we match, create a pool and assign the same dispatches
for (const [keyMatcher, result] of Array.from(this[kClients])) {
if (result && typeof keyMatcher !== 'string' && matchValue(keyMatcher, origin)) {
for (const [keyMatcher, nonExplicitDispatcher] of Array.from(this[kClients])) {
if (nonExplicitDispatcher && typeof keyMatcher !== 'string' && matchValue(keyMatcher, origin)) {
const dispatcher = this[kFactory](origin)
this[kMockAgentSet](origin, dispatcher)
dispatcher[kDispatches] = result.dispatcher[kDispatches]
dispatcher[kDispatches] = nonExplicitDispatcher[kDispatches]
return dispatcher
}
}
Expand All @@ -210,7 +210,7 @@ class MockAgent extends Dispatcher {
const mockAgentClients = this[kClients]

return Array.from(mockAgentClients.entries())
.flatMap(([origin, result]) => result.dispatcher[kDispatches].map(dispatch => ({ ...dispatch, origin })))
.flatMap(([origin, dispatcher]) => dispatcher[kDispatches].map(dispatch => ({ ...dispatch, origin })))
.filter(({ pending }) => pending)
}

Expand Down
92 changes: 92 additions & 0 deletions test/issue-4244.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,96 @@ describe('Agent should close inactive clients', () => {

await p
})

test('should reuse replacement keep-alive connection after server closes the previous one', async (t) => {
let nextSocketId = 0
const socketIds = new Map()
const requestsPerSocket = new Map()

const server = createServer((req, res) => {
const socket = req.socket
if (!socketIds.has(socket)) {
socketIds.set(socket, ++nextSocketId)
}

const count = (requestsPerSocket.get(socket) || 0) + 1
requestsPerSocket.set(socket, count)

const remaining = 3 - count
res.setHeader('x-socket-id', String(socketIds.get(socket)))

if (remaining > 0) {
res.setHeader('connection', 'Keep-Alive')
res.setHeader('keep-alive', `timeout=30, max=${remaining}`)
} else {
res.setHeader('connection', 'close')
}

res.writeHead(200)
res.end('ok')
}).listen(0)

t.after(() => {
server.closeAllConnections?.()
server.close()
})

const agent = new Agent({ connections: 1 })
t.after(() => agent.close())

const socketSequence = []
for (let i = 0; i < 5; i++) {
const { statusCode, headers, body } = await request(`http://localhost:${server.address().port}`, {
dispatcher: agent
})

assert.equal(statusCode, 200)
await body.dump()
socketSequence.push(headers['x-socket-id'])
}

assert.deepEqual(socketSequence.slice(0, 3), ['1', '1', '1'])
assert.deepEqual(socketSequence.slice(3), ['2', '2'])
})

test('should reuse replacement connection after keep-alive max closes the previous one', async (t) => {
let nextSocketId = 0
const socketIds = new Map()

const server = createServer((req, res) => {
const socket = req.socket
if (!socketIds.has(socket)) {
socketIds.set(socket, ++nextSocketId)
}

res.setHeader('x-socket-id', String(socketIds.get(socket)))
res.setHeader('connection', 'Keep-Alive')
res.setHeader('keep-alive', 'timeout=30')

res.writeHead(200)
res.end('ok')
}).listen(0)

t.after(() => {
server.closeAllConnections?.()
server.close()
})

const agent = new Agent({ connections: 1, maxRequestsPerClient: 3 })
t.after(() => agent.close())

const socketSequence = []
for (let i = 0; i < 5; i++) {
const { statusCode, headers, body } = await request(`http://localhost:${server.address().port}`, {
dispatcher: agent
})

assert.equal(statusCode, 200)
await body.dump()
socketSequence.push(headers['x-socket-id'])
}

assert.deepEqual(socketSequence.slice(0, 3), ['1', '1', '1'])
assert.deepEqual(socketSequence.slice(3), ['2', '2'])
})
})
Loading