PHOENIX-7845 ReplicationLogGroup initialization resilience to standby cluster unavailability#2466
Conversation
… cluster unavailability
…eption - Replace RuntimeException wrapping in get() with UncheckedIOException to avoid misclassifying unrelated RuntimeExceptions with IOException causes - Bound peer shard manager creation with configurable timeout (default 10s) via CompletableFuture.get() to prevent blocking the disruptor handler thread on peer NN outage; TimeoutException triggers SAF degradation - Consolidate peer shard manager into a single lazy synchronized accessor on ReplicationLogGroup; remove per-component caching from forwarder - Cancel the in-flight future on timeout to release the ForkJoinPool thread - Add test for timeout-triggered SAF degradation
apurtell
left a comment
There was a problem hiding this comment.
+1 with some minor comments
| try { | ||
| peerShardManager = future.get(peerInitTimeoutMs, TimeUnit.MILLISECONDS); | ||
| return peerShardManager; | ||
| } catch (UncheckedIOException e) { |
There was a problem hiding this comment.
Nit. future.get() won't throw UncheckedIOException. When the supplier throws unchecked, the future fails and Future.get() wraps the cause in ExecutionException. You can unwrap the UncheckedIOException from ExecutionException where you need it.
| } | ||
| throw new IOException("Failed to create peer shard manager", e.getCause()); | ||
| } catch (TimeoutException e) { | ||
| future.cancel(true); |
There was a problem hiding this comment.
future.cancel(true) does not actually free the worker thread. (Per CompletableFuture Javadoc, mayInterruptIfRunning has no effect i.e. interrupts are not used to control processing.) The stall is moved to the worker pool, which is better, but the stall still holds resources the entire time the underlying HDFS call is blocked. Every subsequent call to this method spawns another supplyAsync task while the previous one is still hung, so this issue can compound and potentially exhaust the worker pool.
I asked the robot about this and it recommended: Use a dedicated ExecutorService (single-thread, bounded queue) so future.cancel(true) can actually interrupt, and so leakage is bounded.
| if (cached != null) { | ||
| return cached; | ||
| } | ||
| synchronized (this) { |
There was a problem hiding this comment.
With both the disruptor handler and the forwarder thread involved we get a mutual exclusion here that may be longer than 10s.
- Remove dead catch(UncheckedIOException) block; future.get() wraps supplier exceptions in ExecutionException, not UncheckedIOException - Unwrap UncheckedIOException inside ExecutionException handler to recover the original IOException - Cache the in-flight CompletableFuture to prevent spawning unbounded async tasks when peer HDFS is unavailable; reuse the pending future on timeout, retry only after exceptional completion - Remove no-op future.cancel(true) which has no effect on CompletableFuture (mayInterruptIfRunning is ignored)
Summary
ReplicationLogGroup.init()to individual mode consumers (SyncModeImpl,SyncAndForwardModeImpl,ReplicationLogDiscoveryForwarder)updateModeOnFailure()pathReplicationLogGroup.get()to throwIOExceptioninstead ofRuntimeException, so callers inIndexRegionObserverget proper error classification (not misclassified asIndexBuildingFailureException)createStandbyLog()/createFallbackLog()with singlecreateReplicationLog(shardManager)factoryTest plan
testInitDegradesToSafWhenPeerUnavailable— peer unavailable at startup → SAF mode, writes succeedtestInitFailsWhenLocalUnavailable— local FS unavailable → init fails with IOExceptiontestForwarderRetriesPeerCreation— forwarder retries peer shard manager on next round after initial failureReplicationLogGroupTesttests passReplicationLogDiscoveryForwarderTesttests passReplicationLogTesttests pass