-
Notifications
You must be signed in to change notification settings - Fork 1k
PHOENIX-7845 ReplicationLogGroup initialization resilience to standby cluster unavailability #2466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
e86bb0a
eb87ff6
fedfdb4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ | |
| import com.lmax.disruptor.dsl.ProducerType; | ||
| import java.io.IOException; | ||
| import java.io.InterruptedIOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.net.URI; | ||
| import java.net.URISyntaxException; | ||
| import java.util.ArrayList; | ||
|
|
@@ -165,6 +166,9 @@ public class ReplicationLogGroup { | |
| public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY = | ||
| "phoenix.replication.log.retry.delay.ms"; | ||
| public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L; | ||
| public static final String REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY = | ||
| "phoenix.replication.log.peer.init.timeout.ms"; | ||
| public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 10_000L; | ||
| public static final String WAL_SYNC_TIMEOUT_MS_KEY = "hbase.regionserver.wal.sync.timeout"; | ||
| public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L; | ||
|
|
||
|
|
@@ -180,10 +184,14 @@ public class ReplicationLogGroup { | |
| protected final String haGroupName; | ||
| protected final HAGroupStoreManager haGroupStoreManager; | ||
| protected final MetricsReplicationLogGroupSource metrics; | ||
| protected ReplicationShardDirectoryManager peerShardManager; | ||
| // Cached at init time — HDFS URLs (local and peer) are fixed for the lifetime of this group. | ||
| // URL changes require RS restart. | ||
| protected HAGroupStoreRecord haGroupStoreRecord; | ||
| protected ReplicationShardDirectoryManager localShardManager; | ||
| protected volatile ReplicationShardDirectoryManager peerShardManager; | ||
| protected ReplicationLogDiscoveryForwarder logForwarder; | ||
| protected long syncTimeoutMs; | ||
| protected long peerInitTimeoutMs; | ||
| protected volatile boolean closed = false; | ||
|
|
||
| /** | ||
|
|
@@ -331,20 +339,24 @@ public Record(String tableName, long commitId, Mutation mutation) { | |
| * @param serverName The server name | ||
| * @param haGroupName The HA Group name | ||
| * @return ReplicationLogGroup instance | ||
| * @throws RuntimeException if initialization fails | ||
| * @throws IOException if initialization fails | ||
| */ | ||
| public static ReplicationLogGroup get(Configuration conf, ServerName serverName, | ||
| String haGroupName) { | ||
| return INSTANCES.computeIfAbsent(haGroupName, k -> { | ||
| try { | ||
| ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); | ||
| group.init(); | ||
| return group; | ||
| } catch (IOException e) { | ||
| LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); | ||
| throw new RuntimeException(e); | ||
| } | ||
| }); | ||
| String haGroupName) throws IOException { | ||
| try { | ||
| return INSTANCES.computeIfAbsent(haGroupName, k -> { | ||
| try { | ||
| ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); | ||
| group.init(); | ||
| return group; | ||
| } catch (IOException e) { | ||
| LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| }); | ||
| } catch (UncheckedIOException e) { | ||
| throw e.getCause(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -354,21 +366,25 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, | |
| * @param haGroupName The HA Group name | ||
| * @param haGroupStoreManager HA Group Store Manager instance | ||
| * @return ReplicationLogGroup instance | ||
| * @throws RuntimeException if initialization fails | ||
| * @throws IOException if initialization fails | ||
| */ | ||
| public static ReplicationLogGroup get(Configuration conf, ServerName serverName, | ||
| String haGroupName, HAGroupStoreManager haGroupStoreManager) { | ||
| return INSTANCES.computeIfAbsent(haGroupName, k -> { | ||
| try { | ||
| ReplicationLogGroup group = | ||
| new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager); | ||
| group.init(); | ||
| return group; | ||
| } catch (IOException e) { | ||
| LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); | ||
| throw new RuntimeException(e); | ||
| } | ||
| }); | ||
| String haGroupName, HAGroupStoreManager haGroupStoreManager) throws IOException { | ||
| try { | ||
| return INSTANCES.computeIfAbsent(haGroupName, k -> { | ||
| try { | ||
| ReplicationLogGroup group = | ||
| new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager); | ||
| group.init(); | ||
| return group; | ||
| } catch (IOException e) { | ||
| LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| }); | ||
| } catch (UncheckedIOException e) { | ||
| throw e.getCause(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -421,9 +437,10 @@ protected void init() throws IOException { | |
| throw new IOException(message); | ||
| } | ||
| HAGroupStoreRecord record = haRecord.get(); | ||
| // First initialize the shard managers | ||
| this.peerShardManager = createPeerShardManager(record); | ||
| this.localShardManager = createLocalShardManager(record); | ||
| this.haGroupStoreRecord = record; | ||
| this.localShardManager = createLocalShardManager(); | ||
| this.peerInitTimeoutMs = conf.getLong(REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY, | ||
| DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS); | ||
| // Initialize the replication log forwarder. The log forwarder is only activated when | ||
| // we switch to STORE_AND_FORWARD or SYNC_AND_FORWARD mode | ||
| this.logForwarder = new ReplicationLogDiscoveryForwarder(this); | ||
|
|
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager createShardManager(String uri, String l | |
| } | ||
| } | ||
|
|
||
| /** create shard manager for the standby cluster */ | ||
| protected ReplicationShardDirectoryManager createPeerShardManager(HAGroupStoreRecord record) | ||
| throws IOException { | ||
| return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR); | ||
| /** create shard manager for the fallback cluster */ | ||
| protected ReplicationShardDirectoryManager createLocalShardManager() throws IOException { | ||
| return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR); | ||
| } | ||
|
|
||
| /** create shard manager for the fallback cluster */ | ||
| protected ReplicationShardDirectoryManager createLocalShardManager(HAGroupStoreRecord record) | ||
| throws IOException { | ||
| return createShardManager(record.getHdfsUrl(), FALLBACK_DIR); | ||
| /** | ||
| * Get or create the peer shard manager. Thread-safe and idempotent — the first successful | ||
| * creation is cached; subsequent calls return the cached instance. Bounded by | ||
| * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the disruptor handler | ||
| * thread on a peer NN outage. | ||
| */ | ||
| protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() throws IOException { | ||
| ReplicationShardDirectoryManager cached = peerShardManager; | ||
| if (cached != null) { | ||
| return cached; | ||
| } | ||
| synchronized (this) { | ||
| if (peerShardManager != null) { | ||
| return peerShardManager; | ||
| } | ||
| CompletableFuture<ReplicationShardDirectoryManager> future = | ||
| CompletableFuture.supplyAsync(() -> { | ||
| try { | ||
| return createPeerShardManager(); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| }); | ||
| try { | ||
| peerShardManager = future.get(peerInitTimeoutMs, TimeUnit.MILLISECONDS); | ||
| return peerShardManager; | ||
| } catch (UncheckedIOException e) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit. future.get() won't throw UncheckedIOException. When the supplier throws unchecked, the future fails and Future.get() wraps the cause in ExecutionException. You can unwrap the UncheckedIOException from ExecutionException where you need it. |
||
| throw e.getCause(); | ||
| } catch (ExecutionException e) { | ||
| if (e.getCause() instanceof IOException) { | ||
| throw (IOException) e.getCause(); | ||
| } | ||
| throw new IOException("Failed to create peer shard manager", e.getCause()); | ||
| } catch (TimeoutException e) { | ||
| future.cancel(true); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. future.cancel(true) does not actually free the worker thread. (Per CompletableFuture Javadoc, mayInterruptIfRunning has no effect i.e. interrupts are not used to control processing.) The stall is moved to the worker pool, which is better, but the stall still holds resources the entire time the underlying HDFS call is blocked. Every subsequent call to this method spawns another supplyAsync task while the previous one is still hung, so this issue can compound and potentially exhaust the worker pool. I asked the robot about this and it recommended: Use a dedicated ExecutorService (single-thread, bounded queue) so future.cancel(true) can actually interrupt, and so leakage is bounded. |
||
| throw new IOException("Timed out creating peer shard manager after " + peerInitTimeoutMs | ||
| + "ms for " + haGroupName, e); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new IOException("Interrupted while creating peer shard manager", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** return shard manager for the standby cluster */ | ||
| protected ReplicationShardDirectoryManager getPeerShardManager() { | ||
| return peerShardManager; | ||
| /** Create a new peer shard manager for the standby cluster */ | ||
| protected ReplicationShardDirectoryManager createPeerShardManager() throws IOException { | ||
| return createShardManager(haGroupStoreRecord.getPeerHdfsUrl(), STANDBY_DIR); | ||
| } | ||
|
|
||
| /** return shard manager for the fallback cluster */ | ||
|
|
@@ -809,14 +863,10 @@ private FileSystem getFileSystem(URI uri) throws IOException { | |
| return FileSystem.get(uri, conf); | ||
| } | ||
|
|
||
| /** Create the standby(synchronous) writer */ | ||
| protected ReplicationLog createStandbyLog() throws IOException { | ||
| return new ReplicationLog(this, peerShardManager); | ||
| } | ||
|
|
||
| /** Create the fallback writer */ | ||
| protected ReplicationLog createFallbackLog() throws IOException { | ||
| return new ReplicationLog(this, localShardManager); | ||
| /** Create a replication log using the given shard manager */ | ||
| protected ReplicationLog createReplicationLog(ReplicationShardDirectoryManager shardManager) | ||
| throws IOException { | ||
| return new ReplicationLog(this, shardManager); | ||
| } | ||
|
|
||
| /** Returns the log forwarder for this replication group */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With both the disruptor handler and the forwarder thread involved we get a mutual exclusion here that may be longer than 10s.