diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js index d4fa785df66..b53df2d490b 100644 --- a/packages/ddp-client/common/connection_stream_handlers.js +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -157,37 +157,17 @@ export class ConnectionStreamHandlers { * @private */ _handleOutstandingMethodsOnReset() { - const blocks = this._connection._outstandingMethodBlocks; - if (blocks.length === 0) return; - - const currentMethodBlock = blocks[0].methods; - blocks[0].methods = currentMethodBlock.filter( - methodInvoker => { - // Methods with 'noRetry' option set are not allowed to re-send after - // recovering dropped connection. - if (methodInvoker.sentMessage && methodInvoker.noRetry) { - methodInvoker.receiveResult( - new Meteor.Error( - 'invocation-failed', - 'Method invocation might have failed due to dropped connection. ' + - 'Failing because `noRetry` option was passed to Meteor.apply.' - ) - ); - } - - // Only keep a method if it wasn't sent or it's allowed to retry. - return !(methodInvoker.sentMessage && methodInvoker.noRetry); - } - ); + const queue = this._connection._methodQueue; + if (queue.length === 0) return; - // Clear empty blocks - if (blocks.length > 0 && blocks[0].methods.length === 0) { - blocks.shift(); - } - - // Reset all method invokers as unsent + // Notify all invokers and groups about the reconnect. + // Invokers transition IN_FLIGHT → WAITING_FOR_RESEND. + // Groups reset wire-level state (updated/dataVisible) while preserving results. Object.values(this._connection._methodInvokers).forEach(invoker => { - invoker.sentMessage = false; + invoker.onReconnect(); + }); + queue.forEach(group => { + group.resetForReconnect(); }); } diff --git a/packages/ddp-client/common/document_processors.js b/packages/ddp-client/common/document_processors.js index 11c4b2f8c0b..41cfc5c851f 100644 --- a/packages/ddp-client/common/document_processors.js +++ b/packages/ddp-client/common/document_processors.js @@ -161,16 +161,14 @@ export class DocumentProcessors { }); delete self._documentsWrittenByStub[methodId]; - // We want to call the data-written callback, but we can't do so until all - // currently buffered messages are flushed. - const callbackInvoker = self._methodInvokers[methodId]; - if (!callbackInvoker) { - throw new Error('No callback invoker for method ' + methodId); + // Data for this method is now visible in the local cache. Tell the + // execution group, which will complete the invoker if it also has the result. + const group = self._methodQueue[0]; + if (group) { + self._runWhenAllServerDocsAreFlushed(() => { + group.markDataVisible(methodId); + }); } - - self._runWhenAllServerDocsAreFlushed( - (...args) => callbackInvoker.dataVisible(...args) - ); }); } diff --git a/packages/ddp-client/common/execution_group.js b/packages/ddp-client/common/execution_group.js new file mode 100644 index 00000000000..885a1744ac1 --- /dev/null +++ b/packages/ddp-client/common/execution_group.js @@ -0,0 +1,169 @@ +// An ExecutionGroup manages a set of method calls that execute together. +// Groups are processed sequentially by the connection — all methods in a +// group must complete before the next group is sent. +// +// The group owns the DDP two-phase completion protocol: +// - Tracks 'result' wire messages per method +// - Tracks 'updated' wire messages per method (for quiescence) +// - Tracks data visibility per method (for callback firing) +// - Tells each MethodInvoker when to fire its callback +// +// The group does NOT handle transport (sending/retrying) — that's the +// invoker's job. The group does NOT handle scheduling (which group runs +// next) — that's the connection's job. +export class ExecutionGroup { + // atomic: if true, data messages are buffered while this group runs, + // and the group cannot be merged with adjacent groups. + // onComplete: callback(invoker) when an invoker reaches a terminal state. + // The connection handles its own bookkeeping in this callback. + constructor({ atomic = false, onComplete }) { + this.atomic = atomic; + this._onComplete = onComplete; + this._methods = []; + + // Per-method two-phase tracking. Keyed by methodId. + this._completion = new Map(); + } + + get methods() { + return this._methods; + } + + // -- Group management ------------------------------------------------------ + + addMethod(invoker) { + this._methods.push(invoker); + this._completion.set(invoker.methodId, { + gotResult: false, + gotUpdated: false, // wire: 'updated' message received + dataVisible: false, // local: data flushed to minimongo + err: undefined, + result: undefined, + }); + } + + removeMethod(invoker) { + const idx = this._methods.indexOf(invoker); + if (idx !== -1) { + this._methods.splice(idx, 1); + } + this._completion.delete(invoker.methodId); + } + + // Transfer a method from another group, preserving its completion state. + // Used during reconnect when merging old groups into new ones. + transferMethod(invoker, fromGroup) { + this._methods.push(invoker); + const existingEntry = fromGroup._completion.get(invoker.methodId); + if (existingEntry) { + this._completion.set(invoker.methodId, existingEntry); + fromGroup._completion.delete(invoker.methodId); + } else { + this._completion.set(invoker.methodId, { + gotResult: false, + gotUpdated: false, + dataVisible: false, + err: undefined, + result: undefined, + }); + } + } + + hasMethod(invoker) { + return this._completion.has(invoker.methodId); + } + + isEmpty() { + return this._methods.length === 0; + } + + // Reset wire-level state on reconnect. Preserves result (which was already + // received and won't be re-sent), but clears updated/dataVisible since those + // need to arrive fresh on the new connection. + resetForReconnect() { + for (const [, entry] of this._completion) { + entry.gotUpdated = false; + entry.dataVisible = false; + } + } + + // -- Two-phase completion -------------------------------------------------- + + // Called when the server sends a 'result' message for a method in this group. + receiveResult(methodId, err, result) { + const entry = this._completion.get(methodId); + if (!entry) return; + entry.gotResult = true; + entry.err = err; + entry.result = result; + + // Fire onResultReceived callback if provided (eager result access). + const invoker = this._methods.find(m => m.methodId === methodId); + if (invoker && invoker._onResultReceived) { + invoker._onResultReceived(err, result); + } + + this._maybeComplete(methodId); + } + + // Called when the 'updated' wire message arrives for this method ID. + // This is used for quiescence tracking — it does NOT mean data is visible. + // For non-atomic groups, this also marks data as visible (they're the same + // moment since there's no buffering). + receiveUpdated(methodId) { + const entry = this._completion.get(methodId); + if (!entry) return; + entry.gotUpdated = true; + } + + // Called when data written by this method is actually visible in minimongo. + // For non-atomic groups, this is called in the same tick as receiveUpdated. + // For atomic groups, this is called after quiescence ends and data is flushed. + markDataVisible(methodId) { + const entry = this._completion.get(methodId); + if (!entry) return; + entry.dataVisible = true; + this._maybeComplete(methodId); + } + + // For atomic groups: called after quiescence ends and buffered data has been + // flushed to minimongo. Marks all updated methods as data-visible and + // completes any that also have their result. + flushCompleted() { + for (const [methodId, entry] of this._completion) { + if (entry.gotUpdated) { + entry.dataVisible = true; + this._maybeComplete(methodId); + } + } + } + + // True if all methods in this group have received their 'updated' wire message. + // Used by the connection to decide when to stop buffering data (quiescence). + isQuiesced() { + for (const [, entry] of this._completion) { + if (!entry.gotUpdated) return false; + } + return true; + } + + // True if a specific method has received its result. + hasResult(methodId) { + const entry = this._completion.get(methodId); + return entry ? entry.gotResult : false; + } + + // -- Internal -------------------------------------------------------------- + + // Complete the invoker if it has both result AND data visible. + _maybeComplete(methodId) { + const entry = this._completion.get(methodId); + if (!entry || !entry.gotResult || !entry.dataVisible) return; + + const invoker = this._methods.find(m => m.methodId === methodId); + if (!invoker || invoker.isDone()) return; + + invoker.complete(entry.err, entry.result); + this._onComplete(invoker); + } +} diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 41ae9e3f21c..0748cc38876 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -6,6 +6,7 @@ import { Random } from 'meteor/random'; import { MongoID } from 'meteor/mongo-id'; import { DDP } from './namespace.js'; import { MethodInvoker } from './method_invoker'; +import { ExecutionGroup } from './execution_group'; import { hasOwn, slice, @@ -117,40 +118,16 @@ export class Connection { // Tracks methods which the user has called but whose result messages have not // arrived yet. // - // _outstandingMethodBlocks is an array of blocks of methods. Each block - // represents a set of methods that can run at the same time. The first block - // represents the methods which are currently in flight; subsequent blocks - // must wait for previous blocks to be fully finished before they can be sent - // to the server. + // _methodQueue is an array of ExecutionGroup instances. Each group manages + // a set of methods to send in parallel and tracks their two-phase completion + // (result + updated). Groups are processed sequentially — all methods in a + // group must complete before the next group is sent. // - // Each block is an object with the following fields: - // - methods: a list of MethodInvoker objects - // - wait: a boolean; if true, this block had a single method invoked with - // the "wait" option - // - // There will never be adjacent blocks with wait=false, because the only thing - // that makes methods need to be serialized is a wait method. - // - // Methods are removed from the first block when their "result" is - // received. The entire first block is only removed when all of the in-flight - // methods have received their results (so the "methods" list is empty) *AND* - // all of the data written by those methods are visible in the local cache. So - // it is possible for the first block's methods list to be empty, if we are - // still waiting for some objects to quiesce. - // - // Example: - // _outstandingMethodBlocks = [ - // {wait: false, methods: []}, - // {wait: true, methods: []}, - // {wait: false, methods: [, - // ]}] - // This means that there were some methods which were sent to the server and - // which have returned their results, but some of the data written by - // the methods may not be visible in the local cache. Once all that data is - // visible, we will send a 'login' method. Once the login method has returned - // and all the data is visible (including re-running subs if userId changes), - // we will send the 'foo' and 'bar' methods in parallel. - self._outstandingMethodBlocks = []; + // See execution_group.js for the group API. The scheduler does not know + // about 'wait' — it only processes groups. The 'wait' flag is consumed at + // scheduling time (_addOutstandingMethod) to decide whether to append to + // the current group or create a new atomic one. + self._methodQueue = []; // method ID -> array of objects with keys 'collection' and 'id', listing // documents written by a given method's stub. keys are associated with @@ -190,10 +167,8 @@ export class Connection { // This buffers the messages that aren't being processed yet. self._messagesBufferedUntilQuiescence = []; - // Map from method ID -> true. Methods are removed from this when their - // "data done" message is received, and we will not quiesce until it is - // empty. - self._methodsBlockingQuiescence = {}; + // Quiescence is derived from the current execution group's pendingUpdates + // set. No separate tracking structure needed — see _waitingForQuiescence(). // map from sub ID -> true for subs that were ready (ie, called the sub // ready callback) before reconnect but haven't become ready again yet self._subsBeingRevived = {}; // map from sub._id -> true @@ -260,17 +235,11 @@ export class Connection { // the WebSocket between the two) hang forever — MethodInvoker // requires both before firing the callback. if (!this.options.retry || this._stream._forcedToDisconnect) { - // Clear method blocks before cleanup to prevent - // _outstandingMethodFinished from throwing invariant errors - // as we mass-complete invokers during teardown. - this._outstandingMethodBlocks = []; - - // Abort all outstanding method invokers. If a result was already - // received, it is attached to the error for caller recovery. + // Abort all outstanding method invokers and clean up. keys(this._methodInvokers).forEach(id => { - this._methodInvokers[id].abort( - 'Connection closed before method completed' - ); + const invoker = this._methodInvokers[id]; + invoker.abort('Connection closed before method completed'); + this._onMethodComplete(invoker); }); } }; @@ -640,6 +609,7 @@ export class Connection { * @param {Boolean} options.wait (Client only) If true, don't send this method until all previous method calls have completed, and don't send any subsequent method calls until this one is completed. * @param {Function} options.onResultReceived (Client only) This callback is invoked with the error or result of the method (just like `asyncCallback`) as soon as the error or result is available. The local cache may not yet reflect the writes performed by the method. * @param {Boolean} options.noRetry (Client only) if true, don't send this method again on reload, simply call the callback an error with the error code 'invocation-failed'. + * @param {Number} options.maxRetries (Client only) Maximum number of times to re-send this method on reconnect before aborting with a 'disconnected' error. If not set, the method will be retried indefinitely (default Meteor behavior). * @param {Boolean} options.throwStubExceptions (Client only) If true, exceptions thrown by method stubs will be thrown instead of logged, and the method will not be invoked on the server. * @param {Boolean} options.returnStubValue (Client only) If true then in cases where we would have otherwise discarded the stub's return value and returned undefined, instead we go ahead and return it. Specifically, this is any time other than when (a) we are already inside a stub or (b) we are in Node and no callback was provided. Currently we require this flag to be explicitly passed to reduce the likelihood that stub return values will be confused with server return values; we may improve this in future. * @param {Function} [asyncCallback] Optional callback; same semantics as in [`Meteor.call`](#meteor_call). @@ -683,6 +653,7 @@ export class Connection { * @param {Boolean} options.wait (Client only) If true, don't send this method until all previous method calls have completed, and don't send any subsequent method calls until this one is completed. * @param {Function} options.onResultReceived (Client only) This callback is invoked with the error or result of the method (just like `asyncCallback`) as soon as the error or result is available. The local cache may not yet reflect the writes performed by the method. * @param {Boolean} options.noRetry (Client only) if true, don't send this method again on reload, simply call the callback an error with the error code 'invocation-failed'. + * @param {Number} options.maxRetries (Client only) Maximum number of times to re-send this method on reconnect before aborting with a 'disconnected' error. If not set, the method will be retried indefinitely (default Meteor behavior). * @param {Boolean} options.throwStubExceptions (Client only) If true, exceptions thrown by method stubs will be thrown instead of logged, and the method will not be invoked on the server. * @param {Boolean} options.returnStubValue (Client only) If true then in cases where we would have otherwise discarded the stub's return value and returned undefined, instead we go ahead and return it. Specifically, this is any time other than when (a) we are already inside a stub or (b) we are in Node and no callback was provided. Currently we require this flag to be explicitly passed to reduce the likelihood that stub return values will be confused with server return values; we may improve this in future. * @param {Boolean} options.returnServerResultPromise (Client only) If true, the promise returned by applyAsync will resolve to the server's return value, rather than the stub's return value. This is useful when you want to ensure that the server's return value is used, even if the stub returns a promise. The same behavior as `callAsync`. @@ -878,13 +849,17 @@ export class Connection { const methodInvoker = new MethodInvoker({ methodId, callback: callback, - connection: self, onResultReceived: options.onResultReceived, - wait: !!options.wait, message: message, - noRetry: !!options.noRetry + noRetry: !!options.noRetry, + maxRetries: options.maxRetries != null ? options.maxRetries : null, + send: (msg) => self._send(msg), + onComplete: (invoker) => self._onMethodComplete(invoker), }); + // Register in the connection's tracking map. + self._methodInvokers[methodInvoker.methodId] = methodInvoker; + let result; if (promise) { @@ -1134,17 +1109,19 @@ export class Connection { // revived or early methods to finish their data, or we are waiting for a // "wait" method to finish. _waitingForQuiescence() { - return ( - ! isEmpty(this._subsBeingRevived) || - ! isEmpty(this._methodsBlockingQuiescence) - ); + if (!isEmpty(this._subsBeingRevived)) return true; + const group = this._methodQueue[0]; + return group && group.atomic && !group.isQuiesced(); } // Returns true if any method whose message has been sent to the server has // not yet invoked its user callback. _anyMethodsAreOutstanding() { - const invokers = this._methodInvokers; - return Object.values(invokers).some((invoker) => !!invoker.sentMessage); + // Check only the first group — methods in later groups are PENDING + // and shouldn't block advancement. + const group = this._methodQueue[0]; + if (!group) return false; + return group.methods.some((invoker) => !invoker.isDone()); } async _processOneDataMessage(msg, updates) { @@ -1318,7 +1295,7 @@ export class Connection { const writtenByStubForAMethodWithSentMessage = keys(serverDoc.writtenByStubs).some(methodId => { const invoker = self._methodInvokers[methodId]; - return invoker && invoker.sentMessage; + return invoker && invoker.isInFlight(); }); if (writtenByStubForAMethodWithSentMessage) { @@ -1334,114 +1311,150 @@ export class Connection { } } + _createGroup(atomic) { + return new ExecutionGroup({ + atomic, + onComplete: (invoker) => this._onMethodComplete(invoker), + }); + } + _addOutstandingMethod(methodInvoker, options) { if (options?.wait) { - // It's a wait method! Wait methods go in their own block. - this._outstandingMethodBlocks.push({ - wait: true, - methods: [methodInvoker] - }); + // Atomic method — gets its own group. Data is buffered until it completes, + // and no other methods can be appended to this group. + const group = this._createGroup(true); + group.addMethod(methodInvoker); + this._methodQueue.push(group); } else { - // Not a wait method. Start a new block if the previous block was a wait - // block, and add it to the last block of methods. - if (isEmpty(this._outstandingMethodBlocks) || - last(this._outstandingMethodBlocks).wait) { - this._outstandingMethodBlocks.push({ - wait: false, - methods: [], - }); + // Parallel method — append to current group, or start a new one + // if the queue is empty or the last group is atomic. + const lastGroup = this._methodQueue[this._methodQueue.length - 1]; + if (isEmpty(this._methodQueue) || lastGroup.atomic) { + const group = this._createGroup(false); + group.addMethod(methodInvoker); + this._methodQueue.push(group); + } else { + lastGroup.addMethod(methodInvoker); } - - last(this._outstandingMethodBlocks).methods.push(methodInvoker); } - // If we added it to the first block, send it out now. - if (this._outstandingMethodBlocks.length === 1) { + // If we added it to the first group, send it now. + if (this._methodQueue.length === 1) { methodInvoker.sendMessage(); } } - // Called by MethodInvoker after a method's callback is invoked. If this was - // the last outstanding method in the current block, runs the next block. If - // there are no more methods, consider accepting a hot code push. - _outstandingMethodFinished() { + // Single callback invoked by MethodInvoker when it reaches a terminal state. + // Handles ALL connection-side bookkeeping in one place: + // 1. Remove from _methodInvokers map + // 2. Remove from execution group in _methodQueue + // 3. Advance to next group if current group is fully complete + // 4. Check for hot code push readiness + _onMethodComplete(invoker) { const self = this; - if (self._anyMethodsAreOutstanding()) return; - - // No methods are outstanding. This should mean that the first block of - // methods is empty. (Or it might not exist, if this was a method that - // half-finished before disconnect/reconnect.) - if (! isEmpty(self._outstandingMethodBlocks)) { - const firstBlock = self._outstandingMethodBlocks.shift(); - if (! isEmpty(firstBlock.methods)) - throw new Error( - 'No methods outstanding but nonempty block: ' + - JSON.stringify(firstBlock) - ); - // Send the outstanding methods now in the first block. - if (! isEmpty(self._outstandingMethodBlocks)) - self._sendOutstandingMethods(); + // 1. Remove from the invoker tracking map. + delete self._methodInvokers[invoker.methodId]; + + // 2. Remove from execution group. + for (const group of self._methodQueue) { + if (group.hasMethod(invoker)) { + group.removeMethod(invoker); + break; + } } - // Maybe accept a hot code push. - self._maybeMigrate(); + // 3. If no methods are in-flight, advance to the next group. + if (!self._anyMethodsAreOutstanding()) { + if (!isEmpty(self._methodQueue)) { + const firstGroup = self._methodQueue.shift(); + if (!firstGroup.isEmpty()) + throw new Error( + 'No methods outstanding but nonempty group' + ); + + // Send the methods in the next group. + if (!isEmpty(self._methodQueue)) + self._sendOutstandingMethods(); + } + + // 4. Maybe accept a hot code push. + self._maybeMigrate(); + } } - // Sends messages for all the methods in the first block in - // _outstandingMethodBlocks. + // Sends messages for all the methods in the first group of _methodQueue. + // Methods that abort during sendMessage() are cleaned up by _onMethodComplete. _sendOutstandingMethods() { const self = this; - if (isEmpty(self._outstandingMethodBlocks)) { + if (isEmpty(self._methodQueue)) { return; } - self._outstandingMethodBlocks[0].methods.forEach(m => { + const group = self._methodQueue[0]; + // Copy — sendMessage() may abort (noRetry/maxRetries) which modifies the group. + const methods = [...group.methods]; + methods.forEach(m => { + // Don't re-send methods that already have their result — they won't + // get a new result from the server. They're waiting for dataVisible. + if (group.hasResult(m.methodId)) return; + m.sendMessage(); + // If the invoker aborted during sendMessage (noRetry, maxRetries), + // clean up now. + if (m.isDone()) { + self._onMethodComplete(m); + } }); } - _sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks) { + _mergeAndSendMethodQueue(oldQueue) { const self = this; - if (isEmpty(oldOutstandingMethodBlocks)) return; + if (isEmpty(oldQueue)) return; // We have at least one block worth of old outstanding methods to try // again. First: did onReconnect actually send anything? If not, we just - // restore all outstanding methods and run the first block. - if (isEmpty(self._outstandingMethodBlocks)) { - self._outstandingMethodBlocks = oldOutstandingMethodBlocks; + // restore all outstanding methods and run the first group. + if (isEmpty(self._methodQueue)) { + self._methodQueue = oldQueue; self._sendOutstandingMethods(); return; } - // OK, there are blocks on both sides. Special case: merge the last block of - // the reconnect methods with the first block of the original methods, if - // neither of them are "wait" blocks. - if ( - !last(self._outstandingMethodBlocks).wait && - !oldOutstandingMethodBlocks[0].wait - ) { - oldOutstandingMethodBlocks[0].methods.forEach((m) => { - last(self._outstandingMethodBlocks).methods.push(m); - - // If this "last block" is also the first block, send the message. - if (self._outstandingMethodBlocks.length === 1) { + // OK, there are groups on both sides. Special case: merge the last group of + // the reconnect methods with the first group of the original methods, if + // neither of them is atomic (atomic groups can't be merged). + const lastNew = self._methodQueue[self._methodQueue.length - 1]; + if (!lastNew.atomic && !oldQueue[0].atomic) { + // Transfer methods from the old first group into the new last group, + // preserving completion state (e.g. result received before reconnect). + const oldGroup = oldQueue[0]; + const toMerge = [...oldGroup.methods]; + toMerge.forEach((m) => { + lastNew.transferMethod(m, oldGroup); + + // If this "last group" is also the first group, send the message + // (unless it already has its result from before reconnect). + if (self._methodQueue.length === 1 && !lastNew.hasResult(m.methodId)) { m.sendMessage(); + if (m.isDone()) { + self._onMethodComplete(m); + } } }); - oldOutstandingMethodBlocks.shift(); + oldQueue.shift(); } - // Now add the rest of the original blocks on. - self._outstandingMethodBlocks.push(...oldOutstandingMethodBlocks); + // Now add the rest of the original groups. + self._methodQueue.push(...oldQueue); } _callOnReconnectAndSendAppropriateOutstandingMethods() { const self = this; - const oldOutstandingMethodBlocks = self._outstandingMethodBlocks; - self._outstandingMethodBlocks = []; + const oldQueue = self._methodQueue; + self._methodQueue = []; self.onReconnect && self.onReconnect(); DDP._reconnectHook.each((callback) => { @@ -1449,7 +1462,7 @@ export class Connection { return true; }); - self._sendOutstandingMethodBlocksMessages(oldOutstandingMethodBlocks); + self._mergeAndSendMethodQueue(oldQueue); } // We can accept a hot code push if there are no methods in flight. diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js index 0fbe7ece981..4fe6e0a629f 100644 --- a/packages/ddp-client/common/message_processors.js +++ b/packages/ddp-client/common/message_processors.js @@ -79,34 +79,24 @@ export class MessageProcessors { // track methods that were sent on this connection so that we don't // quiesce until they are all done. // - // Start by clearing _methodsBlockingQuiescence: methods sent before - // reconnect don't matter, and any "wait" methods sent on the new connection - // that we drop here will be restored by the loop below. - self._methodsBlockingQuiescence = Object.create(null); - if (self._resetStores) { - const invokers = self._methodInvokers; - Object.keys(invokers).forEach(id => { - const invoker = invokers[id]; - if (invoker.gotResult()) { - // This method already got its result, but it didn't call its callback - // because its data didn't become visible. We did not resend the - // method RPC. We'll call its callback when we get a full quiesce, - // since that's as close as we'll get to "data must be visible". - self._afterUpdateCallbacks.push( - (...args) => invoker.dataVisible(...args) - ); - } else if (invoker.sentMessage) { - // This method has been sent on this connection (maybe as a resend - // from the last connection, maybe from onReconnect, maybe just very - // quickly before processing the connected message). - // - // We don't need to do anything special to ensure its callbacks get - // called, but we'll count it as a method which is preventing - // reconnect quiescence. (eg, it might be a login method that was run - // from onReconnect, and we don't want to see flicker by seeing a - // logged-out state.) - self._methodsBlockingQuiescence[invoker.methodId] = true; + // During reconnect, upgrade the first execution group to atomic so the UI + // doesn't flicker. The group tracks 'updated' wire messages via + // receiveUpdated() and reports quiescence via isQuiesced(). + if (self._resetStores && !isEmpty(self._methodQueue)) { + const firstGroup = self._methodQueue[0]; + firstGroup.atomic = true; + + // Methods that already got their result before reconnect were NOT + // re-sent, so they won't get a new 'updated' message. Force-complete + // them after quiescence ends — mark their updated as received so + // they don't block quiescence, and schedule a flush. + for (const invoker of firstGroup.methods) { + if (firstGroup.hasResult(invoker.methodId)) { + firstGroup.receiveUpdated(invoker.methodId); } + } + self._afterUpdateCallbacks.push(() => { + firstGroup.flushCompleted(); }); } @@ -146,9 +136,10 @@ export class MessageProcessors { }); } - if (msg.methods) { + if (msg.methods && !isEmpty(self._methodQueue)) { + const group = self._methodQueue[0]; msg.methods.forEach(methodId => { - delete self._methodsBlockingQuiescence[methodId]; + group.receiveUpdated(methodId); }); } @@ -250,34 +241,26 @@ export class MessageProcessors { // find the outstanding request // should be O(1) in nearly all realistic use cases - if (isEmpty(self._outstandingMethodBlocks)) { + if (isEmpty(self._methodQueue)) { Meteor._debug('Received method result but no methods outstanding'); return; } - const currentMethodBlock = self._outstandingMethodBlocks[0].methods; - let i; - const m = currentMethodBlock.find((method, idx) => { - const found = method.methodId === msg.id; - if (found) i = idx; - return found; - }); + const group = self._methodQueue[0]; + const m = group.methods.find(method => method.methodId === msg.id); if (!m) { Meteor._debug("Can't match method response to original method call", msg); return; } - // Remove from current method block. This may leave the block empty, but we - // don't move on to the next block until the callback has been delivered, in - // _outstandingMethodFinished. - currentMethodBlock.splice(i, 1); - + // Deliver the result to the group, which tracks two-phase completion + // and will call invoker.complete() when both result and updated arrive. if (hasOwn.call(msg, 'error')) { - m.receiveResult( + group.receiveResult( + msg.id, new Meteor.Error(msg.error.error, msg.error.reason, msg.error.details) ); } else { - // msg.result may be undefined if the method didn't return a value - m.receiveResult(undefined, msg.result); + group.receiveResult(msg.id, undefined, msg.result); } } diff --git a/packages/ddp-client/common/method_invoker.js b/packages/ddp-client/common/method_invoker.js index 14703c1294f..2fb3a94bbac 100644 --- a/packages/ddp-client/common/method_invoker.js +++ b/packages/ddp-client/common/method_invoker.js @@ -1,93 +1,132 @@ -// A MethodInvoker manages sending a method to the server and calling the user's -// callbacks. On construction, it registers itself in the connection's -// _methodInvokers map; it removes itself once the method is fully finished and -// the callback is invoked. This occurs when it has both received a result, -// and the data written by it is fully visible. +// MethodInvoker state enum. +const InvokerState = Object.freeze({ + PENDING: 'PENDING', + IN_FLIGHT: 'IN_FLIGHT', + WAITING_FOR_RESEND: 'WAITING_FOR_RESEND', + COMPLETE: 'COMPLETE', + ABORTED: 'ABORTED', +}); + +export { InvokerState }; + +// A MethodInvoker manages the transport lifecycle of a single DDP method call: +// sending the message, handling retries on reconnect, and firing the user +// callback when told to by the ExecutionGroup. +// +// The invoker does NOT track two-phase completion (result + updated) — that's +// the ExecutionGroup's responsibility. The invoker just sends messages and +// fires callbacks. +// +// State machine: +// +// PENDING ──sendMessage()──► IN_FLIGHT ──complete()──► COMPLETE +// │ +// onReconnect() +// │ +// ▼ +// WAITING_FOR_RESEND +// │ +// sendMessage() +// ┌───────┴────────┐ +// _shouldRetry() _shouldRetry() +// returns true returns false +// │ │ +// ▼ ▼ +// IN_FLIGHT ABORTED +// +// Any state may transition to ABORTED via abort(). export class MethodInvoker { constructor(options) { - // Public (within this file) fields. this.methodId = options.methodId; - this.sentMessage = false; + this._state = InvokerState.PENDING; this._callback = options.callback; - this._connection = options.connection; this._message = options.message; - this._onResultReceived = options.onResultReceived || (() => {}); - this._wait = options.wait; + this._onResultReceived = options.onResultReceived || null; this.noRetry = options.noRetry; - this._methodResult = null; - this._dataVisible = false; + this._maxRetries = options.maxRetries != null ? options.maxRetries : null; + this._retryCount = 0; - // Register with the connection. - this._connection._methodInvokers[this.methodId] = this; + // Delegate: physically send a DDP message on the wire. + this._send = options.send; } - // Sends the method message to the server. May be called additional times if - // we lose the connection and reconnect before receiving a result. - sendMessage() { - // This function is called before sending a method (including resending on - // reconnect). We should only (re)send methods where we don't already have a - // result! - if (this.gotResult()) - throw new Error('sendingMethod is called on method with result'); - - // If we're re-sending it, it doesn't matter if data was written the first - // time. - this._dataVisible = false; - this.sentMessage = true; - - // If this is a wait method, make all data messages be buffered until it is - // done. - if (this._wait) - this._connection._methodsBlockingQuiescence[this.methodId] = true; - - // Actually send the message. - this._connection._send(this._message); + + // -- State queries --------------------------------------------------------- + + isInFlight() { + return this._state === InvokerState.IN_FLIGHT; } - // Invoke the callback, if we have both a result and know that all data has - // been written to the local cache. - _maybeInvokeCallback() { - if (this._methodResult && this._dataVisible) { - // Call the callback. (This won't throw: the callback was wrapped with - // bindEnvironment.) - this._callback(this._methodResult[0], this._methodResult[1]); - - // Forget about this method. - delete this._connection._methodInvokers[this.methodId]; - - // Let the connection know that this method is finished, so it can try to - // move on to the next block of methods. - this._connection._outstandingMethodFinished(); + + isDone() { + return this._state === InvokerState.COMPLETE + || this._state === InvokerState.ABORTED; + } + + // -- State transitions ----------------------------------------------------- + + // Sends the method message to the server. May be called additional times + // on reconnect. + sendMessage() { + if (this.isDone()) return; + + // On re-send (reconnect), check whether this method is allowed to retry. + if (this._state === InvokerState.WAITING_FOR_RESEND) { + if (!this._shouldRetry()) { + return; + } } + + this._state = InvokerState.IN_FLIGHT; + this._send(this._message); } - // Call with the result of the method from the server. Only may be called - // once; once it is called, you should not call sendMessage again. - // If the user provided an onResultReceived callback, call it immediately. - // Then invoke the main callback if data is also visible. - receiveResult(err, result) { - if (this.gotResult()) - throw new Error('Methods should only receive results once'); - this._methodResult = [err, result]; - this._onResultReceived(err, result); - this._maybeInvokeCallback(); + + // Called by the connection layer when a reconnect occurs. + onReconnect() { + if (this._state === InvokerState.IN_FLIGHT) { + this._state = InvokerState.WAITING_FOR_RESEND; + } } - // Call this when all data written by the method is visible. This means that - // the method has returns its "data is done" message *AND* all server - // documents that are buffered at that time have been written to the local - // cache. Invokes the main callback if the result has been received. - dataVisible() { - this._dataVisible = true; - this._maybeInvokeCallback(); + + // Called by the ExecutionGroup when both result and data visibility + // conditions are met. Fires the user callback. + complete(err, result) { + if (this.isDone()) return; + this._state = InvokerState.COMPLETE; + this._callback(err, result); } - // Force-complete this invoker with an error, regardless of current state. - // Fires the callback exactly once and cleans up. Callers who need the - // result before write confirmation should use onResultReceived. + + // Force-complete with an error. Used on disconnect teardown and when + // retry limits are exceeded. abort(reason) { - this._methodResult = [new Meteor.Error('disconnected', reason), undefined]; - this._dataVisible = true; - this._maybeInvokeCallback(); + if (this.isDone()) return; + this._state = InvokerState.ABORTED; + this._callback(new Meteor.Error('disconnected', reason), undefined); } - // True if receiveResult has been called. - gotResult() { - return !!this._methodResult; + + // -- Internal -------------------------------------------------------------- + + _shouldRetry() { + if (this.noRetry) { + this._state = InvokerState.ABORTED; + this._callback( + new Meteor.Error( + 'invocation-failed', + 'Method invocation might have failed due to dropped connection. ' + + 'Failing because `noRetry` option was passed to Meteor.apply.' + ), + undefined + ); + return false; + } + + if (this._maxRetries !== null) { + this._retryCount++; + if (this._retryCount > this._maxRetries) { + this.abort('Method retry limit exceeded (' + this._maxRetries + ')'); + return false; + } + } + + return true; } } diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index 87382daa4cf..90417ed9cf7 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -1823,9 +1823,9 @@ addReconnectTests( // white-box test: test.equal( - conn._outstandingMethodBlocks.map(function(block) { + conn._methodQueue.map(function(block) { return [ - block.wait, + block.atomic, block.methods.map(function(method) { return method._message.params[0]; }) @@ -2049,9 +2049,9 @@ addReconnectTests( // white-box test: test.equal( - conn._outstandingMethodBlocks.map(function(block) { + conn._methodQueue.map(function(block) { return [ - block.wait, + block.atomic, block.methods.map(function(method) { return method._message.params[0]; }) @@ -2184,7 +2184,7 @@ addReconnectTests('livedata stub - reconnect double wait method', async function // Call another method. It should be delivered immediately. This is a // regression test for a case where it never got delivered because there was - // an empty block in _outstandingMethodBlocks blocking it from being sent. + // an empty group in _methodQueue blocking it from being sent. conn.call('lastMethod', identity); testGotMessage(test, stream, { msg: 'method', @@ -2639,7 +2639,7 @@ Tinytest.addAsync( // All invokers should be cleaned up test.equal(Object.keys(conn._methodInvokers).length, 0); - test.equal(conn._outstandingMethodBlocks.length, 0); + test.equal(conn._methodQueue.length, 0); } ); @@ -2669,6 +2669,120 @@ Tinytest.addAsync( } ); +Tinytest.addAsync( + 'livedata connection - maxRetries aborts method after exceeding retry limit', + async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Call a method with maxRetries: 2 + let callbackError = null; + conn.apply('limitedMethod', [], { maxRetries: 2 }, function(err) { + callbackError = err; + }); + testGotMessage(test, stream, { + msg: 'method', method: 'limitedMethod', params: [], id: '*' + }); + + // Callback should not have fired yet + test.isNull(callbackError); + + // Reconnect #1 — should re-send (retry 1 of 2) + stream.sent.length = 0; + await stream.reset(); + test.isNull(callbackError); + // reset sends connect + re-sends the method + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + testGotMessage(test, stream, { + msg: 'method', method: 'limitedMethod', params: [], id: '*' + }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); + + // Reconnect #2 — should re-send (retry 2 of 2) + stream.sent.length = 0; + await stream.reset(); + test.isNull(callbackError); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + testGotMessage(test, stream, { + msg: 'method', method: 'limitedMethod', params: [], id: '*' + }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); + + // Reconnect #3 — should abort (exceeds maxRetries) + stream.sent.length = 0; + await stream.reset(); + + test.instanceOf(callbackError, Meteor.Error); + test.equal(callbackError.error, 'disconnected'); + test.equal(Object.keys(conn._methodInvokers).length, 0); + } +); + +Tinytest.addAsync( + 'livedata connection - maxRetries 0 behaves like noRetry', + async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + let callbackError = null; + conn.apply('oneShotMethod', [], { maxRetries: 0 }, function(err) { + callbackError = err; + }); + testGotMessage(test, stream, { + msg: 'method', method: 'oneShotMethod', params: [], id: '*' + }); + + test.isNull(callbackError); + + // First reconnect should abort immediately (0 retries allowed) + stream.sent.length = 0; + await stream.reset(); + // Method should NOT appear in sent (it was aborted, not re-sent) + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + + test.instanceOf(callbackError, Meteor.Error); + test.equal(callbackError.error, 'disconnected'); + test.equal(Object.keys(conn._methodInvokers).length, 0); + } +); + +Tinytest.addAsync( + 'livedata connection - method without maxRetries retries indefinitely', + async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + let callbackFired = false; + conn.call('unlimitedMethod', function() { + callbackFired = true; + }); + testGotMessage(test, stream, { + msg: 'method', method: 'unlimitedMethod', params: [], id: '*' + }); + + // Reconnect 5 times — method should always be re-sent + for (let i = 0; i < 5; i++) { + stream.sent.length = 0; + await stream.reset(); + test.isFalse(callbackFired); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); + testGotMessage(test, stream, { + msg: 'method', method: 'unlimitedMethod', params: [], id: '*' + }); + await stream.receive({ msg: 'connected', session: SESSION_ID }); + } + + test.isFalse(callbackFired); + test.equal(Object.keys(conn._methodInvokers).length, 1); + } +); + // ============================================================================ // DDP Session Resumption Tests (Client-side) // ============================================================================