From 186ccb6e9a768c8c0f8bd5917d6d3105842dec1b Mon Sep 17 00:00:00 2001 From: Mark Russell Date: Mon, 6 Apr 2026 16:51:12 -0400 Subject: [PATCH 1/6] add configurable retry logic and expose explicit method incoker state machine --- .../common/connection_stream_handlers.js | 32 +-- .../ddp-client/common/livedata_connection.js | 23 ++- .../ddp-client/common/message_processors.js | 2 +- packages/ddp-client/common/method_invoker.js | 193 +++++++++++++++--- .../test/livedata_connection_tests.js | 114 +++++++++++ 5 files changed, 295 insertions(+), 69 deletions(-) diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js index d4fa785df66..f8e046ef293 100644 --- a/packages/ddp-client/common/connection_stream_handlers.js +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -160,34 +160,12 @@ export class ConnectionStreamHandlers { const blocks = this._connection._outstandingMethodBlocks; if (blocks.length === 0) return; - const currentMethodBlock = blocks[0].methods; - blocks[0].methods = currentMethodBlock.filter( - methodInvoker => { - // Methods with 'noRetry' option set are not allowed to re-send after - // recovering dropped connection. - if (methodInvoker.sentMessage && methodInvoker.noRetry) { - methodInvoker.receiveResult( - new Meteor.Error( - 'invocation-failed', - 'Method invocation might have failed due to dropped connection. ' + - 'Failing because `noRetry` option was passed to Meteor.apply.' - ) - ); - } - - // Only keep a method if it wasn't sent or it's allowed to retry. - return !(methodInvoker.sentMessage && methodInvoker.noRetry); - } - ); - - // Clear empty blocks - if (blocks.length > 0 && blocks[0].methods.length === 0) { - blocks.shift(); - } - - // Reset all method invokers as unsent + // Notify all invokers about the reconnect. Each invoker transitions + // its own state (IN_FLIGHT → WAITING_FOR_RESEND). Retry decisions + // (noRetry, maxRetries) are handled inside the invoker when + // sendMessage() is called during reconnect. Object.values(this._connection._methodInvokers).forEach(invoker => { - invoker.sentMessage = false; + invoker.onReconnect(); }); } diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 41ae9e3f21c..167dff74e8a 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -265,8 +265,7 @@ export class Connection { // as we mass-complete invokers during teardown. this._outstandingMethodBlocks = []; - // Abort all outstanding method invokers. If a result was already - // received, it is attached to the error for caller recovery. + // Abort all outstanding method invokers. keys(this._methodInvokers).forEach(id => { this._methodInvokers[id].abort( 'Connection closed before method completed' @@ -640,6 +639,7 @@ export class Connection { * @param {Boolean} options.wait (Client only) If true, don't send this method until all previous method calls have completed, and don't send any subsequent method calls until this one is completed. * @param {Function} options.onResultReceived (Client only) This callback is invoked with the error or result of the method (just like `asyncCallback`) as soon as the error or result is available. The local cache may not yet reflect the writes performed by the method. * @param {Boolean} options.noRetry (Client only) if true, don't send this method again on reload, simply call the callback an error with the error code 'invocation-failed'. + * @param {Number} options.maxRetries (Client only) Maximum number of times to re-send this method on reconnect before aborting with a 'disconnected' error. If not set, the method will be retried indefinitely (default Meteor behavior). * @param {Boolean} options.throwStubExceptions (Client only) If true, exceptions thrown by method stubs will be thrown instead of logged, and the method will not be invoked on the server. * @param {Boolean} options.returnStubValue (Client only) If true then in cases where we would have otherwise discarded the stub's return value and returned undefined, instead we go ahead and return it. Specifically, this is any time other than when (a) we are already inside a stub or (b) we are in Node and no callback was provided. Currently we require this flag to be explicitly passed to reduce the likelihood that stub return values will be confused with server return values; we may improve this in future. * @param {Function} [asyncCallback] Optional callback; same semantics as in [`Meteor.call`](#meteor_call). @@ -683,6 +683,7 @@ export class Connection { * @param {Boolean} options.wait (Client only) If true, don't send this method until all previous method calls have completed, and don't send any subsequent method calls until this one is completed. * @param {Function} options.onResultReceived (Client only) This callback is invoked with the error or result of the method (just like `asyncCallback`) as soon as the error or result is available. The local cache may not yet reflect the writes performed by the method. * @param {Boolean} options.noRetry (Client only) if true, don't send this method again on reload, simply call the callback an error with the error code 'invocation-failed'. + * @param {Number} options.maxRetries (Client only) Maximum number of times to re-send this method on reconnect before aborting with a 'disconnected' error. If not set, the method will be retried indefinitely (default Meteor behavior). * @param {Boolean} options.throwStubExceptions (Client only) If true, exceptions thrown by method stubs will be thrown instead of logged, and the method will not be invoked on the server. * @param {Boolean} options.returnStubValue (Client only) If true then in cases where we would have otherwise discarded the stub's return value and returned undefined, instead we go ahead and return it. Specifically, this is any time other than when (a) we are already inside a stub or (b) we are in Node and no callback was provided. Currently we require this flag to be explicitly passed to reduce the likelihood that stub return values will be confused with server return values; we may improve this in future. * @param {Boolean} options.returnServerResultPromise (Client only) If true, the promise returned by applyAsync will resolve to the server's return value, rather than the stub's return value. This is useful when you want to ensure that the server's return value is used, even if the stub returns a promise. The same behavior as `callAsync`. @@ -882,7 +883,8 @@ export class Connection { onResultReceived: options.onResultReceived, wait: !!options.wait, message: message, - noRetry: !!options.noRetry + noRetry: !!options.noRetry, + maxRetries: options.maxRetries != null ? options.maxRetries : null }); let result; @@ -1144,7 +1146,7 @@ export class Connection { // not yet invoked its user callback. _anyMethodsAreOutstanding() { const invokers = this._methodInvokers; - return Object.values(invokers).some((invoker) => !!invoker.sentMessage); + return Object.values(invokers).some((invoker) => invoker.isInFlight()); } async _processOneDataMessage(msg, updates) { @@ -1318,7 +1320,7 @@ export class Connection { const writtenByStubForAMethodWithSentMessage = keys(serverDoc.writtenByStubs).some(methodId => { const invoker = self._methodInvokers[methodId]; - return invoker && invoker.sentMessage; + return invoker && invoker.isInFlight(); }); if (writtenByStubForAMethodWithSentMessage) { @@ -1389,7 +1391,8 @@ export class Connection { } // Sends messages for all the methods in the first block in - // _outstandingMethodBlocks. + // _outstandingMethodBlocks. Methods that abort during sendMessage() + // (e.g. noRetry or maxRetries exceeded) are removed from the block. _sendOutstandingMethods() { const self = this; @@ -1397,9 +1400,11 @@ export class Connection { return; } - self._outstandingMethodBlocks[0].methods.forEach(m => { + const block = self._outstandingMethodBlocks[0]; + block.methods.forEach(m => { m.sendMessage(); }); + block.methods = block.methods.filter(m => !m.isDone()); } _sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks) { @@ -1431,6 +1436,10 @@ export class Connection { } }); + // Remove invokers that aborted during sendMessage(). + const merged = last(self._outstandingMethodBlocks); + merged.methods = merged.methods.filter(m => !m.isDone()); + oldOutstandingMethodBlocks.shift(); } diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js index 0fbe7ece981..22d3a32692b 100644 --- a/packages/ddp-client/common/message_processors.js +++ b/packages/ddp-client/common/message_processors.js @@ -95,7 +95,7 @@ export class MessageProcessors { self._afterUpdateCallbacks.push( (...args) => invoker.dataVisible(...args) ); - } else if (invoker.sentMessage) { + } else if (invoker.isInFlight()) { // This method has been sent on this connection (maybe as a resend // from the last connection, maybe from onReconnect, maybe just very // quickly before processing the connected message). diff --git a/packages/ddp-client/common/method_invoker.js b/packages/ddp-client/common/method_invoker.js index 14703c1294f..ee2ad01dbc4 100644 --- a/packages/ddp-client/common/method_invoker.js +++ b/packages/ddp-client/common/method_invoker.js @@ -1,13 +1,54 @@ +// MethodInvoker state enum. Transitions are enforced — callers go through +// public methods rather than flipping boolean flags. +const InvokerState = Object.freeze({ + // Created but not yet sent to the server. + PENDING: 'PENDING', + // Message sent on the current connection, waiting for result and updated. + IN_FLIGHT: 'IN_FLIGHT', + // Result received from the server, waiting for data visibility (updated). + RESULT_RECEIVED: 'RESULT_RECEIVED', + // Data visible (updated arrived), waiting for result. + DATA_VISIBLE: 'DATA_VISIBLE', + // Connection dropped; waiting to be re-sent on the next connection. + WAITING_FOR_RESEND: 'WAITING_FOR_RESEND', + // Fully complete — both result and data visible, callback fired. + COMPLETE: 'COMPLETE', + // Aborted — timed out, noRetry, maxRetries exceeded, or connection torn down. + ABORTED: 'ABORTED', +}); + +export { InvokerState }; + // A MethodInvoker manages sending a method to the server and calling the user's // callbacks. On construction, it registers itself in the connection's // _methodInvokers map; it removes itself once the method is fully finished and // the callback is invoked. This occurs when it has both received a result, // and the data written by it is fully visible. +// +// State machine (two-phase completion: result and updated can arrive in +// either order): +// +// PENDING ──sendMessage()──► IN_FLIGHT +// │ +// ┌─────────────┼──────────────┐ +// receiveResult() onReconnect() dataVisible() +// │ │ │ +// ▼ ▼ ▼ +// RESULT_RECEIVED WAITING_FOR DATA_VISIBLE +// │ _RESEND │ +// dataVisible() │ receiveResult() +// │ sendMessage() │ +// ▼ ┌────┴─────┐ ▼ +// COMPLETE retry? no retry COMPLETE +// │ │ +// ▼ ▼ +// IN_FLIGHT ABORTED +// +// Any state may transition to ABORTED via abort() (disconnect teardown). export class MethodInvoker { constructor(options) { - // Public (within this file) fields. this.methodId = options.methodId; - this.sentMessage = false; + this._state = InvokerState.PENDING; this._callback = options.callback; this._connection = options.connection; @@ -15,25 +56,53 @@ export class MethodInvoker { this._onResultReceived = options.onResultReceived || (() => {}); this._wait = options.wait; this.noRetry = options.noRetry; + this._maxRetries = options.maxRetries != null ? options.maxRetries : null; + this._retryCount = 0; this._methodResult = null; - this._dataVisible = false; // Register with the connection. this._connection._methodInvokers[this.methodId] = this; } + + // -- State queries (replace external flag reads) -------------------------- + + // True if the method has been sent on the current connection and has not + // yet completed or been reset for resend. This is the question every + // external caller was asking via `invoker.sentMessage`. + isInFlight() { + return this._state === InvokerState.IN_FLIGHT + || this._state === InvokerState.RESULT_RECEIVED + || this._state === InvokerState.DATA_VISIBLE; + } + + // True if the invoker has reached a terminal state (COMPLETE or ABORTED). + isDone() { + return this._state === InvokerState.COMPLETE + || this._state === InvokerState.ABORTED; + } + + // True if receiveResult has been called (result available regardless of + // data visibility). + gotResult() { + return !!this._methodResult; + } + + // -- State transitions (public API) --------------------------------------- + // Sends the method message to the server. May be called additional times if // we lose the connection and reconnect before receiving a result. sendMessage() { - // This function is called before sending a method (including resending on - // reconnect). We should only (re)send methods where we don't already have a - // result! if (this.gotResult()) throw new Error('sendingMethod is called on method with result'); - // If we're re-sending it, it doesn't matter if data was written the first - // time. - this._dataVisible = false; - this.sentMessage = true; + // On re-send (reconnect), check whether this method is allowed to retry. + if (this._state === InvokerState.WAITING_FOR_RESEND) { + if (!this._shouldRetry()) { + return; + } + } + + this._state = InvokerState.IN_FLIGHT; // If this is a wait method, make all data messages be buffered until it is // done. @@ -43,22 +112,20 @@ export class MethodInvoker { // Actually send the message. this._connection._send(this._message); } - // Invoke the callback, if we have both a result and know that all data has - // been written to the local cache. - _maybeInvokeCallback() { - if (this._methodResult && this._dataVisible) { - // Call the callback. (This won't throw: the callback was wrapped with - // bindEnvironment.) - this._callback(this._methodResult[0], this._methodResult[1]); - - // Forget about this method. - delete this._connection._methodInvokers[this.methodId]; - - // Let the connection know that this method is finished, so it can try to - // move on to the next block of methods. - this._connection._outstandingMethodFinished(); + + // Called by the connection layer when a reconnect occurs. Marks the invoker + // as needing to be re-sent on the new connection. This replaces the old + // pattern of externally setting `invoker.sentMessage = false`. + onReconnect() { + if (this._state === InvokerState.IN_FLIGHT + || this._state === InvokerState.DATA_VISIBLE) { + this._state = InvokerState.WAITING_FOR_RESEND; } + // RESULT_RECEIVED invokers keep their state — they already have the + // result and just need dataVisible() to complete. The reconnect quiescence + // logic handles them via gotResult() + dataVisible(). } + // Call with the result of the method from the server. Only may be called // once; once it is called, you should not call sendMessage again. // If the user provided an onResultReceived callback, call it immediately. @@ -67,27 +134,85 @@ export class MethodInvoker { if (this.gotResult()) throw new Error('Methods should only receive results once'); this._methodResult = [err, result]; + // If dataVisible() already arrived, go straight to COMPLETE. + this._state = this._state === InvokerState.DATA_VISIBLE + ? InvokerState.COMPLETE + : InvokerState.RESULT_RECEIVED; this._onResultReceived(err, result); - this._maybeInvokeCallback(); + this._maybeComplete(); } + // Call this when all data written by the method is visible. This means that - // the method has returns its "data is done" message *AND* all server + // the method has returned its "data is done" message *AND* all server // documents that are buffered at that time have been written to the local // cache. Invokes the main callback if the result has been received. dataVisible() { - this._dataVisible = true; - this._maybeInvokeCallback(); + if (this._state === InvokerState.RESULT_RECEIVED) { + this._state = InvokerState.COMPLETE; + } else if (this._state === InvokerState.IN_FLIGHT) { + this._state = InvokerState.DATA_VISIBLE; + } + this._maybeComplete(); } + // Force-complete this invoker with an error, regardless of current state. - // Fires the callback exactly once and cleans up. Callers who need the + // Fires the callback exactly once and cleans up. Callers who need the // result before write confirmation should use onResultReceived. abort(reason) { + if (this.isDone()) return; this._methodResult = [new Meteor.Error('disconnected', reason), undefined]; - this._dataVisible = true; - this._maybeInvokeCallback(); + this._state = InvokerState.ABORTED; + this._fireCallback(); } - // True if receiveResult has been called. - gotResult() { - return !!this._methodResult; + + // -- Internal methods ----------------------------------------------------- + + // Decides whether a method should be re-sent on reconnect. Centralizes + // noRetry and maxRetries logic so the connection layer doesn't need to + // inspect invoker internals. + _shouldRetry() { + if (this.noRetry) { + this.receiveResult( + new Meteor.Error( + 'invocation-failed', + 'Method invocation might have failed due to dropped connection. ' + + 'Failing because `noRetry` option was passed to Meteor.apply.' + ) + ); + return false; + } + + if (this._maxRetries !== null) { + this._retryCount++; + if (this._retryCount > this._maxRetries) { + this.abort( + 'Method retry limit exceeded (' + this._maxRetries + ')' + ); + return false; + } + } + + return true; + } + + // Fire the callback if we have a result and are in a terminal state. + _maybeComplete() { + if (this._methodResult + && (this._state === InvokerState.COMPLETE + || this._state === InvokerState.ABORTED)) { + this._fireCallback(); + } + } + + // Actually invoke the callback and clean up. Called at most once. + _fireCallback() { + this._callback(this._methodResult[0], this._methodResult[1]); + + // Forget about this method. + delete this._connection._methodInvokers[this.methodId]; + + // Let the connection know that this method is finished, so it can try to + // move on to the next block of methods. + this._connection._outstandingMethodFinished(); } } diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index 87382daa4cf..4ff0f805b20 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -2669,6 +2669,120 @@ Tinytest.addAsync( } ); +Tinytest.addAsync( + 'livedata connection - maxRetries aborts method after exceeding retry limit', + async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Call a method with maxRetries: 2 + let callbackError = null; + conn.apply('limitedMethod', [], { maxRetries: 2 }, function(err) { + callbackError = err; + }); + testGotMessage(test, stream, { + msg: 'method', method: 'limitedMethod', params: [], id: '*' + }); + + // Callback should not have fired yet + test.isNull(callbackError); + + // Reconnect #1 — should re-send (retry 1 of 2) + stream.sent.length = 0; + await stream.reset(); + test.isNull(callbackError); + // reset sends connect + re-sends the method + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + testGotMessage(test, stream, { + msg: 'method', method: 'limitedMethod', params: [], id: '*' + }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); + + // Reconnect #2 — should re-send (retry 2 of 2) + stream.sent.length = 0; + await stream.reset(); + test.isNull(callbackError); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + testGotMessage(test, stream, { + msg: 'method', method: 'limitedMethod', params: [], id: '*' + }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); + + // Reconnect #3 — should abort (exceeds maxRetries) + stream.sent.length = 0; + await stream.reset(); + + test.instanceOf(callbackError, Meteor.Error); + test.equal(callbackError.error, 'disconnected'); + test.equal(Object.keys(conn._methodInvokers).length, 0); + } +); + +Tinytest.addAsync( + 'livedata connection - maxRetries 0 behaves like noRetry', + async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + let callbackError = null; + conn.apply('oneShotMethod', [], { maxRetries: 0 }, function(err) { + callbackError = err; + }); + testGotMessage(test, stream, { + msg: 'method', method: 'oneShotMethod', params: [], id: '*' + }); + + test.isNull(callbackError); + + // First reconnect should abort immediately (0 retries allowed) + stream.sent.length = 0; + await stream.reset(); + // Method should NOT appear in sent (it was aborted, not re-sent) + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + + test.instanceOf(callbackError, Meteor.Error); + test.equal(callbackError.error, 'disconnected'); + test.equal(Object.keys(conn._methodInvokers).length, 0); + } +); + +Tinytest.addAsync( + 'livedata connection - method without maxRetries retries indefinitely', + async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + let callbackFired = false; + conn.call('unlimitedMethod', function() { + callbackFired = true; + }); + testGotMessage(test, stream, { + msg: 'method', method: 'unlimitedMethod', params: [], id: '*' + }); + + // Reconnect 5 times — method should always be re-sent + for (let i = 0; i < 5; i++) { + stream.sent.length = 0; + await stream.reset(); + test.isFalse(callbackFired); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + testGotMessage(test, stream, { + msg: 'method', method: 'unlimitedMethod', params: [], id: '*' + }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); + } + + test.isFalse(callbackFired); + test.equal(Object.keys(conn._methodInvokers).length, 1); + } +); + // ============================================================================ // DDP Session Resumption Tests (Client-side) // ============================================================================ From ca3533aa5848ef6f7a86eca8bac0e503eb666c14 Mon Sep 17 00:00:00 2001 From: Mark Russell Date: Mon, 6 Apr 2026 17:27:14 -0400 Subject: [PATCH 2/6] decoupling --- .../ddp-client/common/livedata_connection.js | 101 +++++++++++------- .../ddp-client/common/message_processors.js | 13 +-- packages/ddp-client/common/method_invoker.js | 62 +++++------ 3 files changed, 92 insertions(+), 84 deletions(-) diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 167dff74e8a..94e8226a608 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -260,12 +260,8 @@ export class Connection { // the WebSocket between the two) hang forever — MethodInvoker // requires both before firing the callback. if (!this.options.retry || this._stream._forcedToDisconnect) { - // Clear method blocks before cleanup to prevent - // _outstandingMethodFinished from throwing invariant errors - // as we mass-complete invokers during teardown. - this._outstandingMethodBlocks = []; - - // Abort all outstanding method invokers. + // Abort all outstanding method invokers. _onMethodComplete handles + // removal from both _methodInvokers and _outstandingMethodBlocks. keys(this._methodInvokers).forEach(id => { this._methodInvokers[id].abort( 'Connection closed before method completed' @@ -879,14 +875,18 @@ export class Connection { const methodInvoker = new MethodInvoker({ methodId, callback: callback, - connection: self, onResultReceived: options.onResultReceived, wait: !!options.wait, message: message, noRetry: !!options.noRetry, - maxRetries: options.maxRetries != null ? options.maxRetries : null + maxRetries: options.maxRetries != null ? options.maxRetries : null, + send: (msg) => self._send(msg), + onComplete: (invoker) => self._onMethodComplete(invoker), }); + // Register in the connection's tracking map. + self._methodInvokers[methodInvoker.methodId] = methodInvoker; + let result; if (promise) { @@ -1360,39 +1360,59 @@ export class Connection { // If we added it to the first block, send it out now. if (this._outstandingMethodBlocks.length === 1) { methodInvoker.sendMessage(); + // Wait methods block quiescence — set only at send time, not at + // construction. During reconnect, message_processors rebuilds this + // for all in-flight methods. + if (methodInvoker._wait) { + this._methodsBlockingQuiescence[methodInvoker.methodId] = true; + } } } - // Called by MethodInvoker after a method's callback is invoked. If this was - // the last outstanding method in the current block, runs the next block. If - // there are no more methods, consider accepting a hot code push. - _outstandingMethodFinished() { + // Single callback invoked by MethodInvoker when it reaches a terminal state. + // Handles ALL connection-side bookkeeping in one place: + // 1. Remove from _methodInvokers map + // 2. Remove from _outstandingMethodBlocks + // 3. Advance to next method block if current block is empty + // 4. Check for hot code push readiness + _onMethodComplete(invoker) { const self = this; - if (self._anyMethodsAreOutstanding()) return; - - // No methods are outstanding. This should mean that the first block of - // methods is empty. (Or it might not exist, if this was a method that - // half-finished before disconnect/reconnect.) - if (! isEmpty(self._outstandingMethodBlocks)) { - const firstBlock = self._outstandingMethodBlocks.shift(); - if (! isEmpty(firstBlock.methods)) - throw new Error( - 'No methods outstanding but nonempty block: ' + - JSON.stringify(firstBlock) - ); - // Send the outstanding methods now in the first block. - if (! isEmpty(self._outstandingMethodBlocks)) - self._sendOutstandingMethods(); + // 1. Remove from the invoker tracking map. + delete self._methodInvokers[invoker.methodId]; + + // 2. Remove from method blocks. + for (const block of self._outstandingMethodBlocks) { + const idx = block.methods.indexOf(invoker); + if (idx !== -1) { + block.methods.splice(idx, 1); + break; + } } - // Maybe accept a hot code push. - self._maybeMigrate(); + // 3. If no methods are in-flight, advance to the next block. + if (!self._anyMethodsAreOutstanding()) { + if (!isEmpty(self._outstandingMethodBlocks)) { + const firstBlock = self._outstandingMethodBlocks.shift(); + if (!isEmpty(firstBlock.methods)) + throw new Error( + 'No methods outstanding but nonempty block: ' + + JSON.stringify(firstBlock) + ); + + // Send the outstanding methods now in the first block. + if (!isEmpty(self._outstandingMethodBlocks)) + self._sendOutstandingMethods(); + } + + // 4. Maybe accept a hot code push. + self._maybeMigrate(); + } } // Sends messages for all the methods in the first block in // _outstandingMethodBlocks. Methods that abort during sendMessage() - // (e.g. noRetry or maxRetries exceeded) are removed from the block. + // (e.g. noRetry or maxRetries exceeded) are cleaned up by _onMethodComplete. _sendOutstandingMethods() { const self = this; @@ -1400,11 +1420,15 @@ export class Connection { return; } - const block = self._outstandingMethodBlocks[0]; - block.methods.forEach(m => { + // Copy the array — sendMessage() may trigger _onMethodComplete which + // mutates the block's methods array via splice. + const methods = [...self._outstandingMethodBlocks[0].methods]; + methods.forEach(m => { m.sendMessage(); + if (m._wait && !m.isDone()) { + self._methodsBlockingQuiescence[m.methodId] = true; + } }); - block.methods = block.methods.filter(m => !m.isDone()); } _sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks) { @@ -1427,19 +1451,20 @@ export class Connection { !last(self._outstandingMethodBlocks).wait && !oldOutstandingMethodBlocks[0].wait ) { - oldOutstandingMethodBlocks[0].methods.forEach((m) => { + // Copy — sendMessage() may trigger _onMethodComplete which splices. + const toMerge = [...oldOutstandingMethodBlocks[0].methods]; + toMerge.forEach((m) => { last(self._outstandingMethodBlocks).methods.push(m); // If this "last block" is also the first block, send the message. if (self._outstandingMethodBlocks.length === 1) { m.sendMessage(); + if (m._wait && !m.isDone()) { + self._methodsBlockingQuiescence[m.methodId] = true; + } } }); - // Remove invokers that aborted during sendMessage(). - const merged = last(self._outstandingMethodBlocks); - merged.methods = merged.methods.filter(m => !m.isDone()); - oldOutstandingMethodBlocks.shift(); } diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js index 22d3a32692b..dc64df1abfc 100644 --- a/packages/ddp-client/common/message_processors.js +++ b/packages/ddp-client/common/message_processors.js @@ -255,21 +255,14 @@ export class MessageProcessors { return; } const currentMethodBlock = self._outstandingMethodBlocks[0].methods; - let i; - const m = currentMethodBlock.find((method, idx) => { - const found = method.methodId === msg.id; - if (found) i = idx; - return found; - }); + const m = currentMethodBlock.find(method => method.methodId === msg.id); if (!m) { Meteor._debug("Can't match method response to original method call", msg); return; } - // Remove from current method block. This may leave the block empty, but we - // don't move on to the next block until the callback has been delivered, in - // _outstandingMethodFinished. - currentMethodBlock.splice(i, 1); + // The invoker stays in the method block until it reaches a terminal state. + // _onMethodComplete is the single place that removes from blocks. if (hasOwn.call(msg, 'error')) { m.receiveResult( diff --git a/packages/ddp-client/common/method_invoker.js b/packages/ddp-client/common/method_invoker.js index ee2ad01dbc4..8646c3c5048 100644 --- a/packages/ddp-client/common/method_invoker.js +++ b/packages/ddp-client/common/method_invoker.js @@ -19,11 +19,18 @@ const InvokerState = Object.freeze({ export { InvokerState }; -// A MethodInvoker manages sending a method to the server and calling the user's -// callbacks. On construction, it registers itself in the connection's -// _methodInvokers map; it removes itself once the method is fully finished and -// the callback is invoked. This occurs when it has both received a result, -// and the data written by it is fully visible. +// A MethodInvoker manages sending a method to the server and calling the +// user's callbacks. It owns its own lifecycle state and communicates with +// the connection layer exclusively through delegate callbacks — it never +// directly mutates connection data structures. +// +// The connection provides two delegate functions: +// send(message) — physically send a DDP message on the wire +// onComplete(invoker) — notify the connection that this invoker is done +// (terminal state reached, callback fired). +// The connection handles all its own bookkeeping +// (_methodInvokers, _outstandingMethodBlocks, +// quiescence, migration) in this callback. // // State machine (two-phase completion: result and updated can arrive in // either order): @@ -51,7 +58,6 @@ export class MethodInvoker { this._state = InvokerState.PENDING; this._callback = options.callback; - this._connection = options.connection; this._message = options.message; this._onResultReceived = options.onResultReceived || (() => {}); this._wait = options.wait; @@ -60,15 +66,16 @@ export class MethodInvoker { this._retryCount = 0; this._methodResult = null; - // Register with the connection. - this._connection._methodInvokers[this.methodId] = this; + // Delegate callbacks provided by the connection. The invoker never + // directly accesses connection data structures. + this._send = options.send; + this._onComplete = options.onComplete; } - // -- State queries (replace external flag reads) -------------------------- + // -- State queries --------------------------------------------------------- // True if the method has been sent on the current connection and has not - // yet completed or been reset for resend. This is the question every - // external caller was asking via `invoker.sentMessage`. + // yet completed or been reset for resend. isInFlight() { return this._state === InvokerState.IN_FLIGHT || this._state === InvokerState.RESULT_RECEIVED @@ -87,7 +94,7 @@ export class MethodInvoker { return !!this._methodResult; } - // -- State transitions (public API) --------------------------------------- + // -- State transitions (public API) ---------------------------------------- // Sends the method message to the server. May be called additional times if // we lose the connection and reconnect before receiving a result. @@ -103,19 +110,11 @@ export class MethodInvoker { } this._state = InvokerState.IN_FLIGHT; - - // If this is a wait method, make all data messages be buffered until it is - // done. - if (this._wait) - this._connection._methodsBlockingQuiescence[this.methodId] = true; - - // Actually send the message. - this._connection._send(this._message); + this._send(this._message); } // Called by the connection layer when a reconnect occurs. Marks the invoker - // as needing to be re-sent on the new connection. This replaces the old - // pattern of externally setting `invoker.sentMessage = false`. + // as needing to be re-sent on the new connection. onReconnect() { if (this._state === InvokerState.IN_FLIGHT || this._state === InvokerState.DATA_VISIBLE) { @@ -128,8 +127,6 @@ export class MethodInvoker { // Call with the result of the method from the server. Only may be called // once; once it is called, you should not call sendMessage again. - // If the user provided an onResultReceived callback, call it immediately. - // Then invoke the main callback if data is also visible. receiveResult(err, result) { if (this.gotResult()) throw new Error('Methods should only receive results once'); @@ -145,7 +142,7 @@ export class MethodInvoker { // Call this when all data written by the method is visible. This means that // the method has returned its "data is done" message *AND* all server // documents that are buffered at that time have been written to the local - // cache. Invokes the main callback if the result has been received. + // cache. dataVisible() { if (this._state === InvokerState.RESULT_RECEIVED) { this._state = InvokerState.COMPLETE; @@ -165,11 +162,10 @@ export class MethodInvoker { this._fireCallback(); } - // -- Internal methods ----------------------------------------------------- + // -- Internal methods ------------------------------------------------------ // Decides whether a method should be re-sent on reconnect. Centralizes - // noRetry and maxRetries logic so the connection layer doesn't need to - // inspect invoker internals. + // noRetry and maxRetries logic. _shouldRetry() { if (this.noRetry) { this.receiveResult( @@ -204,15 +200,9 @@ export class MethodInvoker { } } - // Actually invoke the callback and clean up. Called at most once. + // Actually invoke the callback and notify the connection. Called at most once. _fireCallback() { this._callback(this._methodResult[0], this._methodResult[1]); - - // Forget about this method. - delete this._connection._methodInvokers[this.methodId]; - - // Let the connection know that this method is finished, so it can try to - // move on to the next block of methods. - this._connection._outstandingMethodFinished(); + this._onComplete(this); } } From 97b51cd509573487a3aaf0fda31f286decac7e60 Mon Sep 17 00:00:00 2001 From: Mark Russell Date: Mon, 6 Apr 2026 18:20:10 -0400 Subject: [PATCH 3/6] refactor more --- .../common/connection_stream_handlers.js | 4 +- .../ddp-client/common/livedata_connection.js | 200 +++++++++--------- .../ddp-client/common/message_processors.js | 35 ++- packages/ddp-client/common/method_invoker.js | 10 +- .../test/livedata_connection_tests.js | 12 +- 5 files changed, 125 insertions(+), 136 deletions(-) diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js index f8e046ef293..268a129613b 100644 --- a/packages/ddp-client/common/connection_stream_handlers.js +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -157,8 +157,8 @@ export class ConnectionStreamHandlers { * @private */ _handleOutstandingMethodsOnReset() { - const blocks = this._connection._outstandingMethodBlocks; - if (blocks.length === 0) return; + const queue = this._connection._methodQueue; + if (queue.length === 0) return; // Notify all invokers about the reconnect. Each invoker transitions // its own state (IN_FLIGHT → WAITING_FOR_RESEND). Retry decisions diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 94e8226a608..0fb506b4e8c 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -117,40 +117,33 @@ export class Connection { // Tracks methods which the user has called but whose result messages have not // arrived yet. // - // _outstandingMethodBlocks is an array of blocks of methods. Each block - // represents a set of methods that can run at the same time. The first block - // represents the methods which are currently in flight; subsequent blocks - // must wait for previous blocks to be fully finished before they can be sent - // to the server. + // _methodQueue is an array of execution groups. Each group is a set of + // methods to send in parallel. Groups are processed sequentially — all + // methods in a group must complete before the next group is sent. // - // Each block is an object with the following fields: - // - methods: a list of MethodInvoker objects - // - wait: a boolean; if true, this block had a single method invoked with - // the "wait" option + // Each group is an object with: + // - methods: array of MethodInvoker objects + // - bufferData: boolean — if true, incoming data messages are buffered + // until all methods in this group have received their 'updated' message. + // This prevents the client from seeing partial state from the group's + // writes. Set to true for barrier groups (single-method groups created + // by `wait: true`) and during reconnect (all in-flight methods buffer). + // - pendingUpdates: Set of method IDs still waiting for 'updated'. When + // empty and bufferData is true, quiescence ends for this group. // - // There will never be adjacent blocks with wait=false, because the only thing - // that makes methods need to be serialized is a wait method. - // - // Methods are removed from the first block when their "result" is - // received. The entire first block is only removed when all of the in-flight - // methods have received their results (so the "methods" list is empty) *AND* - // all of the data written by those methods are visible in the local cache. So - // it is possible for the first block's methods list to be empty, if we are - // still waiting for some objects to quiesce. + // The scheduler does not know about 'wait' — it only processes groups. + // The 'wait' flag is consumed at scheduling time (_addOutstandingMethod) + // to decide whether to append to the current group or start a new one. // // Example: - // _outstandingMethodBlocks = [ - // {wait: false, methods: []}, - // {wait: true, methods: []}, - // {wait: false, methods: [, - // ]}] - // This means that there were some methods which were sent to the server and - // which have returned their results, but some of the data written by - // the methods may not be visible in the local cache. Once all that data is - // visible, we will send a 'login' method. Once the login method has returned - // and all the data is visible (including re-running subs if userId changes), - // we will send the 'foo' and 'bar' methods in parallel. - self._outstandingMethodBlocks = []; + // _methodQueue = [ + // {bufferData: false, pendingUpdates: new Set(), methods: []}, + // {bufferData: true, pendingUpdates: new Set(['id2']), + // methods: []}, + // {bufferData: false, pendingUpdates: new Set(), + // methods: [, ]} + // ] + self._methodQueue = []; // method ID -> array of objects with keys 'collection' and 'id', listing // documents written by a given method's stub. keys are associated with @@ -190,10 +183,8 @@ export class Connection { // This buffers the messages that aren't being processed yet. self._messagesBufferedUntilQuiescence = []; - // Map from method ID -> true. Methods are removed from this when their - // "data done" message is received, and we will not quiesce until it is - // empty. - self._methodsBlockingQuiescence = {}; + // Quiescence is derived from the current execution group's pendingUpdates + // set. No separate tracking structure needed — see _waitingForQuiescence(). // map from sub ID -> true for subs that were ready (ie, called the sub // ready callback) before reconnect but haven't become ready again yet self._subsBeingRevived = {}; // map from sub._id -> true @@ -261,7 +252,7 @@ export class Connection { // requires both before firing the callback. if (!this.options.retry || this._stream._forcedToDisconnect) { // Abort all outstanding method invokers. _onMethodComplete handles - // removal from both _methodInvokers and _outstandingMethodBlocks. + // removal from both _methodInvokers and _methodQueue. keys(this._methodInvokers).forEach(id => { this._methodInvokers[id].abort( 'Connection closed before method completed' @@ -1136,10 +1127,9 @@ export class Connection { // revived or early methods to finish their data, or we are waiting for a // "wait" method to finish. _waitingForQuiescence() { - return ( - ! isEmpty(this._subsBeingRevived) || - ! isEmpty(this._methodsBlockingQuiescence) - ); + if (!isEmpty(this._subsBeingRevived)) return true; + const group = this._methodQueue[0]; + return group && group.bufferData && group.pendingUpdates.size > 0; } // Returns true if any method whose message has been sent to the server has @@ -1337,34 +1327,36 @@ export class Connection { } _addOutstandingMethod(methodInvoker, options) { - if (options?.wait) { - // It's a wait method! Wait methods go in their own block. - this._outstandingMethodBlocks.push({ - wait: true, - methods: [methodInvoker] + const isBarrier = !!options?.wait; + + if (isBarrier) { + // Barrier method — gets its own group with data buffering. + this._methodQueue.push({ + bufferData: true, + pendingUpdates: new Set(), + methods: [methodInvoker], }); } else { - // Not a wait method. Start a new block if the previous block was a wait - // block, and add it to the last block of methods. - if (isEmpty(this._outstandingMethodBlocks) || - last(this._outstandingMethodBlocks).wait) { - this._outstandingMethodBlocks.push({ - wait: false, + // Parallel method — append to current group, or start a new one + // if the queue is empty or the last group is a barrier. + if (isEmpty(this._methodQueue) || + this._methodQueue[this._methodQueue.length - 1].bufferData) { + this._methodQueue.push({ + bufferData: false, + pendingUpdates: new Set(), methods: [], }); } - last(this._outstandingMethodBlocks).methods.push(methodInvoker); + this._methodQueue[this._methodQueue.length - 1].methods.push(methodInvoker); } - // If we added it to the first block, send it out now. - if (this._outstandingMethodBlocks.length === 1) { + // If we added it to the first group, send it now. + if (this._methodQueue.length === 1) { methodInvoker.sendMessage(); - // Wait methods block quiescence — set only at send time, not at - // construction. During reconnect, message_processors rebuilds this - // for all in-flight methods. - if (methodInvoker._wait) { - this._methodsBlockingQuiescence[methodInvoker.methodId] = true; + // If this group buffers data, track this method's ID for quiescence. + if (this._methodQueue[0].bufferData) { + this._methodQueue[0].pendingUpdates.add(methodInvoker.methodId); } } } @@ -1372,8 +1364,8 @@ export class Connection { // Single callback invoked by MethodInvoker when it reaches a terminal state. // Handles ALL connection-side bookkeeping in one place: // 1. Remove from _methodInvokers map - // 2. Remove from _outstandingMethodBlocks - // 3. Advance to next method block if current block is empty + // 2. Remove from execution group in _methodQueue + // 3. Advance to next group if current group is fully complete // 4. Check for hot code push readiness _onMethodComplete(invoker) { const self = this; @@ -1381,27 +1373,27 @@ export class Connection { // 1. Remove from the invoker tracking map. delete self._methodInvokers[invoker.methodId]; - // 2. Remove from method blocks. - for (const block of self._outstandingMethodBlocks) { - const idx = block.methods.indexOf(invoker); + // 2. Remove from execution group. + for (const group of self._methodQueue) { + const idx = group.methods.indexOf(invoker); if (idx !== -1) { - block.methods.splice(idx, 1); + group.methods.splice(idx, 1); break; } } - // 3. If no methods are in-flight, advance to the next block. + // 3. If no methods are in-flight, advance to the next group. if (!self._anyMethodsAreOutstanding()) { - if (!isEmpty(self._outstandingMethodBlocks)) { - const firstBlock = self._outstandingMethodBlocks.shift(); - if (!isEmpty(firstBlock.methods)) + if (!isEmpty(self._methodQueue)) { + const firstGroup = self._methodQueue.shift(); + if (!isEmpty(firstGroup.methods)) throw new Error( - 'No methods outstanding but nonempty block: ' + - JSON.stringify(firstBlock) + 'No methods outstanding but nonempty group: ' + + JSON.stringify(firstGroup) ); - // Send the outstanding methods now in the first block. - if (!isEmpty(self._outstandingMethodBlocks)) + // Send the methods in the next group. + if (!isEmpty(self._methodQueue)) self._sendOutstandingMethods(); } @@ -1410,72 +1402,70 @@ export class Connection { } } - // Sends messages for all the methods in the first block in - // _outstandingMethodBlocks. Methods that abort during sendMessage() - // (e.g. noRetry or maxRetries exceeded) are cleaned up by _onMethodComplete. + // Sends messages for all the methods in the first group of _methodQueue. + // Methods that abort during sendMessage() are cleaned up by _onMethodComplete. _sendOutstandingMethods() { const self = this; - if (isEmpty(self._outstandingMethodBlocks)) { + if (isEmpty(self._methodQueue)) { return; } - // Copy the array — sendMessage() may trigger _onMethodComplete which - // mutates the block's methods array via splice. - const methods = [...self._outstandingMethodBlocks[0].methods]; + const group = self._methodQueue[0]; + // Copy — sendMessage() may trigger _onMethodComplete which splices. + const methods = [...group.methods]; methods.forEach(m => { m.sendMessage(); - if (m._wait && !m.isDone()) { - self._methodsBlockingQuiescence[m.methodId] = true; + // Track for quiescence if this group buffers data. + if (group.bufferData && !m.isDone()) { + group.pendingUpdates.add(m.methodId); } }); } - _sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks) { + _mergeAndSendMethodQueue(oldQueue) { const self = this; - if (isEmpty(oldOutstandingMethodBlocks)) return; + if (isEmpty(oldQueue)) return; // We have at least one block worth of old outstanding methods to try // again. First: did onReconnect actually send anything? If not, we just - // restore all outstanding methods and run the first block. - if (isEmpty(self._outstandingMethodBlocks)) { - self._outstandingMethodBlocks = oldOutstandingMethodBlocks; + // restore all outstanding methods and run the first group. + if (isEmpty(self._methodQueue)) { + self._methodQueue = oldQueue; self._sendOutstandingMethods(); return; } - // OK, there are blocks on both sides. Special case: merge the last block of - // the reconnect methods with the first block of the original methods, if - // neither of them are "wait" blocks. - if ( - !last(self._outstandingMethodBlocks).wait && - !oldOutstandingMethodBlocks[0].wait - ) { + // OK, there are groups on both sides. Special case: merge the last group of + // the reconnect methods with the first group of the original methods, if + // neither of them buffers data (i.e. neither is a barrier group). + const lastNew = self._methodQueue[self._methodQueue.length - 1]; + if (!lastNew.bufferData && !oldQueue[0].bufferData) { // Copy — sendMessage() may trigger _onMethodComplete which splices. - const toMerge = [...oldOutstandingMethodBlocks[0].methods]; + const toMerge = [...oldQueue[0].methods]; toMerge.forEach((m) => { - last(self._outstandingMethodBlocks).methods.push(m); + lastNew.methods.push(m); - // If this "last block" is also the first block, send the message. - if (self._outstandingMethodBlocks.length === 1) { + // If this "last group" is also the first group, send the message. + if (self._methodQueue.length === 1) { m.sendMessage(); - if (m._wait && !m.isDone()) { - self._methodsBlockingQuiescence[m.methodId] = true; + if (lastNew.bufferData && !m.isDone()) { + lastNew.pendingUpdates.add(m.methodId); } } }); - oldOutstandingMethodBlocks.shift(); + oldQueue.shift(); } - // Now add the rest of the original blocks on. - self._outstandingMethodBlocks.push(...oldOutstandingMethodBlocks); + // Now add the rest of the original groups. + self._methodQueue.push(...oldQueue); } _callOnReconnectAndSendAppropriateOutstandingMethods() { const self = this; - const oldOutstandingMethodBlocks = self._outstandingMethodBlocks; - self._outstandingMethodBlocks = []; + const oldQueue = self._methodQueue; + self._methodQueue = []; self.onReconnect && self.onReconnect(); DDP._reconnectHook.each((callback) => { @@ -1483,7 +1473,7 @@ export class Connection { return true; }); - self._sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks); + self._mergeAndSendMethodQueue(oldQueue); } // We can accept a hot code push if there are no methods in flight. diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js index dc64df1abfc..a0cf654d476 100644 --- a/packages/ddp-client/common/message_processors.js +++ b/packages/ddp-client/common/message_processors.js @@ -79,11 +79,14 @@ export class MessageProcessors { // track methods that were sent on this connection so that we don't // quiesce until they are all done. // - // Start by clearing _methodsBlockingQuiescence: methods sent before - // reconnect don't matter, and any "wait" methods sent on the new connection - // that we drop here will be restored by the loop below. - self._methodsBlockingQuiescence = Object.create(null); - if (self._resetStores) { + // During reconnect, the first execution group buffers data until all its + // methods settle. Mark it as bufferData and populate pendingUpdates with + // all in-flight method IDs. + if (self._resetStores && !isEmpty(self._methodQueue)) { + const firstGroup = self._methodQueue[0]; + firstGroup.bufferData = true; + firstGroup.pendingUpdates = new Set(); + const invokers = self._methodInvokers; Object.keys(invokers).forEach(id => { const invoker = invokers[id]; @@ -96,16 +99,9 @@ export class MessageProcessors { (...args) => invoker.dataVisible(...args) ); } else if (invoker.isInFlight()) { - // This method has been sent on this connection (maybe as a resend - // from the last connection, maybe from onReconnect, maybe just very - // quickly before processing the connected message). - // - // We don't need to do anything special to ensure its callbacks get - // called, but we'll count it as a method which is preventing - // reconnect quiescence. (eg, it might be a login method that was run - // from onReconnect, and we don't want to see flicker by seeing a - // logged-out state.) - self._methodsBlockingQuiescence[invoker.methodId] = true; + // In-flight method blocks reconnect quiescence (e.g. a login method + // from onReconnect — we don't want UI flicker). + firstGroup.pendingUpdates.add(invoker.methodId); } }); } @@ -146,9 +142,10 @@ export class MessageProcessors { }); } - if (msg.methods) { + if (msg.methods && !isEmpty(self._methodQueue)) { + const group = self._methodQueue[0]; msg.methods.forEach(methodId => { - delete self._methodsBlockingQuiescence[methodId]; + group.pendingUpdates.delete(methodId); }); } @@ -250,11 +247,11 @@ export class MessageProcessors { // find the outstanding request // should be O(1) in nearly all realistic use cases - if (isEmpty(self._outstandingMethodBlocks)) { + if (isEmpty(self._methodQueue)) { Meteor._debug('Received method result but no methods outstanding'); return; } - const currentMethodBlock = self._outstandingMethodBlocks[0].methods; + const currentMethodBlock = self._methodQueue[0].methods; const m = currentMethodBlock.find(method => method.methodId === msg.id); if (!m) { Meteor._debug("Can't match method response to original method call", msg); diff --git a/packages/ddp-client/common/method_invoker.js b/packages/ddp-client/common/method_invoker.js index 8646c3c5048..59dfc09ead8 100644 --- a/packages/ddp-client/common/method_invoker.js +++ b/packages/ddp-client/common/method_invoker.js @@ -29,8 +29,8 @@ export { InvokerState }; // onComplete(invoker) — notify the connection that this invoker is done // (terminal state reached, callback fired). // The connection handles all its own bookkeeping -// (_methodInvokers, _outstandingMethodBlocks, -// quiescence, migration) in this callback. +// (_methodInvokers, _methodQueue, quiescence, +// migration) in this callback. // // State machine (two-phase completion: result and updated can arrive in // either order): @@ -99,8 +99,10 @@ export class MethodInvoker { // Sends the method message to the server. May be called additional times if // we lose the connection and reconnect before receiving a result. sendMessage() { - if (this.gotResult()) - throw new Error('sendingMethod is called on method with result'); + // Already have a result — nothing to send. This can happen when + // _sendOutstandingMethods iterates a group that contains an invoker + // which received its result before a reconnect. + if (this.gotResult()) return; // On re-send (reconnect), check whether this method is allowed to retry. if (this._state === InvokerState.WAITING_FOR_RESEND) { diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index 4ff0f805b20..6e9c8610118 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -1823,9 +1823,9 @@ addReconnectTests( // white-box test: test.equal( - conn._outstandingMethodBlocks.map(function(block) { + conn._methodQueue.map(function(block) { return [ - block.wait, + block.bufferData, block.methods.map(function(method) { return method._message.params[0]; }) @@ -2049,9 +2049,9 @@ addReconnectTests( // white-box test: test.equal( - conn._outstandingMethodBlocks.map(function(block) { + conn._methodQueue.map(function(block) { return [ - block.wait, + block.bufferData, block.methods.map(function(method) { return method._message.params[0]; }) @@ -2184,7 +2184,7 @@ addReconnectTests('livedata stub - reconnect double wait method', async function // Call another method. It should be delivered immediately. This is a // regression test for a case where it never got delivered because there was - // an empty block in _outstandingMethodBlocks blocking it from being sent. + // an empty group in _methodQueue blocking it from being sent. conn.call('lastMethod', identity); testGotMessage(test, stream, { msg: 'method', @@ -2639,7 +2639,7 @@ Tinytest.addAsync( // All invokers should be cleaned up test.equal(Object.keys(conn._methodInvokers).length, 0); - test.equal(conn._outstandingMethodBlocks.length, 0); + test.equal(conn._methodQueue.length, 0); } ); From 24e9c9b6f7ca8677ec99f3a9c3cd93c6f5721d50 Mon Sep 17 00:00:00 2001 From: Mark Russell Date: Mon, 6 Apr 2026 18:21:15 -0400 Subject: [PATCH 4/6] clean dead code --- packages/ddp-client/common/livedata_connection.js | 1 - packages/ddp-client/common/method_invoker.js | 1 - 2 files changed, 2 deletions(-) diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 0fb506b4e8c..7c43593ca82 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -867,7 +867,6 @@ export class Connection { methodId, callback: callback, onResultReceived: options.onResultReceived, - wait: !!options.wait, message: message, noRetry: !!options.noRetry, maxRetries: options.maxRetries != null ? options.maxRetries : null, diff --git a/packages/ddp-client/common/method_invoker.js b/packages/ddp-client/common/method_invoker.js index 59dfc09ead8..c76bb4c64b8 100644 --- a/packages/ddp-client/common/method_invoker.js +++ b/packages/ddp-client/common/method_invoker.js @@ -60,7 +60,6 @@ export class MethodInvoker { this._callback = options.callback; this._message = options.message; this._onResultReceived = options.onResultReceived || (() => {}); - this._wait = options.wait; this.noRetry = options.noRetry; this._maxRetries = options.maxRetries != null ? options.maxRetries : null; this._retryCount = 0; From a3f00e7396c5f981b8ee73577e3500b7966a1545 Mon Sep 17 00:00:00 2001 From: Mark Russell Date: Tue, 7 Apr 2026 00:31:25 -0400 Subject: [PATCH 5/6] refactor and fix --- .../common/connection_stream_handlers.js | 10 +- .../ddp-client/common/document_processors.js | 16 +- packages/ddp-client/common/execution_group.js | 150 ++++++++++++++++ .../ddp-client/common/livedata_connection.js | 127 ++++++------- .../ddp-client/common/message_processors.js | 53 +++--- packages/ddp-client/common/method_invoker.js | 169 +++++------------- .../test/livedata_connection_tests.js | 4 +- 7 files changed, 289 insertions(+), 240 deletions(-) create mode 100644 packages/ddp-client/common/execution_group.js diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js index 268a129613b..b53df2d490b 100644 --- a/packages/ddp-client/common/connection_stream_handlers.js +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -160,13 +160,15 @@ export class ConnectionStreamHandlers { const queue = this._connection._methodQueue; if (queue.length === 0) return; - // Notify all invokers about the reconnect. Each invoker transitions - // its own state (IN_FLIGHT → WAITING_FOR_RESEND). Retry decisions - // (noRetry, maxRetries) are handled inside the invoker when - // sendMessage() is called during reconnect. + // Notify all invokers and groups about the reconnect. + // Invokers transition IN_FLIGHT → WAITING_FOR_RESEND. + // Groups reset wire-level state (updated/dataVisible) while preserving results. Object.values(this._connection._methodInvokers).forEach(invoker => { invoker.onReconnect(); }); + queue.forEach(group => { + group.resetForReconnect(); + }); } /** diff --git a/packages/ddp-client/common/document_processors.js b/packages/ddp-client/common/document_processors.js index 11c4b2f8c0b..41cfc5c851f 100644 --- a/packages/ddp-client/common/document_processors.js +++ b/packages/ddp-client/common/document_processors.js @@ -161,16 +161,14 @@ export class DocumentProcessors { }); delete self._documentsWrittenByStub[methodId]; - // We want to call the data-written callback, but we can't do so until all - // currently buffered messages are flushed. - const callbackInvoker = self._methodInvokers[methodId]; - if (!callbackInvoker) { - throw new Error('No callback invoker for method ' + methodId); + // Data for this method is now visible in the local cache. Tell the + // execution group, which will complete the invoker if it also has the result. + const group = self._methodQueue[0]; + if (group) { + self._runWhenAllServerDocsAreFlushed(() => { + group.markDataVisible(methodId); + }); } - - self._runWhenAllServerDocsAreFlushed( - (...args) => callbackInvoker.dataVisible(...args) - ); }); } diff --git a/packages/ddp-client/common/execution_group.js b/packages/ddp-client/common/execution_group.js new file mode 100644 index 00000000000..662fae249eb --- /dev/null +++ b/packages/ddp-client/common/execution_group.js @@ -0,0 +1,150 @@ +// An ExecutionGroup manages a set of method calls that execute together. +// Groups are processed sequentially by the connection — all methods in a +// group must complete before the next group is sent. +// +// The group owns the DDP two-phase completion protocol: +// - Tracks 'result' wire messages per method +// - Tracks 'updated' wire messages per method (for quiescence) +// - Tracks data visibility per method (for callback firing) +// - Tells each MethodInvoker when to fire its callback +// +// The group does NOT handle transport (sending/retrying) — that's the +// invoker's job. The group does NOT handle scheduling (which group runs +// next) — that's the connection's job. +export class ExecutionGroup { + // atomic: if true, data messages are buffered while this group runs, + // and the group cannot be merged with adjacent groups. + // onComplete: callback(invoker) when an invoker reaches a terminal state. + // The connection handles its own bookkeeping in this callback. + constructor({ atomic = false, onComplete }) { + this.atomic = atomic; + this._onComplete = onComplete; + this._methods = []; + + // Per-method two-phase tracking. Keyed by methodId. + this._completion = new Map(); + } + + get methods() { + return this._methods; + } + + // -- Group management ------------------------------------------------------ + + addMethod(invoker) { + this._methods.push(invoker); + this._completion.set(invoker.methodId, { + gotResult: false, + gotUpdated: false, // wire: 'updated' message received + dataVisible: false, // local: data flushed to minimongo + err: undefined, + result: undefined, + }); + } + + removeMethod(invoker) { + const idx = this._methods.indexOf(invoker); + if (idx !== -1) { + this._methods.splice(idx, 1); + } + this._completion.delete(invoker.methodId); + } + + hasMethod(invoker) { + return this._completion.has(invoker.methodId); + } + + isEmpty() { + return this._methods.length === 0; + } + + // Reset wire-level state on reconnect. Preserves result (which was already + // received and won't be re-sent), but clears updated/dataVisible since those + // need to arrive fresh on the new connection. + resetForReconnect() { + for (const [, entry] of this._completion) { + entry.gotUpdated = false; + entry.dataVisible = false; + } + } + + // -- Two-phase completion -------------------------------------------------- + + // Called when the server sends a 'result' message for a method in this group. + receiveResult(methodId, err, result) { + const entry = this._completion.get(methodId); + if (!entry) return; + entry.gotResult = true; + entry.err = err; + entry.result = result; + + // Fire onResultReceived callback if provided (eager result access). + const invoker = this._methods.find(m => m.methodId === methodId); + if (invoker && invoker._onResultReceived) { + invoker._onResultReceived(err, result); + } + + this._maybeComplete(methodId); + } + + // Called when the 'updated' wire message arrives for this method ID. + // This is used for quiescence tracking — it does NOT mean data is visible. + // For non-atomic groups, this also marks data as visible (they're the same + // moment since there's no buffering). + receiveUpdated(methodId) { + const entry = this._completion.get(methodId); + if (!entry) return; + entry.gotUpdated = true; + } + + // Called when data written by this method is actually visible in minimongo. + // For non-atomic groups, this is called in the same tick as receiveUpdated. + // For atomic groups, this is called after quiescence ends and data is flushed. + markDataVisible(methodId) { + const entry = this._completion.get(methodId); + if (!entry) return; + entry.dataVisible = true; + this._maybeComplete(methodId); + } + + // For atomic groups: called after quiescence ends and buffered data has been + // flushed to minimongo. Marks all updated methods as data-visible and + // completes any that also have their result. + flushCompleted() { + for (const [methodId, entry] of this._completion) { + if (entry.gotUpdated) { + entry.dataVisible = true; + this._maybeComplete(methodId); + } + } + } + + // True if all methods in this group have received their 'updated' wire message. + // Used by the connection to decide when to stop buffering data (quiescence). + isQuiesced() { + for (const [, entry] of this._completion) { + if (!entry.gotUpdated) return false; + } + return true; + } + + // True if a specific method has received its result. + hasResult(methodId) { + const entry = this._completion.get(methodId); + return entry ? entry.gotResult : false; + } + + // -- Internal -------------------------------------------------------------- + + // Complete the invoker if it has both result AND data visible. + _maybeComplete(methodId) { + const entry = this._completion.get(methodId); + if (!entry || !entry.gotResult || !entry.dataVisible) return; + + const invoker = this._methods.find(m => m.methodId === methodId); + if (!invoker || invoker.isDone()) return; + + invoker.complete(entry.err, entry.result); + this._onComplete(invoker); + } +} diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 7c43593ca82..6771e6654d5 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -6,6 +6,7 @@ import { Random } from 'meteor/random'; import { MongoID } from 'meteor/mongo-id'; import { DDP } from './namespace.js'; import { MethodInvoker } from './method_invoker'; +import { ExecutionGroup } from './execution_group'; import { hasOwn, slice, @@ -117,32 +118,15 @@ export class Connection { // Tracks methods which the user has called but whose result messages have not // arrived yet. // - // _methodQueue is an array of execution groups. Each group is a set of - // methods to send in parallel. Groups are processed sequentially — all - // methods in a group must complete before the next group is sent. + // _methodQueue is an array of ExecutionGroup instances. Each group manages + // a set of methods to send in parallel and tracks their two-phase completion + // (result + updated). Groups are processed sequentially — all methods in a + // group must complete before the next group is sent. // - // Each group is an object with: - // - methods: array of MethodInvoker objects - // - bufferData: boolean — if true, incoming data messages are buffered - // until all methods in this group have received their 'updated' message. - // This prevents the client from seeing partial state from the group's - // writes. Set to true for barrier groups (single-method groups created - // by `wait: true`) and during reconnect (all in-flight methods buffer). - // - pendingUpdates: Set of method IDs still waiting for 'updated'. When - // empty and bufferData is true, quiescence ends for this group. - // - // The scheduler does not know about 'wait' — it only processes groups. - // The 'wait' flag is consumed at scheduling time (_addOutstandingMethod) - // to decide whether to append to the current group or start a new one. - // - // Example: - // _methodQueue = [ - // {bufferData: false, pendingUpdates: new Set(), methods: []}, - // {bufferData: true, pendingUpdates: new Set(['id2']), - // methods: []}, - // {bufferData: false, pendingUpdates: new Set(), - // methods: [, ]} - // ] + // See execution_group.js for the group API. The scheduler does not know + // about 'wait' — it only processes groups. The 'wait' flag is consumed at + // scheduling time (_addOutstandingMethod) to decide whether to append to + // the current group or create a new atomic one. self._methodQueue = []; // method ID -> array of objects with keys 'collection' and 'id', listing @@ -251,12 +235,11 @@ export class Connection { // the WebSocket between the two) hang forever — MethodInvoker // requires both before firing the callback. if (!this.options.retry || this._stream._forcedToDisconnect) { - // Abort all outstanding method invokers. _onMethodComplete handles - // removal from both _methodInvokers and _methodQueue. + // Abort all outstanding method invokers and clean up. keys(this._methodInvokers).forEach(id => { - this._methodInvokers[id].abort( - 'Connection closed before method completed' - ); + const invoker = this._methodInvokers[id]; + invoker.abort('Connection closed before method completed'); + this._onMethodComplete(invoker); }); } }; @@ -1128,14 +1111,17 @@ export class Connection { _waitingForQuiescence() { if (!isEmpty(this._subsBeingRevived)) return true; const group = this._methodQueue[0]; - return group && group.bufferData && group.pendingUpdates.size > 0; + return group && group.atomic && !group.isQuiesced(); } // Returns true if any method whose message has been sent to the server has // not yet invoked its user callback. _anyMethodsAreOutstanding() { - const invokers = this._methodInvokers; - return Object.values(invokers).some((invoker) => invoker.isInFlight()); + // Check only the first group — methods in later groups are PENDING + // and shouldn't block advancement. + const group = this._methodQueue[0]; + if (!group) return false; + return group.methods.some((invoker) => !invoker.isDone()); } async _processOneDataMessage(msg, updates) { @@ -1325,38 +1311,36 @@ export class Connection { } } + _createGroup(atomic) { + return new ExecutionGroup({ + atomic, + onComplete: (invoker) => this._onMethodComplete(invoker), + }); + } + _addOutstandingMethod(methodInvoker, options) { - const isBarrier = !!options?.wait; - - if (isBarrier) { - // Barrier method — gets its own group with data buffering. - this._methodQueue.push({ - bufferData: true, - pendingUpdates: new Set(), - methods: [methodInvoker], - }); + if (options?.wait) { + // Atomic method — gets its own group. Data is buffered until it completes, + // and no other methods can be appended to this group. + const group = this._createGroup(true); + group.addMethod(methodInvoker); + this._methodQueue.push(group); } else { // Parallel method — append to current group, or start a new one - // if the queue is empty or the last group is a barrier. - if (isEmpty(this._methodQueue) || - this._methodQueue[this._methodQueue.length - 1].bufferData) { - this._methodQueue.push({ - bufferData: false, - pendingUpdates: new Set(), - methods: [], - }); + // if the queue is empty or the last group is atomic. + const lastGroup = this._methodQueue[this._methodQueue.length - 1]; + if (isEmpty(this._methodQueue) || lastGroup.atomic) { + const group = this._createGroup(false); + group.addMethod(methodInvoker); + this._methodQueue.push(group); + } else { + lastGroup.addMethod(methodInvoker); } - - this._methodQueue[this._methodQueue.length - 1].methods.push(methodInvoker); } // If we added it to the first group, send it now. if (this._methodQueue.length === 1) { methodInvoker.sendMessage(); - // If this group buffers data, track this method's ID for quiescence. - if (this._methodQueue[0].bufferData) { - this._methodQueue[0].pendingUpdates.add(methodInvoker.methodId); - } } } @@ -1374,9 +1358,8 @@ export class Connection { // 2. Remove from execution group. for (const group of self._methodQueue) { - const idx = group.methods.indexOf(invoker); - if (idx !== -1) { - group.methods.splice(idx, 1); + if (group.hasMethod(invoker)) { + group.removeMethod(invoker); break; } } @@ -1385,10 +1368,9 @@ export class Connection { if (!self._anyMethodsAreOutstanding()) { if (!isEmpty(self._methodQueue)) { const firstGroup = self._methodQueue.shift(); - if (!isEmpty(firstGroup.methods)) + if (!firstGroup.isEmpty()) throw new Error( - 'No methods outstanding but nonempty group: ' + - JSON.stringify(firstGroup) + 'No methods outstanding but nonempty group' ); // Send the methods in the next group. @@ -1411,13 +1393,14 @@ export class Connection { } const group = self._methodQueue[0]; - // Copy — sendMessage() may trigger _onMethodComplete which splices. + // Copy — sendMessage() may abort (noRetry/maxRetries) which modifies the group. const methods = [...group.methods]; methods.forEach(m => { m.sendMessage(); - // Track for quiescence if this group buffers data. - if (group.bufferData && !m.isDone()) { - group.pendingUpdates.add(m.methodId); + // If the invoker aborted during sendMessage (noRetry, maxRetries), + // clean up now. + if (m.isDone()) { + self._onMethodComplete(m); } }); } @@ -1437,19 +1420,19 @@ export class Connection { // OK, there are groups on both sides. Special case: merge the last group of // the reconnect methods with the first group of the original methods, if - // neither of them buffers data (i.e. neither is a barrier group). + // neither of them is atomic (atomic groups can't be merged). const lastNew = self._methodQueue[self._methodQueue.length - 1]; - if (!lastNew.bufferData && !oldQueue[0].bufferData) { - // Copy — sendMessage() may trigger _onMethodComplete which splices. + if (!lastNew.atomic && !oldQueue[0].atomic) { + // Move methods from the old first group into the new last group. const toMerge = [...oldQueue[0].methods]; toMerge.forEach((m) => { - lastNew.methods.push(m); + lastNew.addMethod(m); // If this "last group" is also the first group, send the message. if (self._methodQueue.length === 1) { m.sendMessage(); - if (lastNew.bufferData && !m.isDone()) { - lastNew.pendingUpdates.add(m.methodId); + if (m.isDone()) { + self._onMethodComplete(m); } } }); diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js index a0cf654d476..4fe6e0a629f 100644 --- a/packages/ddp-client/common/message_processors.js +++ b/packages/ddp-client/common/message_processors.js @@ -79,30 +79,24 @@ export class MessageProcessors { // track methods that were sent on this connection so that we don't // quiesce until they are all done. // - // During reconnect, the first execution group buffers data until all its - // methods settle. Mark it as bufferData and populate pendingUpdates with - // all in-flight method IDs. + // During reconnect, upgrade the first execution group to atomic so the UI + // doesn't flicker. The group tracks 'updated' wire messages via + // receiveUpdated() and reports quiescence via isQuiesced(). if (self._resetStores && !isEmpty(self._methodQueue)) { const firstGroup = self._methodQueue[0]; - firstGroup.bufferData = true; - firstGroup.pendingUpdates = new Set(); - - const invokers = self._methodInvokers; - Object.keys(invokers).forEach(id => { - const invoker = invokers[id]; - if (invoker.gotResult()) { - // This method already got its result, but it didn't call its callback - // because its data didn't become visible. We did not resend the - // method RPC. We'll call its callback when we get a full quiesce, - // since that's as close as we'll get to "data must be visible". - self._afterUpdateCallbacks.push( - (...args) => invoker.dataVisible(...args) - ); - } else if (invoker.isInFlight()) { - // In-flight method blocks reconnect quiescence (e.g. a login method - // from onReconnect — we don't want UI flicker). - firstGroup.pendingUpdates.add(invoker.methodId); + firstGroup.atomic = true; + + // Methods that already got their result before reconnect were NOT + // re-sent, so they won't get a new 'updated' message. Force-complete + // them after quiescence ends — mark their updated as received so + // they don't block quiescence, and schedule a flush. + for (const invoker of firstGroup.methods) { + if (firstGroup.hasResult(invoker.methodId)) { + firstGroup.receiveUpdated(invoker.methodId); } + } + self._afterUpdateCallbacks.push(() => { + firstGroup.flushCompleted(); }); } @@ -145,7 +139,7 @@ export class MessageProcessors { if (msg.methods && !isEmpty(self._methodQueue)) { const group = self._methodQueue[0]; msg.methods.forEach(methodId => { - group.pendingUpdates.delete(methodId); + group.receiveUpdated(methodId); }); } @@ -251,23 +245,22 @@ export class MessageProcessors { Meteor._debug('Received method result but no methods outstanding'); return; } - const currentMethodBlock = self._methodQueue[0].methods; - const m = currentMethodBlock.find(method => method.methodId === msg.id); + const group = self._methodQueue[0]; + const m = group.methods.find(method => method.methodId === msg.id); if (!m) { Meteor._debug("Can't match method response to original method call", msg); return; } - // The invoker stays in the method block until it reaches a terminal state. - // _onMethodComplete is the single place that removes from blocks. - + // Deliver the result to the group, which tracks two-phase completion + // and will call invoker.complete() when both result and updated arrive. if (hasOwn.call(msg, 'error')) { - m.receiveResult( + group.receiveResult( + msg.id, new Meteor.Error(msg.error.error, msg.error.reason, msg.error.details) ); } else { - // msg.result may be undefined if the method didn't return a value - m.receiveResult(undefined, msg.result); + group.receiveResult(msg.id, undefined, msg.result); } } diff --git a/packages/ddp-client/common/method_invoker.js b/packages/ddp-client/common/method_invoker.js index c76bb4c64b8..2fb3a94bbac 100644 --- a/packages/ddp-client/common/method_invoker.js +++ b/packages/ddp-client/common/method_invoker.js @@ -1,57 +1,40 @@ -// MethodInvoker state enum. Transitions are enforced — callers go through -// public methods rather than flipping boolean flags. +// MethodInvoker state enum. const InvokerState = Object.freeze({ - // Created but not yet sent to the server. PENDING: 'PENDING', - // Message sent on the current connection, waiting for result and updated. IN_FLIGHT: 'IN_FLIGHT', - // Result received from the server, waiting for data visibility (updated). - RESULT_RECEIVED: 'RESULT_RECEIVED', - // Data visible (updated arrived), waiting for result. - DATA_VISIBLE: 'DATA_VISIBLE', - // Connection dropped; waiting to be re-sent on the next connection. WAITING_FOR_RESEND: 'WAITING_FOR_RESEND', - // Fully complete — both result and data visible, callback fired. COMPLETE: 'COMPLETE', - // Aborted — timed out, noRetry, maxRetries exceeded, or connection torn down. ABORTED: 'ABORTED', }); export { InvokerState }; -// A MethodInvoker manages sending a method to the server and calling the -// user's callbacks. It owns its own lifecycle state and communicates with -// the connection layer exclusively through delegate callbacks — it never -// directly mutates connection data structures. +// A MethodInvoker manages the transport lifecycle of a single DDP method call: +// sending the message, handling retries on reconnect, and firing the user +// callback when told to by the ExecutionGroup. // -// The connection provides two delegate functions: -// send(message) — physically send a DDP message on the wire -// onComplete(invoker) — notify the connection that this invoker is done -// (terminal state reached, callback fired). -// The connection handles all its own bookkeeping -// (_methodInvokers, _methodQueue, quiescence, -// migration) in this callback. +// The invoker does NOT track two-phase completion (result + updated) — that's +// the ExecutionGroup's responsibility. The invoker just sends messages and +// fires callbacks. // -// State machine (two-phase completion: result and updated can arrive in -// either order): +// State machine: // -// PENDING ──sendMessage()──► IN_FLIGHT +// PENDING ──sendMessage()──► IN_FLIGHT ──complete()──► COMPLETE // │ -// ┌─────────────┼──────────────┐ -// receiveResult() onReconnect() dataVisible() -// │ │ │ -// ▼ ▼ ▼ -// RESULT_RECEIVED WAITING_FOR DATA_VISIBLE -// │ _RESEND │ -// dataVisible() │ receiveResult() -// │ sendMessage() │ -// ▼ ┌────┴─────┐ ▼ -// COMPLETE retry? no retry COMPLETE -// │ │ -// ▼ ▼ -// IN_FLIGHT ABORTED +// onReconnect() +// │ +// ▼ +// WAITING_FOR_RESEND +// │ +// sendMessage() +// ┌───────┴────────┐ +// _shouldRetry() _shouldRetry() +// returns true returns false +// │ │ +// ▼ ▼ +// IN_FLIGHT ABORTED // -// Any state may transition to ABORTED via abort() (disconnect teardown). +// Any state may transition to ABORTED via abort(). export class MethodInvoker { constructor(options) { this.methodId = options.methodId; @@ -59,49 +42,32 @@ export class MethodInvoker { this._callback = options.callback; this._message = options.message; - this._onResultReceived = options.onResultReceived || (() => {}); + this._onResultReceived = options.onResultReceived || null; this.noRetry = options.noRetry; this._maxRetries = options.maxRetries != null ? options.maxRetries : null; this._retryCount = 0; - this._methodResult = null; - // Delegate callbacks provided by the connection. The invoker never - // directly accesses connection data structures. + // Delegate: physically send a DDP message on the wire. this._send = options.send; - this._onComplete = options.onComplete; } // -- State queries --------------------------------------------------------- - // True if the method has been sent on the current connection and has not - // yet completed or been reset for resend. isInFlight() { - return this._state === InvokerState.IN_FLIGHT - || this._state === InvokerState.RESULT_RECEIVED - || this._state === InvokerState.DATA_VISIBLE; + return this._state === InvokerState.IN_FLIGHT; } - // True if the invoker has reached a terminal state (COMPLETE or ABORTED). isDone() { return this._state === InvokerState.COMPLETE || this._state === InvokerState.ABORTED; } - // True if receiveResult has been called (result available regardless of - // data visibility). - gotResult() { - return !!this._methodResult; - } - - // -- State transitions (public API) ---------------------------------------- + // -- State transitions ----------------------------------------------------- - // Sends the method message to the server. May be called additional times if - // we lose the connection and reconnect before receiving a result. + // Sends the method message to the server. May be called additional times + // on reconnect. sendMessage() { - // Already have a result — nothing to send. This can happen when - // _sendOutstandingMethods iterates a group that contains an invoker - // which received its result before a reconnect. - if (this.gotResult()) return; + if (this.isDone()) return; // On re-send (reconnect), check whether this method is allowed to retry. if (this._state === InvokerState.WAITING_FOR_RESEND) { @@ -114,67 +80,41 @@ export class MethodInvoker { this._send(this._message); } - // Called by the connection layer when a reconnect occurs. Marks the invoker - // as needing to be re-sent on the new connection. + // Called by the connection layer when a reconnect occurs. onReconnect() { - if (this._state === InvokerState.IN_FLIGHT - || this._state === InvokerState.DATA_VISIBLE) { + if (this._state === InvokerState.IN_FLIGHT) { this._state = InvokerState.WAITING_FOR_RESEND; } - // RESULT_RECEIVED invokers keep their state — they already have the - // result and just need dataVisible() to complete. The reconnect quiescence - // logic handles them via gotResult() + dataVisible(). - } - - // Call with the result of the method from the server. Only may be called - // once; once it is called, you should not call sendMessage again. - receiveResult(err, result) { - if (this.gotResult()) - throw new Error('Methods should only receive results once'); - this._methodResult = [err, result]; - // If dataVisible() already arrived, go straight to COMPLETE. - this._state = this._state === InvokerState.DATA_VISIBLE - ? InvokerState.COMPLETE - : InvokerState.RESULT_RECEIVED; - this._onResultReceived(err, result); - this._maybeComplete(); } - // Call this when all data written by the method is visible. This means that - // the method has returned its "data is done" message *AND* all server - // documents that are buffered at that time have been written to the local - // cache. - dataVisible() { - if (this._state === InvokerState.RESULT_RECEIVED) { - this._state = InvokerState.COMPLETE; - } else if (this._state === InvokerState.IN_FLIGHT) { - this._state = InvokerState.DATA_VISIBLE; - } - this._maybeComplete(); + // Called by the ExecutionGroup when both result and data visibility + // conditions are met. Fires the user callback. + complete(err, result) { + if (this.isDone()) return; + this._state = InvokerState.COMPLETE; + this._callback(err, result); } - // Force-complete this invoker with an error, regardless of current state. - // Fires the callback exactly once and cleans up. Callers who need the - // result before write confirmation should use onResultReceived. + // Force-complete with an error. Used on disconnect teardown and when + // retry limits are exceeded. abort(reason) { if (this.isDone()) return; - this._methodResult = [new Meteor.Error('disconnected', reason), undefined]; this._state = InvokerState.ABORTED; - this._fireCallback(); + this._callback(new Meteor.Error('disconnected', reason), undefined); } - // -- Internal methods ------------------------------------------------------ + // -- Internal -------------------------------------------------------------- - // Decides whether a method should be re-sent on reconnect. Centralizes - // noRetry and maxRetries logic. _shouldRetry() { if (this.noRetry) { - this.receiveResult( + this._state = InvokerState.ABORTED; + this._callback( new Meteor.Error( 'invocation-failed', 'Method invocation might have failed due to dropped connection. ' + 'Failing because `noRetry` option was passed to Meteor.apply.' - ) + ), + undefined ); return false; } @@ -182,28 +122,11 @@ export class MethodInvoker { if (this._maxRetries !== null) { this._retryCount++; if (this._retryCount > this._maxRetries) { - this.abort( - 'Method retry limit exceeded (' + this._maxRetries + ')' - ); + this.abort('Method retry limit exceeded (' + this._maxRetries + ')'); return false; } } return true; } - - // Fire the callback if we have a result and are in a terminal state. - _maybeComplete() { - if (this._methodResult - && (this._state === InvokerState.COMPLETE - || this._state === InvokerState.ABORTED)) { - this._fireCallback(); - } - } - - // Actually invoke the callback and notify the connection. Called at most once. - _fireCallback() { - this._callback(this._methodResult[0], this._methodResult[1]); - this._onComplete(this); - } } diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index 6e9c8610118..90417ed9cf7 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -1825,7 +1825,7 @@ addReconnectTests( test.equal( conn._methodQueue.map(function(block) { return [ - block.bufferData, + block.atomic, block.methods.map(function(method) { return method._message.params[0]; }) @@ -2051,7 +2051,7 @@ addReconnectTests( test.equal( conn._methodQueue.map(function(block) { return [ - block.bufferData, + block.atomic, block.methods.map(function(method) { return method._message.params[0]; }) From e3271c77985e7c6d6694fa87c831d01823852587 Mon Sep 17 00:00:00 2001 From: Mark Russell Date: Tue, 7 Apr 2026 00:54:06 -0400 Subject: [PATCH 6/6] fixed test failures --- packages/ddp-client/common/execution_group.js | 19 +++++++++++++++++++ .../ddp-client/common/livedata_connection.js | 17 ++++++++++++----- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/packages/ddp-client/common/execution_group.js b/packages/ddp-client/common/execution_group.js index 662fae249eb..885a1744ac1 100644 --- a/packages/ddp-client/common/execution_group.js +++ b/packages/ddp-client/common/execution_group.js @@ -50,6 +50,25 @@ export class ExecutionGroup { this._completion.delete(invoker.methodId); } + // Transfer a method from another group, preserving its completion state. + // Used during reconnect when merging old groups into new ones. + transferMethod(invoker, fromGroup) { + this._methods.push(invoker); + const existingEntry = fromGroup._completion.get(invoker.methodId); + if (existingEntry) { + this._completion.set(invoker.methodId, existingEntry); + fromGroup._completion.delete(invoker.methodId); + } else { + this._completion.set(invoker.methodId, { + gotResult: false, + gotUpdated: false, + dataVisible: false, + err: undefined, + result: undefined, + }); + } + } + hasMethod(invoker) { return this._completion.has(invoker.methodId); } diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 6771e6654d5..0748cc38876 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -1396,6 +1396,10 @@ export class Connection { // Copy — sendMessage() may abort (noRetry/maxRetries) which modifies the group. const methods = [...group.methods]; methods.forEach(m => { + // Don't re-send methods that already have their result — they won't + // get a new result from the server. They're waiting for dataVisible. + if (group.hasResult(m.methodId)) return; + m.sendMessage(); // If the invoker aborted during sendMessage (noRetry, maxRetries), // clean up now. @@ -1423,13 +1427,16 @@ export class Connection { // neither of them is atomic (atomic groups can't be merged). const lastNew = self._methodQueue[self._methodQueue.length - 1]; if (!lastNew.atomic && !oldQueue[0].atomic) { - // Move methods from the old first group into the new last group. - const toMerge = [...oldQueue[0].methods]; + // Transfer methods from the old first group into the new last group, + // preserving completion state (e.g. result received before reconnect). + const oldGroup = oldQueue[0]; + const toMerge = [...oldGroup.methods]; toMerge.forEach((m) => { - lastNew.addMethod(m); + lastNew.transferMethod(m, oldGroup); - // If this "last group" is also the first group, send the message. - if (self._methodQueue.length === 1) { + // If this "last group" is also the first group, send the message + // (unless it already has its result from before reconnect). + if (self._methodQueue.length === 1 && !lastNew.hasResult(m.methodId)) { m.sendMessage(); if (m.isDone()) { self._onMethodComplete(m);