diff --git a/docs/issues/replication.md b/docs/issues/replication.md index b726963d4..738de819f 100644 --- a/docs/issues/replication.md +++ b/docs/issues/replication.md @@ -319,7 +319,20 @@ Make `Orchestrator::replicate_and_cutover()` the single canonical implementation --- -## 🚧 Issue 5 — `AbortSignal` is not an abort signal; it is a coordinator-gone detector +## ✅ Issue 5 — `AbortSignal` is not an abort signal; it is a coordinator-gone detector (resolved) + +> **Resolved.** `AbortSignal` is gone. The api task's `CancellationToken` +> (`ctx.cancellation_token()` in [`copy_data.rs`](../../pgdog/src/api/copy_data.rs)) +> is now threaded straight down the call chain — +> `Orchestrator::data_sync(&token)` → `Publisher::data_sync(.., cancel)` → +> `ParallelSyncManager::run(cancel)` → each `ParallelSync` worker → +> `Table::data_sync(.., cancel, ..)` — where the COPY loop `select!`s on +> `cancel.cancelled()` and the post-permit pre-flight is `cancel.is_cancelled()`. +> Cancellation is now an explicit, level-triggered signal rather than a side +> effect of the result channel's receiver being dropped: an external caller (the +> task framework, a timeout) can cancel directly, and per-table cancellation is +> available via `cancel.child_token()` if needed. The misleadingly-named +> `AbortSignal` type and its `tx.closed()` mechanism were removed. ### Description diff --git a/pgdog/src/api/copy_data.rs b/pgdog/src/api/copy_data.rs index 9dbbe337f..07539bb12 100644 --- a/pgdog/src/api/copy_data.rs +++ b/pgdog/src/api/copy_data.rs @@ -4,8 +4,6 @@ //! indexes) and replication around it are composed by //! [`ReshardTask`](crate::api::resharding::ReshardTask). -use tokio::select; - use crate::api::Task; use crate::api::async_task::{AsyncTaskContext, Empty}; use crate::backend::replication::logical::Error; @@ -28,15 +26,15 @@ impl Task for CopyDataTask { let token = ctx.cancellation_token(); let orchestrator = self.orchestrator; - select! { - res = orchestrator.data_sync() => res?, - // Cancellation drops the `data_sync()` future, whose internal - // `JoinSet` aborts every in-flight shard copy; closing those - // connections releases the temporary data-sync slots. The - // composing task drops the persistent replication slots afterward. - _ = token.cancelled() => return Err(Error::DataSyncAborted), + // Don't start a sync that's already cancelled. Once it's running, the + // token is threaded into the copy workers, which abort their COPY loops + // on cancellation; the composing task drops the slots afterward. + if token.is_cancelled() { + return Err(Error::DataSyncAborted); } + orchestrator.data_sync(&token).await?; + Ok(orchestrator) } } diff --git a/pgdog/src/backend/replication/logical/orchestrator.rs b/pgdog/src/backend/replication/logical/orchestrator.rs index c972923ca..0bb59aa6b 100644 --- a/pgdog/src/backend/replication/logical/orchestrator.rs +++ b/pgdog/src/backend/replication/logical/orchestrator.rs @@ -150,13 +150,15 @@ impl Orchestrator { Ok(()) } - pub(crate) async fn data_sync(&self) -> Result<(), Error> { + pub(crate) async fn data_sync(&self, cancel: &CancellationToken) -> Result<(), Error> { let mut publisher = self.publisher.lock().await; orchestrator_state(OrchestratorState::DataSync); // Run data sync for all tables in parallel using multiple replicas, // if available. - publisher.data_sync(&self.source, &self.destination).await?; + publisher + .data_sync(&self.source, &self.destination, cancel) + .await?; Ok(()) } @@ -473,7 +475,7 @@ impl ReplicationWaiter { } // We're going, point of no return. - self.orchestrator.publisher.lock().await.request_stop(); + self.waiter.stop(); ok_or_abort!(self.waiter.wait().await); ok_or_abort!(self.orchestrator.schema_sync_cutover(true).await); // Traffic is about to go to the new cluster. diff --git a/pgdog/src/backend/replication/logical/publisher/abort.rs b/pgdog/src/backend/replication/logical/publisher/abort.rs deleted file mode 100644 index 6b0e40c7f..000000000 --- a/pgdog/src/backend/replication/logical/publisher/abort.rs +++ /dev/null @@ -1,18 +0,0 @@ -use tokio::sync::mpsc::UnboundedSender; - -use super::super::Error; -use super::*; - -pub struct AbortSignal { - tx: UnboundedSender>, -} - -impl AbortSignal { - pub fn new(tx: UnboundedSender>) -> Self { - Self { tx } - } - - pub async fn aborted(&self) { - self.tx.closed().await - } -} diff --git a/pgdog/src/backend/replication/logical/publisher/mod.rs b/pgdog/src/backend/replication/logical/publisher/mod.rs index cb9b249ba..af6b7f433 100644 --- a/pgdog/src/backend/replication/logical/publisher/mod.rs +++ b/pgdog/src/backend/replication/logical/publisher/mod.rs @@ -3,14 +3,12 @@ pub use non_identity_columns_presence::*; pub mod slot; pub use slot::*; -pub mod abort; pub mod copy; pub mod parallel_sync; pub mod progress; pub mod publisher_impl; pub mod queries; pub mod table; -pub use abort::*; pub use copy::*; pub use parallel_sync::ParallelSyncManager; pub use queries::*; diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index 8b8f90b21..29e499d49 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -5,19 +5,10 @@ //! use std::sync::Arc; -use tokio::{ - spawn, - sync::{ - Semaphore, - mpsc::{UnboundedSender, unbounded_channel}, - }, - task::JoinHandle, - time::sleep, -}; +use tokio::{spawn, sync::Semaphore, task::JoinHandle, time::sleep}; use tracing::{info, warn}; use super::super::Error; -use super::AbortSignal; use crate::backend::{ Cluster, Pool, pool::{Address, Request}, @@ -26,18 +17,20 @@ use crate::backend::{ use crate::frontend::client::query_engine::two_pc::Manager; use crate::net::messages::Protocol; use crate::util::escape_identifier; +use futures::{StreamExt, stream::FuturesUnordered}; +use tokio_util::sync::CancellationToken; struct ParallelSync { table: Table, addr: Address, dest: Cluster, - tx: UnboundedSender>, permit: Arc, + cancel: CancellationToken, } impl ParallelSync { // Run parallel sync. - pub fn run(mut self) -> JoinHandle> { + pub fn run(self) -> JoinHandle> { spawn(async move { // Record copy in queue before waiting for permit. let tracker = TableCopy::new(&self.table.table.schema, &self.table.table.name); @@ -51,7 +44,7 @@ impl ParallelSync { .await .map_err(|_| Error::ParallelConnection)?; - if self.tx.is_closed() { + if self.cancel.is_cancelled() { return Err(Error::DataSyncAborted); } @@ -61,25 +54,18 @@ impl ParallelSync { /// Retry loop: attempt the table copy up to `max_retries` times. /// Abort signals and schema errors are not retried. - async fn run_with_retry(&mut self, tracker: &TableCopy) -> Result<(), Error> { + async fn run_with_retry(mut self, tracker: &TableCopy) -> Result { let max_retries = self.dest.resharding_copy_retry_max_attempts(); let base_delay = *self.dest.resharding_copy_retry_min_delay(); let mut attempt = 0usize; loop { - let abort = AbortSignal::new(self.tx.clone()); - match self .table - .data_sync(&self.addr, &self.dest, abort, tracker) + .data_sync(&self.addr, &self.dest, &self.cancel, tracker) .await { - Ok(_) => { - self.tx - .send(Ok(self.table.clone())) - .map_err(|_| Error::ParallelConnection)?; - return Ok(()); - } + Ok(_) => return Ok(self.table), Err(err) if !err.is_retryable() || attempt >= max_retries => { tracker.error(&err); // Terminal failure: warn if rows remain so the operator can truncate. @@ -202,20 +188,24 @@ impl ParallelSyncManager { } /// Run parallel table sync and return table LSNs when everything is done. - pub async fn run(self) -> Result, Error> { + pub async fn run(self, cancel: CancellationToken) -> Result, Error> { info!( "starting parallel table copy using {} replicas and {} parallel copies", self.replicas.len(), self.permit.available_permits() / self.replicas.len(), ); + // Create a child cancel token with the guard to cancel the handles below + // in case any of it fails without affecting the parent task. + // If every handle succeed the guard token will just cancel already finished work + let cancel = cancel.child_token(); + let _guard = cancel.clone().drop_guard(); + // cycle() is the idiomatic "rewind": it restarts the iterator from the // beginning once exhausted, giving round-robin distribution across replicas. let mut replicas_iter = self.replicas.iter().cycle(); - let (tx, mut rx) = unbounded_channel(); - let mut tables = vec![]; - let mut handles = vec![]; + let mut handles = FuturesUnordered::new(); for table in self.tables { // SAFETY: cycle() on a non-empty slice never returns None. @@ -227,21 +217,19 @@ impl ParallelSyncManager { table, addr: replica.addr().clone(), dest: self.dest.clone(), - tx: tx.clone(), permit: self.permit.clone(), + cancel: cancel.clone(), } .run(), ); } - drop(tx); - - while let Some(table) = rx.recv().await { - tables.push(table?); - } + let mut tables = Vec::with_capacity(handles.len()); - for handle in handles { - handle.await??; + // Short-circuit on first error and cancel other futures (JoinHandles that are not cancellable on drop) + // thanks to cancel guard. + while let Some(joined) = handles.next().await { + tables.push(joined??); } Ok(tables) diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 2935cc1c8..d774c5b01 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -2,9 +2,10 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use futures::future::try_join_all; use parking_lot::Mutex; use pgdog_config::QueryParserEngine; -use tokio::task::{JoinHandle, JoinSet}; +use tokio::task::JoinHandle; use tokio::time::{Instant, sleep}; use tokio::try_join; use tokio::{select, spawn, time::interval}; @@ -54,8 +55,6 @@ pub struct Publisher { replication_lag: Arc>>, /// Last transaction. last_transaction: Arc>>, - /// Stop signal. - stop: CancellationToken, /// Slot name. slot_name: String, } @@ -72,7 +71,6 @@ impl Publisher { slots: HashMap::new(), query_parser_engine, replication_lag: Arc::new(Mutex::new(HashMap::new())), - stop: CancellationToken::new(), last_transaction: Arc::new(Mutex::new(None)), slot_name, } @@ -164,8 +162,19 @@ impl Publisher { /// If you're doing a cross-shard transaction, parts of it can be lost. /// /// TODO: Add support for 2-phase commit. - async fn create_slots(&mut self, source: &Cluster) -> Result<(), Error> { + async fn create_slots( + &mut self, + source: &Cluster, + cancel: &CancellationToken, + ) -> Result<(), Error> { for (number, shard) in source.shards().iter().enumerate() { + // Cancel at slot boundaries so we never tear down an in-flight + // CREATE_REPLICATION_SLOT: the current slot completes, the next is + // not started. Slots already created are dropped by the caller. + if cancel.is_cancelled() { + return Err(Error::DataSyncAborted); + } + let addr = shard.primary(&Request::default()).await?.addr().clone(); let mut slot = ReplicationSlot::replication( @@ -190,12 +199,14 @@ impl Publisher { // Replicate shards in parallel. let mut streams = vec![]; + let stop = CancellationToken::new(); + // Synchronize tables from publication. self.sync_tables(false, source, dest).await?; // Create replication slots if we haven't already. if self.slots.is_empty() { - self.create_slots(source).await?; + self.create_slots(source, &stop).await?; } let n_sources = source.shards().len(); @@ -221,7 +232,7 @@ impl Publisher { let mut check_lag = interval(Duration::from_secs(1)); let replication_lag = self.replication_lag.clone(); - let stop = self.stop.clone(); + let stop = stop.clone(); let last_transaction = self.last_transaction.clone(); let source_cluster = source.clone(); @@ -345,15 +356,7 @@ impl Publisher { streams.push(handle); } - Ok(Waiter { - streams, - stop: self.stop.clone(), - }) - } - - /// Request the publisher to stop replication. - pub fn request_stop(&self) { - self.stop.cancel(); + Ok(Waiter { streams, stop }) } /// Get current replication lag. @@ -370,7 +373,12 @@ impl Publisher { /// re-sharding the cluster in the process. /// /// TODO: Parallelize shard syncs. - pub async fn data_sync(&mut self, source: &Cluster, dest: &Cluster) -> Result<(), Error> { + pub async fn data_sync( + &mut self, + source: &Cluster, + dest: &Cluster, + cancel: &CancellationToken, + ) -> Result<(), Error> { // Fetch schema and column metadata first — valid() depends on it. self.sync_tables(true, source, dest).await?; @@ -388,13 +396,13 @@ impl Publisher { // Create replication slots only after validation passes — a slot // created before valid() would be orphaned on validation errors. - self.create_slots(source).await?; + self.create_slots(source, cancel).await?; - // A JoinSet aborts every in-flight shard copy when this future is - // dropped (e.g. the owning task is cancelled), so cancellation actually - // stops the copy instead of leaving detached syncs running in the - // background. - let mut set: JoinSet), Error>> = JoinSet::new(); + // Each manager coordinates its own spawned per-table workers, so run the + // per-shard managers concurrently on this task. Cancellation reaches the + // workers through `cancel` (threaded down into the COPY loop), so no + // JoinSet is needed to stop the copy. + let mut syncs = Vec::new(); for (number, shard) in source.shards().iter().enumerate() { let tables = self @@ -431,17 +439,16 @@ impl Publisher { }; let dest = dest.clone(); - set.spawn(async move { + let cancel = cancel.clone(); + syncs.push(async move { let manager = ParallelSyncManager::new(tables, replicas, dest)?; - let tables = manager.run().await?; + let tables = manager.run(cancel).await?; Ok::<(usize, Vec), Error>((number, tables)) }); } - while let Some(joined) = set.join_next().await { - let (number, tables) = joined??; - + for (number, tables) in try_join_all(syncs).await? { info!( "table sync for {} tables complete [{}, shard: {}]", tables.len(), @@ -628,7 +635,9 @@ mod test { ); // Validation must fire before the copy begins. - let result = publisher.data_sync(&source, &dest).await; + let result = publisher + .data_sync(&source, &dest, &CancellationToken::new()) + .await; let err = result.expect_err("data_sync must fail for a publication with no-pk tables"); @@ -684,7 +693,9 @@ mod test { "pub_full_identity_nothing_slot".into(), ); - let result = publisher.data_sync(&source, &dest).await; + let result = publisher + .data_sync(&source, &dest, &CancellationToken::new()) + .await; let err = result.expect_err("data_sync must fail for REPLICA IDENTITY NOTHING table"); assert!( diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index 101a0f971..e4218df03 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -21,9 +21,8 @@ use super::super::{ Error, TableValidationError, TableValidationErrorKind, subscriber::CopySubscriber, }; use super::non_identity_columns_presence::NonIdentityColumnsPresence; -use super::{ - AbortSignal, Copy, PublicationTable, PublicationTableColumn, ReplicaIdentity, ReplicationSlot, -}; +use super::{Copy, PublicationTable, PublicationTableColumn, ReplicaIdentity, ReplicationSlot}; +use tokio_util::sync::CancellationToken; use tracing::info; @@ -441,7 +440,7 @@ impl Table { &mut self, source: &Address, dest: &Cluster, - abort: AbortSignal, + cancel: &CancellationToken, tracker: &TableCopy, ) -> Result { info!( @@ -476,12 +475,11 @@ impl Table { while let Some(data_row) = copy.data(slot.server()?).await? { select! { - _ = abort.aborted() => { + _ = cancel.cancelled() => { error!("aborting data sync for table {}", self.table); return Err(Error::CopyAborted(self.table.clone())) }, - result = copy_sub.copy_data(data_row) => { let (rows, bytes) = result?; progress.update(copy_sub.bytes_sharded(), slot.lsn().lsn);