Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/issues/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,20 @@ Make `Orchestrator::replicate_and_cutover()` the single canonical implementation

---

## 🚧 Issue 5 — `AbortSignal` is not an abort signal; it is a coordinator-gone detector
## ✅ Issue 5 — `AbortSignal` is not an abort signal; it is a coordinator-gone detector (resolved)

> **Resolved.** `AbortSignal` is gone. The api task's `CancellationToken`
> (`ctx.cancellation_token()` in [`copy_data.rs`](../../pgdog/src/api/copy_data.rs))
> is now threaded straight down the call chain —
> `Orchestrator::data_sync(&token)` → `Publisher::data_sync(.., cancel)` →
> `ParallelSyncManager::run(cancel)` → each `ParallelSync` worker →
> `Table::data_sync(.., cancel, ..)` — where the COPY loop `select!`s on
> `cancel.cancelled()` and the post-permit pre-flight is `cancel.is_cancelled()`.
> Cancellation is now an explicit, level-triggered signal rather than a side
> effect of the result channel's receiver being dropped: an external caller (the
> task framework, a timeout) can cancel directly, and per-table cancellation is
> available via `cancel.child_token()` if needed. The misleadingly-named
> `AbortSignal` type and its `tx.closed()` mechanism were removed.

### Description

Expand Down
16 changes: 7 additions & 9 deletions pgdog/src/api/copy_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
//! indexes) and replication around it are composed by
//! [`ReshardTask`](crate::api::resharding::ReshardTask).

use tokio::select;

use crate::api::Task;
use crate::api::async_task::{AsyncTaskContext, Empty};
use crate::backend::replication::logical::Error;
Expand All @@ -28,15 +26,15 @@ impl Task for CopyDataTask {
let token = ctx.cancellation_token();
let orchestrator = self.orchestrator;

select! {
res = orchestrator.data_sync() => res?,
// Cancellation drops the `data_sync()` future, whose internal
// `JoinSet` aborts every in-flight shard copy; closing those
// connections releases the temporary data-sync slots. The
// composing task drops the persistent replication slots afterward.
_ = token.cancelled() => return Err(Error::DataSyncAborted),
// Don't start a sync that's already cancelled. Once it's running, the
// token is threaded into the copy workers, which abort their COPY loops
// on cancellation; the composing task drops the slots afterward.
if token.is_cancelled() {
return Err(Error::DataSyncAborted);
}

orchestrator.data_sync(&token).await?;

Ok(orchestrator)
}
}
8 changes: 5 additions & 3 deletions pgdog/src/backend/replication/logical/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ impl Orchestrator {
Ok(())
}

