Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 9 additions & 29 deletions packages/ddp-client/common/connection_stream_handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,37 +157,17 @@ export class ConnectionStreamHandlers {
* @private
*/
_handleOutstandingMethodsOnReset() {
const blocks = this._connection._outstandingMethodBlocks;
if (blocks.length === 0) return;

const currentMethodBlock = blocks[0].methods;
blocks[0].methods = currentMethodBlock.filter(
methodInvoker => {
// Methods with 'noRetry' option set are not allowed to re-send after
// recovering dropped connection.
if (methodInvoker.sentMessage && methodInvoker.noRetry) {
methodInvoker.receiveResult(
new Meteor.Error(
'invocation-failed',
'Method invocation might have failed due to dropped connection. ' +
'Failing because `noRetry` option was passed to Meteor.apply.'
)
);
}

// Only keep a method if it wasn't sent or it's allowed to retry.
return !(methodInvoker.sentMessage && methodInvoker.noRetry);
}
);
const queue = this._connection._methodQueue;
if (queue.length === 0) return;

// Clear empty blocks
if (blocks.length > 0 && blocks[0].methods.length === 0) {
blocks.shift();
}

// Reset all method invokers as unsent
// Notify all invokers and groups about the reconnect.
// Invokers transition IN_FLIGHT → WAITING_FOR_RESEND.
// Groups reset wire-level state (updated/dataVisible) while preserving results.
Object.values(this._connection._methodInvokers).forEach(invoker => {
invoker.sentMessage = false;
invoker.onReconnect();
});
queue.forEach(group => {
group.resetForReconnect();
});
}

Expand Down
16 changes: 7 additions & 9 deletions packages/ddp-client/common/document_processors.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,14 @@ export class DocumentProcessors {
});
delete self._documentsWrittenByStub[methodId];

// We want to call the data-written callback, but we can't do so until all
// currently buffered messages are flushed.
const callbackInvoker = self._methodInvokers[methodId];
if (!callbackInvoker) {
throw new Error('No callback invoker for method ' + methodId);
// Data for this method is now visible in the local cache. Tell the
// execution group, which will complete the invoker if it also has the result.
const group = self._methodQueue[0];
if (group) {
self._runWhenAllServerDocsAreFlushed(() => {
group.markDataVisible(methodId);
});
}

self._runWhenAllServerDocsAreFlushed(
(...args) => callbackInvoker.dataVisible(...args)
);
});
}

Expand Down
169 changes: 169 additions & 0 deletions packages/ddp-client/common/execution_group.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// An ExecutionGroup manages a set of method calls that execute together.
// Groups are processed sequentially by the connection — all methods in a
// group must complete before the next group is sent.
//
// The group owns the DDP two-phase completion protocol:
// - Tracks 'result' wire messages per method
// - Tracks 'updated' wire messages per method (for quiescence)
// - Tracks data visibility per method (for callback firing)
// - Tells each MethodInvoker when to fire its callback
//
// The group does NOT handle transport (sending/retrying) — that's the
// invoker's job. The group does NOT handle scheduling (which group runs
// next) — that's the connection's job.
export class ExecutionGroup {
// atomic: if true, data messages are buffered while this group runs,
// and the group cannot be merged with adjacent groups.
// onComplete: callback(invoker) when an invoker reaches a terminal state.
// The connection handles its own bookkeeping in this callback.
constructor({ atomic = false, onComplete }) {
this.atomic = atomic;
this._onComplete = onComplete;
this._methods = [];

// Per-method two-phase tracking. Keyed by methodId.
this._completion = new Map();
}

get methods() {
return this._methods;
}

// -- Group management ------------------------------------------------------

addMethod(invoker) {
this._methods.push(invoker);
this._completion.set(invoker.methodId, {
gotResult: false,
gotUpdated: false, // wire: 'updated' message received
dataVisible: false, // local: data flushed to minimongo
err: undefined,
result: undefined,
});
}

removeMethod(invoker) {
const idx = this._methods.indexOf(invoker);
if (idx !== -1) {
this._methods.splice(idx, 1);
}
this._completion.delete(invoker.methodId);
}

// Transfer a method from another group, preserving its completion state.
// Used during reconnect when merging old groups into new ones.
transferMethod(invoker, fromGroup) {
this._methods.push(invoker);
const existingEntry = fromGroup._completion.get(invoker.methodId);
if (existingEntry) {
this._completion.set(invoker.methodId, existingEntry);
fromGroup._completion.delete(invoker.methodId);
} else {
this._completion.set(invoker.methodId, {
gotResult: false,
gotUpdated: false,
dataVisible: false,
err: undefined,
result: undefined,
});
}
}

hasMethod(invoker) {
return this._completion.has(invoker.methodId);
}

isEmpty() {
return this._methods.length === 0;
}

// Reset wire-level state on reconnect. Preserves result (which was already
// received and won't be re-sent), but clears updated/dataVisible since those
// need to arrive fresh on the new connection.
resetForReconnect() {
for (const [, entry] of this._completion) {
entry.gotUpdated = false;
entry.dataVisible = false;
}
}

// -- Two-phase completion --------------------------------------------------

// Called when the server sends a 'result' message for a method in this group.
receiveResult(methodId, err, result) {
const entry = this._completion.get(methodId);
if (!entry) return;
entry.gotResult = true;
entry.err = err;
entry.result = result;

// Fire onResultReceived callback if provided (eager result access).
const invoker = this._methods.find(m => m.methodId === methodId);
if (invoker && invoker._onResultReceived) {
invoker._onResultReceived(err, result);
}

this._maybeComplete(methodId);
}

// Called when the 'updated' wire message arrives for this method ID.
// This is used for quiescence tracking — it does NOT mean data is visible.
// For non-atomic groups, this also marks data as visible (they're the same
// moment since there's no buffering).
receiveUpdated(methodId) {
const entry = this._completion.get(methodId);
if (!entry) return;
entry.gotUpdated = true;
}

// Called when data written by this method is actually visible in minimongo.
// For non-atomic groups, this is called in the same tick as receiveUpdated.
// For atomic groups, this is called after quiescence ends and data is flushed.
markDataVisible(methodId) {
const entry = this._completion.get(methodId);
if (!entry) return;
entry.dataVisible = true;
this._maybeComplete(methodId);
}

// For atomic groups: called after quiescence ends and buffered data has been
// flushed to minimongo. Marks all updated methods as data-visible and
// completes any that also have their result.
flushCompleted() {
for (const [methodId, entry] of this._completion) {
if (entry.gotUpdated) {
entry.dataVisible = true;
this._maybeComplete(methodId);
}
}
}

// True if all methods in this group have received their 'updated' wire message.
// Used by the connection to decide when to stop buffering data (quiescence).
isQuiesced() {
for (const [, entry] of this._completion) {
if (!entry.gotUpdated) return false;
}
return true;
}

// True if a specific method has received its result.
hasResult(methodId) {
const entry = this._completion.get(methodId);
return entry ? entry.gotResult : false;
}

// -- Internal --------------------------------------------------------------

// Complete the invoker if it has both result AND data visible.
_maybeComplete(methodId) {
const entry = this._completion.get(methodId);
if (!entry || !entry.gotResult || !entry.dataVisible) return;

const invoker = this._methods.find(m => m.methodId === methodId);
if (!invoker || invoker.isDone()) return;

invoker.complete(entry.err, entry.result);
this._onComplete(invoker);
}
}
Loading
Loading