Skip to content

fix(replication): replace custom AbortToken with tokio::CancellationToken#1131

Open
meskill wants to merge 1 commit into
mainfrom
meskill-2026-06-30-fix-replication---rzm
Open

fix(replication): replace custom AbortToken with tokio::CancellationToken#1131
meskill wants to merge 1 commit into
mainfrom
meskill-2026-06-30-fix-replication---rzm

Conversation

@meskill

@meskill meskill commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Replace custom AbortSignal with CancellationToken to make the cancellation explicit and not relied on futures drop (that holds rx part of channel). Pass cancellation token for graceful cancellation. Early cancellation on errors

@codecov

codecov Bot commented Jul 2, 2026

Copy link
Copy Markdown


// Create a child cancel token with the guard to cancel the handles below
// in case any of it fails without affecting the parent task.
// If every handle succeed the guard token will just cancel already finished work

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If every handle succeed the guard token will just cancel already finished work
// If every handle succeeds the guard token will just cancel already finished work

// Create a child cancel token with the guard to cancel the handles below
// in case any of it fails without affecting the parent task.
// If every handle succeed the guard token will just cancel already finished work
let cancel = cancel.child_token();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should require the caller to call .child_token() if they don't want to be cancelled.

Suggested change
let cancel = cancel.child_token();

// in case any of it fails without affecting the parent task.
// If every handle succeed the guard token will just cancel already finished work
let cancel = cancel.child_token();
let _guard = cancel.clone().drop_guard();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let _guard = cancel.clone().drop_guard();
let _guard = cancel.drop_guard_ref();

Comment on lines +442 to +445
let cancel = cancel.clone();
syncs.push(async move {
let manager = ParallelSyncManager::new(tables, replicas, dest)?;
let tables = manager.run().await?;
let tables = manager.run(cancel).await?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let cancel = cancel.clone();
syncs.push(async move {
let manager = ParallelSyncManager::new(tables, replicas, dest)?;
let tables = manager.run().await?;
let tables = manager.run(cancel).await?;
syncs.push(async move {
let manager = ParallelSyncManager::new(tables, replicas, dest)?;
let tables = manager.run(cancel.child_token()).await?;


let dest = dest.clone();
set.spawn(async move {
let cancel = cancel.clone();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first place we actually do anything with a cancellation token other than pass it through. Should we remove everything up the chain from here and the CancellationToken::new()s, and create it at this point?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants