Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
aa4dcc1
Change IMetaProxy API to return an ImmutableList of AbstractBackupPat…
mattl-netflix May 1, 2024
f9de9a7
Include incremental secondary index files in restore.
mattl-netflix May 1, 2024
a5a6bb0
Remove unused code from our restore tooling.
mattl-netflix Mar 9, 2024
6b19184
Remove waitForCompletion parameter to download method which is always…
mattl-netflix Mar 10, 2024
71756d1
Remove all non-javadoc comments that don't provide warnings to future…
mattl-netflix Mar 10, 2024
50b41cb
Remove redundant logging.
mattl-netflix Mar 10, 2024
515eb2f
Remove redundant vertical whitespace.
mattl-netflix Mar 10, 2024
948485a
Remove backup racs as that configuration is not used in practice and …
mattl-netflix Mar 10, 2024
78aca40
Consolidate restore status API.
mattl-netflix Apr 10, 2024
78dae29
Cease wiping the data directory in advance of restore. It is counter-…
mattl-netflix Apr 10, 2024
a7c0975
Move getBackupDirectories to BackupRestoreUtil and change its signatu…
mattl-netflix May 2, 2024
71c045b
Cease stopping Cassandra in the restore process and reveal the abilit…
mattl-netflix May 2, 2024
6b1027a
Rename file variable to be idomatic Java.
mattl-netflix Dec 27, 2024
728645b
Remove redundant File.exists() calls.
mattl-netflix Dec 27, 2024
67a1c0a
Use Path.resolve rather than switching between Path and String.
mattl-netflix Dec 27, 2024
9c3be36
Move to canonical method of moving one directory to another in Java.
mattl-netflix Dec 31, 2024
a1b7e8d
Log failed imports.
mattl-netflix Mar 17, 2025
e1ebaa5
Change policy to not replace existing files by default when doing an …
mattl-netflix Mar 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@
import com.netflix.priam.utils.SystemUtils;
import java.io.File;
import java.io.FileFilter;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -95,40 +89,6 @@ protected String getKeyspace(File backupDir) {
*/
protected abstract void processColumnFamily(File backupDir) throws Exception;

/**
* Get all the backup directories for Cassandra.
*
* @param config to get the location of the data folder.
* @param monitoringFolder folder where cassandra backup's are configured.
* @return Set of the path(s) containing the backup folder for each columnfamily.
* @throws Exception incase of IOException.
*/
public static Set<Path> getBackupDirectories(IConfiguration config, String monitoringFolder)
throws Exception {
HashSet<Path> backupPaths = new HashSet<>();
if (config.getDataFileLocation() == null) return backupPaths;
Path dataPath = Paths.get(config.getDataFileLocation());
if (Files.exists(dataPath) && Files.isDirectory(dataPath))
try (DirectoryStream<Path> directoryStream =
Files.newDirectoryStream(dataPath, path -> Files.isDirectory(path))) {
for (Path keyspaceDirPath : directoryStream) {
try (DirectoryStream<Path> keyspaceStream =
Files.newDirectoryStream(
keyspaceDirPath, path -> Files.isDirectory(path))) {
for (Path columnfamilyDirPath : keyspaceStream) {
Path backupDirPath =
Paths.get(columnfamilyDirPath.toString(), monitoringFolder);
if (Files.exists(backupDirPath) && Files.isDirectory(backupDirPath)) {
logger.debug("Backup folder: {}", backupDirPath);
backupPaths.add(backupDirPath);
}
}
}
}
}
return backupPaths;
}

protected static File[] getSecondaryIndexDirectories(File backupDir) {
FileFilter filter = (file) -> file.getName().startsWith(".") && isAReadableDirectory(file);
return Optional.ofNullable(backupDir.listFiles(filter)).orElse(new File[] {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,24 @@ protected String match(Date start, Date end) {

/** Local restore file */
public File newRestoreFile() {
File return_;
String dataDir = config.getDataFileLocation();
File file;
String dataDir = config.getRestoreDataLocation();
switch (type) {
case SECONDARY_INDEX_V2:
String restoreFileName =
PATH_JOINER.join(dataDir, keyspace, columnFamily, indexDir, fileName);
return_ = new File(restoreFileName);
file =
new File(
PATH_JOINER.join(
dataDir, keyspace, columnFamily, indexDir, fileName));
break;
case META_V2:
return_ = new File(PATH_JOINER.join(config.getDataFileLocation(), fileName));
file = new File(PATH_JOINER.join(config.getDataFileLocation(), fileName));
break;
default:
return_ = new File(PATH_JOINER.join(dataDir, keyspace, columnFamily, fileName));
file = new File(PATH_JOINER.join(dataDir, keyspace, columnFamily, fileName));
}
File parent = new File(return_.getParent());
File parent = new File(file.getParent());
if (!parent.exists()) parent.mkdirs();
return return_;
return file;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package com.netflix.priam.backup;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.priam.backupv2.IMetaProxy;
import com.netflix.priam.utils.DateUtil;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -130,4 +135,34 @@ public final boolean isFiltered(String keyspace, String columnFamilyDir) {
|| includeFilter.get(keyspace).contains(columnFamilyName)));
return false;
}

/**
* Get all the backup directories for Cassandra.
*
* @param dataDirectory the location of the data folder.
* @param monitoringFolder folder where cassandra backup's are configured.
* @return Set of the path(s) containing the backup folder for each columnfamily.
* @throws IOException
*/
public static ImmutableSet<Path> getBackupDirectories(
String dataDirectory, String monitoringFolder) throws IOException {
ImmutableSet.Builder<Path> backupPaths = ImmutableSet.builder();
Path dataPath = Paths.get(dataDirectory);
if (Files.isDirectory(dataPath))
try (DirectoryStream<Path> directoryStream =
Files.newDirectoryStream(dataPath, Files::isDirectory)) {
for (Path keyspaceDirPath : directoryStream) {
try (DirectoryStream<Path> keyspaceStream =
Files.newDirectoryStream(keyspaceDirPath, Files::isDirectory)) {
for (Path columnfamilyDirPath : keyspaceStream) {
Path backupDirPath = columnfamilyDirPath.resolve(monitoringFolder);
if (Files.isDirectory(backupDirPath)) {
backupPaths.add(backupDirPath);
}
}
}
}
}
return backupPaths.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public static TaskTimer getTimer(

private static void cleanOldBackups(IConfiguration configuration) throws Exception {
Set<Path> backupPaths =
AbstractBackup.getBackupDirectories(configuration, INCREMENTAL_BACKUP_FOLDER);
BackupRestoreUtil.getBackupDirectories(
configuration.getDataFileLocation(), INCREMENTAL_BACKUP_FOLDER);
for (Path backupDirPath : backupPaths) {
FileUtils.cleanDirectory(backupDirPath.toFile());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ public BackupTTLTask(

@Override
public void execute() throws Exception {
if (instanceState.getRestoreStatus() != null
&& instanceState.getRestoreStatus().getStatus() != null
&& instanceState.getRestoreStatus().getStatus() == Status.STARTED) {
if (instanceState.isRestoring()) {
logger.info("Not executing the TTL Task for backups as Priam is in restore mode.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ public void execute() throws Exception {
return;
}

if (instanceState.getRestoreStatus() != null
&& instanceState.getRestoreStatus().getStatus() != null
&& instanceState.getRestoreStatus().getStatus() == Status.STARTED) {
if (instanceState.isRestoring()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this cleanup 🙌

logger.info("Skipping backup verification. Priam is in restore mode.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ public static TaskTimer getTimer(String cron) throws IllegalArgumentException {

static void cleanOldBackups(IConfiguration config) throws Exception {
// Clean up all the backup directories, if any.
Set<Path> backupPaths = AbstractBackup.getBackupDirectories(config, SNAPSHOT_FOLDER);
Set<Path> backupPaths =
BackupRestoreUtil.getBackupDirectories(
config.getDataFileLocation(), SNAPSHOT_FOLDER);
for (Path backupDirPath : backupPaths)
try (DirectoryStream<Path> directoryStream =
Files.newDirectoryStream(backupDirPath, Files::isDirectory)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ default int getBackupRetentionDays() {
return 0;
}

/** @return Get list of racs to backup. Backup all racs if empty */
default List<String> getBackupRacs() {
return Collections.EMPTY_LIST;
}

/**
* Backup location i.e. remote file system to upload backups. e.g. for S3 it will be s3 bucket
* name
Expand Down Expand Up @@ -403,6 +398,10 @@ default String getRestoreSnapshot() {
return StringUtils.EMPTY;
}

default String getRestoreDataLocation() {
return getCassandraBaseDirectory() + "/restore";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm my understanding on our systems this would be something like /mnt/data/cassandra/restore? If so, +1

}

/** @return Get the region to connect to SDB for instance identity */
default String getSDBInstanceIdentityRegion() {
return "us-east-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ public int getBackupRetentionDays() {
return config.get(PRIAM_PRE + ".backup.retention", 0);
}

@Override
public List<String> getBackupRacs() {
return config.getList(PRIAM_PRE + ".backup.racs");
}

@Override
public String getRestorePrefix() {
return config.get(PRIAM_PRE + ".restore.prefix");
Expand Down Expand Up @@ -283,6 +278,11 @@ public String getRestoreSnapshot() {
return config.get(PRIAM_PRE + ".restore.snapshot", "");
}

@Override
public String getRestoreDataLocation() {
return config.get(PRIAM_PRE + ".restore.data.location");
}

@Override
public boolean isRestoreEncrypted() {
return config.get(PRIAM_PRE + ".encrypted.restore.enabled", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@
*/
package com.netflix.priam.connection;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.netflix.priam.backup.BackupRestoreUtil;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.health.CassandraMonitor;
import com.netflix.priam.utils.RetryableCallable;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import javax.inject.Inject;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.file.StandardCopyOption.*;

/** This class encapsulates interactions with Cassandra. Created by aagrawal on 6/19/18. */
public class CassandraOperations implements ICassandraOperations {
private static final Logger logger = LoggerFactory.getLogger(CassandraOperations.class);
Expand Down Expand Up @@ -202,4 +211,51 @@ public List<Map<String, String>> gossipInfo() throws Exception {
}
return returnPublicIpSourceIpMap;
}

@Override
public List<String> importAll(String srcDir) throws IOException {
List<String> failedImports = new ArrayList<>();
if (CassandraMonitor.hasCassadraStarted()) {
for (Path tableDir : BackupRestoreUtil.getBackupDirectories(srcDir, "")) {
String keyspace = tableDir.getParent().getFileName().toString();
String table = tableDir.getFileName().toString().split("-")[0];
failedImports.addAll(importData(keyspace, table, tableDir.toString()));
}
} else {
Path target = Paths.get(configuration.getDataFileLocation());
Path source = Paths.get(srcDir);
Files.walkFileTree(source, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
Path targetDir = target.resolve(source.relativize(dir));
Files.createDirectories(targetDir);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.move(file, target.resolve(source.relativize(file)), ATOMIC_MOVE, COPY_ATTRIBUTES);
return FileVisitResult.CONTINUE;
}
});
}
return failedImports;
}

private List<String> importData(String keyspace, String table, String source)
throws IOException {
try (JMXNodeTool nodeTool = JMXNodeTool.instance(configuration)) {
return nodeTool.importNewSSTables(
keyspace,
table,
ImmutableSet.of(source),
false /* resetLevel */,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These defaults LGTM, do we want to add options to override them?

false /* clearRepaired */,
true /* verifySSTables */,
true /* verifyTokens */,
true /* invalidateCaches */,
false /* extendedVerify */,
false /* copyData */);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public interface ICassandraOperations {
void forceKeyspaceFlush(String keyspaceName) throws Exception;

List<Map<String, String>> gossipInfo() throws Exception;

/**
* import sstables from the directory at srcDir into the configured data directory. importAll
* Will just move them if Cassandra hasn't started.
*/
List<String> importAll(String srcDir) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ public static Boolean hasCassadraStarted() {
}

// Added for testing only
public static void setIsCassadraStarted() {
public static void setIsCassandraStarted(boolean newStartedState) {
// Setting cassandra flag to true
isCassandraStarted.set(true);
isCassandraStarted.set(newStartedState);
}
}
24 changes: 23 additions & 1 deletion priam/src/main/java/com/netflix/priam/health/InstanceState.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import com.netflix.priam.backup.BackupMetadata;
import com.netflix.priam.backup.Status;
import com.netflix.priam.utils.DateUtil;
import com.netflix.priam.utils.GsonJsonSerializer;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
Expand Down Expand Up @@ -143,12 +146,31 @@ public boolean isHealthy() {
return isHealthy.get();
}

private boolean isRestoring() {
public boolean isRestoring() {
return restoreStatus != null
&& restoreStatus.getStatus() != null
&& restoreStatus.getStatus() == Status.STARTED;
}

public void startRestore(DateUtil.DateRange dateRange) {
restoreStatus.resetStatus();
restoreStatus.setStartDateRange(
LocalDateTime.ofInstant(dateRange.getStartTime(), ZoneId.of("UTC")));
Date endTime = new Date(dateRange.getEndTime().toEpochMilli());
restoreStatus.setEndDateRange(DateUtil.convert(endTime));
restoreStatus.setExecutionStartTime(LocalDateTime.now());
setRestoreStatus(Status.STARTED);
}

public void endRestore(Status status, LocalDateTime endTime) {
restoreStatus.setExecutionEndTime(endTime);
setRestoreStatus(status);
}

public void setRestoreMetaFile(String path) {
restoreStatus.setSnapshotMetaFile(path);
}

private void setHealthy() {
this.isHealthy.set(
isRestoring()
Expand Down
Loading