Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions db_stress_tool/batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
namespace ROCKSDB_NAMESPACE {
class BatchedOpsStressTest : public StressTest {
public:
BatchedOpsStressTest() = default;
BatchedOpsStressTest(const std::string& db_path, const std::string& ev_dir,
const std::string& sec_base)
: StressTest(db_path, ev_dir, sec_base) {}
virtual ~BatchedOpsStressTest() = default;

bool IsStateTracked() const override { return false; }
Expand Down Expand Up @@ -722,7 +724,11 @@ class BatchedOpsStressTest : public StressTest {
}
};

StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }
StressTest* CreateBatchedOpsStressTest(const std::string& db_path,
const std::string& ev_dir,
const std::string& sec_base) {
return new BatchedOpsStressTest(db_path, ev_dir, sec_base);
}

} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS
36 changes: 19 additions & 17 deletions db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
namespace ROCKSDB_NAMESPACE {
class CfConsistencyStressTest : public StressTest {
public:
CfConsistencyStressTest() : batch_id_(0) {}
CfConsistencyStressTest(const std::string& db_path, const std::string& ev_dir,
const std::string& sec_base)
: StressTest(db_path, ev_dir, sec_base), batch_id_(0) {}

~CfConsistencyStressTest() override = default;

Expand Down Expand Up @@ -172,10 +174,10 @@ class CfConsistencyStressTest : public StressTest {
key, &value0);

// Temporarily disable error injection for verification
if (fault_fs_guard) {
fault_fs_guard->DisableThreadLocalErrorInjection(
if (fault_fs_) {
fault_fs_->DisableThreadLocalErrorInjection(
FaultInjectionIOType::kRead);
fault_fs_guard->DisableThreadLocalErrorInjection(
fault_fs_->DisableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataRead);
}

Expand Down Expand Up @@ -224,10 +226,9 @@ class CfConsistencyStressTest : public StressTest {
}

// Enable back error injection disabled for verification
if (fault_fs_guard) {
fault_fs_guard->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kRead);
fault_fs_guard->EnableThreadLocalErrorInjection(
if (fault_fs_) {
fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead);
fault_fs_->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataRead);
}
db_->ReleaseSnapshot(snapshot);
Expand Down Expand Up @@ -340,10 +341,10 @@ class CfConsistencyStressTest : public StressTest {
&cmp_result);

// Temporarily disable error injection for verification
if (fault_fs_guard) {
fault_fs_guard->DisableThreadLocalErrorInjection(
if (fault_fs_) {
fault_fs_->DisableThreadLocalErrorInjection(
FaultInjectionIOType::kRead);
fault_fs_guard->DisableThreadLocalErrorInjection(
fault_fs_->DisableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataRead);
}

Expand Down Expand Up @@ -484,10 +485,9 @@ class CfConsistencyStressTest : public StressTest {
}

// Enable back error injection disabled for verification
if (fault_fs_guard) {
fault_fs_guard->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kRead);
fault_fs_guard->EnableThreadLocalErrorInjection(
if (fault_fs_) {
fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead);
fault_fs_->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataRead);
}
}
Expand Down Expand Up @@ -1148,8 +1148,10 @@ class CfConsistencyStressTest : public StressTest {
std::atomic<uint32_t> batch_id_;
};

StressTest* CreateCfConsistencyStressTest() {
return new CfConsistencyStressTest();
StressTest* CreateCfConsistencyStressTest(const std::string& db_path,
const std::string& ev_dir,
const std::string& sec_base) {
return new CfConsistencyStressTest(db_path, ev_dir, sec_base);
}

} // namespace ROCKSDB_NAMESPACE
Expand Down
41 changes: 23 additions & 18 deletions db_stress_tool/db_stress_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

ROCKSDB_NAMESPACE::Env* db_stress_listener_env = nullptr;
ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr;
std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> compressed_secondary_cache;
std::shared_ptr<ROCKSDB_NAMESPACE::Cache> block_cache;
std::shared_ptr<ROCKSDB_NAMESPACE::WriteBufferManager> shared_wbm;

Check warning on line 25 in db_stress_tool/db_stress_common.cc

View workflow job for this annotation

GitHub Actions / clang-tidy

variable 'shared_wbm' is non-const and globally accessible, consider making it const [cppcoreguidelines-avoid-non-const-global-variables]
std::shared_ptr<ROCKSDB_NAMESPACE::RateLimiter> shared_rate_limiter;

Check warning on line 26 in db_stress_tool/db_stress_common.cc

View workflow job for this annotation

GitHub Actions / clang-tidy

variable 'shared_rate_limiter' is non-const and globally accessible, consider making it const [cppcoreguidelines-avoid-non-const-global-variables]
enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
ROCKSDB_NAMESPACE::kSnappyCompression;
enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
Expand Down Expand Up @@ -230,37 +231,38 @@
}

#ifndef NDEBUG
static void SetupFaultInjectionForRemoteCompaction(SharedState* shared) {
if (!fault_fs_guard) {
static void SetupFaultInjectionForRemoteCompaction(
FaultInjectionTestFS* fault_fs, SharedState* shared) {
if (!fault_fs) {
return;
}

fault_fs_guard->SetThreadLocalErrorContext(
fault_fs->SetThreadLocalErrorContext(
FaultInjectionIOType::kRead, shared->GetSeed(), FLAGS_read_fault_one_in,
FLAGS_inject_error_severity == 1 /* retryable */,
FLAGS_inject_error_severity == 2 /* has_data_loss*/);
fault_fs_guard->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead);
fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead);

fault_fs_guard->SetThreadLocalErrorContext(
fault_fs->SetThreadLocalErrorContext(
FaultInjectionIOType::kWrite, shared->GetSeed(), FLAGS_write_fault_one_in,
FLAGS_inject_error_severity == 1 /* retryable */,
FLAGS_inject_error_severity == 2 /* has_data_loss*/);
fault_fs_guard->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);

fault_fs_guard->SetThreadLocalErrorContext(
fault_fs->SetThreadLocalErrorContext(
FaultInjectionIOType::kMetadataRead, shared->GetSeed(),
FLAGS_metadata_read_fault_one_in,
FLAGS_inject_error_severity == 1 /* retryable */,
FLAGS_inject_error_severity == 2 /* has_data_loss*/);
fault_fs_guard->EnableThreadLocalErrorInjection(
fault_fs->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataRead);

fault_fs_guard->SetThreadLocalErrorContext(
fault_fs->SetThreadLocalErrorContext(
FaultInjectionIOType::kMetadataWrite, shared->GetSeed(),
FLAGS_metadata_write_fault_one_in,
FLAGS_inject_error_severity == 1 /* retryable */,
FLAGS_inject_error_severity == 2 /* has_data_loss*/);
fault_fs_guard->EnableThreadLocalErrorInjection(
fault_fs->EnableThreadLocalErrorInjection(
FaultInjectionIOType::kMetadataWrite);
}
#endif // NDEBUG
Expand Down Expand Up @@ -310,11 +312,13 @@
return override_options;
}

static Status CleanupOutputDirectory(const std::string& output_directory) {
static Status CleanupOutputDirectory(
const std::string& output_directory,
[[maybe_unused]] FaultInjectionTestFS* fault_fs) {
#ifndef NDEBUG
// Temporarily disable fault injection to ensure deletion always succeeds
if (fault_fs_guard) {
fault_fs_guard->DisableAllThreadLocalErrorInjection();
if (fault_fs) {
fault_fs->DisableAllThreadLocalErrorInjection();
}
#endif // NDEBUG

Expand All @@ -338,8 +342,8 @@

#ifndef NDEBUG
// Re-enable fault injection after deletion
if (fault_fs_guard) {
fault_fs_guard->EnableAllThreadLocalErrorInjection();
if (fault_fs) {
fault_fs->EnableAllThreadLocalErrorInjection();
}
#endif // NDEBUG

Expand Down Expand Up @@ -428,7 +432,7 @@
}

if (!open_compact_options.allow_resumption) {
CleanupOutputDirectory(output_directory);
CleanupOutputDirectory(output_directory, stress_test->GetFaultFs().get());
}

std::shared_ptr<std::atomic<bool>> canceled = nullptr;
Expand Down Expand Up @@ -460,7 +464,8 @@
assert(stress_test != nullptr);

#ifndef NDEBUG
SetupFaultInjectionForRemoteCompaction(shared);
SetupFaultInjectionForRemoteCompaction(stress_test->GetFaultFs().get(),
shared);
#endif // NDEBUG

// Tracks the duration (in microseconds) of the most recent successfully
Expand Down
22 changes: 17 additions & 5 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ DECLARE_bool(universal_reduce_file_locking);
DECLARE_bool(use_multiscan);
DECLARE_bool(multiscan_use_async_io);

// Multi-DB stress test flags
DECLARE_int32(num_dbs);

// Compaction deletion trigger declarations for stress testing
DECLARE_bool(enable_compaction_on_deletion_trigger);
DECLARE_uint64(compaction_on_deletion_min_file_size);
Expand All @@ -469,10 +472,11 @@ constexpr uint32_t kLargePrimeForCommonFactorSkew = 1872439133;
// wrapped posix environment
extern ROCKSDB_NAMESPACE::Env* db_stress_env;
extern ROCKSDB_NAMESPACE::Env* db_stress_listener_env;
extern std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
extern std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache>
compressed_secondary_cache;
extern std::shared_ptr<ROCKSDB_NAMESPACE::Cache> block_cache;
extern std::shared_ptr<ROCKSDB_NAMESPACE::WriteBufferManager> shared_wbm;
extern std::shared_ptr<ROCKSDB_NAMESPACE::RateLimiter> shared_rate_limiter;

extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e;
extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e;
Expand Down Expand Up @@ -815,10 +819,18 @@ AttributeGroups GenerateAttributeGroups(
const std::vector<ColumnFamilyHandle*>& cfhs, uint32_t value_base,
const Slice& slice);

StressTest* CreateCfConsistencyStressTest();
StressTest* CreateBatchedOpsStressTest();
StressTest* CreateNonBatchedOpsStressTest();
StressTest* CreateMultiOpsTxnsStressTest();
StressTest* CreateCfConsistencyStressTest(const std::string& db_path,
const std::string& ev_dir,
const std::string& sec_base);
StressTest* CreateBatchedOpsStressTest(const std::string& db_path,
const std::string& ev_dir,
const std::string& sec_base);
StressTest* CreateNonBatchedOpsStressTest(const std::string& db_path,
const std::string& ev_dir,
const std::string& sec_base);
StressTest* CreateMultiOpsTxnsStressTest(const std::string& db_path,
const std::string& ev_dir,
const std::string& sec_base);
void CheckAndSetOptionsForMultiOpsTxnStressTest();
void InitializeHotKeyGenerator(double alpha);
int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
Expand Down
23 changes: 13 additions & 10 deletions db_stress_tool/db_stress_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ bool RunStressTestImpl(SharedState* shared) {
StressTest* stress = shared->GetStressTest();

if (shared->ShouldVerifyAtBeginning() && FLAGS_preserve_unverified_changes) {
Status s = InitUnverifiedSubdir(FLAGS_db);
if (s.ok() && !FLAGS_expected_values_dir.empty()) {
s = InitUnverifiedSubdir(FLAGS_expected_values_dir);
const std::string& db_path = stress->GetDbPath();
const std::string& expected_values_dir = stress->GetExpectedValuesDir();
Status s = InitUnverifiedSubdir(db_path);
if (s.ok() && !expected_values_dir.empty()) {
s = InitUnverifiedSubdir(expected_values_dir);
}
if (!s.ok()) {
fprintf(stderr, "Failed to setup unverified state dir: %s\n",
Expand Down Expand Up @@ -159,9 +161,9 @@ bool RunStressTestImpl(SharedState* shared) {
fprintf(stderr, "Crash-recovery verification failed :(\n");
} else {
fprintf(stdout, "Crash-recovery verification passed :)\n");
Status s = DestroyUnverifiedSubdir(FLAGS_db);
if (s.ok() && !FLAGS_expected_values_dir.empty()) {
s = DestroyUnverifiedSubdir(FLAGS_expected_values_dir);
Status s = DestroyUnverifiedSubdir(stress->GetDbPath());
if (s.ok() && !stress->GetExpectedValuesDir().empty()) {
s = DestroyUnverifiedSubdir(stress->GetExpectedValuesDir());
}
if (!s.ok()) {
fprintf(stderr, "Failed to cleanup unverified state dir: %s\n",
Expand All @@ -174,15 +176,16 @@ bool RunStressTestImpl(SharedState* shared) {
if (!FLAGS_verification_only) {
// This is after the verification step to avoid making all those `Get()`s
// and `MultiGet()`s contend on the DB-wide trace mutex.
if (!FLAGS_expected_values_dir.empty()) {
if (!stress->GetExpectedValuesDir().empty()) {
stress->TrackExpectedState(shared);
}

if (FLAGS_sync_fault_injection || FLAGS_write_fault_one_in > 0) {
fault_fs_guard->SetFilesystemDirectWritable(false);
fault_fs_guard->SetInjectUnsyncedDataLoss(FLAGS_sync_fault_injection);
auto fault_fs = stress->GetFaultFs();
fault_fs->SetFilesystemDirectWritable(false);
fault_fs->SetInjectUnsyncedDataLoss(FLAGS_sync_fault_injection);
if (FLAGS_exclude_wal_from_write_fault_injection) {
fault_fs_guard->SetFileTypesExcludedFromWriteFaultInjection(
fault_fs->SetFileTypesExcludedFromWriteFaultInjection(
{FileType::kWalFile});
}
}
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_env_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class DbStressFSWrapper : public FileSystemWrapper {
static const char* kClassName() { return "DbStressFS"; }
const char* Name() const override { return kClassName(); }

const std::shared_ptr<FileSystem>& GetInnerFS() const { return target_; }

IOStatus NewRandomAccessFile(const std::string& f,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* r,
Expand Down
5 changes: 5 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1633,4 +1633,9 @@ DEFINE_bool(use_multiscan, false,
DEFINE_bool(multiscan_use_async_io, false,
"If set, enable async_io for MultiScan operations.");

DEFINE_int32(num_dbs, 1,
"Number of DB instances to run in parallel. "
"When > 1, each DB uses FLAGS_db/db_<i> as its path. "
"Shared resources (Env, cache) are shared across all DBs.");

#endif // GFLAGS
16 changes: 11 additions & 5 deletions db_stress_tool/db_stress_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ namespace ROCKSDB_NAMESPACE {

#ifdef GFLAGS

UniqueIdVerifier::UniqueIdVerifier(const std::string& dir)
: path_(dir + "/.unique_ids") {
// TODO: consider using expected_values_dir instead, but this is more
// convenient for now.
UniqueIdVerifier::UniqueIdVerifier(const std::string& db_name,
const std::string& expected_values_dir)
: path_((!expected_values_dir.empty() ? expected_values_dir : db_name) +
"/.unique_ids") {
// We expect such a small number of files generated during this test
// (thousands?), checking full 192-bit IDs for uniqueness is a very
// weak check. For a stronger check, we pick a specific 64-bit
Expand All @@ -25,9 +29,11 @@ UniqueIdVerifier::UniqueIdVerifier(const std::string& dir)
// very good probability for the quantities in this test.
offset_ = Random::GetTLSInstance()->Uniform(17); // 0 to 16

// Always use local (default) filesystem for this bookkeeping file,
// even when DB is on a remote/warm filesystem, to avoid issues with
// weaker durability guarantees on remote filesystems.
// Use expected_values_dir (always a local path) when available, falling
// back to db_name. Always use the default (local) filesystem so
// bookkeeping avoids durability issues with remote/custom filesystems.
const std::string& dir =
!expected_values_dir.empty() ? expected_values_dir : db_name;
const std::shared_ptr<FileSystem> fs = Env::Default()->GetFileSystem();
IOOptions opts;

Expand Down
Loading
Loading