diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 60e972e561be2..269cbafc20799 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -19,7 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; @@ -496,7 +496,7 @@ public void initializeComplete() { future.complete(newledger); // May need to trigger offloading if (config.isTriggerOffloadOnTopicLoad()) { - newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + newledger.maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); } }); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4a1a3d12ab075..dc8a83db4cdb4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -231,8 +231,19 @@ public Logger getLogger() { protected final CallbackMutex trimmerMutex = new CallbackMutex(); protected final CallbackMutex offloadMutex = new CallbackMutex(); - public static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture + // Automatic offload has no caller-visible future. Coalesce concurrent automatic triggers into at most one + // running offload and one follow-up run. + private final AtomicBoolean automaticOffloadInProgress = new AtomicBoolean(false); + private final AtomicBoolean automaticOffloadRerunRequested = new AtomicBoolean(false); + // Identity sentinel for automatic offload requests. The completed Position value is not used. + public static final CompletableFuture AUTOMATIC_OFFLOAD_TRIGGER = CompletableFuture .completedFuture(PositionFactory.LATEST); + + private enum OffloadRequestSource { + AUTOMATIC, + EXPLICIT + } + @VisibleForTesting @Getter protected volatile LedgerHandle currentLedger; @@ -1968,7 +1979,7 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) { trimConsumedLedgersInBackground(); - maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); createLedgerAfterClosed(); } @@ -2804,22 +2815,66 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture p } public void maybeOffloadInBackground(CompletableFuture promise) { - if (getOffloadPoliciesIfAppendable().isEmpty()) { + if (promise == AUTOMATIC_OFFLOAD_TRIGGER) { + if (!automaticOffloadInProgress.compareAndSet(false, true)) { + automaticOffloadRerunRequested.set(true); + return; + } + CompletableFuture automaticOffloadCompletion = new CompletableFuture<>(); + automaticOffloadCompletion.whenComplete((res, ex) -> finishAutomaticOffload(ex)); + maybeOffloadInBackground(automaticOffloadCompletion, OffloadRequestSource.AUTOMATIC); + return; + } + + maybeOffloadInBackground(promise, OffloadRequestSource.EXPLICIT); + } + + private void maybeOffloadInBackground(CompletableFuture promise, OffloadRequestSource source) { + Optional> offloadThresholds = getOffloadThresholds(); + if (offloadThresholds.isEmpty()) { + // Explicit callers keep the previous no-completion behavior. The internal automatic completion must be + // finished so automaticOffloadInProgress can be cleared. + if (source == OffloadRequestSource.AUTOMATIC) { + promise.complete(PositionFactory.LATEST); + } return; } - final OffloadPolicies policies = config.getLedgerOffloader().getOffloadPolicies(); + Pair thresholds = offloadThresholds.get(); + executor.execute(() -> maybeOffload(thresholds.getLeft(), thresholds.getRight(), promise, + source)); + } + + private Optional> getOffloadThresholds() { + Optional optionalOffloadPolicies = getOffloadPoliciesIfAppendable(); + if (optionalOffloadPolicies.isEmpty()) { + return Optional.empty(); + } + + final OffloadPolicies policies = optionalOffloadPolicies.get(); final long offloadThresholdInBytes = Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L); final long offloadThresholdInSeconds = Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L); if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) { - executor.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)); + return Optional.of(Pair.of(offloadThresholdInBytes, offloadThresholdInSeconds)); + } + + return Optional.empty(); + } + + private void finishAutomaticOffload(Throwable exception) { + if (exception != null) { + log.warn().exception(exception).log("Failed to automatically offload ledgers"); + } + automaticOffloadInProgress.set(false); + if (automaticOffloadRerunRequested.getAndSet(false)) { + maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); } } private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds, - CompletableFuture finalPromise) { + CompletableFuture finalPromise, OffloadRequestSource source) { if (getOffloadPoliciesIfAppendable().isEmpty()) { String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name); finalPromise.completeExceptionally(new IllegalArgumentException(msg)); @@ -2834,7 +2889,7 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS } if (!offloadMutex.tryLock()) { - scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise), + scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise, source), 100, TimeUnit.MILLISECONDS); return; } @@ -2926,12 +2981,11 @@ void internalTrimConsumedLedgers(CompletableFuture promise) { private Optional getOffloadPoliciesIfAppendable() { LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); - if (ledgerOffloader == null - || !ledgerOffloader.isAppendable() - || ledgerOffloader.getOffloadPolicies() == null) { + if (ledgerOffloader == null || !ledgerOffloader.isAppendable()) { return Optional.empty(); } - return Optional.ofNullable(ledgerOffloader.getOffloadPolicies()); + OffloadPolicies offloadPolicies = ledgerOffloader.getOffloadPolicies(); + return Optional.ofNullable(offloadPolicies); } @VisibleForTesting diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index e991ceabf456a..5385f642c5713 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -1184,6 +1184,128 @@ public CompletableFuture offload(ReadHandle ledger, } } + @Test + public void automaticOffloadTriggersAreCoalescedWhileOffloadInProgress() throws Exception { + CompletableFuture slowOffload = new CompletableFuture<>(); + CountDownLatch offloadRunning = new CountDownLatch(1); + AtomicInteger offloadPolicyCalls = new AtomicInteger(); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture offload(ReadHandle ledger, + UUID uuid, + Map extraMetadata) { + offloadRunning.countDown(); + return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata)); + } + + @Override + public OffloadPoliciesImpl getOffloadPolicies() { + offloadPolicyCalls.incrementAndGet(); + return super.getOffloadPolicies(); + } + }; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 25; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + assertTrue(offloadRunning.await(5, TimeUnit.SECONDS)); + + int callsBeforeRepeatedTriggers = offloadPolicyCalls.get(); + for (int i = 0; i < 20; i++) { + ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER); + } + + Thread.sleep(300); + assertTrue(offloadPolicyCalls.get() < callsBeforeRepeatedTriggers + 5, + "Repeated automatic triggers should not create independent retry loops"); + + slowOffload.complete(null); + + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2); + List allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList(); + assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1))); + } + + @Test + public void automaticOffloadRunsAgainForCoalescedTrigger() throws Exception { + CompletableFuture slowOffload = new CompletableFuture<>(); + CountDownLatch offloadRunning = new CountDownLatch(1); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture offload(ReadHandle ledger, + UUID uuid, + Map extraMetadata) { + offloadRunning.countDown(); + return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata)); + } + }; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 11; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + assertTrue(offloadRunning.await(5, TimeUnit.SECONDS)); + + // The next ledger closes after the first automatic scan, so it depends on the coalesced rerun. + for (int i = 11; i < 21; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + assertEquals(offloader.offloadedLedgers().size(), 0); + + slowOffload.complete(null); + + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2); + List allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList(); + assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1))); + } + + @Test + public void automaticOffloadWithoutThresholdDoesNotBlockLaterTriggers() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(-1L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 25; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER); + assertEquals(offloader.offloadedLedgers().size(), 0); + + // A disabled automatic trigger must complete internally so a later enabled trigger can run. + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L); + ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER); + + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2); + List allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList(); + assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1))); + } + @DataProvider(name = "offloadAsSoonAsClosed") public Object[][] offloadAsSoonAsClosedProvider() { return new Object[][]{