Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[
"db_stress_tool/db_stress_shared_state.cc",
"db_stress_tool/db_stress_test_base.cc",
"db_stress_tool/db_stress_tool.cc",
"db_stress_tool/db_stress_trace.cc",
"db_stress_tool/db_stress_wide_merge_operator.cc",
"db_stress_tool/expected_state.cc",
"db_stress_tool/expected_value.cc",
Expand Down Expand Up @@ -5795,3 +5796,7 @@ cpp_unittest_wrapper(name="write_unprepared_transaction_test",


export_file(name = "tools/db_crashtest.py")

export_file(name = "tools/db_stress_trace_parser.py")

export_file(name = "tools/fault_injection_log_parser.py")
2 changes: 2 additions & 0 deletions buckifier/buckify_rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ def generate_buck(repo_path, deps_map):
extra_compiler_flags=json.dumps(deps["extra_compiler_flags"]),
)
BUCK.export_file("tools/db_crashtest.py")
BUCK.export_file("tools/db_stress_trace_parser.py")
BUCK.export_file("tools/fault_injection_log_parser.py")

print(ColorString.info("Generated BUCK Summary:"))
print(ColorString.info("- %d libs" % BUCK.total_lib))
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX}
db_stress_gflags.cc
db_stress_listener.cc
db_stress_shared_state.cc
db_stress_trace.cc
db_stress_test_base.cc
db_stress_wide_merge_operator.cc
db_stress_tool.cc
Expand Down
2 changes: 1 addition & 1 deletion db_stress_tool/batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ class BatchedOpsStressTest : public StressTest {
ro_copies[i].prefix_same_as_start = true;
}

iters[i].reset(db_->NewIterator(ro_copies[i], cfh));
iters[i] = NewTraceIterator(db_, ro_copies[i], cfh);
iters[i]->Seek(prefix_slices[i]);
}

Expand Down
8 changes: 4 additions & 4 deletions db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ class CfConsistencyStressTest : public StressTest {
static_cast<int>(rand_column_families.size()))]];
assert(cfh);

std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));
std::unique_ptr<Iterator> iter(NewTraceIterator(db_, ro_copy, cfh));