pub(crate) async fn data_sync(&self) -> Result<(), Error> {
pub(crate) async fn data_sync(&self, cancel: &CancellationToken) -> Result<(), Error> {
let mut publisher = self.publisher.lock().await;

orchestrator_state(OrchestratorState::DataSync);
// Run data sync for all tables in parallel using multiple replicas,
// if available.
publisher.data_sync(&self.source, &self.destination).await?;
publisher
.data_sync(&self.source, &self.destination, cancel)
.await?;

Ok(())
}
Expand Down Expand Up @@ -473,7 +475,7 @@ impl ReplicationWaiter {
}

// We're going, point of no return.
self.orchestrator.publisher.lock().await.request_stop();
self.waiter.stop();
ok_or_abort!(self.waiter.wait().await);
ok_or_abort!(self.orchestrator.schema_sync_cutover(true).await);
// Traffic is about to go to the new cluster.
Expand Down
18 changes: 0 additions & 18 deletions pgdog/src/backend/replication/logical/publisher/abort.rs

This file was deleted.

2 changes: 0 additions & 2 deletions pgdog/src/backend/replication/logical/publisher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ pub use non_identity_columns_presence::*;

pub mod slot;
pub use slot::*;
pub mod abort;
pub mod copy;
pub mod parallel_sync;
pub mod progress;
pub mod publisher_impl;
pub mod queries;
pub mod table;
pub use abort::*;
pub use copy::*;
pub use parallel_sync::ParallelSyncManager;
pub use queries::*;
Expand Down
58 changes: 23 additions & 35 deletions pgdog/src/backend/replication/logical/publisher/parallel_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,10 @@
//!
use std::sync::Arc;

use tokio::{
spawn,
sync::{
Semaphore,
mpsc::{UnboundedSender, unbounded_channel},
},
task::JoinHandle,
time::sleep,
};
use tokio::{spawn, sync::Semaphore, task::JoinHandle, time::sleep};
use tracing::{info, warn};

use super::super::Error;
use super::AbortSignal;
use crate::backend::{
Cluster, Pool,
pool::{Address, Request},
Expand All @@ -26,18 +17,20 @@ use crate::backend::{
use crate::frontend::client::query_engine::two_pc::Manager;
use crate::net::messages::Protocol;
use crate::util::escape_identifier;
use futures::{StreamExt, stream::FuturesUnordered};
use tokio_util::sync::CancellationToken;

struct ParallelSync {
table: Table,
addr: Address,
dest: Cluster,
tx: UnboundedSender<Result<Table, Error>>,
permit: Arc<Semaphore>,
cancel: CancellationToken,
}

impl ParallelSync {
// Run parallel sync.
pub fn run(mut self) -> JoinHandle<Result<(), Error>> {
pub fn run(self) -> JoinHandle<Result<Table, Error>> {
spawn(async move {
// Record copy in queue before waiting for permit.
let tracker = TableCopy::new(&self.table.table.schema, &self.table.table.name);
Expand All @@ -51,7 +44,7 @@ impl ParallelSync {
.await
.map_err(|_| Error::ParallelConnection)?;

if self.tx.is_closed() {
if self.cancel.is_cancelled() {
return Err(Error::DataSyncAborted);
}

Expand All @@ -61,25 +54,18 @@ impl ParallelSync {

/// Retry loop: attempt the table copy up to `max_retries` times.
/// Abort signals and schema errors are not retried.
async fn run_with_retry(&mut self, tracker: &TableCopy) -> Result<(), Error> {
async fn run_with_retry(mut self, tracker: &TableCopy) -> Result<Table, Error> {
let max_retries = self.dest.resharding_copy_retry_max_attempts();
let base_delay = *self.dest.resharding_copy_retry_min_delay();
let mut attempt = 0usize;

loop {
let abort = AbortSignal::new(self.tx.clone());

match self
.table
.data_sync(&self.addr, &self.dest, abort, tracker)
.data_sync(&self.addr, &self.dest, &self.cancel, tracker)
.await
{
Ok(_) => {
self.tx
.send(Ok(self.table.clone()))
.map_err(|_| Error::ParallelConnection)?;
return Ok(());
}
Ok(_) => return Ok(self.table),
Err(err) if !err.is_retryable() || attempt >= max_retries => {
tracker.error(&err);
// Terminal failure: warn if rows remain so the operator can truncate.
Expand Down Expand Up @@ -202,20 +188,24 @@ impl ParallelSyncManager {
}

/// Run parallel table sync and return table LSNs when everything is done.
pub async fn run(self) -> Result<Vec<Table>, Error> {
pub async fn run(self, cancel: CancellationToken) -> Result<Vec<Table>, Error> {
info!(
"starting parallel table copy using {} replicas and {} parallel copies",
self.replicas.len(),
self.permit.available_permits() / self.replicas.len(),
);

// Create a child cancel token with the guard to cancel the handles below
// in case any of it fails without affecting the parent task.
// If every handle succeed the guard token will just cancel already finished work

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If every handle succeed the guard token will just cancel already finished work
// If every handle succeeds the guard token will just cancel already finished work

let cancel = cancel.child_token();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should require the caller to call .child_token() if they don't want to be cancelled.

Suggested change
let cancel = cancel.child_token();

let _guard = cancel.clone().drop_guard();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let _guard = cancel.clone().drop_guard();
let _guard = cancel.drop_guard_ref();


// cycle() is the idiomatic "rewind": it restarts the iterator from the
// beginning once exhausted, giving round-robin distribution across replicas.
let mut replicas_iter = self.replicas.iter().cycle();

let (tx, mut rx) = unbounded_channel();
let mut tables = vec![];
let mut handles = vec![];
let mut handles = FuturesUnordered::new();

for table in self.tables {
// SAFETY: cycle() on a non-empty slice never returns None.
Expand All @@ -227,21 +217,19 @@ impl ParallelSyncManager {
table,
addr: replica.addr().clone(),
dest: self.dest.clone(),
tx: tx.clone(),
permit: self.permit.clone(),
cancel: cancel.clone(),
}
.run(),
);
}

drop(tx);

while let Some(table) = rx.recv().await {
tables.push(table?);
}
let mut tables = Vec::with_capacity(handles.len());

for handle in handles {
handle.await??;
// Short-circuit on first error and cancel other futures (JoinHandles that are not cancellable on drop)
// thanks to cancel guard.
while let Some(joined) = handles.next().await {
tables.push(joined??);
}

Ok(tables)
Expand Down
Loading
Loading