Skip to content

Commit 2cc7983

Browse files
authored
feat(NODE-7491): finalize client backpressure implementation for phase 1 rollout (#4920)
1 parent 16a899d commit 2cc7983

38 files changed

Lines changed: 515 additions & 2947 deletions

src/connection_string.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ interface OptionDescriptor {
643643
}
644644

645645
export const OPTIONS = {
646-
adaptiveRetries: {
646+
enableOverloadRetargeting: {
647647
default: false,
648648
type: 'boolean'
649649
},
@@ -885,6 +885,10 @@ export const OPTIONS = {
885885
default: 15,
886886
type: 'uint'
887887
},
888+
maxAdaptiveRetries: {
889+
default: 2,
890+
type: 'uint'
891+
},
888892
maxConnecting: {
889893
default: 2,
890894
transform({ name, values: [value] }): number {

src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,6 @@ export type {
621621
TimeoutContext,
622622
TimeoutContextOptions
623623
} from './timeout';
624-
export type { MAX_RETRIES, TokenBucket } from './token_bucket';
625624
export type { Transaction, TransactionOptions, TxnState } from './transactions';
626625
export type {
627626
BufferPool,

src/mongo_client.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,18 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
229229
retryReads?: boolean;
230230
/** Enable retryable writes. */
231231
retryWrites?: boolean;
232-
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
233-
adaptiveRetries?: boolean;
232+
/**
233+
* The maximum number of retries during server overload. Set to 0 to disable overload retries. Defaults to 2.
234+
* @see https://www.mongodb.com/docs/atlas/overload-errors
235+
* */
236+
maxAdaptiveRetries?: number;
237+
/**
238+
* Whether or not to enable overload retargeting. Defaults to false.
239+
* @see https://www.mongodb.com/docs/atlas/overload-errors
240+
* More information about the overload policy in drivers:
241+
* @see https://github.com/mongodb/specifications/blob/master/source/client-backpressure/client-backpressure.md#overload-retry-policy
242+
* */
243+
enableOverloadRetargeting?: boolean;
234244
/** Allow a driver to force a Single topology type with a connection string containing one host */
235245
directConnection?: boolean;
236246
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
@@ -1043,7 +1053,8 @@ export interface MongoOptions
10431053
extends Required<
10441054
Pick<
10451055
MongoClientOptions,
1046-
| 'adaptiveRetries'
1056+
| 'maxAdaptiveRetries'
1057+
| 'enableOverloadRetargeting'
10471058
| 'autoEncryption'
10481059
| 'connectTimeoutMS'
10491060
| 'directConnection'

src/operations/execute_operation.ts

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,6 @@ import {
2929
import type { Topology } from '../sdam/topology';
3030
import type { ClientSession } from '../sessions';
3131
import { TimeoutContext } from '../timeout';
32-
import {
33-
BASE_BACKOFF_MS,
34-
MAX_BACKOFF_MS,
35-
MAX_RETRIES,
36-
RETRY_COST,
37-
RETRY_TOKEN_RETURN_RATE
38-
} from '../token_bucket';
3932
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
4033
import { AggregateOperation } from './aggregate';
4134
import { AbstractOperation, Aspect } from './operation';
@@ -176,6 +169,10 @@ type RetryOptions = {
176169
topology: Topology;
177170
timeoutContext: TimeoutContext;
178171
};
172+
/** @internal The base backoff duration in milliseconds */
173+
const BASE_BACKOFF_MS = 100;
174+
/** @internal The maximum backoff duration in milliseconds */
175+
const MAX_BACKOFF_MS = 10_000;
179176

180177
/**
181178
* Executes an operation and retries as appropriate
@@ -267,13 +264,6 @@ async function executeOperationWithRetries<
267264
try {
268265
try {
269266
const result = await server.command(operation, timeoutContext);
270-
if (topology.s.options.adaptiveRetries) {
271-
topology.tokenBucket.deposit(
272-
attempt > 0
273-
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
274-
: RETRY_TOKEN_RETURN_RATE // otherwise
275-
);
276-
}
277267
return operation.handleOk(result);
278268
} catch (error) {
279269
return operation.handleError(error);
@@ -282,15 +272,6 @@ async function executeOperationWithRetries<
282272
// Should never happen but if it does - propagate the error.
283273
if (!(operationError instanceof MongoError)) throw operationError;
284274

285-
if (
286-
topology.s.options.adaptiveRetries &&
287-
attempt > 0 &&
288-
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
289-
) {
290-
// if a retry attempt fails with a non-overload error, deposit 1 token.
291-
topology.tokenBucket.deposit(RETRY_COST);
292-
}
293-
294275
// Preserve the original error once a write has been performed.
295276
// Only update to the latest error if no writes were performed.
296277
if (error == null) {
@@ -317,7 +298,8 @@ async function executeOperationWithRetries<
317298
}
318299

319300
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
320-
maxAttempts = Math.min(MAX_RETRIES + 1, operation.maxAttempts ?? MAX_RETRIES + 1);
301+
const maxOverloadAttempts = topology.s.options.maxAdaptiveRetries + 1;
302+
maxAttempts = Math.min(maxOverloadAttempts, operation.maxAttempts ?? maxOverloadAttempts);
321303
}
322304

323305
if (attempt + 1 >= maxAttempts) {
@@ -352,16 +334,13 @@ async function executeOperationWithRetries<
352334
throw error;
353335
}
354336

355-
if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
356-
throw error;
357-
}
358-
359337
await setTimeout(backoffMS);
360338
}
361339

362340
if (
363341
topology.description.type === TopologyType.Sharded ||
364-
operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
342+
(operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
343+
topology.s.options.enableOverloadRetargeting)
365344
) {
366345
deprioritizedServers.add(server.description);
367346
}

src/sdam/topology.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
38-
import { INITIAL_TOKEN_BUCKET_SIZE, TokenBucket } from '../token_bucket';
3938
import type { Transaction } from '../transactions';
4039
import {
4140
addAbortListener,
@@ -146,7 +145,8 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
146145
hosts: HostAddress[];
147146
retryWrites: boolean;
148147
retryReads: boolean;
149-
adaptiveRetries: boolean;
148+
maxAdaptiveRetries: number;
149+
enableOverloadRetargeting: boolean;
150150
/** How long to block for server selection before throwing an error */
151151
serverSelectionTimeoutMS: number;
152152
/** The name of the replica set to connect to */
@@ -214,8 +214,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
214214
hello?: Document;
215215
_type?: string;
216216

217-
tokenBucket = new TokenBucket(INITIAL_TOKEN_BUCKET_SIZE);
218-
219217
client!: MongoClient;
220218

221219
private connectionLock?: Promise<Topology>;

src/sessions.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import { ReadConcernLevel } from './read_concern';
3131
import { ReadPreference } from './read_preference';
3232
import { _advanceClusterTime, type ClusterTime, TopologyType } from './sdam/common';
3333
import { TimeoutContext } from './timeout';
34-
import { MAX_RETRIES } from './token_bucket';
3534
import {
3635
isTransactionCommand,
3736
Transaction,
@@ -499,7 +498,7 @@ export class ClientSession
499498
readPreference: ReadPreference.primary,
500499
bypassPinningCheck: true
501500
});
502-
operation.maxAttempts = MAX_RETRIES + 1;
501+
operation.maxAttempts = this.clientOptions.maxAdaptiveRetries + 1;
503502

504503
const timeoutContext =
505504
this.timeoutContext ??
@@ -518,7 +517,7 @@ export class ClientSession
518517
} catch (firstCommitError) {
519518
this.commitAttempted = true;
520519

521-
const remainingAttempts = MAX_RETRIES + 1 - operation.attemptsMade;
520+
const remainingAttempts = this.clientOptions.maxAdaptiveRetries + 1 - operation.attemptsMade;
522521
if (remainingAttempts <= 0) {
523522
throw firstCommitError;
524523
}

src/token_bucket.ts

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)