diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index ae0ded37211..830763aa022 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -628,6 +628,10 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
private final LastLogMark lastLogMark = new LastLogMark(0, 0);
+ // Ensures lastMark only advances forward across concurrent checkpointComplete calls.
+ private final Object checkpointLock = new Object();
+ private final LogMark lastPersistedMark = new LogMark(0, 0);
+
private static final String LAST_MARK_DEFAULT_NAME = "lastMark";
private final String lastMarkFileName;
@@ -705,6 +709,8 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
}
lastLogMark.readLog();
+ lastPersistedMark.setLogMark(
+ lastLogMark.getCurMark().getLogFileId(), lastLogMark.getCurMark().getLogFileOffset());
log.debug().attr("lastMark", () -> lastLogMark.getCurMark()).log("Last Log Mark");
try {
@@ -771,6 +777,9 @@ public Checkpoint newCheckpoint() {
/**
* Telling journal a checkpoint is finished.
*
+ *
Skips if the checkpoint is older than the last persisted mark,
+ * preventing lastMark from regressing backwards.
+ *
* @throws IOException
*/
@Override
@@ -781,7 +790,16 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint;
LastLogMark mark = lmcheckpoint.mark;
- mark.rollLog(mark);
+ // See #4105: keep the monotonic decision and lastMark persistence together.
+ synchronized (checkpointLock) {
+ if (mark.getCurMark().compare(lastPersistedMark) < 0) {
+ return;
+ }
+ persistLastLogMark(mark);
+ lastPersistedMark.setLogMark(
+ mark.getCurMark().getLogFileId(), mark.getCurMark().getLogFileOffset());
+ }
+
if (compact) {
// list the journals that have been marked
List logs = listJournalIds(journalDirectory, new JournalRollingFilter(mark));
@@ -803,6 +821,16 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
}
}
+ @VisibleForTesting
+ protected void persistLastLogMark(LastLogMark mark) throws NoWritableLedgerDirException {
+ mark.rollLog(mark);
+ }
+
+ @VisibleForTesting
+ protected boolean isCheckpointCompleteLockHeldByCurrentThread() {
+ return Thread.holdsLock(checkpointLock);
+ }
+
/**
* Scan the journal.
*
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index dfc2459678b..a569a638ccf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -32,11 +32,17 @@
import io.netty.util.ReferenceCountUtil;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
@@ -45,7 +51,9 @@
import org.apache.bookkeeper.bookie.CheckpointSourceList;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LogMark;
import org.apache.bookkeeper.bookie.TestBookieImpl;
@@ -53,6 +61,7 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.DiskChecker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -881,4 +890,235 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger
bookie.getLedgerStorage().flush();
Assert.assertEquals(pendingDeletedLedgers.size(), 0);
}
+
+ @Test
+ public void testCheckpointCompleteSkipsStaleCheckpointAfterNewerCheckpoint() throws Exception {
+ File baseDir = new File(tmpDir, "journalStaleCheckpointTest");
+ File ledgerDir = new File(baseDir, "ledger");
+ File journalBaseDir = new File(baseDir, "journal");
+ File journalDir = BookieImpl.getCurrentDirectory(journalBaseDir);
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(1000);
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+ conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+ conf.setJournalDirName(journalBaseDir.getCanonicalPath());
+ conf.setMaxBackupJournals(0);
+
+ BookieImpl.checkDirectoryStructure(journalDir);
+ LedgerDirsManager dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(0.95f, 0.90f));
+ Journal journal = new Journal(0, journalDir, conf, dirsManager);
+ File ledgerDirMark = new File(BookieImpl.getCurrentDirectory(ledgerDir), "lastMark");
+ createJournalFiles(journalDir, 3, 10);
+
+ CheckpointSource checkpointSource = new CheckpointSourceList(Lists.newArrayList(journal));
+
+ journal.getLastLogMark().getCurMark().setLogMark(7, 100);
+ CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();
+
+ journal.getLastLogMark().getCurMark().setLogMark(9, 300);
+ CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint();
+
+ checkpointSource.checkpointComplete(newerCheckpoint, true);
+ checkpointSource.checkpointComplete(staleCheckpoint, true);
+
+ LogMark markAfterStaleCheckpoint = readLogMark(ledgerDirMark);
+ assertEquals("lastMark must stay at newer checkpoint",
+ 9, markAfterStaleCheckpoint.getLogFileId());
+ assertEquals(300, markAfterStaleCheckpoint.getLogFileOffset());
+
+ for (long id = 3; id <= 8; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists());
+ }
+ for (long id = 9; id <= 10; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertTrue("Journal " + id + " should still exist", journalFile.exists());
+ }
+ }
+
+ @Test
+ public void testCheckpointCompleteUsesReloadedLastMarkAsPersistedMark() throws Exception {
+ File baseDir = new File(tmpDir, "journalReloadedLastMarkTest");
+ File ledgerDir = new File(baseDir, "ledger");
+ File journalBaseDir = new File(baseDir, "journal");
+ File journalDir = BookieImpl.getCurrentDirectory(journalBaseDir);
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(1000);
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+ conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+ conf.setJournalDirName(journalBaseDir.getCanonicalPath());
+ conf.setMaxBackupJournals(0);
+
+ BookieImpl.checkDirectoryStructure(journalDir);
+ LedgerDirsManager dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(0.95f, 0.90f));
+ File ledgerDirMark = new File(BookieImpl.getCurrentDirectory(ledgerDir), "lastMark");
+ writeLogMark(ledgerDirMark, 9, 300);
+ createJournalFiles(journalDir, 7, 10);
+
+ Journal journal = new Journal(0, journalDir, conf, dirsManager);
+ CheckpointSource checkpointSource = new CheckpointSourceList(Lists.newArrayList(journal));
+
+ journal.getLastLogMark().getCurMark().setLogMark(7, 100);
+ CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();
+ checkpointSource.checkpointComplete(staleCheckpoint, true);
+
+ LogMark markAfterStaleCheckpoint = readLogMark(ledgerDirMark);
+ assertEquals("Reloaded lastMark must seed the monotonic guard",
+ 9, markAfterStaleCheckpoint.getLogFileId());
+ assertEquals(300, markAfterStaleCheckpoint.getLogFileOffset());
+ }
+
+ @Test
+ public void testConcurrentCheckpointCompleteLastMarkRegression() throws Exception {
+ File baseDir = new File(tmpDir, "journalMissingTest");
+ File ledgerDir = new File(baseDir, "ledger");
+ File journalBaseDir = new File(baseDir, "journal");
+ File journalDir = BookieImpl.getCurrentDirectory(journalBaseDir);
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(1000);
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+ conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+ conf.setJournalDirName(journalBaseDir.getCanonicalPath());
+ conf.setMaxBackupJournals(0);
+
+ BookieImpl.checkDirectoryStructure(journalDir);
+ LedgerDirsManager dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(0.95f, 0.90f));
+ CountDownLatch stalePersistStarted = new CountDownLatch(1);
+ CountDownLatch releaseStalePersist = new CountDownLatch(1);
+ CountDownLatch newerPersisted = new CountDownLatch(1);
+ BlockingJournal journal = new BlockingJournal(0, journalDir, conf, dirsManager,
+ stalePersistStarted, releaseStalePersist, newerPersisted);
+ File ledgerDirMark = new File(BookieImpl.getCurrentDirectory(ledgerDir), "lastMark");
+ createJournalFiles(journalDir, 3, 10);
+
+ CheckpointSource checkpointSource = new CheckpointSourceList(Lists.newArrayList(journal));
+
+ journal.getLastLogMark().getCurMark().setLogMark(7, 100);
+ CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();
+
+ journal.getLastLogMark().getCurMark().setLogMark(9, 300);
+ CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint();
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ Future> staleFuture = executor.submit(() -> {
+ checkpointSource.checkpointComplete(staleCheckpoint, true);
+ return null;
+ });
+ assertTrue("Stale checkpoint should reach lastMark persistence",
+ stalePersistStarted.await(10, TimeUnit.SECONDS));
+
+ Future> newerFuture = executor.submit(() -> {
+ checkpointSource.checkpointComplete(newerCheckpoint, true);
+ return null;
+ });
+ releaseStalePersist.countDown();
+
+ newerFuture.get(10, TimeUnit.SECONDS);
+ staleFuture.get(10, TimeUnit.SECONDS);
+ } finally {
+ executor.shutdownNow();
+ }
+
+ LogMark markAfterFlush = readLogMark(ledgerDirMark);
+ assertEquals("lastMark must not regress after concurrent checkpoints",
+ 9, markAfterFlush.getLogFileId());
+ assertEquals(300, markAfterFlush.getLogFileOffset());
+
+ for (long id = 3; id <= 8; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists());
+ }
+ for (long id = 9; id <= 10; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertTrue("Journal " + id + " should still exist", journalFile.exists());
+ }
+
+ journal.getLastLogMark().getCurMark().setLogMark(0, 0);
+ journal.getLastLogMark().readLog();
+ LogMark restartMark = journal.getLastLogMark().getCurMark();
+ assertEquals("Reloaded lastMark should be 9", 9, restartMark.getLogFileId());
+ assertEquals("Reloaded lastMark offset should be restored from disk",
+ 300, restartMark.getLogFileOffset());
+
+ File markedJournal = new File(journalDir,
+ Long.toHexString(restartMark.getLogFileId()) + ".txn");
+ assertTrue("Journal file " + restartMark.getLogFileId() + " must exist for recovery",
+ markedJournal.exists());
+
+ List replayLogs = Journal.listJournalIds(journalDir,
+ journalId -> journalId >= restartMark.getLogFileId());
+ assertTrue("Replay journal list must contain the marked journal",
+ replayLogs.size() > 0 && replayLogs.get(0) == restartMark.getLogFileId());
+ }
+
+ private static void createJournalFiles(File journalDir, long startId, long endId) throws IOException {
+ for (long id = startId; id <= endId; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertTrue("Failed to create journal file " + id, journalFile.createNewFile());
+ }
+ }
+
+ private static void writeLogMark(File file, long logFileId, long logFileOffset) throws IOException {
+ byte[] buff = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ LogMark mark = new LogMark(logFileId, logFileOffset);
+ mark.writeLogMark(bb);
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ fos.write(buff);
+ fos.getChannel().force(true);
+ }
+ }
+
+ private static void awaitLatch(CountDownLatch latch, String message) {
+ try {
+ assertTrue(message, latch.await(10, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError(message, e);
+ }
+ }
+
+ private static final class BlockingJournal extends Journal {
+ private static final long STALE_LOG_ID = 7;
+ private static final long NEWER_LOG_ID = 9;
+
+ private final CountDownLatch stalePersistStarted;
+ private final CountDownLatch releaseStalePersist;
+ private final CountDownLatch newerPersisted;
+
+ BlockingJournal(int journalIndex, File journalDirectory, ServerConfiguration conf,
+ LedgerDirsManager ledgerDirsManager, CountDownLatch stalePersistStarted,
+ CountDownLatch releaseStalePersist, CountDownLatch newerPersisted) {
+ super(journalIndex, journalDirectory, conf, ledgerDirsManager);
+ this.stalePersistStarted = stalePersistStarted;
+ this.releaseStalePersist = releaseStalePersist;
+ this.newerPersisted = newerPersisted;
+ }
+
+ @Override
+ protected void persistLastLogMark(LastLogMark mark) throws NoWritableLedgerDirException {
+ long logFileId = mark.getCurMark().getLogFileId();
+ if (logFileId == STALE_LOG_ID) {
+ stalePersistStarted.countDown();
+ awaitLatch(releaseStalePersist, "Timed out waiting to release stale checkpoint");
+ if (!isCheckpointCompleteLockHeldByCurrentThread()) {
+ awaitLatch(newerPersisted, "Timed out waiting for newer checkpoint to persist");
+ }
+ }
+ super.persistLastLogMark(mark);
+ if (logFileId == NEWER_LOG_ID) {
+ newerPersisted.countDown();
+ }
+ }
+ }
}