uint64_t count = 0;
Status s;
Expand Down Expand Up @@ -971,7 +971,7 @@ class CfConsistencyStressTest : public StressTest {
iters.reserve(num);

for (size_t i = 0; i < num; ++i) {
iters.emplace_back(db_->NewIterator(options, column_families_[i]));
iters.emplace_back(NewTraceIterator(db_, options, column_families_[i]));
iters.back()->SeekToFirst();
}

Expand Down Expand Up @@ -1196,7 +1196,7 @@ class CfConsistencyStressTest : public StressTest {
uint32_t crc = 0;
{
// Compute crc for all key-values of default column family.
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
std::unique_ptr<Iterator> it(NewTraceIterator(db_ptr, ropts));
status = checksum_column_family(it.get(), &crc);
if (!status.ok()) {
fprintf(stderr, "Computing checksum of default cf: %s\n",
Expand All @@ -1215,7 +1215,7 @@ class CfConsistencyStressTest : public StressTest {
if (cfh == db_ptr->DefaultColumnFamily()) {
continue;
}
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
std::unique_ptr<Iterator> it(NewTraceIterator(db_ptr, ropts, cfh));
status = checksum_column_family(it.get(), &tmp_crc);
if (!status.ok() || tmp_crc != crc) {
break;
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ DECLARE_string(expected_values_dir);
DECLARE_bool(expected_state_trace_debug);
DECLARE_int64(expected_state_trace_debug_key);
DECLARE_int32(expected_state_trace_debug_max_logs);
DECLARE_bool(trace_public_iterator_api);
DECLARE_bool(verify_checksum);
DECLARE_bool(mmap_read);
DECLARE_bool(mmap_write);
Expand Down
6 changes: 6 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,12 @@ DEFINE_int32(expected_state_trace_debug_max_logs, 200,
"Maximum number of expected-state trace debug log lines to emit "
"per restore attempt.");

DEFINE_bool(
trace_public_iterator_api, false,
"If true, enable a fixed-size 32 MiB per-process ring buffer that records "
"public iterator API calls in db_stress. The trace is dumped on crash and "
"on verification failure.");

DEFINE_bool(verify_checksum, false,
"Verify checksum for every block read from storage");

Expand Down
35 changes: 25 additions & 10 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "db_stress_tool/db_stress_driver.h"
#include "db_stress_tool/db_stress_filters.h"
#include "db_stress_tool/db_stress_table_properties_collector.h"
#include "db_stress_tool/db_stress_trace.h"
#include "db_stress_tool/db_stress_wide_merge_operator.h"
#include "file/file_util.h"
#include "options/options_parser.h"
Expand Down Expand Up @@ -94,6 +95,19 @@ StressTest::StressTest()
}
}

std::unique_ptr<Iterator> StressTest::NewTraceIterator(
DB* db, const ReadOptions& read_opts,
ColumnFamilyHandle* column_family) const {
return NewDbStressTraceIterator(db, read_opts, column_family);
}

std::unique_ptr<Iterator> StressTest::WrapTraceIterator(
std::unique_ptr<Iterator> iter, const ReadOptions& read_opts,
ColumnFamilyHandle* column_family) const {
return MaybeWrapDbStressTraceIterator(std::move(iter), read_opts,
column_family);
}

void StressTest::CleanUp() {
CleanUpColumnFamilies();
if (db_) {
Expand Down Expand Up @@ -561,7 +575,7 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
std::unique_ptr<Iterator> iterator(NewTraceIterator(db, ropt));
std::unique_ptr<std::vector<bool>> tmp_bitvec(
new std::vector<bool>(FLAGS_max_key));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
Expand Down Expand Up @@ -1632,11 +1646,11 @@ Status StressTest::TestIterate(ThreadState* thread,
cfhs.emplace_back(column_families_[cf_index]);
}
assert(!cfhs.empty());
return db_->NewCoalescingIterator(ro, cfhs);
return WrapTraceIterator(db_->NewCoalescingIterator(ro, cfhs), ro);
} else {
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
return std::unique_ptr<Iterator>(db_->NewIterator(ro, cfh));
return NewTraceIterator(db_, ro, cfh);
}
};

Expand Down Expand Up @@ -1743,7 +1757,7 @@ Status StressTest::TestMultiScan(ThreadState* thread,
assert(options_.prefix_extractor.get() == nullptr);

std::unique_ptr<Iterator> iter;
iter.reset(db_->NewIterator(ro, column_families_[rand_column_families[0]]));
iter = NewTraceIterator(db_, ro, column_families_[rand_column_families[0]]);
iter->Prepare(scan_opts);

constexpr size_t kOpLogsLimit = 50000;
Expand Down Expand Up @@ -1786,7 +1800,7 @@ Status StressTest::TestMultiScan(ThreadState* thread,
GetControlCfh(thread, rand_column_families[0]);
assert(cmp_cfh);

std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
std::unique_ptr<Iterator> cmp_iter(NewTraceIterator(db_, cmp_ro, cmp_cfh));

bool diverged = false;

Expand Down Expand Up @@ -2002,7 +2016,7 @@ Status StressTest::TestIterateImpl(ThreadState* thread,
GetControlCfh(thread, rand_column_families[0]);
assert(cmp_cfh);

std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
std::unique_ptr<Iterator> cmp_iter(NewTraceIterator(db_, cmp_ro, cmp_cfh));

bool diverged = false;

Expand Down Expand Up @@ -2176,9 +2190,10 @@ void StressTest::DumpIteratorDivergenceDiagnostics(
for (int cf_index : rand_column_families) {
cfhs.emplace_back(column_families_[cf_index]);
}
return db_->NewCoalescingIterator(debug_ro, cfhs);
return WrapTraceIterator(db_->NewCoalescingIterator(debug_ro, cfhs),
debug_ro);
}
return std::unique_ptr<Iterator>(db_->NewIterator(debug_ro, cmp_cfh));
return NewTraceIterator(db_, debug_ro, cmp_cfh);
};

auto dump_debug_iter = [&](const char* label, const ReadOptions& debug_ro,
Expand Down Expand Up @@ -3358,7 +3373,7 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread,
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
std::unique_ptr<Iterator> iterator(NewTraceIterator(db_, ropt));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
if (GetIntVal(iterator->key().ToString(), &key_val)) {
Expand Down Expand Up @@ -3549,7 +3564,7 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
ro.timestamp = &ts;
}

std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
std::unique_ptr<Iterator> it(NewTraceIterator(db_, ro, column_family));

constexpr char kCrcCalculatorSepearator = ';';

Expand Down
7 changes: 7 additions & 0 deletions db_stress_tool/db_stress_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ class StressTest {
ColumnFamilyHandle* column_family,
const Slice& start_key, const Slice& end_key);

std::unique_ptr<Iterator> NewTraceIterator(
DB* db, const ReadOptions& read_opts,
ColumnFamilyHandle* column_family = nullptr) const;
std::unique_ptr<Iterator> WrapTraceIterator(
std::unique_ptr<Iterator> iter, const ReadOptions& read_opts,
ColumnFamilyHandle* column_family = nullptr) const;

// Return a column family handle that mirrors what is pointed by
// `column_family_id`, which will be used to validate data to be correct.
// By default, the column family itself will be returned.
Expand Down
68 changes: 50 additions & 18 deletions db_stress_tool/db_stress_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
// different behavior. See comment of the flag for details.

#ifdef GFLAGS
#include <cstdlib>
#include <iostream>

#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_driver.h"
#include "db_stress_tool/db_stress_shared_state.h"
#include "db_stress_tool/db_stress_trace.h"
#include "port/stack_trace.h"
#include "rocksdb/convenience.h"
#include "utilities/fault_injection_fs.h"
Expand All @@ -43,6 +45,32 @@ int ReturnFlagValidationError(const char* message) {
std::cerr << "Error: " << message << '\n';
return 1;
}

void DumpDbStressPublicIteratorTraceAtExit() {
DumpDbStressPublicIteratorTrace();
}

void DumpAndReportDbStressPublicIteratorTrace(const char* reason) {
if (!IsDbStressPublicIteratorTraceEnabled()) {
return;
}
const std::string trace_path = GetDbStressPublicIteratorTracePath();
DumpDbStressPublicIteratorTrace();
fprintf(stdout, "db_stress public iterator raw trace written to %s",
trace_path.empty() ? "<unavailable>" : trace_path.c_str());
if (reason != nullptr && reason[0] != '\0') {
fprintf(stdout, " %s", reason);
}
fputc('\n', stdout);
}

std::string GetTraceArtifactDir(Env* env) {
std::string dir;
if (env != nullptr && env->GetTestDirectory(&dir).ok() && !dir.empty()) {
return dir;
}
return "/tmp";
}
} // namespace

KeyGenContext key_gen_ctx;
Expand Down Expand Up @@ -101,15 +129,6 @@ int db_stress_tool(int argc, char** argv) {
fault_env_guard =
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
raw_env = fault_env_guard.get();

// Register a crash callback so that recently injected errors are
// printed to stderr when the process crashes (SIGABRT, SIGSEGV, etc.).
// This helps diagnose stress test failures caused by fault injection.
port::RegisterCrashCallback([]() {
if (fault_fs_guard) {
fault_fs_guard->PrintRecentInjectedErrors();
}
});
}

auto db_stress_fs =
Expand Down Expand Up @@ -358,24 +377,36 @@ int db_stress_tool(int argc, char** argv) {
}

// Now that FLAGS_db is resolved, set the fault injection log file path
// so that PrintAll() writes to a file instead of stderr (signal-safe).
// so the raw dump file can be pre-opened before any crash callback runs.
// Store the log in TEST_TMPDIR (outside the DB directory) so it survives
// DB reopen (which cleans untracked files) and gets included in the
// sandcastle db.tar.gz artifact for post-failure analysis.
if (fault_fs_guard) {
std::string log_dir;
const char* test_tmpdir = getenv("TEST_TMPDIR");
if (test_tmpdir && test_tmpdir[0] != '\0') {
log_dir = test_tmpdir;
} else {
log_dir = "/tmp";
}
const std::string log_dir = GetTraceArtifactDir(db_stress_env);
std::string log_path = log_dir + "/fault_injection_" +
std::to_string(getpid()) + "_" +
std::to_string(time(nullptr)) + ".log";
std::to_string(time(nullptr)) + ".bin";
fault_fs_guard->SetInjectedErrorLogPath(log_path);
}

if (FLAGS_trace_public_iterator_api) {
const std::string log_dir = GetTraceArtifactDir(db_stress_env);
std::string log_path = log_dir + "/db_stress_public_iterator_trace_" +
std::to_string(getpid()) + "_" +
std::to_string(time(nullptr)) + ".bin";
InitDbStressPublicIteratorTrace(log_path);
std::atexit(DumpDbStressPublicIteratorTraceAtExit);
}

if (fault_fs_guard || IsDbStressPublicIteratorTraceEnabled()) {
port::RegisterCrashCallback([]() {
if (fault_fs_guard) {
fault_fs_guard->DumpRecentInjectedErrors();
}
DumpDbStressPublicIteratorTrace();
});
}

if ((FLAGS_test_secondary || FLAGS_continuous_verification_interval > 0) &&
FLAGS_secondaries_base.empty()) {
std::string default_secondaries_path;
Expand Down Expand Up @@ -525,6 +556,7 @@ int db_stress_tool(int argc, char** argv) {
// Close DB in CleanUp() before destructor to prevent race between destructor
// and operations in listener callbacks (e.g. MultiOpsTxnsStressListener).
stress->CleanUp();
DumpAndReportDbStressPublicIteratorTrace("on exit");
return run_stress_test ? 0 : 1;
}

Expand Down
Loading
Loading