diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index b0db2883a6ad..d0a9636caa9b 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -13,7 +13,9 @@ namespace ROCKSDB_NAMESPACE { class BatchedOpsStressTest : public StressTest { public: - BatchedOpsStressTest() = default; + BatchedOpsStressTest(const std::string& db_path, const std::string& ev_dir, + const std::string& sec_base) + : StressTest(db_path, ev_dir, sec_base) {} virtual ~BatchedOpsStressTest() = default; bool IsStateTracked() const override { return false; } @@ -722,7 +724,11 @@ class BatchedOpsStressTest : public StressTest { } }; -StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); } +StressTest* CreateBatchedOpsStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base) { + return new BatchedOpsStressTest(db_path, ev_dir, sec_base); +} } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index d18c47281a69..83146276e307 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -14,7 +14,9 @@ namespace ROCKSDB_NAMESPACE { class CfConsistencyStressTest : public StressTest { public: - CfConsistencyStressTest() : batch_id_(0) {} + CfConsistencyStressTest(const std::string& db_path, const std::string& ev_dir, + const std::string& sec_base) + : StressTest(db_path, ev_dir, sec_base), batch_id_(0) {} ~CfConsistencyStressTest() override = default; @@ -172,10 +174,10 @@ class CfConsistencyStressTest : public StressTest { key, &value0); // Temporarily disable error injection for verification - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } @@ -224,10 +226,9 @@ class CfConsistencyStressTest : public StressTest { } // Enable back error injection disabled for verification - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } db_->ReleaseSnapshot(snapshot); @@ -340,10 +341,10 @@ class CfConsistencyStressTest : public StressTest { &cmp_result); // Temporarily disable error injection for verification - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } @@ -484,10 +485,9 @@ class CfConsistencyStressTest : public StressTest { } // Enable back error injection disabled for verification - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } } @@ -1148,8 +1148,10 @@ class CfConsistencyStressTest : public StressTest { std::atomic batch_id_; }; -StressTest* CreateCfConsistencyStressTest() { - return new CfConsistencyStressTest(); +StressTest* CreateCfConsistencyStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base) { + return new CfConsistencyStressTest(db_path, ev_dir, sec_base); } } // namespace ROCKSDB_NAMESPACE diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index c26401352234..9a5b1b039a73 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -20,9 +20,10 @@ ROCKSDB_NAMESPACE::Env* db_stress_listener_env = nullptr; ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr; -std::shared_ptr fault_fs_guard; std::shared_ptr compressed_secondary_cache; std::shared_ptr block_cache; +std::shared_ptr shared_wbm; +std::shared_ptr shared_rate_limiter; enum ROCKSDB_NAMESPACE::CompressionType compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression; enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e = @@ -230,37 +231,38 @@ void CompressedCacheSetCapacityThread(void* v) { } #ifndef NDEBUG -static void SetupFaultInjectionForRemoteCompaction(SharedState* shared) { - if (!fault_fs_guard) { +static void SetupFaultInjectionForRemoteCompaction( + FaultInjectionTestFS* fault_fs, SharedState* shared) { + if (!fault_fs) { return; } - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs->SetThreadLocalErrorContext( FaultInjectionIOType::kRead, shared->GetSeed(), FLAGS_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); + fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs->SetThreadLocalErrorContext( FaultInjectionIOType::kWrite, shared->GetSeed(), FLAGS_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite); + fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataRead, shared->GetSeed(), FLAGS_metadata_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataWrite, shared->GetSeed(), FLAGS_metadata_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataWrite); } #endif // NDEBUG @@ -310,11 +312,13 @@ static CompactionServiceOptionsOverride CreateOverrideOptions( return override_options; } -static Status CleanupOutputDirectory(const std::string& output_directory) { +static Status CleanupOutputDirectory( + const std::string& output_directory, + [[maybe_unused]] FaultInjectionTestFS* fault_fs) { #ifndef NDEBUG // Temporarily disable fault injection to ensure deletion always succeeds - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs) { + fault_fs->DisableAllThreadLocalErrorInjection(); } #endif // NDEBUG @@ -338,8 +342,8 @@ static Status CleanupOutputDirectory(const std::string& output_directory) { #ifndef NDEBUG // Re-enable fault injection after deletion - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs) { + fault_fs->EnableAllThreadLocalErrorInjection(); } #endif // NDEBUG @@ -428,7 +432,7 @@ static void ProcessRemoteCompactionJob( } if (!open_compact_options.allow_resumption) { - CleanupOutputDirectory(output_directory); + CleanupOutputDirectory(output_directory, stress_test->GetFaultFs().get()); } std::shared_ptr> canceled = nullptr; @@ -460,7 +464,8 @@ void RemoteCompactionWorkerThread(void* v) { assert(stress_test != nullptr); #ifndef NDEBUG - SetupFaultInjectionForRemoteCompaction(shared); + SetupFaultInjectionForRemoteCompaction(stress_test->GetFaultFs().get(), + shared); #endif // NDEBUG // Tracks the duration (in microseconds) of the most recent successfully diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 8ded5d59e1ec..9614e0e3e52c 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -453,6 +453,9 @@ DECLARE_bool(universal_reduce_file_locking); DECLARE_bool(use_multiscan); DECLARE_bool(multiscan_use_async_io); +// Multi-DB stress test flags +DECLARE_int32(num_dbs); + // Compaction deletion trigger declarations for stress testing DECLARE_bool(enable_compaction_on_deletion_trigger); DECLARE_uint64(compaction_on_deletion_min_file_size); @@ -469,10 +472,11 @@ constexpr uint32_t kLargePrimeForCommonFactorSkew = 1872439133; // wrapped posix environment extern ROCKSDB_NAMESPACE::Env* db_stress_env; extern ROCKSDB_NAMESPACE::Env* db_stress_listener_env; -extern std::shared_ptr fault_fs_guard; extern std::shared_ptr compressed_secondary_cache; extern std::shared_ptr block_cache; +extern std::shared_ptr shared_wbm; +extern std::shared_ptr shared_rate_limiter; extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e; extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e; @@ -815,10 +819,18 @@ AttributeGroups GenerateAttributeGroups( const std::vector& cfhs, uint32_t value_base, const Slice& slice); -StressTest* CreateCfConsistencyStressTest(); -StressTest* CreateBatchedOpsStressTest(); -StressTest* CreateNonBatchedOpsStressTest(); -StressTest* CreateMultiOpsTxnsStressTest(); +StressTest* CreateCfConsistencyStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base); +StressTest* CreateBatchedOpsStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base); +StressTest* CreateNonBatchedOpsStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base); +StressTest* CreateMultiOpsTxnsStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base); void CheckAndSetOptionsForMultiOpsTxnStressTest(); void InitializeHotKeyGenerator(double alpha); int64_t GetOneHotKeyID(double rand_seed, int64_t max_key); diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index aa93de97ec4a..0a79d2efd9a4 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -68,9 +68,11 @@ bool RunStressTestImpl(SharedState* shared) { StressTest* stress = shared->GetStressTest(); if (shared->ShouldVerifyAtBeginning() && FLAGS_preserve_unverified_changes) { - Status s = InitUnverifiedSubdir(FLAGS_db); - if (s.ok() && !FLAGS_expected_values_dir.empty()) { - s = InitUnverifiedSubdir(FLAGS_expected_values_dir); + const std::string& db_path = stress->GetDbPath(); + const std::string& expected_values_dir = stress->GetExpectedValuesDir(); + Status s = InitUnverifiedSubdir(db_path); + if (s.ok() && !expected_values_dir.empty()) { + s = InitUnverifiedSubdir(expected_values_dir); } if (!s.ok()) { fprintf(stderr, "Failed to setup unverified state dir: %s\n", @@ -159,9 +161,9 @@ bool RunStressTestImpl(SharedState* shared) { fprintf(stderr, "Crash-recovery verification failed :(\n"); } else { fprintf(stdout, "Crash-recovery verification passed :)\n"); - Status s = DestroyUnverifiedSubdir(FLAGS_db); - if (s.ok() && !FLAGS_expected_values_dir.empty()) { - s = DestroyUnverifiedSubdir(FLAGS_expected_values_dir); + Status s = DestroyUnverifiedSubdir(stress->GetDbPath()); + if (s.ok() && !stress->GetExpectedValuesDir().empty()) { + s = DestroyUnverifiedSubdir(stress->GetExpectedValuesDir()); } if (!s.ok()) { fprintf(stderr, "Failed to cleanup unverified state dir: %s\n", @@ -174,15 +176,16 @@ bool RunStressTestImpl(SharedState* shared) { if (!FLAGS_verification_only) { // This is after the verification step to avoid making all those `Get()`s // and `MultiGet()`s contend on the DB-wide trace mutex. - if (!FLAGS_expected_values_dir.empty()) { + if (!stress->GetExpectedValuesDir().empty()) { stress->TrackExpectedState(shared); } if (FLAGS_sync_fault_injection || FLAGS_write_fault_one_in > 0) { - fault_fs_guard->SetFilesystemDirectWritable(false); - fault_fs_guard->SetInjectUnsyncedDataLoss(FLAGS_sync_fault_injection); + auto fault_fs = stress->GetFaultFs(); + fault_fs->SetFilesystemDirectWritable(false); + fault_fs->SetInjectUnsyncedDataLoss(FLAGS_sync_fault_injection); if (FLAGS_exclude_wal_from_write_fault_injection) { - fault_fs_guard->SetFileTypesExcludedFromWriteFaultInjection( + fault_fs->SetFileTypesExcludedFromWriteFaultInjection( {FileType::kWalFile}); } } diff --git a/db_stress_tool/db_stress_env_wrapper.h b/db_stress_tool/db_stress_env_wrapper.h index 4186bc41f653..081b169c8120 100644 --- a/db_stress_tool/db_stress_env_wrapper.h +++ b/db_stress_tool/db_stress_env_wrapper.h @@ -172,6 +172,8 @@ class DbStressFSWrapper : public FileSystemWrapper { static const char* kClassName() { return "DbStressFS"; } const char* Name() const override { return kClassName(); } + const std::shared_ptr& GetInnerFS() const { return target_; } + IOStatus NewRandomAccessFile(const std::string& f, const FileOptions& file_opts, std::unique_ptr* r, diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index dda35e3ee551..4231b243c6da 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -1633,4 +1633,9 @@ DEFINE_bool(use_multiscan, false, DEFINE_bool(multiscan_use_async_io, false, "If set, enable async_io for MultiScan operations."); +DEFINE_int32(num_dbs, 1, + "Number of DB instances to run in parallel. " + "When > 1, each DB uses FLAGS_db/db_ as its path. " + "Shared resources (Env, cache) are shared across all DBs."); + #endif // GFLAGS diff --git a/db_stress_tool/db_stress_listener.cc b/db_stress_tool/db_stress_listener.cc index 4ac7c8d28c6b..61e0cfa79a49 100644 --- a/db_stress_tool/db_stress_listener.cc +++ b/db_stress_tool/db_stress_listener.cc @@ -15,8 +15,12 @@ namespace ROCKSDB_NAMESPACE { #ifdef GFLAGS -UniqueIdVerifier::UniqueIdVerifier(const std::string& dir) - : path_(dir + "/.unique_ids") { +// TODO: consider using expected_values_dir instead, but this is more +// convenient for now. +UniqueIdVerifier::UniqueIdVerifier(const std::string& db_name, + const std::string& expected_values_dir) + : path_((!expected_values_dir.empty() ? expected_values_dir : db_name) + + "/.unique_ids") { // We expect such a small number of files generated during this test // (thousands?), checking full 192-bit IDs for uniqueness is a very // weak check. For a stronger check, we pick a specific 64-bit @@ -25,9 +29,11 @@ UniqueIdVerifier::UniqueIdVerifier(const std::string& dir) // very good probability for the quantities in this test. offset_ = Random::GetTLSInstance()->Uniform(17); // 0 to 16 - // Always use local (default) filesystem for this bookkeeping file, - // even when DB is on a remote/warm filesystem, to avoid issues with - // weaker durability guarantees on remote filesystems. + // Use expected_values_dir (always a local path) when available, falling + // back to db_name. Always use the default (local) filesystem so + // bookkeeping avoids durability issues with remote/custom filesystems. + const std::string& dir = + !expected_values_dir.empty() ? expected_values_dir : db_name; const std::shared_ptr fs = Env::Default()->GetFileSystem(); IOOptions opts; diff --git a/db_stress_tool/db_stress_listener.h b/db_stress_tool/db_stress_listener.h index 2dd46dfc424e..3000f3304235 100644 --- a/db_stress_tool/db_stress_listener.h +++ b/db_stress_tool/db_stress_listener.h @@ -24,14 +24,13 @@ #include "utilities/fault_injection_fs.h" DECLARE_int32(compact_files_one_in); -extern std::shared_ptr fault_fs_guard; - namespace ROCKSDB_NAMESPACE { // Verify across process executions that all seen IDs are unique class UniqueIdVerifier { public: - explicit UniqueIdVerifier(const std::string& dir); + explicit UniqueIdVerifier(const std::string& db_name, + const std::string& expected_values_dir); ~UniqueIdVerifier(); void Verify(const std::string& id); @@ -55,15 +54,15 @@ class DbStressListener : public EventListener { DbStressListener(const std::string& db_name, const std::vector& db_paths, const std::vector& column_families, - SharedState* shared) + const std::string& expected_values_dir, SharedState* shared, + std::shared_ptr fault_fs) : db_name_(db_name), db_paths_(db_paths), column_families_(column_families), num_pending_file_creations_(0), - unique_ids_(FLAGS_expected_values_dir.empty() - ? db_name - : FLAGS_expected_values_dir), - shared_(shared) {} + unique_ids_(db_name, expected_values_dir), + shared_(shared), + fault_fs_(std::move(fault_fs)) {} const char* Name() const override { return kClassName(); } static const char* kClassName() { return "DBStressListener"; } @@ -74,8 +73,8 @@ class DbStressListener : public EventListener { VerifyFilePath(info.file_path); // pretending doing some work here RandomSleep(); - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } shared_->SetPersistedSeqno(info.largest_seqno); } @@ -83,37 +82,35 @@ class DbStressListener : public EventListener { void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*flush_job_info*/) override { RandomSleep(); - if (fault_fs_guard) { - fault_fs_guard->SetThreadLocalErrorContext( + if (fault_fs_) { + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kRead, static_cast(FLAGS_seed), FLAGS_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kWrite, static_cast(FLAGS_seed), FLAGS_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kWrite); + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataRead, static_cast(FLAGS_seed), FLAGS_metadata_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataWrite, static_cast(FLAGS_seed), FLAGS_metadata_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataWrite); } } @@ -140,44 +137,42 @@ class DbStressListener : public EventListener { } void OnSubcompactionBegin(const SubcompactionJobInfo& /* si */) override { - if (fault_fs_guard) { - fault_fs_guard->SetThreadLocalErrorContext( + if (fault_fs_) { + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kRead, static_cast(FLAGS_seed), FLAGS_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kWrite, static_cast(FLAGS_seed), FLAGS_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kWrite); + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataRead, static_cast(FLAGS_seed), FLAGS_metadata_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataWrite, static_cast(FLAGS_seed), FLAGS_metadata_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataWrite); } } void OnSubcompactionCompleted(const SubcompactionJobInfo& /* si */) override { - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } } @@ -270,11 +265,11 @@ class DbStressListener : public EventListener { Status /* bg_error */, bool* /* auto_recovery */) override { RandomSleep(); - if (FLAGS_error_recovery_with_no_fault_injection && fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (FLAGS_error_recovery_with_no_fault_injection && fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); // TODO(hx235): only exempt the flush thread during error recovery instead // of all the flush threads from error injection - fault_fs_guard->SetIOActivitiesExcludedFromFaultInjection( + fault_fs_->SetIOActivitiesExcludedFromFaultInjection( {Env::IOActivity::kFlush}); } } @@ -282,9 +277,9 @@ class DbStressListener : public EventListener { void OnErrorRecoveryEnd( const BackgroundErrorRecoveryInfo& /*info*/) override { RandomSleep(); - if (FLAGS_error_recovery_with_no_fault_injection && fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); - fault_fs_guard->SetIOActivitiesExcludedFromFaultInjection({}); + if (FLAGS_error_recovery_with_no_fault_injection && fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); + fault_fs_->SetIOActivitiesExcludedFromFaultInjection({}); } } @@ -375,6 +370,7 @@ class DbStressListener : public EventListener { std::atomic num_pending_file_creations_; UniqueIdVerifier unique_ids_; SharedState* shared_; + std::shared_ptr fault_fs_; mutable std::mutex bg_pressure_mu_; BackgroundJobPressure last_bg_pressure_; }; diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index b4546cd3bad2..897bc5f83936 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -78,7 +78,8 @@ class SharedState { // for those calls static thread_local bool ignore_read_error; - SharedState(Env* /*env*/, StressTest* stress_test) + SharedState(Env* /*env*/, StressTest* stress_test, + const std::string& expected_values_dir) : cv_(&mu_), seed_(static_cast(FLAGS_seed)), max_key_(FLAGS_max_key), @@ -98,15 +99,17 @@ class SharedState { should_stop_test_(false), no_overwrite_ids_(GenerateNoOverwriteIds()), expected_state_manager_(nullptr), + expected_values_dir_(expected_values_dir), printing_verification_results_(false), start_timestamp_(Env::Default()->NowNanos()) { Status status; + const std::string& ev_dir = expected_values_dir_; // TODO: We should introduce a way to explicitly disable verification - // during shutdown. When that is disabled and FLAGS_expected_values_dir + // during shutdown. When that is disabled and expected_values_dir // is empty (disabling verification at startup), we can skip tracking // expected state. Only then should we permit bypassing the below feature // compatibility checks. - if (!FLAGS_expected_values_dir.empty()) { + if (!ev_dir.empty()) { if (!std::atomic{}.is_lock_free() || !std::atomic{}.is_lock_free()) { std::ostringstream status_s; @@ -125,12 +128,12 @@ class SharedState { } } if (status.ok()) { - if (FLAGS_expected_values_dir.empty()) { + if (ev_dir.empty()) { expected_state_manager_.reset( new AnonExpectedStateManager(FLAGS_max_key, FLAGS_column_families)); } else { expected_state_manager_.reset(new FileExpectedStateManager( - FLAGS_max_key, FLAGS_column_families, FLAGS_expected_values_dir)); + FLAGS_max_key, FLAGS_column_families, ev_dir)); } status = expected_state_manager_->Open(); } @@ -430,9 +433,7 @@ class SharedState { return bg_thread_finished_ == num_bg_threads_; } - bool ShouldVerifyAtBeginning() const { - return !FLAGS_expected_values_dir.empty(); - } + bool ShouldVerifyAtBeginning() const { return !expected_values_dir_.empty(); } bool PrintingVerificationResults() { bool tmp = false; @@ -519,6 +520,7 @@ class SharedState { const std::unordered_set no_overwrite_ids_; std::unique_ptr expected_state_manager_; + const std::string expected_values_dir_; // Cannot store `port::Mutex` directly in vector since it is not copyable // and storing it in the container may require copying depending on the impl. std::vector> key_locks_; diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 9dca58934988..6b9d65a1c8db 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -62,10 +62,40 @@ std::shared_ptr CreateFilterPolicy() { return std::shared_ptr(new_policy); } +static bool NeedsFaultInjection() { + return FLAGS_open_metadata_read_fault_one_in || + FLAGS_open_metadata_write_fault_one_in || + FLAGS_open_read_fault_one_in || FLAGS_open_write_fault_one_in || + FLAGS_metadata_read_fault_one_in || + FLAGS_metadata_write_fault_one_in || FLAGS_read_fault_one_in || + FLAGS_write_fault_one_in || FLAGS_sync_fault_injection; +} + } // namespace -StressTest::StressTest() - : cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)), +StressTest::StressTest(const std::string& db_path, + const std::string& expected_values_dir, + const std::string& secondaries_base) + : db_path_(db_path), + expected_values_dir_(expected_values_dir), + secondaries_base_(secondaries_base), + fault_fs_(NeedsFaultInjection() + ? std::make_shared( + std::static_pointer_cast( + db_stress_env->GetFileSystem()) + ->GetInnerFS()) + : nullptr), + fault_stress_fs_( + fault_fs_ ? std::make_shared(fault_fs_) : nullptr), + fault_env_(fault_stress_fs_ ? std::make_unique( + db_stress_env, fault_stress_fs_) + : nullptr), + cache_(FLAGS_num_dbs > 1 + ? (block_cache + ? block_cache + : (block_cache = NewCache(FLAGS_cache_size, + FLAGS_cache_numshardbits))) + : NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)), filter_policy_(CreateFilterPolicy()), db_(nullptr), txn_db_(nullptr), @@ -76,14 +106,6 @@ StressTest::StressTest() num_times_reopened_(0), db_preload_finished_(false), is_db_stopped_(false) { - if (FLAGS_destroy_db_initially) { - const Status s = DbStressDestroyDb(FLAGS_db); - if (!s.ok()) { - fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str()); - exit(1); - } - } - Status s = DbStressSqfcManager().MakeSharedFactory( FLAGS_sqfc_name, FLAGS_sqfc_version, &sqfc_factory_); if (!s.ok()) { @@ -1060,37 +1082,35 @@ void StressTest::OperateDb(ThreadState* thread) { } #ifndef NDEBUG - if (fault_fs_guard) { - fault_fs_guard->SetThreadLocalErrorContext( + if (fault_fs_) { + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kRead, thread->shared->GetSeed(), FLAGS_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kWrite, thread->shared->GetSeed(), FLAGS_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kWrite); + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataRead, thread->shared->GetSeed(), FLAGS_metadata_read_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataWrite, thread->shared->GetSeed(), FLAGS_metadata_write_fault_one_in, FLAGS_inject_error_severity == 1 /* retryable */, FLAGS_inject_error_severity == 2 /* has_data_loss*/); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataWrite); } #endif // NDEBUG @@ -1113,13 +1133,13 @@ void StressTest::OperateDb(ThreadState* thread) { if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 && thread->rand.OneIn(FLAGS_verify_db_one_in)) { // Temporarily disable error injection for verification - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } ContinuouslyVerifyDb(thread); // Enable back error injection disabled for verification - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } if (thread->shared->ShouldStopTest()) { break; @@ -1144,8 +1164,8 @@ void StressTest::OperateDb(ThreadState* thread) { fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str()); } else if (s.ok()) { // Temporarily disable error injection for verification - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } // Verify no writes during LockWAL @@ -1199,8 +1219,8 @@ void StressTest::OperateDb(ThreadState* thread) { } // Enable back error injection disabled for verification - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } } } @@ -1317,14 +1337,14 @@ void StressTest::OperateDb(ThreadState* thread) { // TestGetProperty doesn't return status for us to tell whether it has // failed due to injected error. So we disable fault injection to avoid // false positive - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } TestGetProperty(thread); - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } } @@ -1345,7 +1365,7 @@ void StressTest::OperateDb(ThreadState* thread) { uint64_t total_size = 0; if (FLAGS_backup_max_size > 0) { std::vector files; - db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files); + db_stress_env->GetChildrenFileAttributes(db_path_, &files); for (auto& file : files) { total_size += file.size_bytes; } @@ -1354,12 +1374,12 @@ void StressTest::OperateDb(ThreadState* thread) { if (total_size <= FLAGS_backup_max_size) { // TODO(hx235): enable error injection with // backup/restore after fixing the various issues it surfaces - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } Status s = TestBackupRestore(thread, rand_column_families, rand_keys); - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } ProcessStatus(shared, "Backup/restore", s); } @@ -1402,7 +1422,7 @@ void StressTest::OperateDb(ThreadState* thread) { // user write. // TODO(hx235): support tracing user writes with fault injection. bool disable_fault_injection_during_user_write = - fault_fs_guard && MightHaveUnsyncedDataLoss(); + fault_fs_ && MightHaveUnsyncedDataLoss(); int prob_op = thread->rand.Uniform(100); // Reset this in case we pick something other than a read op. We don't // want to use a stale value when deciding at the beginning of the loop @@ -1462,32 +1482,32 @@ void StressTest::OperateDb(ThreadState* thread) { assert(prefix_bound <= prob_op); // OPERATION write if (disable_fault_injection_during_user_write) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + fault_fs_->DisableAllThreadLocalErrorInjection(); } TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, value); if (disable_fault_injection_during_user_write) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + fault_fs_->EnableAllThreadLocalErrorInjection(); } } else if (prob_op < del_bound) { assert(write_bound <= prob_op); // OPERATION delete if (disable_fault_injection_during_user_write) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + fault_fs_->DisableAllThreadLocalErrorInjection(); } TestDelete(thread, write_opts, rand_column_families, rand_keys); if (disable_fault_injection_during_user_write) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + fault_fs_->EnableAllThreadLocalErrorInjection(); } } else if (prob_op < delrange_bound) { assert(del_bound <= prob_op); // OPERATION delete range if (disable_fault_injection_during_user_write) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + fault_fs_->DisableAllThreadLocalErrorInjection(); } TestDeleteRange(thread, write_opts, rand_column_families, rand_keys); if (disable_fault_injection_during_user_write) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + fault_fs_->EnableAllThreadLocalErrorInjection(); } } else if (prob_op < iterate_bound) { assert(delrange_bound <= prob_op); @@ -1545,8 +1565,8 @@ void StressTest::OperateDb(ThreadState* thread) { } #ifndef NDEBUG - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } #endif // NDEBUG } @@ -2362,9 +2382,9 @@ Status StressTest::TestBackupRestore( } const std::string backup_dir = - FLAGS_db + "/.backup" + std::to_string(thread->tid); + db_path_ + "/.backup" + std::to_string(thread->tid); const std::string restore_dir = - FLAGS_db + "/.restore" + std::to_string(thread->tid); + db_path_ + "/.restore" + std::to_string(thread->tid); BackupEngineOptions backup_opts(backup_dir); // For debugging, get info_log from live options backup_opts.info_log = db_->GetDBOptions().info_log.get(); @@ -2663,8 +2683,8 @@ Status StressTest::TestBackupRestore( } // Temporarily disable error injection for clean up - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } if (s.ok() || IsErrorInjectedAndRetryable(s)) { @@ -2685,8 +2705,8 @@ Status StressTest::TestBackupRestore( } // Enable back error injection disabled for clean up - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } if (!s.ok() && !IsErrorInjectedAndRetryable(s)) { @@ -2828,20 +2848,20 @@ Status StressTest::TestCheckpoint(ThreadState* thread, } std::string checkpoint_dir = - FLAGS_db + "/.checkpoint" + std::to_string(thread->tid); + db_path_ + "/.checkpoint" + std::to_string(thread->tid); Options tmp_opts(options_); tmp_opts.listeners.clear(); tmp_opts.env = db_stress_env; // Avoid delayed deletion so whole directory can be deleted tmp_opts.sst_file_manager.reset(); // Temporarily disable error injection for clean-up - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } DestroyDB(checkpoint_dir, tmp_opts); // Enable back error injection disabled for clean-up - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } Checkpoint* checkpoint = nullptr; Status s = Checkpoint::Create(db_, &checkpoint); @@ -2853,8 +2873,8 @@ Status StressTest::TestCheckpoint(ThreadState* thread, std::vector files; // Temporarily disable error injection to print debugging information - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } @@ -2862,8 +2882,8 @@ Status StressTest::TestCheckpoint(ThreadState* thread, // Enable back disable error injection disabled for printing debugging // information - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } if (!my_s.ok()) { @@ -2947,8 +2967,8 @@ Status StressTest::TestCheckpoint(ThreadState* thread, } // Temporarily disable error injection for clean-up - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } if (!s.ok() && !IsErrorInjectedAndRetryable(s)) { @@ -2959,8 +2979,8 @@ Status StressTest::TestCheckpoint(ThreadState* thread, } // Enable back error injection disabled for clean-up - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } return s; } @@ -3351,8 +3371,8 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, uint32_t pre_hash = 0; if (thread->rand.OneIn(2)) { // Temporarily disable error injection to for validation - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } // Declare a snapshot and compare the data before and after the compaction @@ -3361,8 +3381,8 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key); // Enable back error injection disabled for validation - if (fault_fs_guard) { - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->EnableAllThreadLocalErrorInjection(); } } std::ostringstream compact_range_opt_oss; @@ -3399,8 +3419,8 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, if (pre_snapshot != nullptr) { // Temporarily disable error injection for validation - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } uint32_t post_hash = GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key); @@ -3417,9 +3437,9 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, thread->shared->SetVerificationFailure(); } db_->ReleaseSnapshot(pre_snapshot); - if (fault_fs_guard) { + if (fault_fs_) { // Enable back error injection disabled for validation - fault_fs_guard->EnableAllThreadLocalErrorInjection(); + fault_fs_->EnableAllThreadLocalErrorInjection(); } } } @@ -3690,6 +3710,7 @@ void StressTest::Open(SharedState* shared, bool reopen) { InitializeOptionsFromFlags(cache_, filter_policy_, udi_factory_, options_); } InitializeOptionsGeneral(cache_, filter_policy_, sqfc_factory_, options_); + options_.env = fault_env_ ? fault_env_.get() : db_stress_env; DbStressCustomCompressionManager::Register(); if (!strcasecmp(FLAGS_compression_manager.c_str(), "custom")) { @@ -3790,13 +3811,13 @@ void StressTest::Open(SharedState* shared, bool reopen) { fprintf(stdout, "Integrated BlobDB: blob cache disabled\n"); } - fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); + fprintf(stdout, "DB path: [%s]\n", db_path_.c_str()); Status s; if (FLAGS_ttl == -1) { std::vector existing_column_families; - s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, + s = DB::ListColumnFamilies(DBOptions(options_), db_path_, &existing_column_families); // ignore errors if (!s.ok()) { // DB doesn't exist @@ -3844,14 +3865,15 @@ void StressTest::Open(SharedState* shared, bool reopen) { } options_.listeners.clear(); - options_.listeners.emplace_back(new DbStressListener( - FLAGS_db, options_.db_paths, cf_descriptors, shared)); + options_.listeners.emplace_back( + new DbStressListener(db_path_, options_.db_paths, cf_descriptors, + expected_values_dir_, shared, fault_fs_)); RegisterAdditionalListeners(); // If this is for DB reopen, error injection may have been enabled. // Disable it here in case there is no open fault injection. - if (fault_fs_guard) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + if (fault_fs_) { + fault_fs_->DisableAllThreadLocalErrorInjection(); } // TODO; test transaction DB Open with fault injection if (!FLAGS_use_txn) { @@ -3865,41 +3887,39 @@ void StressTest::Open(SharedState* shared, bool reopen) { if ((inject_sync_fault || inject_open_meta_read_error || inject_open_meta_write_error || inject_open_read_error || inject_open_write_error) && - fault_fs_guard - ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr) + fault_fs_->FileExists(db_path_ + "/CURRENT", IOOptions(), nullptr) .ok()) { if (inject_sync_fault || inject_open_write_error) { - fault_fs_guard->SetFilesystemDirectWritable(false); - fault_fs_guard->SetInjectUnsyncedDataLoss(inject_sync_fault); + fault_fs_->SetFilesystemDirectWritable(false); + fault_fs_->SetInjectUnsyncedDataLoss(inject_sync_fault); } - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataRead, static_cast(FLAGS_seed), FLAGS_open_metadata_read_fault_one_in, false /* retryable */, false /* has_data_loss */); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kMetadataWrite, static_cast(FLAGS_seed), FLAGS_open_metadata_write_fault_one_in, false /* retryable */, false /* has_data_loss */); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataWrite); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kRead, static_cast(FLAGS_seed), FLAGS_open_read_fault_one_in, false /* retryable */, false /* has_data_loss */); - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); - fault_fs_guard->SetThreadLocalErrorContext( + fault_fs_->SetThreadLocalErrorContext( FaultInjectionIOType::kWrite, static_cast(FLAGS_seed), FLAGS_open_write_fault_one_in, false /* retryable */, false /* has_data_loss */); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kWrite); } while (true) { @@ -3910,7 +3930,7 @@ void StressTest::Open(SharedState* shared, bool reopen) { blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc; blob_db::BlobDB* blob_db = nullptr; - s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db, + s = blob_db::BlobDB::Open(options_, blob_db_options, db_path_, cf_descriptors, &column_families_, &blob_db); if (s.ok()) { @@ -3919,11 +3939,11 @@ void StressTest::Open(SharedState* shared, bool reopen) { } } else { if (db_preload_finished_.load() && FLAGS_read_only) { - s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, + s = DB::OpenForReadOnly(DBOptions(options_), db_path_, cf_descriptors, &column_families_, &db_owner_); } else { - s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, + s = DB::Open(DBOptions(options_), db_path_, cf_descriptors, &column_families_, &db_owner_); } if (s.ok()) { @@ -3934,7 +3954,7 @@ void StressTest::Open(SharedState* shared, bool reopen) { if (inject_sync_fault || inject_open_meta_read_error || inject_open_meta_write_error || inject_open_read_error || inject_open_write_error) { - fault_fs_guard->DisableAllThreadLocalErrorInjection(); + fault_fs_->DisableAllThreadLocalErrorInjection(); if (s.ok()) { // Injected errors might happen in background compactions. We @@ -3964,13 +3984,13 @@ void StressTest::Open(SharedState* shared, bool reopen) { if (!reopen) { Random rand(static_cast(FLAGS_seed)); if (rand.OneIn(2)) { - fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(), - nullptr); + fault_fs_->DeleteFilesCreatedAfterLastDirSync(IOOptions(), + nullptr); } if (rand.OneIn(3)) { - fault_fs_guard->DropUnsyncedFileData(); + fault_fs_->DropUnsyncedFileData(); } else if (rand.OneIn(2)) { - fault_fs_guard->DropRandomUnsyncedFileData(&rand); + fault_fs_->DropRandomUnsyncedFileData(&rand); } } continue; @@ -3993,7 +4013,7 @@ void StressTest::Open(SharedState* shared, bool reopen) { optimistic_txn_db_options.shared_lock_buckets = nullptr; } s = OptimisticTransactionDB::Open( - options_, optimistic_txn_db_options, FLAGS_db, cf_descriptors, + options_, optimistic_txn_db_options, db_path_, cf_descriptors, &column_families_, &optimistic_txn_db_); if (!s.ok()) { fprintf(stderr, "Error in opening the OptimisticTransactionDB [%s]\n", @@ -4027,7 +4047,7 @@ void StressTest::Open(SharedState* shared, bool reopen) { txn_db_options.use_per_key_point_lock_mgr = FLAGS_use_per_key_point_lock_mgr; PrepareTxnDbOptions(shared, txn_db_options); - s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, + s = TransactionDB::Open(options_, txn_db_options, db_path_, cf_descriptors, &column_families_, &txn_db_); if (!s.ok()) { fprintf(stderr, "Error in opening the TransactionDB [%s]\n", @@ -4067,8 +4087,8 @@ void StressTest::Open(SharedState* shared, bool reopen) { // TODO(yanqin) support max_open_files != -1 for secondary instance. tmp_opts.max_open_files = -1; tmp_opts.env = db_stress_env; - const std::string& secondary_path = FLAGS_secondaries_base; - s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, + const std::string& secondary_path = secondaries_base_; + s = DB::OpenAsSecondary(tmp_opts, db_path_, secondary_path, cf_descriptors, &secondary_cfhs_, &secondary_db_); assert(s.ok()); assert(secondary_cfhs_.size() == @@ -4076,7 +4096,7 @@ void StressTest::Open(SharedState* shared, bool reopen) { } } else { DBWithTTL* db_with_ttl; - s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); + s = DBWithTTL::Open(options_, db_path_, &db_with_ttl, FLAGS_ttl); db_owner_.reset(db_with_ttl); db_ = db_with_ttl; } @@ -4417,8 +4437,12 @@ void InitializeOptionsFromFlags( options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_prefix_bloom_size_ratio; if (FLAGS_use_write_buffer_manager) { - options.write_buffer_manager.reset( - new WriteBufferManager(FLAGS_db_write_buffer_size, block_cache)); + if (shared_wbm) { + options.write_buffer_manager = shared_wbm; + } else { + options.write_buffer_manager = std::make_shared( + FLAGS_db_write_buffer_size, block_cache); + } } options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering; if (ShouldDisableAutoCompactionsBeforeVerifyDb()) { @@ -4753,7 +4777,9 @@ void InitializeOptionsGeneral( // TODO: row_cache, thread-pool IO priority, CPU priority. if (!options.rate_limiter) { - if (FLAGS_rate_limiter_bytes_per_sec > 0) { + if (shared_rate_limiter) { + options.rate_limiter = shared_rate_limiter; + } else if (FLAGS_rate_limiter_bytes_per_sec > 0) { options.rate_limiter.reset(NewGenericRateLimiter( FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */, 10 /* fairness */, diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index a61e18c3fa5f..d4030a7c3cd3 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -13,11 +13,15 @@ #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_shared_state.h" +#include "env/composite_env_wrapper.h" +#include "rocksdb/db_stress_tool.h" #include "rocksdb/experimental.h" #include "rocksdb/user_defined_index.h" #include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { + +class DbStressFSWrapper; class SystemClock; class Transaction; class TransactionDB; @@ -40,11 +44,20 @@ class StressTest { // from optimistic transactions when conflict detection retries are exhausted. static bool IsExpectedTxnError(const Status& s); - StressTest(); + StressTest(const std::string& db_path, const std::string& expected_values_dir, + const std::string& secondaries_base); virtual ~StressTest() {} - std::shared_ptr NewCache(size_t capacity, int32_t num_shard_bits); + const std::string& GetDbPath() const { return db_path_; } + const std::string& GetExpectedValuesDir() const { + return expected_values_dir_; + } + const std::string& GetSecondariesBase() const { return secondaries_base_; } + std::shared_ptr GetFaultFs() const { return fault_fs_; } + + static std::shared_ptr NewCache(size_t capacity, + int32_t num_shard_bits); static std::vector GetBlobCompressionTags(); @@ -408,6 +421,14 @@ class StressTest { void CleanUpColumnFamilies(); + std::string db_path_; + std::string expected_values_dir_; + std::string secondaries_base_; + + std::shared_ptr fault_fs_; + std::shared_ptr fault_stress_fs_; + std::unique_ptr fault_env_; + std::shared_ptr cache_; std::shared_ptr compressed_cache_; std::shared_ptr filter_policy_; diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 28598f1eb62c..e826128e7c43 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -21,6 +21,8 @@ // different behavior. See comment of the flag for details. #ifdef GFLAGS +#include + #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_driver.h" #include "db_stress_tool/db_stress_shared_state.h" @@ -35,11 +37,93 @@ static std::shared_ptr env_wrapper_guard; static std::shared_ptr legacy_env_wrapper_guard; static std::shared_ptr dbsl_env_wrapper_guard; -static std::shared_ptr fault_env_guard; +// Global state for crash callback — must be accessible from a plain function +// pointer (CrashCallback = void(*)()) since lambdas with captures cannot +// convert to function pointers. Uses a fixed-size C array of raw pointers +// so the callback is async-signal-safe (no vector walks, shared_ptr copies, +// or std::string access). +static constexpr int kMaxDbsForCrash = 256; +static ROCKSDB_NAMESPACE::FaultInjectionTestFS* + g_fault_fs_for_crash[kMaxDbsForCrash] = {}; +static int g_num_dbs_for_crash = 0; } // namespace KeyGenContext key_gen_ctx; +struct DbPaths { + std::string db_path; + std::string ev_dir; + std::string sec_base; +}; + +void ResolveDefaultDbPathIfEmpty() { + if (!FLAGS_db.empty()) { + return; + } + + std::string default_db_path; + db_stress_env->GetTestDirectory(&default_db_path); + default_db_path += "/dbstress"; + FLAGS_db = default_db_path; +} + +bool IsMultiDbRoot(const std::string& db_root) { + return db_stress_env->FileExists(db_root + "/db_0").ok(); +} + +DbPaths ComputeDbPaths(int i, int num_dbs) { + DbPaths p; + if (num_dbs > 1) { + std::string suffix = "/db_" + std::to_string(i); + p.db_path = FLAGS_db + suffix; + p.ev_dir = FLAGS_expected_values_dir.empty() + ? "" + : FLAGS_expected_values_dir + suffix; + p.sec_base = + FLAGS_secondaries_base.empty() ? "" : FLAGS_secondaries_base + suffix; + } else { + p.db_path = FLAGS_db; + p.ev_dir = FLAGS_expected_values_dir; + p.sec_base = FLAGS_secondaries_base; + } + return p; +} + +void EnsureDirsExist(const DbPaths& paths) { + auto check = [](Env* env, const std::string& dir) { + if (dir.empty()) { + return; + } + Status s = env->CreateDirIfMissing(dir); + if (!s.ok()) { + fprintf(stderr, "Error creating dir %s: %s\n", dir.c_str(), + s.ToString().c_str()); + exit(1); + } + }; + check(db_stress_env, paths.db_path); + // expected_values_dir is always on the local filesystem (the Python driver + // materializes it via tempfile.mkdtemp), even for remote-DB runs. + check(Env::Default(), paths.ev_dir); + check(db_stress_env, paths.sec_base); +} + +StressTest* CreateStressTestByFlags(const DbPaths& paths) { + if (FLAGS_test_cf_consistency) { + return CreateCfConsistencyStressTest(paths.db_path, paths.ev_dir, + paths.sec_base); + } else if (FLAGS_test_batches_snapshots) { + return CreateBatchedOpsStressTest(paths.db_path, paths.ev_dir, + paths.sec_base); + } else if (FLAGS_test_multi_ops_txns) { + return CreateMultiOpsTxnsStressTest(paths.db_path, paths.ev_dir, + paths.sec_base); + } else { + return CreateNonBatchedOpsStressTest(paths.db_path, paths.ev_dir, + paths.sec_base); + } +} + int db_stress_tool(int argc, char** argv) { SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + " [OPTIONS]..."); @@ -77,42 +161,22 @@ int db_stress_tool(int argc, char** argv) { dbsl_env_wrapper_guard = std::make_shared(raw_env); db_stress_listener_env = dbsl_env_wrapper_guard.get(); - if (FLAGS_open_metadata_read_fault_one_in || - FLAGS_open_metadata_write_fault_one_in || FLAGS_open_read_fault_one_in || - FLAGS_open_write_fault_one_in || FLAGS_metadata_read_fault_one_in || - FLAGS_metadata_write_fault_one_in || FLAGS_read_fault_one_in || - FLAGS_write_fault_one_in || FLAGS_sync_fault_injection) { - FaultInjectionTestFS* fs = - new FaultInjectionTestFS(raw_env->GetFileSystem()); - fault_fs_guard.reset(fs); - // Set it to direct writable here to initially bypass any fault injection - // during DB open This will correspondingly be overwritten in - // StressTest::Open() for open fault injection and in RunStressTestImpl() - // for proper fault injection setup. - fault_fs_guard->SetFilesystemDirectWritable(true); - fault_env_guard = - std::make_shared(raw_env, fault_fs_guard); - raw_env = fault_env_guard.get(); - - // Register a crash callback so that recently injected errors are - // printed to stderr when the process crashes (SIGABRT, SIGSEGV, etc.). - // This helps diagnose stress test failures caused by fault injection. - port::RegisterCrashCallback([]() { - if (fault_fs_guard) { - fault_fs_guard->PrintRecentInjectedErrors(); - } - }); - } - auto db_stress_fs = std::make_shared(raw_env->GetFileSystem()); env_wrapper_guard = std::make_shared(raw_env, db_stress_fs); db_stress_env = env_wrapper_guard.get(); + ResolveDefaultDbPathIfEmpty(); + // Handle --destroy_db_and_exit early, before other option validation if (FLAGS_destroy_db_and_exit) { - s = DbStressDestroyDb(FLAGS_db); + s = (FLAGS_num_dbs > 1 || IsMultiDbRoot(FLAGS_db)) + ? DestroyDir(raw_env, FLAGS_db) + : DbStressDestroyDb(FLAGS_db); + // Note: expected_values_dir and secondaries_base cleanup is handled + // by the crash test framework (db_crashtest.py) after the test passes. + // Do NOT clean them here to avoid double-removal race. if (s.ok()) { fprintf(stdout, "Successfully destroyed db at %s\n", FLAGS_db.c_str()); return 0; @@ -262,33 +326,6 @@ int db_stress_tool(int argc, char** argv) { } } - // Choose a location for the test database if none given with --db= - if (FLAGS_db.empty()) { - std::string default_db_path; - db_stress_env->GetTestDirectory(&default_db_path); - default_db_path += "/dbstress"; - FLAGS_db = default_db_path; - } - - // Now that FLAGS_db is resolved, set the fault injection log file path - // so that PrintAll() writes to a file instead of stderr (signal-safe). - // Store the log in TEST_TMPDIR (outside the DB directory) so it survives - // DB reopen (which cleans untracked files) and gets included in the - // sandcastle db.tar.gz artifact for post-failure analysis. - if (fault_fs_guard) { - std::string log_dir; - const char* test_tmpdir = getenv("TEST_TMPDIR"); - if (test_tmpdir && test_tmpdir[0] != '\0') { - log_dir = test_tmpdir; - } else { - log_dir = "/tmp"; - } - std::string log_path = log_dir + "/fault_injection_" + - std::to_string(getpid()) + "_" + - std::to_string(time(nullptr)) + ".log"; - fault_fs_guard->SetInjectedErrorLogPath(log_path); - } - if ((FLAGS_test_secondary || FLAGS_continuous_verification_interval > 0) && FLAGS_secondaries_base.empty()) { std::string default_secondaries_path; @@ -420,25 +457,190 @@ int db_stress_tool(int argc, char** argv) { key_gen_ctx.weights.emplace_back(key_gen_ctx.window - keys_per_level * (levels - 1)); } - std::unique_ptr shared; - std::unique_ptr stress; - if (FLAGS_test_cf_consistency) { - stress.reset(CreateCfConsistencyStressTest()); - } else if (FLAGS_test_batches_snapshots) { - stress.reset(CreateBatchedOpsStressTest()); - } else if (FLAGS_test_multi_ops_txns) { - stress.reset(CreateMultiOpsTxnsStressTest()); - } else { - stress.reset(CreateNonBatchedOpsStressTest()); - } // Initialize the Zipfian pre-calculated array InitializeHotKeyGenerator(FLAGS_hot_key_alpha); - shared.reset(new SharedState(db_stress_env, stress.get())); - bool run_stress_test = RunStressTest(shared.get()); - // Close DB in CleanUp() before destructor to prevent race between destructor - // and operations in listener callbacks (e.g. MultiOpsTxnsStressListener). - stress->CleanUp(); - return run_stress_test ? 0 : 1; + + if (FLAGS_num_dbs < 1) { + fprintf(stderr, "Error: --num_dbs must be >= 1\n"); + exit(1); + } + if (FLAGS_num_dbs > kMaxDbsForCrash) { + fprintf(stderr, "Error: --num_dbs=%d exceeds maximum %d\n", FLAGS_num_dbs, + kMaxDbsForCrash); + exit(1); + } + + const int num_dbs = FLAGS_num_dbs; + + // Multi-DB mode: run N independent StressTest instances sharing one Env. + if (num_dbs > 1) { + // Column family clearing has a pre-existing race condition where a CF + // handle can be accessed by one thread (e.g. NewIterator) after another + // thread drops and recreates it. This is more likely to trigger with the + // increased thread count in multi-DB mode. Disable it to avoid spurious + // ASAN heap-use-after-free failures. + if (FLAGS_clear_column_family_one_in > 0) { + fprintf(stderr, + "Warning: --num_dbs > 1 disables --clear_column_family_one_in " + "due to a pre-existing CF handle race condition.\n"); + FLAGS_clear_column_family_one_in = 0; + } + // MultiOpsTxnsStressTest uses a single global key_spaces_path file. + // Multiple DB instances would overwrite each other's range descriptors, + // causing key-space layout corruption on reopen. + if (FLAGS_test_multi_ops_txns) { + fprintf(stderr, + "Error: --num_dbs > 1 is incompatible with " + "--test_multi_ops_txns (shared key_spaces_path).\n"); + exit(1); + } + + // CompressedCacheSetCapacityThread mutates and asserts on the shared + // compressed_secondary_cache. Multiple per-DB threads racing on + // SetCapacity(0)/SetCapacity(size) cause spurious assertion failures. + if (FLAGS_compressed_secondary_cache_size > 0 || + FLAGS_compressed_secondary_cache_ratio > 0.0) { + fprintf(stderr, + "Warning: --num_dbs > 1 disables compressed secondary cache " + "capacity stress to avoid races on shared cache state.\n"); + FLAGS_compressed_secondary_cache_size = 0; + FLAGS_compressed_secondary_cache_ratio = 0.0; + } + + // Share WriteBufferManager and RateLimiter across all DBs so total + // memory and I/O are bounded globally, not per-DB. + block_cache = + StressTest::NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits); + if (FLAGS_use_write_buffer_manager) { + shared_wbm = std::make_shared( + FLAGS_db_write_buffer_size, block_cache); + } + if (FLAGS_rate_limiter_bytes_per_sec > 0) { + shared_rate_limiter.reset(NewGenericRateLimiter( + FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */, + 10 /* fairness */, + FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly + : RateLimiter::Mode::kWritesOnly)); + } + } + + // Save and clear the destroy flag before any threads start to avoid a + // data race on this non-atomic global. Destruction is done on the main + // thread below. + bool destroy_initially = FLAGS_destroy_db_initially; + FLAGS_destroy_db_initially = false; + + // Create parent directories for multi-DB paths. + if (num_dbs > 1) { + EnsureDirsExist( + {FLAGS_db, FLAGS_expected_values_dir, FLAGS_secondaries_base}); + } + + std::vector> stresses(num_dbs); + std::vector> shareds(num_dbs); + // Use int instead of bool to avoid std::vector bitset packing + // which causes data races on concurrent writes to different indices. + std::vector results(num_dbs, 0); + std::vector threads(num_dbs); + + // Determine log directory for fault injection error logs. + std::string log_dir; + const char* test_tmpdir = getenv("TEST_TMPDIR"); + if (test_tmpdir && test_tmpdir[0] != '\0') { + log_dir = test_tmpdir; + } else { + log_dir = "/tmp"; + } + + for (int i = 0; i < num_dbs; i++) { + DbPaths paths = ComputeDbPaths(i, num_dbs); + if (num_dbs > 1) { + EnsureDirsExist(paths); + } + if (destroy_initially) { + Status exists = db_stress_env->FileExists(paths.db_path); + if (!exists.IsNotFound()) { + Status ds = DbStressDestroyDb(paths.db_path); + if (!ds.ok()) { + fprintf(stderr, "Cannot destroy db %s: %s\n", paths.db_path.c_str(), + ds.ToString().c_str()); + exit(1); + } + } + } + + stresses[i].reset(CreateStressTestByFlags(paths)); + + // Set up per-DB fault injection error log path so that PrintAll() + // writes to a file instead of stderr (signal-safe). Include the DB + // index in multi-DB mode for clear identification. + auto fault_fs = stresses[i]->GetFaultFs(); + if (fault_fs) { + std::string log_path = + log_dir + "/fault_injection_" + std::to_string(getpid()); + if (num_dbs > 1) { + log_path += "_db" + std::to_string(i); + } + log_path += "_" + std::to_string(time(nullptr)) + ".log"; + fault_fs->SetInjectedErrorLogPath(log_path); + } + + shareds[i].reset( + new SharedState(db_stress_env, stresses[i].get(), paths.ev_dir)); + threads[i] = std::thread([i, &results, &shareds]() { + results[i] = RunStressTest(shareds[i].get()); + }); + } + + // Register a crash callback so that recently injected errors are + // printed when the process crashes (SIGABRT, SIGSEGV, etc.). + // Use a fixed-size C array of raw pointers so the callback is + // async-signal-safe (no vector walks, shared_ptr copies, or + // std::string access). + assert(num_dbs <= kMaxDbsForCrash); + for (int i = 0; i < num_dbs; i++) { + g_fault_fs_for_crash[i] = stresses[i]->GetFaultFs().get(); + } + g_num_dbs_for_crash = num_dbs; + port::RegisterCrashCallback([]() { + for (int i = 0; i < g_num_dbs_for_crash; i++) { + if (g_fault_fs_for_crash[i]) { + g_fault_fs_for_crash[i]->PrintRecentInjectedErrors(); + } + } + }); + + // Join all threads and report results. + bool all_passed = true; + for (int i = 0; i < num_dbs; i++) { + threads[i].join(); + if (num_dbs > 1) { + fprintf(stdout, "[multi-db] DB %d (%s): %s\n", i, + stresses[i]->GetDbPath().c_str(), + results[i] ? "PASSED" : "FAILED"); + } + if (!results[i]) { + all_passed = false; + } + // Close DB in CleanUp() before destructor to prevent race between + // destructor and operations in listener callbacks. + stresses[i]->CleanUp(); + } + + // Clear crash callback and its references before they go out of scope. + port::RegisterCrashCallback(nullptr); + for (int i = 0; i < num_dbs; i++) { + g_fault_fs_for_crash[i] = nullptr; + } + g_num_dbs_for_crash = 0; + + // Reset per-invocation shared resources so a second call to + // db_stress_tool() in the same process uses fresh state from its own flags. + shared_wbm.reset(); + shared_rate_limiter.reset(); + block_cache.reset(); + + return all_passed ? 0 : 1; } } // namespace ROCKSDB_NAMESPACE diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index 80ba18a94c2a..151e7484639b 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -211,6 +211,8 @@ Status AnonExpectedState::Open(bool /* create */) { new std::atomic[GetValuesLen() / sizeof(std::atomic)]); values_ = &values_allocation_[0]; + persisted_seqno_allocation_.reset(new std::atomic(0)); + persisted_seqno_ = persisted_seqno_allocation_.get(); Reset(); return Status::OK(); } diff --git a/db_stress_tool/expected_state.h b/db_stress_tool/expected_state.h index e72a80adeaa3..0b179dcd4386 100644 --- a/db_stress_tool/expected_state.h +++ b/db_stress_tool/expected_state.h @@ -193,6 +193,7 @@ class AnonExpectedState : public ExpectedState { private: std::unique_ptr[]> values_allocation_; + std::unique_ptr> persisted_seqno_allocation_; }; // An `ExpectedStateManager` manages data about the expected state of the diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 3afe0f7b1d7b..9919df6acbfe 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -1767,8 +1767,10 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) { } } -StressTest* CreateMultiOpsTxnsStressTest() { - return new MultiOpsTxnsStressTest(); +StressTest* CreateMultiOpsTxnsStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base) { + return new MultiOpsTxnsStressTest(db_path, ev_dir, sec_base); } void CheckAndSetOptionsForMultiOpsTxnStressTest() { diff --git a/db_stress_tool/multi_ops_txns_stress.h b/db_stress_tool/multi_ops_txns_stress.h index fda681564e60..a7762cd9d225 100644 --- a/db_stress_tool/multi_ops_txns_stress.h +++ b/db_stress_tool/multi_ops_txns_stress.h @@ -191,7 +191,9 @@ class MultiOpsTxnsStressTest : public StressTest { uint32_t c_{0}; }; - MultiOpsTxnsStressTest() {} + MultiOpsTxnsStressTest(const std::string& db_path, const std::string& ev_dir, + const std::string& sec_base) + : StressTest(db_path, ev_dir, sec_base) {} ~MultiOpsTxnsStressTest() override {} diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 859a4b043ed1..efedf4ebae60 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -21,7 +21,9 @@ namespace ROCKSDB_NAMESPACE { class NonBatchedOpsStressTest : public StressTest { public: - NonBatchedOpsStressTest() = default; + NonBatchedOpsStressTest(const std::string& db_path, const std::string& ev_dir, + const std::string& sec_base) + : StressTest(db_path, ev_dir, sec_base) {} virtual ~NonBatchedOpsStressTest() = default; @@ -255,20 +257,20 @@ class NonBatchedOpsStressTest : public StressTest { std::string from_db; // Temporarily disable error injection to verify the secondary - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } s = secondary_db_->Get(options, secondary_cfhs_[cf], key, &from_db); // Re-enable error injection after verifying the secondary - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } @@ -678,10 +680,10 @@ class NonBatchedOpsStressTest : public StressTest { bool read_older_ts = MaybeUseOlderTimestampForPointLookup( thread, read_ts_str, read_ts_slice, read_opts_copy); - if (fault_fs_guard) { - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + if (fault_fs_) { + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead); - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead); SharedState::ignore_read_error = false; } @@ -693,11 +695,11 @@ class NonBatchedOpsStressTest : public StressTest { thread->shared->Get(rand_column_families[0], rand_keys[0]); int injected_error_count = 0; - if (fault_fs_guard) { + if (fault_fs_) { injected_error_count = GetMinInjectedErrorCount( - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead), - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead)); if (!SharedState::ignore_read_error && injected_error_count > 0 && (s.ok() || s.IsNotFound())) { @@ -706,9 +708,9 @@ class NonBatchedOpsStressTest : public StressTest { MutexLock l(thread->shared->GetMutex()); fprintf(stderr, "Didn't get expected error from Get\n"); fprintf(stderr, "Callstack that injected the fault\n"); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kRead); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kMetadataRead); std::terminate(); } @@ -811,10 +813,10 @@ class NonBatchedOpsStressTest : public StressTest { std::unique_ptr txn; if (use_txn) { // TODO(hx235): test fault injection with MultiGet() with transactions - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } WriteOptions wo; @@ -840,20 +842,20 @@ class NonBatchedOpsStressTest : public StressTest { int injected_error_count = 0; if (!use_txn) { - if (fault_fs_guard) { - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + if (fault_fs_) { + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead); - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead); SharedState::ignore_read_error = false; } db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(), statuses.data()); - if (fault_fs_guard) { + if (fault_fs_) { injected_error_count = GetMinInjectedErrorCount( - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead), - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead)); if (injected_error_count > 0) { @@ -873,9 +875,9 @@ class NonBatchedOpsStressTest : public StressTest { "num_keys %zu Expected %d errors, seen at least %d\n", num_keys, injected_error_count, stat_nok_nfound); fprintf(stderr, "Callstack that injected the fault\n"); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kRead); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kMetadataRead); std::terminate(); } @@ -942,10 +944,10 @@ class NonBatchedOpsStressTest : public StressTest { const Status& s, const std::optional& ryw_expected_value) -> bool { // Temporarily disable error injection for verification - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } @@ -1039,10 +1041,10 @@ class NonBatchedOpsStressTest : public StressTest { } // Enable back error injection disbled for checking results - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } return check_multiget_res; @@ -1081,10 +1083,9 @@ class NonBatchedOpsStressTest : public StressTest { if (use_txn) { txn->Rollback().PermitUncheckedError(); // Enable back error injection disbled for transactions - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } } @@ -1131,10 +1132,10 @@ class NonBatchedOpsStressTest : public StressTest { const ExpectedValue pre_read_expected_value = thread->shared->Get(column_family, key); - if (fault_fs_guard) { - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + if (fault_fs_) { + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead); - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead); SharedState::ignore_read_error = false; } @@ -1154,11 +1155,11 @@ class NonBatchedOpsStressTest : public StressTest { thread->shared->Get(column_family, key); int injected_error_count = 0; - if (fault_fs_guard) { + if (fault_fs_) { injected_error_count = GetMinInjectedErrorCount( - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead), - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead)); if (!SharedState::ignore_read_error && injected_error_count > 0 && (s.ok() || s.IsNotFound())) { @@ -1167,9 +1168,9 @@ class NonBatchedOpsStressTest : public StressTest { MutexLock l(thread->shared->GetMutex()); fprintf(stderr, "Didn't get expected error from GetEntity\n"); fprintf(stderr, "Callstack that injected the fault\n"); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kRead); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kMetadataRead); std::terminate(); } @@ -1271,10 +1272,10 @@ class NonBatchedOpsStressTest : public StressTest { if (FLAGS_use_txn) { // TODO(hx235): test fault injection with MultiGetEntity() with // transactions - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } WriteOptions write_options; @@ -1308,11 +1309,11 @@ class NonBatchedOpsStressTest : public StressTest { int injected_error_count = 0; auto verify_expected_errors = [&](auto get_status) { - assert(fault_fs_guard); + assert(fault_fs_); injected_error_count = GetMinInjectedErrorCount( - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead), - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead)); if (injected_error_count) { int stat_nok_nfound = 0; @@ -1334,9 +1335,9 @@ class NonBatchedOpsStressTest : public StressTest { fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys, injected_error_count, stat_nok_nfound); fprintf(stderr, "Call stack that injected the fault\n"); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kRead); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kMetadataRead); std::terminate(); } @@ -1346,10 +1347,10 @@ class NonBatchedOpsStressTest : public StressTest { auto check_results = [&](auto get_columns, auto get_status, auto do_extra_check, auto call_get_entity) { // Temporarily disable error injection for checking results - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } const bool check_get_entity = @@ -1439,10 +1440,9 @@ class NonBatchedOpsStressTest : public StressTest { } } // Enable back error injection disbled for checking results - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } }; @@ -1525,19 +1525,18 @@ class NonBatchedOpsStressTest : public StressTest { txn->Rollback().PermitUncheckedError(); // Enable back error injection disbled for transactions - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } } else if (FLAGS_use_attribute_group) { // AttributeGroup MultiGetEntity verification - if (fault_fs_guard) { - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + if (fault_fs_) { + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead); - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead); SharedState::ignore_read_error = false; } @@ -1553,7 +1552,7 @@ class NonBatchedOpsStressTest : public StressTest { db_->MultiGetEntity(read_opts_copy, num_keys, key_slices.data(), results.data()); - if (fault_fs_guard) { + if (fault_fs_) { verify_expected_errors( [&](size_t i) { return results[i][0].status(); }); } @@ -1569,10 +1568,10 @@ class NonBatchedOpsStressTest : public StressTest { } else { // Non-AttributeGroup MultiGetEntity verification - if (fault_fs_guard) { - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + if (fault_fs_) { + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead); - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead); SharedState::ignore_read_error = false; } @@ -1583,7 +1582,7 @@ class NonBatchedOpsStressTest : public StressTest { db_->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(), results.data(), statuses.data()); - if (fault_fs_guard) { + if (fault_fs_) { verify_expected_errors([&](size_t i) { return statuses[i]; }); } @@ -1634,10 +1633,10 @@ class NonBatchedOpsStressTest : public StressTest { MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, ro_copy); - if (fault_fs_guard) { - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + if (fault_fs_) { + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead); - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead); SharedState::ignore_read_error = false; } @@ -1699,11 +1698,11 @@ class NonBatchedOpsStressTest : public StressTest { } int injected_error_count = 0; - if (fault_fs_guard) { + if (fault_fs_) { injected_error_count = GetMinInjectedErrorCount( - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kRead), - fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount( + fault_fs_->GetAndResetInjectedThreadLocalErrorCount( FaultInjectionIOType::kMetadataRead)); if (!SharedState::ignore_read_error && injected_error_count > 0 && s.ok()) { @@ -1712,9 +1711,9 @@ class NonBatchedOpsStressTest : public StressTest { MutexLock l(thread->shared->GetMutex()); fprintf(stderr, "Didn't get expected error from PrefixScan\n"); fprintf(stderr, "Callstack that injected the fault\n"); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kRead); - fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace( + fault_fs_->PrintInjectedThreadLocalErrorBacktrace( FaultInjectionIOType::kMetadataRead); std::terminate(); } @@ -1781,10 +1780,10 @@ class NonBatchedOpsStressTest : public StressTest { if (FLAGS_verify_before_write) { // Temporarily disable error injection for preparation - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } @@ -1795,10 +1794,9 @@ class NonBatchedOpsStressTest : public StressTest { /* msg_prefix */ "Pre-Put Get verification", from_db, s); // Enable back error injection disabled for preparation - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( - FaultInjectionIOType::kRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead); + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); } if (!res) { @@ -2226,11 +2224,11 @@ class NonBatchedOpsStressTest : public StressTest { FLAGS_test_ingest_standalone_range_deletion_one_in); std::vector external_files; const std::string sst_filename = - FLAGS_db + "/." + std::to_string(thread->tid) + ".sst"; + db_path_ + "/." + std::to_string(thread->tid) + ".sst"; external_files.push_back(sst_filename); std::string standalone_rangedel_filename; if (test_standalone_range_deletion) { - standalone_rangedel_filename = FLAGS_db + "/." + + standalone_rangedel_filename = db_path_ + "/." + std::to_string(thread->tid) + "_standalone_rangedel.sst"; external_files.push_back(standalone_rangedel_filename); @@ -2239,10 +2237,10 @@ class NonBatchedOpsStressTest : public StressTest { std::ostringstream ingest_options_oss; // Temporarily disable error injection for preparation - if (fault_fs_guard) { - fault_fs_guard->DisableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); - fault_fs_guard->DisableThreadLocalErrorInjection( + fault_fs_->DisableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataWrite); } @@ -2257,10 +2255,10 @@ class NonBatchedOpsStressTest : public StressTest { } } - if (fault_fs_guard) { - fault_fs_guard->EnableThreadLocalErrorInjection( + if (fault_fs_) { + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataRead); - fault_fs_guard->EnableThreadLocalErrorInjection( + fault_fs_->EnableThreadLocalErrorInjection( FaultInjectionIOType::kMetadataWrite); } @@ -3386,8 +3384,10 @@ class NonBatchedOpsStressTest : public StressTest { } }; -StressTest* CreateNonBatchedOpsStressTest() { - return new NonBatchedOpsStressTest(); +StressTest* CreateNonBatchedOpsStressTest(const std::string& db_path, + const std::string& ev_dir, + const std::string& sec_base) { + return new NonBatchedOpsStressTest(db_path, ev_dir, sec_base); } } // namespace ROCKSDB_NAMESPACE diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 0a5195c78421..5cc0f8f1e9e9 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -477,6 +477,9 @@ def apply_random_seed_per_iteration(): "statistics": random.choice([0, 1]), # TODO: re-enable after resolving "Req failed: Unknown error -14" errors "multiscan_use_async_io": 0, # random.randint(0, 1), + # TODO: re-enable multi-DB stress testing after CI validation + # "num_dbs": lambda: random.choice([1] * 4 + [3, 5]), + "num_dbs": 1, } _TEST_DIR_ENV_VAR = "TEST_TMPDIR" @@ -1331,6 +1334,14 @@ def finalize_and_sanitize(src_params): # interval so that the feature gets exercised on a quiet DB. if dest_params.get("read_triggered_compaction_threshold", 0) > 0: dest_params["max_compaction_trigger_wakeup_seconds"] = 20 + # Multi-DB mode: disable features with known race conditions + if dest_params.get("num_dbs", 1) > 1: + # clear_column_family_one_in has a pre-existing CF handle race + # condition that is more likely to trigger with multi-DB thread count + dest_params["clear_column_family_one_in"] = 0 + # MultiOpsTxnsStressTest uses a single global key_spaces_path file + # that would be corrupted by concurrent DB instances + dest_params["test_multi_ops_txns"] = 0 return dest_params @@ -1473,9 +1484,27 @@ def print_output_and_exit_on_error(stdout, stderr, print_stderr_separately=False sys.exit(2) -def cleanup_after_success(dbname): +def cleanup_after_success(dbname, num_dbs=1, expected_values_dir=None, + secondaries_base=None, test_secondary=0, + continuous_verification_interval=0): # Use db_stress --destroy_db_and_exit, which simplifies remote DB cleanup cleanup_cmd_parts = [stress_cmd, "--destroy_db_and_exit=1", "--db=" + dbname] + if num_dbs > 1: + cleanup_cmd_parts.append("--num_dbs=%d" % num_dbs) + if expected_values_dir: + cleanup_cmd_parts.append("--expected_values_dir=" + expected_values_dir) + if secondaries_base: + cleanup_cmd_parts.append("--secondaries_base=" + secondaries_base) + # Forward flags that trigger auto-generated secondaries_base in the C++ + # side, so --destroy_db_and_exit can clean the default path even when the + # Python driver never set --secondaries_base explicitly. + if test_secondary: + cleanup_cmd_parts.append("--test_secondary=%s" % test_secondary) + if continuous_verification_interval: + cleanup_cmd_parts.append( + "--continuous_verification_interval=%s" + % continuous_verification_interval + ) # Pass through relevant arguments for remote DB access for arg in remain_args: parts = arg.split("=", 1) @@ -1540,6 +1569,23 @@ def print_and_cleanup_fault_injection_log(pid): # in case of unsafe crashes in RocksDB. def blackbox_crash_main(args, unknown_args): cmd_params = gen_cmd_params(args) + # Materialize num_dbs once since it determines directory layout for the + # entire run and must stay constant across iterations and cleanup. + num_dbs_val = cmd_params.get("num_dbs", 1) + if callable(num_dbs_val): + num_dbs_val = num_dbs_val() + cmd_params["num_dbs"] = num_dbs_val + # Materialize expected_values_dir once since the lambda creates a new + # temp directory each time (mkdtemp) and we need the same path at cleanup. + ev_dir_val = cmd_params.get("expected_values_dir") + if callable(ev_dir_val): + ev_dir_val = ev_dir_val() + cmd_params["expected_values_dir"] = ev_dir_val + # Materialize test_secondary so cleanup can forward a concrete value. + ts_val = cmd_params.get("test_secondary", 0) + if callable(ts_val): + ts_val = ts_val() + cmd_params["test_secondary"] = ts_val dbname = get_dbname("blackbox") exit_time = time.time() + cmd_params["duration"] @@ -1597,13 +1643,34 @@ def blackbox_crash_main(args, unknown_args): print_output_and_exit_on_error(outs, errs, args.print_stderr_separately) # we need to clean up after ourselves -- only do this on test success - cleanup_after_success(dbname) + cleanup_after_success(dbname, cmd_params.get("num_dbs", 1), + cmd_params.get("expected_values_dir"), + cmd_params.get("secondaries_base"), + cmd_params.get("test_secondary", 0), + cmd_params.get("continuous_verification_interval", 0)) # This python script runs db_stress multiple times. Some runs with # kill_random_test that causes rocksdb to crash at various points in code. def whitebox_crash_main(args, unknown_args): cmd_params = gen_cmd_params(args) + # Materialize num_dbs once since it determines directory layout for the + # entire run and must stay constant across iterations and cleanup. + num_dbs_val = cmd_params.get("num_dbs", 1) + if callable(num_dbs_val): + num_dbs_val = num_dbs_val() + cmd_params["num_dbs"] = num_dbs_val + # Materialize expected_values_dir once since the lambda creates a new + # temp directory each time (mkdtemp) and we need the same path at cleanup. + ev_dir_val = cmd_params.get("expected_values_dir") + if callable(ev_dir_val): + ev_dir_val = ev_dir_val() + cmd_params["expected_values_dir"] = ev_dir_val + # Materialize test_secondary so cleanup can forward a concrete value. + ts_val = cmd_params.get("test_secondary", 0) + if callable(ts_val): + ts_val = ts_val() + cmd_params["test_secondary"] = ts_val dbname = get_dbname("whitebox") cur_time = time.time() @@ -1779,7 +1846,11 @@ def whitebox_crash_main(args, unknown_args): # If successfully finished or timed out (we currently treat timed out test as passing) # Clean up after ourselves if succeeded or hit_timeout: - cleanup_after_success(dbname) + cleanup_after_success(dbname, cmd_params.get("num_dbs", 1), + cmd_params.get("expected_values_dir"), + cmd_params.get("secondaries_base"), + cmd_params.get("test_secondary", 0), + cmd_params.get("continuous_verification_interval", 0)) def main():