Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour

private final LastLogMark lastLogMark = new LastLogMark(0, 0);

// Ensures lastMark only advances forward across concurrent checkpointComplete calls.
private final Object checkpointLock = new Object();
private final LogMark lastPersistedMark = new LogMark(0, 0);

private static final String LAST_MARK_DEFAULT_NAME = "lastMark";

private final String lastMarkFileName;
Expand Down Expand Up @@ -705,6 +709,8 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
}
lastLogMark.readLog();
lastPersistedMark.setLogMark(
lastLogMark.getCurMark().getLogFileId(), lastLogMark.getCurMark().getLogFileOffset());
log.debug().attr("lastMark", () -> lastLogMark.getCurMark()).log("Last Log Mark");

try {
Expand Down Expand Up @@ -771,6 +777,9 @@ public Checkpoint newCheckpoint() {
/**
* Telling journal a checkpoint is finished.
*
* <p>Skips if the checkpoint is older than the last persisted mark,
* preventing lastMark from regressing backwards.
*
* @throws IOException
*/
@Override
Expand All @@ -781,7 +790,16 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint;
LastLogMark mark = lmcheckpoint.mark;

mark.rollLog(mark);
// See #4105: keep the monotonic decision and lastMark persistence together.
synchronized (checkpointLock) {
if (mark.getCurMark().compare(lastPersistedMark) < 0) {
return;
}
persistLastLogMark(mark);
lastPersistedMark.setLogMark(
mark.getCurMark().getLogFileId(), mark.getCurMark().getLogFileOffset());
}

if (compact) {
// list the journals that have been marked
List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter(mark));
Expand All @@ -803,6 +821,16 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
}
}

@VisibleForTesting
protected void persistLastLogMark(LastLogMark mark) throws NoWritableLedgerDirException {
mark.rollLog(mark);
}

@VisibleForTesting
protected boolean isCheckpointCompleteLockHeldByCurrentThread() {
return Thread.holdsLock(checkpointLock);
}

