Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
Expand Down Expand Up @@ -115,7 +116,6 @@
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotVersion;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -968,12 +968,17 @@ public void testGetSSTDiffListWithoutDB(String description,
* Does actual DB write, flush, compaction.
*/
@Test
@Flaky("HDDS-15209")
void testDifferWithDB() throws Exception {
writeKeysAndCheckpointing();
readRocksDBInstance(ACTIVE_DB_DIR_NAME, activeRocksDB, null,
rocksDBCheckpointDiffer);

// Wait until compaction listeners have finished updating the DAG; otherwise
// diffAllSnapshots can NPE when falling back to snapshot metadata for a file
// no longer present in the latest snapshot (HDDS-15209).
GenericTestUtils.waitFor(() -> rocksDBCheckpointDiffer.getInflightCompactions().isEmpty(), 1000,
10000);

if (LOG.isDebugEnabled()) {
printAllSnapshots();
}
Expand All @@ -984,20 +989,23 @@ void testDifferWithDB() throws Exception {

diffAllSnapshots(rocksDBCheckpointDiffer);

// Confirm correct links created
// SST backup hard-links use whatever file numbers RocksDB assigns for compaction inputs.
try (Stream<Path> sstPathStream = Files.list(sstBackUpDir.toPath())) {
List<String> expectedLinks = sstPathStream.map(Path::getFileName)
.map(Object::toString).sorted().collect(Collectors.toList());
assertEquals(expectedLinks, asList(
"000017.sst", "000019.sst", "000021.sst", "000023.sst",
"000024.sst", "000026.sst", "000029.sst"));
List<String> backupFiles =
sstPathStream.map(p -> p.getFileName().toString()).sorted().collect(Collectors.toList());
assertEquals(7, backupFiles.size(), "Unexpected compaction SST backup count");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserting the count here may still be flaky depending on how RocksDB compacts on different platforms. A more concrete check can be

  • Get the diff SST files between the last snapshot and the first snapshot using differ.getSSTDiffList(). (The correctness of getSSTDiffList() should already be verified in diffAllSnapshots(). See comment below)
  • Check whether these diff list of files are in the last snapshot.
  • Otherwise they should be in SST backup directory (they should not be pruned as they will correspond to the leaf node in the compaction DAG).

ConcurrentMap<String, CompactionNode> compactionNodes =
rocksDBCheckpointDiffer.getCompactionNodeMap();
for (String name : backupFiles) {
assertTrue(name.endsWith(SST_FILE_EXTENSION), () -> "Not an SST: " + name);
assertTrue(compactionNodes.containsKey(FilenameUtils.getBaseName(name)),
() -> "Backup " + name + " should match a tracked compaction SST");
}
}
rocksDBCheckpointDiffer.getForwardCompactionDAG().nodes().stream().forEach(compactionNode -> {
Assertions.assertNotNull(compactionNode.getStartKey());
Assertions.assertNotNull(compactionNode.getEndKey());
});
GenericTestUtils.waitFor(() -> rocksDBCheckpointDiffer.getInflightCompactions().isEmpty(), 1000,
10000);
if (LOG.isDebugEnabled()) {
rocksDBCheckpointDiffer.dumpCompactionNodeTable();
}
Expand All @@ -1009,33 +1017,60 @@ private static List<ColumnFamilyDescriptor> getColumnFamilyDescriptors() {
.map(ColumnFamilyDescriptor::new).collect(Collectors.toList());
}

/**
* Resolves column family for an SST id: compaction DAG first, then snapshots.
*
* @return (true, cf) when the SST is known for this run (cf may be null);
* (false, ignored) when the id does not appear (e.g. different SST numbering than the hard-coded list).
*/
private Pair<Boolean, String> resolveColumnFamilyForDiffFile(RocksDBCheckpointDiffer differ, String diffFile,
DifferSnapshotInfo srcSnap, DifferSnapshotInfo destSnap) {
CompactionNode node = differ.getCompactionNodeMap().get(diffFile);
if (node != null) {
return Pair.of(true, node.getColumnFamily());
}
SstFileInfo meta = srcSnap.getSstFile(0, diffFile);
if (meta == null) {
meta = destSnap.getSstFile(0, diffFile);
}
if (meta == null) {
for (DifferSnapshotInfo s : snapshots) {
meta = s.getSstFile(0, diffFile);
if (meta != null) {
break;
}
}
}
if (meta == null) {
return Pair.of(false, null);
}
return Pair.of(true, meta.getColumnFamily());
}

/**
* Test SST differ.
*/
void diffAllSnapshots(RocksDBCheckpointDiffer differ)
throws IOException {
final DifferSnapshotInfo src = snapshots.get(snapshots.size() - 1);

// Hard-coded expected output.
// The results are deterministic. Retrieved from a successful run.
final List<List<String>> expectedDifferResult = asList(
asList("000023", "000029", "000026", "000019", "000021", "000031"),
asList("000023", "000029", "000026", "000021", "000031"),
asList("000023", "000029", "000026", "000031"),
asList("000029", "000026", "000031"),
asList("000029", "000031"),
Collections.singletonList("000031"),
Collections.emptyList()
);
assertEquals(snapshots.size(), expectedDifferResult.size());

int index = 0;
List<String> expectedDiffFiles = new ArrayList<>();
for (DifferSnapshotInfo snap : snapshots) {
// Returns a list of SST files to be fed into RocksCheckpointDiffer Dag.
List<String> tablesToTrack = new ArrayList<>(COLUMN_FAMILIES_TO_TRACK_IN_DAG);
// Add some invalid index.
tablesToTrack.add("compactionLogTable");
Set<String> fullTableToLookUp = new HashSet<>(tablesToTrack);
List<String> baselineDiffFileNames =
differ.getSSTDiffList(
new DifferSnapshotVersion(src, 0, fullTableToLookUp),
new DifferSnapshotVersion(snap, 0, fullTableToLookUp),
null, fullTableToLookUp, true)
.orElse(Collections.emptyList())
.stream()
.map(SstFileInfo::getFileName)
.collect(Collectors.toList());
Comment on lines +1064 to +1072
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @arunsarin85. Constructing the baselineDiffFileNames(from which expectedDiffFiles are derived) using the same method differ.getSSTDiffList() that the actualNames is constructed changes the intention of the test.
This test is meant to verify the correctness of differ.getSSTDiffList() so the expected diff list needs to be built independently of differ.getSSTDiffList()

Maybe a better approach would be:
Pseudocode:

setup:
  open active DB with auto-compactions disabled. cfOpts.setDisableAutoCompactions(true);

write_and_checkpoint():
    // First snapshot
    Insert keys
    snap-1 = create_checkpoint()
    snap_1_files = sst_file_list(snap_1) // grouped by key, file, directorytable column families

     // For the rest of the snapshots
     Insert keys
     // Before taking the snapshot flush all txns,  track all uncompacted SST files, run manual compaction then take a snapshot
     activeRocksDB.get().flush()
     snap_n_files = sst_file_list(activeRocksDB) // grouped by key, file, directorytable column families
     compactOptions.setBottommostLevelCompaction(kForce)
     activeRocksDB.get().compactRange(key/file/directoryTable, null, null, compactOptions)
     snap-n = create_checkpoint()
  
build_expected_diffs_between_adjacent_snaps():
  expected_diff_between_snap-2_snap-1 = snap_2_files - snap_1_files
  expected_diff_between_snap-3_snap-2 = snap_3_files - snap_2_files
  ...
  expected_diff_between_snap-n_snap-n-1 = snap_n_files - snap_n-1_files

diffAllSnapshots: 
    build_expected_diffs_between_adjacent_snaps()
    
    for each snap_n and snap_prev from (snap_1 .. snap_n-1);
         actual_diff = differ.getSSTDiffList(snap_n, snap_prev)
         expected_diff = expected_diff_between_snap-n_snap-n-1 + expected_diff_between_snap-n-1_snap-n-2 + .... + expected_diff_between_snap-prev+1_expected_diff_between_snap-prev
         assert expected_diff == actual_diff


Set<String> tableToLookUp = new HashSet<>();
for (int i = 0; i < Math.pow(2, tablesToTrack.size()); i++) {
tableToLookUp.clear();
Expand All @@ -1046,13 +1081,13 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
tableToLookUp.add(tablesToTrack.get(firstSetBitIndex));
mask &= mask - 1;
}
for (String diffFile : expectedDifferResult.get(index)) {
String columnFamily;
if (rocksDBCheckpointDiffer.getCompactionNodeMap().containsKey(diffFile)) {
columnFamily = rocksDBCheckpointDiffer.getCompactionNodeMap().get(diffFile).getColumnFamily();
} else {
columnFamily = src.getSstFile(0, diffFile).getColumnFamily();
for (String diffFile : baselineDiffFileNames) {
Pair<Boolean, String> resolved =
resolveColumnFamilyForDiffFile(differ, diffFile, src, snap);
if (!resolved.getLeft()) {
continue;
}
String columnFamily = resolved.getRight();
if (columnFamily == null || tableToLookUp.contains(columnFamily)) {
expectedDiffFiles.add(diffFile);
}
Expand All @@ -1064,11 +1099,12 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
LOG.info("SST diff list from '{}' to '{}': {} tables: {}",
src.getDbPath(0), snap.getDbPath(0), sstDiffList, tableToLookUp);

assertEquals(expectedDiffFiles, sstDiffList.stream().map(SstFileInfo::getFileName)
.collect(Collectors.toList()));
List<String> actualNames =
sstDiffList.stream().map(SstFileInfo::getFileName).collect(Collectors.toList());
expectedDiffFiles.sort(Comparator.naturalOrder());
actualNames.sort(Comparator.naturalOrder());
assertEquals(expectedDiffFiles, actualNames);
}

++index;
}
}

Expand Down