/**
* Scan the journal.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@
import io.netty.util.ReferenceCountUtil;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
Expand All @@ -45,14 +51,17 @@
import org.apache.bookkeeper.bookie.CheckpointSourceList;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LogMark;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.util.DiskChecker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -881,4 +890,235 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger
bookie.getLedgerStorage().flush();
Assert.assertEquals(pendingDeletedLedgers.size(), 0);
}

@Test
public void testCheckpointCompleteSkipsStaleCheckpointAfterNewerCheckpoint() throws Exception {
File baseDir = new File(tmpDir, "journalStaleCheckpointTest");
File ledgerDir = new File(baseDir, "ledger");
File journalBaseDir = new File(baseDir, "journal");
File journalDir = BookieImpl.getCurrentDirectory(journalBaseDir);
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(1000);
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
conf.setJournalDirName(journalBaseDir.getCanonicalPath());
conf.setMaxBackupJournals(0);

BookieImpl.checkDirectoryStructure(journalDir);
LedgerDirsManager dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(0.95f, 0.90f));
Journal journal = new Journal(0, journalDir, conf, dirsManager);
File ledgerDirMark = new File(BookieImpl.getCurrentDirectory(ledgerDir), "lastMark");
createJournalFiles(journalDir, 3, 10);

CheckpointSource checkpointSource = new CheckpointSourceList(Lists.newArrayList(journal));

journal.getLastLogMark().getCurMark().setLogMark(7, 100);
CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();

journal.getLastLogMark().getCurMark().setLogMark(9, 300);
CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint();

checkpointSource.checkpointComplete(newerCheckpoint, true);
checkpointSource.checkpointComplete(staleCheckpoint, true);

LogMark markAfterStaleCheckpoint = readLogMark(ledgerDirMark);
assertEquals("lastMark must stay at newer checkpoint",
9, markAfterStaleCheckpoint.getLogFileId());
assertEquals(300, markAfterStaleCheckpoint.getLogFileOffset());

for (long id = 3; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists());
}
for (long id = 9; id <= 10; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Journal " + id + " should still exist", journalFile.exists());
}
}

@Test
public void testCheckpointCompleteUsesReloadedLastMarkAsPersistedMark() throws Exception {
File baseDir = new File(tmpDir, "journalReloadedLastMarkTest");
File ledgerDir = new File(baseDir, "ledger");
File journalBaseDir = new File(baseDir, "journal");
File journalDir = BookieImpl.getCurrentDirectory(journalBaseDir);
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(1000);
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
conf.setJournalDirName(journalBaseDir.getCanonicalPath());
conf.setMaxBackupJournals(0);

BookieImpl.checkDirectoryStructure(journalDir);
LedgerDirsManager dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(0.95f, 0.90f));
File ledgerDirMark = new File(BookieImpl.getCurrentDirectory(ledgerDir), "lastMark");
writeLogMark(ledgerDirMark, 9, 300);
createJournalFiles(journalDir, 7, 10);

Journal journal = new Journal(0, journalDir, conf, dirsManager);
CheckpointSource checkpointSource = new CheckpointSourceList(Lists.newArrayList(journal));

journal.getLastLogMark().getCurMark().setLogMark(7, 100);
CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();
checkpointSource.checkpointComplete(staleCheckpoint, true);

LogMark markAfterStaleCheckpoint = readLogMark(ledgerDirMark);
assertEquals("Reloaded lastMark must seed the monotonic guard",
9, markAfterStaleCheckpoint.getLogFileId());
assertEquals(300, markAfterStaleCheckpoint.getLogFileOffset());
}

@Test
public void testConcurrentCheckpointCompleteLastMarkRegression() throws Exception {
File baseDir = new File(tmpDir, "journalMissingTest");
File ledgerDir = new File(baseDir, "ledger");
File journalBaseDir = new File(baseDir, "journal");
File journalDir = BookieImpl.getCurrentDirectory(journalBaseDir);
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(1000);
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
conf.setJournalDirName(journalBaseDir.getCanonicalPath());
conf.setMaxBackupJournals(0);

BookieImpl.checkDirectoryStructure(journalDir);
LedgerDirsManager dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(0.95f, 0.90f));
CountDownLatch stalePersistStarted = new CountDownLatch(1);
CountDownLatch releaseStalePersist = new CountDownLatch(1);
CountDownLatch newerPersisted = new CountDownLatch(1);
BlockingJournal journal = new BlockingJournal(0, journalDir, conf, dirsManager,
stalePersistStarted, releaseStalePersist, newerPersisted);
File ledgerDirMark = new File(BookieImpl.getCurrentDirectory(ledgerDir), "lastMark");
createJournalFiles(journalDir, 3, 10);

CheckpointSource checkpointSource = new CheckpointSourceList(Lists.newArrayList(journal));

journal.getLastLogMark().getCurMark().setLogMark(7, 100);
CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();

journal.getLastLogMark().getCurMark().setLogMark(9, 300);
CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint();

ExecutorService executor = Executors.newFixedThreadPool(2);
try {
Future<?> staleFuture = executor.submit(() -> {
checkpointSource.checkpointComplete(staleCheckpoint, true);
return null;
});
assertTrue("Stale checkpoint should reach lastMark persistence",
stalePersistStarted.await(10, TimeUnit.SECONDS));

Future<?> newerFuture = executor.submit(() -> {
checkpointSource.checkpointComplete(newerCheckpoint, true);
return null;
});
releaseStalePersist.countDown();

newerFuture.get(10, TimeUnit.SECONDS);
staleFuture.get(10, TimeUnit.SECONDS);
} finally {
executor.shutdownNow();
}

LogMark markAfterFlush = readLogMark(ledgerDirMark);
assertEquals("lastMark must not regress after concurrent checkpoints",
9, markAfterFlush.getLogFileId());
assertEquals(300, markAfterFlush.getLogFileOffset());

for (long id = 3; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists());
}
for (long id = 9; id <= 10; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Journal " + id + " should still exist", journalFile.exists());
}

journal.getLastLogMark().getCurMark().setLogMark(0, 0);
journal.getLastLogMark().readLog();
LogMark restartMark = journal.getLastLogMark().getCurMark();
assertEquals("Reloaded lastMark should be 9", 9, restartMark.getLogFileId());
assertEquals("Reloaded lastMark offset should be restored from disk",
300, restartMark.getLogFileOffset());

File markedJournal = new File(journalDir,
Long.toHexString(restartMark.getLogFileId()) + ".txn");
assertTrue("Journal file " + restartMark.getLogFileId() + " must exist for recovery",
markedJournal.exists());

List<Long> replayLogs = Journal.listJournalIds(journalDir,
journalId -> journalId >= restartMark.getLogFileId());
assertTrue("Replay journal list must contain the marked journal",
replayLogs.size() > 0 && replayLogs.get(0) == restartMark.getLogFileId());
}

private static void createJournalFiles(File journalDir, long startId, long endId) throws IOException {
for (long id = startId; id <= endId; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Failed to create journal file " + id, journalFile.createNewFile());
}
}

private static void writeLogMark(File file, long logFileId, long logFileOffset) throws IOException {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark(logFileId, logFileOffset);
mark.writeLogMark(bb);
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(buff);
fos.getChannel().force(true);
}
}

private static void awaitLatch(CountDownLatch latch, String message) {
try {
assertTrue(message, latch.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(message, e);
}
}

private static final class BlockingJournal extends Journal {
private static final long STALE_LOG_ID = 7;
private static final long NEWER_LOG_ID = 9;

private final CountDownLatch stalePersistStarted;
private final CountDownLatch releaseStalePersist;
private final CountDownLatch newerPersisted;

BlockingJournal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager, CountDownLatch stalePersistStarted,
CountDownLatch releaseStalePersist, CountDownLatch newerPersisted) {
super(journalIndex, journalDirectory, conf, ledgerDirsManager);
this.stalePersistStarted = stalePersistStarted;
this.releaseStalePersist = releaseStalePersist;
this.newerPersisted = newerPersisted;
}

@Override
protected void persistLastLogMark(LastLogMark mark) throws NoWritableLedgerDirException {
long logFileId = mark.getCurMark().getLogFileId();
if (logFileId == STALE_LOG_ID) {
stalePersistStarted.countDown();
awaitLatch(releaseStalePersist, "Timed out waiting to release stale checkpoint");
if (!isCheckpointCompleteLockHeldByCurrentThread()) {
awaitLatch(newerPersisted, "Timed out waiting for newer checkpoint to persist");
}
}
super.persistLastLogMark(mark);
if (logFileId == NEWER_LOG_ID) {
newerPersisted.countDown();
}
}
}
}