From f9d9ebe645857f62cec289c9cbb058de7a07d833 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Tue, 14 Apr 2026 16:44:20 -0700 Subject: [PATCH 1/5] add tracing for db stress ops for improve debuggability Benchmark (DEBUG_LEVEL=1, iterator-heavy db_stress, median of 5 runs): - upstream/main: 10.04s - --trace_public_iterator_api=0: 10.13s (+0.9%) - --trace_public_iterator_api=1: 12.32s (+22.7%) --- BUCK | 1 + db_stress_tool/CMakeLists.txt | 1 + db_stress_tool/batched_ops_stress.cc | 2 +- db_stress_tool/cf_consistency_stress.cc | 8 +- db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 6 + db_stress_tool/db_stress_test_base.cc | 35 +- db_stress_tool/db_stress_test_base.h | 7 + db_stress_tool/db_stress_tool.cc | 53 +- db_stress_tool/db_stress_trace.cc | 691 ++++++++++++++++++++++++ db_stress_tool/db_stress_trace.h | 33 ++ db_stress_tool/multi_ops_txns_stress.cc | 10 +- db_stress_tool/no_batched_ops_stress.cc | 18 +- src.mk | 1 + tools/db_crashtest.py | 163 ++++++ tools/db_crashtest_test.py | 146 +++++ tools/db_stress_trace_parser.py | 300 ++++++++++ 17 files changed, 1440 insertions(+), 36 deletions(-) create mode 100644 db_stress_tool/db_stress_trace.cc create mode 100644 db_stress_tool/db_stress_trace.h create mode 100644 tools/db_stress_trace_parser.py diff --git a/BUCK b/BUCK index a28484df232e..fb49bff3e17b 100644 --- a/BUCK +++ b/BUCK @@ -446,6 +446,7 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[ "db_stress_tool/db_stress_shared_state.cc", "db_stress_tool/db_stress_test_base.cc", "db_stress_tool/db_stress_tool.cc", + "db_stress_tool/db_stress_trace.cc", "db_stress_tool/db_stress_wide_merge_operator.cc", "db_stress_tool/expected_state.cc", "db_stress_tool/expected_value.cc", diff --git a/db_stress_tool/CMakeLists.txt b/db_stress_tool/CMakeLists.txt index 90200f342bf4..7ebb72186f52 100644 --- a/db_stress_tool/CMakeLists.txt +++ b/db_stress_tool/CMakeLists.txt @@ -10,6 +10,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX} db_stress_gflags.cc db_stress_listener.cc db_stress_shared_state.cc + db_stress_trace.cc db_stress_test_base.cc db_stress_wide_merge_operator.cc db_stress_tool.cc diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index b0db2883a6ad..db5eecdfe07d 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -604,7 +604,7 @@ class BatchedOpsStressTest : public StressTest { ro_copies[i].prefix_same_as_start = true; } - iters[i].reset(db_->NewIterator(ro_copies[i], cfh)); + iters[i] = NewTraceIterator(db_, ro_copies[i], cfh); iters[i]->Seek(prefix_slices[i]); } diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index 6eb4abd3814a..6ac407b2e4d9 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -897,7 +897,7 @@ class CfConsistencyStressTest : public StressTest { static_cast(rand_column_families.size()))]]; assert(cfh); - std::unique_ptr iter(db_->NewIterator(ro_copy, cfh)); + std::unique_ptr iter(NewTraceIterator(db_, ro_copy, cfh)); uint64_t count = 0; Status s; @@ -971,7 +971,7 @@ class CfConsistencyStressTest : public StressTest { iters.reserve(num); for (size_t i = 0; i < num; ++i) { - iters.emplace_back(db_->NewIterator(options, column_families_[i])); + iters.emplace_back(NewTraceIterator(db_, options, column_families_[i])); iters.back()->SeekToFirst(); } @@ -1196,7 +1196,7 @@ class CfConsistencyStressTest : public StressTest { uint32_t crc = 0; { // Compute crc for all key-values of default column family. - std::unique_ptr it(db_ptr->NewIterator(ropts)); + std::unique_ptr it(NewTraceIterator(db_ptr, ropts)); status = checksum_column_family(it.get(), &crc); if (!status.ok()) { fprintf(stderr, "Computing checksum of default cf: %s\n", @@ -1215,7 +1215,7 @@ class CfConsistencyStressTest : public StressTest { if (cfh == db_ptr->DefaultColumnFamily()) { continue; } - std::unique_ptr it(db_ptr->NewIterator(ropts, cfh)); + std::unique_ptr it(NewTraceIterator(db_ptr, ropts, cfh)); status = checksum_column_family(it.get(), &tmp_crc); if (!status.ok() || tmp_crc != crc) { break; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index d88785bf5da0..a44c7b0ffd38 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -193,6 +193,7 @@ DECLARE_string(expected_values_dir); DECLARE_bool(expected_state_trace_debug); DECLARE_int64(expected_state_trace_debug_key); DECLARE_int32(expected_state_trace_debug_max_logs); +DECLARE_bool(trace_public_iterator_api); DECLARE_bool(verify_checksum); DECLARE_bool(mmap_read); DECLARE_bool(mmap_write); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 445398fbdd57..9b7bca9eb756 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -716,6 +716,12 @@ DEFINE_int32(expected_state_trace_debug_max_logs, 200, "Maximum number of expected-state trace debug log lines to emit " "per restore attempt."); +DEFINE_bool( + trace_public_iterator_api, false, + "If true, enable a fixed-size 32 MiB per-process ring buffer that records " + "public iterator API calls in db_stress. The trace is dumped on crash and " + "on verification failure."); + DEFINE_bool(verify_checksum, false, "Verify checksum for every block read from storage"); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 00a4e43e464e..22c87b6a8556 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -24,6 +24,7 @@ #include "db_stress_tool/db_stress_driver.h" #include "db_stress_tool/db_stress_filters.h" #include "db_stress_tool/db_stress_table_properties_collector.h" +#include "db_stress_tool/db_stress_trace.h" #include "db_stress_tool/db_stress_wide_merge_operator.h" #include "file/file_util.h" #include "options/options_parser.h" @@ -94,6 +95,19 @@ StressTest::StressTest() } } +std::unique_ptr StressTest::NewTraceIterator( + DB* db, const ReadOptions& read_opts, + ColumnFamilyHandle* column_family) const { + return NewDbStressTraceIterator(db, read_opts, column_family); +} + +std::unique_ptr StressTest::WrapTraceIterator( + std::unique_ptr iter, const ReadOptions& read_opts, + ColumnFamilyHandle* column_family) const { + return MaybeWrapDbStressTraceIterator(std::move(iter), read_opts, + column_family); +} + void StressTest::CleanUp() { CleanUpColumnFamilies(); if (db_) { @@ -561,7 +575,7 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf, // When `prefix_extractor` is set, seeking to beginning and scanning // across prefixes are only supported with `total_order_seek` set. ropt.total_order_seek = true; - std::unique_ptr iterator(db->NewIterator(ropt)); + std::unique_ptr iterator(NewTraceIterator(db, ropt)); std::unique_ptr> tmp_bitvec( new std::vector(FLAGS_max_key)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { @@ -1632,11 +1646,11 @@ Status StressTest::TestIterate(ThreadState* thread, cfhs.emplace_back(column_families_[cf_index]); } assert(!cfhs.empty()); - return db_->NewCoalescingIterator(ro, cfhs); + return WrapTraceIterator(db_->NewCoalescingIterator(ro, cfhs), ro); } else { ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]]; assert(cfh); - return std::unique_ptr(db_->NewIterator(ro, cfh)); + return NewTraceIterator(db_, ro, cfh); } }; @@ -1743,7 +1757,7 @@ Status StressTest::TestMultiScan(ThreadState* thread, assert(options_.prefix_extractor.get() == nullptr); std::unique_ptr iter; - iter.reset(db_->NewIterator(ro, column_families_[rand_column_families[0]])); + iter = NewTraceIterator(db_, ro, column_families_[rand_column_families[0]]); iter->Prepare(scan_opts); constexpr size_t kOpLogsLimit = 50000; @@ -1786,7 +1800,7 @@ Status StressTest::TestMultiScan(ThreadState* thread, GetControlCfh(thread, rand_column_families[0]); assert(cmp_cfh); - std::unique_ptr cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh)); + std::unique_ptr cmp_iter(NewTraceIterator(db_, cmp_ro, cmp_cfh)); bool diverged = false; @@ -2002,7 +2016,7 @@ Status StressTest::TestIterateImpl(ThreadState* thread, GetControlCfh(thread, rand_column_families[0]); assert(cmp_cfh); - std::unique_ptr cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh)); + std::unique_ptr cmp_iter(NewTraceIterator(db_, cmp_ro, cmp_cfh)); bool diverged = false; @@ -2176,9 +2190,10 @@ void StressTest::DumpIteratorDivergenceDiagnostics( for (int cf_index : rand_column_families) { cfhs.emplace_back(column_families_[cf_index]); } - return db_->NewCoalescingIterator(debug_ro, cfhs); + return WrapTraceIterator(db_->NewCoalescingIterator(debug_ro, cfhs), + debug_ro); } - return std::unique_ptr(db_->NewIterator(debug_ro, cmp_cfh)); + return NewTraceIterator(db_, debug_ro, cmp_cfh); }; auto dump_debug_iter = [&](const char* label, const ReadOptions& debug_ro, @@ -3358,7 +3373,7 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread, // When `prefix_extractor` is set, seeking to beginning and scanning // across prefixes are only supported with `total_order_seek` set. ropt.total_order_seek = true; - std::unique_ptr iterator(db_->NewIterator(ropt)); + std::unique_ptr iterator(NewTraceIterator(db_, ropt)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { uint64_t key_val; if (GetIntVal(iterator->key().ToString(), &key_val)) { @@ -3549,7 +3564,7 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot, ro.timestamp = &ts; } - std::unique_ptr it(db_->NewIterator(ro, column_family)); + std::unique_ptr it(NewTraceIterator(db_, ro, column_family)); constexpr char kCrcCalculatorSepearator = ';'; diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 7474c5b0a81e..c057c736f2ca 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -249,6 +249,13 @@ class StressTest { ColumnFamilyHandle* column_family, const Slice& start_key, const Slice& end_key); + std::unique_ptr NewTraceIterator( + DB* db, const ReadOptions& read_opts, + ColumnFamilyHandle* column_family = nullptr) const; + std::unique_ptr WrapTraceIterator( + std::unique_ptr iter, const ReadOptions& read_opts, + ColumnFamilyHandle* column_family = nullptr) const; + // Return a column family handle that mirrors what is pointed by // `column_family_id`, which will be used to validate data to be correct. // By default, the column family itself will be returned. diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 3956582cca9b..8fffaefbeb03 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -21,11 +21,13 @@ // different behavior. See comment of the flag for details. #ifdef GFLAGS +#include #include #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_driver.h" #include "db_stress_tool/db_stress_shared_state.h" +#include "db_stress_tool/db_stress_trace.h" #include "port/stack_trace.h" #include "rocksdb/convenience.h" #include "utilities/fault_injection_fs.h" @@ -43,6 +45,24 @@ int ReturnFlagValidationError(const char* message) { std::cerr << "Error: " << message << '\n'; return 1; } + +void DumpDbStressPublicIteratorTraceAtExit() { + DumpDbStressPublicIteratorTrace(); +} + +void DumpAndReportDbStressPublicIteratorTrace(const char* reason) { + if (!IsDbStressPublicIteratorTraceEnabled()) { + return; + } + const std::string trace_path = GetDbStressPublicIteratorTracePath(); + DumpDbStressPublicIteratorTrace(); + fprintf(stdout, "db_stress public iterator raw trace written to %s", + trace_path.empty() ? "" : trace_path.c_str()); + if (reason != nullptr && reason[0] != '\0') { + fprintf(stdout, " %s", reason); + } + fputc('\n', stdout); +} } // namespace KeyGenContext key_gen_ctx; @@ -102,14 +122,6 @@ int db_stress_tool(int argc, char** argv) { std::make_shared(raw_env, fault_fs_guard); raw_env = fault_env_guard.get(); - // Register a crash callback so that recently injected errors are - // printed to stderr when the process crashes (SIGABRT, SIGSEGV, etc.). - // This helps diagnose stress test failures caused by fault injection. - port::RegisterCrashCallback([]() { - if (fault_fs_guard) { - fault_fs_guard->PrintRecentInjectedErrors(); - } - }); } auto db_stress_fs = @@ -376,6 +388,30 @@ int db_stress_tool(int argc, char** argv) { fault_fs_guard->SetInjectedErrorLogPath(log_path); } + if (FLAGS_trace_public_iterator_api) { + std::string log_dir; + const char* test_tmpdir = getenv("TEST_TMPDIR"); + if (test_tmpdir && test_tmpdir[0] != '\0') { + log_dir = test_tmpdir; + } else { + log_dir = "/tmp"; + } + std::string log_path = log_dir + "/db_stress_public_iterator_trace_" + + std::to_string(getpid()) + "_" + + std::to_string(time(nullptr)) + ".bin"; + InitDbStressPublicIteratorTrace(log_path); + std::atexit(DumpDbStressPublicIteratorTraceAtExit); + } + + if (fault_fs_guard || IsDbStressPublicIteratorTraceEnabled()) { + port::RegisterCrashCallback([]() { + if (fault_fs_guard) { + fault_fs_guard->PrintRecentInjectedErrors(); + } + DumpDbStressPublicIteratorTrace(); + }); + } + if ((FLAGS_test_secondary || FLAGS_continuous_verification_interval > 0) && FLAGS_secondaries_base.empty()) { std::string default_secondaries_path; @@ -525,6 +561,7 @@ int db_stress_tool(int argc, char** argv) { // Close DB in CleanUp() before destructor to prevent race between destructor // and operations in listener callbacks (e.g. MultiOpsTxnsStressListener). stress->CleanUp(); + DumpAndReportDbStressPublicIteratorTrace("on exit"); return run_stress_test ? 0 : 1; } diff --git a/db_stress_tool/db_stress_trace.cc b/db_stress_tool/db_stress_trace.cc new file mode 100644 index 000000000000..ce4d43714b77 --- /dev/null +++ b/db_stress_tool/db_stress_trace.cc @@ -0,0 +1,691 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifdef GFLAGS +#include "db_stress_tool/db_stress_trace.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "db_stress_tool/db_stress_common.h" +#include "port/lang.h" + +namespace ROCKSDB_NAMESPACE { +namespace { + +constexpr size_t kTraceBudgetBytes = 32ULL << 20; +constexpr size_t kMaxTraceThreads = 32; +constexpr size_t kTraceEntrySize = 256; +constexpr size_t kEntriesPerThread = + kTraceBudgetBytes / kMaxTraceThreads / kTraceEntrySize; +constexpr uint32_t kTraceFileVersion = 1; +constexpr uint8_t kNoThreadSlot = 0xFF; +constexpr uint8_t kResultBoolUnset = 0xFF; +constexpr std::array kTraceFileMagic = {'D', 'B', 'S', 'P', + 'I', 'T', 'R', '1'}; + +enum class IteratorTraceEventType : uint8_t { + kCreate = 1, + kSeek = 2, + kSeekForPrev = 3, + kSeekToFirst = 4, + kSeekToLast = 5, + kNext = 6, + kPrev = 7, + kPrepareValue = 8, + kRefresh = 9, +}; + +enum IteratorTraceFlags : uint32_t { + kHasSnapshot = 1u << 0, + kHasLowerBound = 1u << 1, + kHasUpperBound = 1u << 2, + kAllowUnpreparedValue = 1u << 3, + kTotalOrderSeek = 1u << 4, + kPrefixSameAsStart = 1u << 5, + kTailing = 1u << 6, + kPinData = 1u << 7, + kAutoRefreshIterator = 1u << 8, +}; + +struct KeySample { + uint16_t full_len; + uint8_t head_len; + uint8_t tail_len; + char head[32]; + char tail[16]; +}; + +static_assert(sizeof(KeySample) == 52, "KeySample must remain compact"); + +struct TraceEntry { + uint64_t timestamp_us; + uint64_t sequence; + uint64_t object_id; + uint64_t aux0; + uint64_t aux1; + uint32_t os_thread_id_hash; + uint32_t cf_id; + uint32_t flags; + uint8_t slot; + uint8_t event_type; + uint8_t status_code; + uint8_t status_subcode; + uint8_t valid_before; + uint8_t valid_after; + uint8_t result_bool; + uint8_t reserved0; + KeySample key0; + KeySample key1; + char reserved[92]; +}; + +static_assert(sizeof(TraceEntry) == kTraceEntrySize, + "TraceEntry must remain 256B"); + +struct ThreadTraceLog { + std::atomic head; + std::atomic thread_id_hash; + TraceEntry entries[kEntriesPerThread]; +}; + +struct TraceFileHeader { + char magic[8]; + uint64_t trace_budget_bytes; + uint64_t dropped_threads; + uint64_t next_sequence; + uint64_t next_iterator_id; + uint64_t dump_timestamp_us; + uint32_t version; + uint32_t header_size; + uint32_t slot_header_size; + uint32_t entry_size; + uint32_t max_threads; + uint32_t entries_per_thread; + uint32_t used_slots; + uint32_t reserved0; +}; + +struct TraceFileSlotHeader { + uint64_t thread_id_hash; + uint64_t total_entries; + uint32_t slot; + uint32_t entry_count; + uint32_t reserved0; + uint32_t reserved1; +}; + +static_assert(sizeof(TraceFileHeader) == 80, + "TraceFileHeader size must stay stable"); +static_assert(sizeof(TraceFileSlotHeader) == 32, + "TraceFileSlotHeader size must stay stable"); +static_assert(kEntriesPerThread == 4096, + "32 MiB / 32 threads / 256 B entries must be 4096 entries"); + +uint64_t NowMicros() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + +void CopyVolatileBytes(const volatile char* src, char* dst, size_t len) { + for (size_t i = 0; i < len; ++i) { + dst[i] = src[i]; + } +} + +bool WriteAll(int fd, const char* data, size_t len) { + while (len > 0) { + ssize_t written = write(fd, data, len); + if (written <= 0) { + return false; + } + data += static_cast(written); + len -= static_cast(written); + } + return true; +} + +KeySample CaptureKeySample(const Slice& key) { + KeySample sample{}; + sample.full_len = static_cast( + std::min(key.size(), std::numeric_limits::max())); + sample.head_len = + static_cast(std::min(key.size(), sizeof(sample.head))); + if (sample.head_len > 0) { + memcpy(sample.head, key.data(), sample.head_len); + } + if (key.size() > sample.head_len) { + sample.tail_len = static_cast( + std::min(key.size() - sample.head_len, sizeof(sample.tail))); + if (sample.tail_len > 0) { + memcpy(sample.tail, key.data() + key.size() - sample.tail_len, + sample.tail_len); + } + } + return sample; +} + +uint32_t BuildCreateFlags(const ReadOptions& read_opts) { + uint32_t flags = 0; + if (read_opts.snapshot != nullptr) { + flags |= kHasSnapshot; + } + if (read_opts.iterate_lower_bound != nullptr) { + flags |= kHasLowerBound; + } + if (read_opts.iterate_upper_bound != nullptr) { + flags |= kHasUpperBound; + } + if (read_opts.allow_unprepared_value) { + flags |= kAllowUnpreparedValue; + } + if (read_opts.total_order_seek) { + flags |= kTotalOrderSeek; + } + if (read_opts.prefix_same_as_start) { + flags |= kPrefixSameAsStart; + } + if (read_opts.tailing) { + flags |= kTailing; + } + if (read_opts.pin_data) { + flags |= kPinData; + } + if (read_opts.auto_refresh_iterator_with_snapshot) { + flags |= kAutoRefreshIterator; + } + return flags; +} + +uint64_t HashCurrentThreadId() { + return std::hash{}(std::this_thread::get_id()); +} + +class DbStressPublicIteratorTraceLog { + public: + DbStressPublicIteratorTraceLog() + : next_sequence_(0), + next_iterator_id_(0), + next_slot_(0), + dropped_threads_(0), + dump_started_(0), + log_fd_(-1), + logs_{} { + log_path_[0] = '\0'; + } + + ~DbStressPublicIteratorTraceLog() { +#ifndef OS_WIN + if (log_fd_ >= 0) { + close(log_fd_); + } + if (dump_started_.load(std::memory_order_relaxed) == 0 && + log_path_[0] != '\0') { + unlink(log_path_); + } +#endif + } + + void SetLogFilePath(const std::string& path) { +#ifndef OS_WIN + if (log_fd_ >= 0) { + close(log_fd_); + log_fd_ = -1; + } + if (dump_started_.load(std::memory_order_relaxed) == 0 && + log_path_[0] != '\0') { + unlink(log_path_); + } +#endif + + size_t len = std::min(path.size(), sizeof(log_path_) - 1); + memcpy(log_path_, path.data(), len); + log_path_[len] = '\0'; + +#ifndef OS_WIN + if (log_path_[0] == '\0') { + return; + } + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef O_CLOEXEC + flags |= O_CLOEXEC; +#endif + log_fd_ = open(log_path_, flags, 0644); +#endif + } + + std::string GetLogFilePath() const { + if (log_path_[0] == '\0') { + return std::string(); + } + return std::string(log_path_); + } + + uint64_t NextIteratorId() { + return next_iterator_id_.fetch_add(1, std::memory_order_relaxed) + 1; + } + + TSAN_SUPPRESSION void RecordIteratorCreate(uint64_t iterator_id, + uint32_t cf_id, + const ReadOptions& read_opts) { + KeySample lower{}; + KeySample upper{}; + if (read_opts.iterate_lower_bound != nullptr) { + lower = CaptureKeySample(*read_opts.iterate_lower_bound); + } + if (read_opts.iterate_upper_bound != nullptr) { + upper = CaptureKeySample(*read_opts.iterate_upper_bound); + } + Record(IteratorTraceEventType::kCreate, iterator_id, cf_id, lower, upper, + false, false, Status::OK(), BuildCreateFlags(read_opts), + static_cast( + reinterpret_cast(read_opts.snapshot)), + 0, kResultBoolUnset); + } + + TSAN_SUPPRESSION void RecordIteratorOp(IteratorTraceEventType event_type, + uint64_t iterator_id, uint32_t cf_id, + const KeySample& key0, + const KeySample& key1, + bool valid_before, bool valid_after, + const Status& status, + uint32_t flags = 0, uint64_t aux0 = 0, + uint64_t aux1 = 0, + uint8_t result_bool = + kResultBoolUnset) { + Record(event_type, iterator_id, cf_id, key0, key1, valid_before, + valid_after, status, flags, aux0, aux1, result_bool); + } + + TSAN_SUPPRESSION void DumpRaw() { +#ifndef OS_WIN + if (log_fd_ < 0) { + return; + } + + uint32_t expected = 0; + if (!dump_started_.compare_exchange_strong(expected, 1, + std::memory_order_relaxed)) { + return; + } + + uint32_t used_slots = 0; + for (size_t slot = 0; slot < kMaxTraceThreads; ++slot) { + if (logs_[slot].head.load(std::memory_order_relaxed) != 0) { + ++used_slots; + } + } + + TraceFileHeader header; + for (size_t i = 0; i < kTraceFileMagic.size(); ++i) { + header.magic[i] = kTraceFileMagic[i]; + } + header.trace_budget_bytes = kTraceBudgetBytes; + header.dropped_threads = dropped_threads_.load(std::memory_order_relaxed); + header.next_sequence = next_sequence_.load(std::memory_order_relaxed); + header.next_iterator_id = next_iterator_id_.load(std::memory_order_relaxed); + header.dump_timestamp_us = 0; + header.version = kTraceFileVersion; + header.header_size = static_cast(sizeof(header)); + header.slot_header_size = static_cast(sizeof(TraceFileSlotHeader)); + header.entry_size = static_cast(sizeof(TraceEntry)); + header.max_threads = static_cast(kMaxTraceThreads); + header.entries_per_thread = static_cast(kEntriesPerThread); + header.used_slots = used_slots; + header.reserved0 = 0; + + if (!WriteAll(log_fd_, reinterpret_cast(&header), + sizeof(header))) { + return; + } + + std::array chunk; + for (size_t slot = 0; slot < kMaxTraceThreads; ++slot) { + const uint64_t total = logs_[slot].head.load(std::memory_order_relaxed); + if (total == 0) { + continue; + } + + const uint64_t count = + std::min(total, static_cast(kEntriesPerThread)); + const uint64_t start = + (total >= kEntriesPerThread) ? (total % kEntriesPerThread) : 0; + + TraceFileSlotHeader slot_header; + slot_header.thread_id_hash = + logs_[slot].thread_id_hash.load(std::memory_order_relaxed); + slot_header.total_entries = total; + slot_header.slot = static_cast(slot); + slot_header.entry_count = static_cast(count); + slot_header.reserved0 = 0; + slot_header.reserved1 = 0; + + if (!WriteAll(log_fd_, reinterpret_cast(&slot_header), + sizeof(slot_header))) { + return; + } + + size_t chunk_count = 0; + for (uint64_t i = 0; i < count; ++i) { + const uint64_t idx = (start + i) % kEntriesPerThread; + TraceEntry& dst_entry = chunk[chunk_count++]; + CopyVolatileBytes( + reinterpret_cast(&logs_[slot].entries[idx]), + reinterpret_cast(&dst_entry), sizeof(dst_entry)); + if (chunk_count == chunk.size() || i + 1 == count) { + if (!WriteAll(log_fd_, reinterpret_cast(chunk.data()), + chunk_count * sizeof(TraceEntry))) { + return; + } + chunk_count = 0; + } + } + } +#endif + } + + private: + void Record(IteratorTraceEventType event_type, uint64_t iterator_id, + uint32_t cf_id, const KeySample& key0, const KeySample& key1, + bool valid_before, bool valid_after, const Status& status, + uint32_t flags, uint64_t aux0, uint64_t aux1, + uint8_t result_bool) { + const uint8_t slot = GetThreadSlot(); + if (slot == kNoThreadSlot) { + return; + } + ThreadTraceLog& log = logs_[slot]; + const uint64_t seq = + next_sequence_.fetch_add(1, std::memory_order_relaxed) + 1; + const uint64_t pos = log.head.fetch_add(1, std::memory_order_relaxed); + TraceEntry& entry = log.entries[pos % kEntriesPerThread]; + entry = TraceEntry{}; + entry.object_id = iterator_id; + entry.aux0 = aux0; + entry.aux1 = aux1; + entry.os_thread_id_hash = static_cast( + log.thread_id_hash.load(std::memory_order_relaxed)); + entry.cf_id = cf_id; + entry.flags = flags; + entry.slot = slot; + entry.event_type = static_cast(event_type); + entry.status_code = static_cast(status.code()); + entry.status_subcode = static_cast(status.subcode()); + entry.valid_before = valid_before ? 1 : 0; + entry.valid_after = valid_after ? 1 : 0; + entry.result_bool = result_bool; + entry.key0 = key0; + entry.key1 = key1; + entry.timestamp_us = NowMicros(); + std::atomic_signal_fence(std::memory_order_release); + entry.sequence = seq; + } + + uint8_t GetThreadSlot() { + static thread_local uint8_t thread_slot = kNoThreadSlot; + static thread_local bool slot_initialized = false; + if (slot_initialized) { + return thread_slot; + } + slot_initialized = true; + const uint32_t slot = next_slot_.fetch_add(1, std::memory_order_relaxed); + if (slot >= kMaxTraceThreads) { + dropped_threads_.fetch_add(1, std::memory_order_relaxed); + return thread_slot; + } + thread_slot = static_cast(slot); + logs_[slot].thread_id_hash.store(HashCurrentThreadId(), + std::memory_order_relaxed); + return thread_slot; + } + + std::atomic next_sequence_; + std::atomic next_iterator_id_; + std::atomic next_slot_; + std::atomic dropped_threads_; + std::atomic dump_started_; + int log_fd_; + ThreadTraceLog logs_[kMaxTraceThreads]; + char log_path_[PATH_MAX]; +}; + +class DbStressTraceIterator : public Iterator { + public: + DbStressTraceIterator(std::unique_ptr&& iter, + DbStressPublicIteratorTraceLog* trace_log, + uint32_t cf_id, const ReadOptions& read_opts) + : iter_(std::move(iter)), + trace_log_(trace_log), + cf_id_(cf_id), + iterator_id_(trace_log_->NextIteratorId()) { + trace_log_->RecordIteratorCreate(iterator_id_, cf_id_, read_opts); + } + + bool Valid() const override { return iter_->Valid(); } + + void SeekToFirst() override { + KeySample before{}; + if (iter_->Valid()) { + before = CaptureKeySample(iter_->key()); + } + iter_->SeekToFirst(); + KeySample after{}; + if (iter_->Valid()) { + after = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp(IteratorTraceEventType::kSeekToFirst, + iterator_id_, cf_id_, before, after, + before.full_len != 0, iter_->Valid(), + iter_->status()); + } + + void SeekToLast() override { + KeySample before{}; + if (iter_->Valid()) { + before = CaptureKeySample(iter_->key()); + } + iter_->SeekToLast(); + KeySample after{}; + if (iter_->Valid()) { + after = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp(IteratorTraceEventType::kSeekToLast, + iterator_id_, cf_id_, before, after, + before.full_len != 0, iter_->Valid(), + iter_->status()); + } + + void Seek(const Slice& target) override { + KeySample target_key = CaptureKeySample(target); + const bool valid_before = iter_->Valid(); + iter_->Seek(target); + KeySample result{}; + if (iter_->Valid()) { + result = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp(IteratorTraceEventType::kSeek, iterator_id_, + cf_id_, target_key, result, valid_before, + iter_->Valid(), iter_->status()); + } + + void SeekForPrev(const Slice& target) override { + KeySample target_key = CaptureKeySample(target); + const bool valid_before = iter_->Valid(); + iter_->SeekForPrev(target); + KeySample result{}; + if (iter_->Valid()) { + result = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp(IteratorTraceEventType::kSeekForPrev, + iterator_id_, cf_id_, target_key, result, + valid_before, iter_->Valid(), iter_->status()); + } + + void Next() override { + KeySample before{}; + if (iter_->Valid()) { + before = CaptureKeySample(iter_->key()); + } + iter_->Next(); + KeySample after{}; + if (iter_->Valid()) { + after = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp(IteratorTraceEventType::kNext, iterator_id_, + cf_id_, before, after, before.full_len != 0, + iter_->Valid(), iter_->status()); + } + + void Prev() override { + KeySample before{}; + if (iter_->Valid()) { + before = CaptureKeySample(iter_->key()); + } + iter_->Prev(); + KeySample after{}; + if (iter_->Valid()) { + after = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp(IteratorTraceEventType::kPrev, iterator_id_, + cf_id_, before, after, before.full_len != 0, + iter_->Valid(), iter_->status()); + } + + Status Refresh() override { return Refresh(nullptr); } + + Status Refresh(const Snapshot* snapshot) override { + KeySample before{}; + if (iter_->Valid()) { + before = CaptureKeySample(iter_->key()); + } + Status status = iter_->Refresh(snapshot); + KeySample after{}; + if (iter_->Valid()) { + after = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp( + IteratorTraceEventType::kRefresh, iterator_id_, cf_id_, before, after, + before.full_len != 0, iter_->Valid(), status, + snapshot != nullptr ? static_cast(kHasSnapshot) : 0u, + static_cast(reinterpret_cast(snapshot))); + return status; + } + + bool PrepareValue() override { + KeySample before{}; + if (iter_->Valid()) { + before = CaptureKeySample(iter_->key()); + } + bool ok = iter_->PrepareValue(); + KeySample after{}; + if (iter_->Valid()) { + after = CaptureKeySample(iter_->key()); + } + trace_log_->RecordIteratorOp(IteratorTraceEventType::kPrepareValue, + iterator_id_, cf_id_, before, after, + before.full_len != 0, iter_->Valid(), + iter_->status(), 0, 0, 0, ok ? 1 : 0); + return ok; + } + + Slice key() const override { return iter_->key(); } + + Status status() const override { return iter_->status(); } + + Slice value() const override { return iter_->value(); } + + const WideColumns& columns() const override { return iter_->columns(); } + + Status GetProperty(std::string prop_name, std::string* prop) override { + return iter_->GetProperty(std::move(prop_name), prop); + } + + Slice timestamp() const override { return iter_->timestamp(); } + + void Prepare(const MultiScanArgs& scan_opts) override { + iter_->Prepare(scan_opts); + } + + private: + std::unique_ptr iter_; + DbStressPublicIteratorTraceLog* trace_log_; + uint32_t cf_id_; + uint64_t iterator_id_; +}; + +std::unique_ptr + g_db_stress_public_iterator_trace; + +} // namespace + +bool IsDbStressPublicIteratorTraceEnabled() { + return FLAGS_trace_public_iterator_api && g_db_stress_public_iterator_trace; +} + +void InitDbStressPublicIteratorTrace(const std::string& path) { + if (!FLAGS_trace_public_iterator_api) { + return; + } + if (!g_db_stress_public_iterator_trace) { + g_db_stress_public_iterator_trace.reset( + new DbStressPublicIteratorTraceLog()); + } + g_db_stress_public_iterator_trace->SetLogFilePath(path); +} + +void DumpDbStressPublicIteratorTrace() { + if (!g_db_stress_public_iterator_trace) { + return; + } + g_db_stress_public_iterator_trace->DumpRaw(); +} + +std::string GetDbStressPublicIteratorTracePath() { + if (!g_db_stress_public_iterator_trace) { + return std::string(); + } + return g_db_stress_public_iterator_trace->GetLogFilePath(); +} + +std::unique_ptr MaybeWrapDbStressTraceIterator( + std::unique_ptr iter, const ReadOptions& read_opts, + ColumnFamilyHandle* column_family) { + if (!g_db_stress_public_iterator_trace || !iter) { + return iter; + } + const uint32_t cf_id = column_family != nullptr ? column_family->GetID() : 0; + return std::unique_ptr(new DbStressTraceIterator( + std::move(iter), g_db_stress_public_iterator_trace.get(), cf_id, + read_opts)); +} + +std::unique_ptr NewDbStressTraceIterator( + DB* db, const ReadOptions& read_opts, ColumnFamilyHandle* column_family) { + if (db == nullptr) { + return nullptr; + } + if (column_family != nullptr) { + return MaybeWrapDbStressTraceIterator( + std::unique_ptr(db->NewIterator(read_opts, column_family)), + read_opts, column_family); + } + return MaybeWrapDbStressTraceIterator( + std::unique_ptr(db->NewIterator(read_opts)), read_opts, + nullptr); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS diff --git a/db_stress_tool/db_stress_trace.h b/db_stress_tool/db_stress_trace.h new file mode 100644 index 000000000000..89fdb28a6587 --- /dev/null +++ b/db_stress_tool/db_stress_trace.h @@ -0,0 +1,33 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifdef GFLAGS +#pragma once + +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" + +namespace ROCKSDB_NAMESPACE { + +bool IsDbStressPublicIteratorTraceEnabled(); +void InitDbStressPublicIteratorTrace(const std::string& path); +void DumpDbStressPublicIteratorTrace(); +std::string GetDbStressPublicIteratorTracePath(); + +std::unique_ptr MaybeWrapDbStressTraceIterator( + std::unique_ptr iter, const ReadOptions& read_opts, + ColumnFamilyHandle* column_family = nullptr); + +std::unique_ptr NewDbStressTraceIterator( + DB* db, const ReadOptions& read_opts, + ColumnFamilyHandle* column_family = nullptr); + +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 3afe0f7b1d7b..b3c1d0f0b1b8 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -355,7 +355,7 @@ void MultiOpsTxnsStressTest::ReopenAndPreloadDbIfNeeded(SharedState* shared) { ropt.snapshot = snapshot->snapshot(); ropt.auto_refresh_iterator_with_snapshot = true; } - std::unique_ptr iter(db_->NewIterator(ropt)); + std::unique_ptr iter(NewTraceIterator(db_, ropt)); iter->SeekToFirst(); if (!iter->Valid()) { db_empty = true; @@ -1154,7 +1154,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { sqfc_factory_->GetTableFilterForRangeQuery(start_key, iter_ub); } - std::unique_ptr it(db_->NewIterator(ropts)); + std::unique_ptr it(NewTraceIterator(db_, ropts)); for (it->Seek(start_key); it->Valid(); it->Next()) { Record record; Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); @@ -1206,7 +1206,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { FLAGS_auto_refresh_iterator_with_snapshot; ropts.total_order_seek = true; - std::unique_ptr it(db_->NewIterator(ropts)); + std::unique_ptr it(NewTraceIterator(db_, ropts)); for (it->Seek(start_key); it->Valid(); it->Next()) { ++secondary_index_entries_count; Record record; @@ -1307,7 +1307,7 @@ void MultiOpsTxnsStressTest::VerifyPkSkFast(const ReadOptions& read_options, ropts.total_order_seek = true; ropts.io_activity = read_options.io_activity; - std::unique_ptr it(db_->NewIterator(ropts)); + std::unique_ptr it(NewTraceIterator(db_, ropts)); for (it->Seek(start_key); it->Valid(); it->Next()) { Record record; Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); @@ -1682,7 +1682,7 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) { ropts.table_filter = sqfc_factory_->GetTableFilterForRangeQuery(pk_lb, pk_ub); } - std::unique_ptr it(db_->NewIterator(ropts)); + std::unique_ptr it(NewTraceIterator(db_, ropts)); for (it->SeekToFirst(); it->Valid(); it->Next()) { Record record; diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index db07077131d6..7c303e081fb5 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -105,7 +105,7 @@ class NonBatchedOpsStressTest : public StressTest { } std::unique_ptr iter( - db_->NewIterator(options, column_families_[cf])); + NewTraceIterator(db_, options, column_families_[cf])); std::string seek_key = Key(start); iter->Seek(seek_key); @@ -525,7 +525,8 @@ class NonBatchedOpsStressTest : public StressTest { { uint32_t crc = 0; - std::unique_ptr it(secondary_db_->NewIterator(read_opts)); + std::unique_ptr it( + NewTraceIterator(secondary_db_.get(), read_opts)); s = checksum_column_family(it.get(), &crc); if (!s.ok()) { fprintf(stderr, "Computing checksum of default cf: %s\n", @@ -552,7 +553,7 @@ class NonBatchedOpsStressTest : public StressTest { read_opts.snapshot = snapshot->snapshot(); } std::unique_ptr iter( - secondary_db_->NewIterator(read_opts, handle)); + NewTraceIterator(secondary_db_.get(), read_opts, handle)); // Skip SeekToFirst, SeekToLast, SeekForPrev, and Prev when backward // scan is disabled. uint32_t rnd = @@ -1747,7 +1748,7 @@ class NonBatchedOpsStressTest : public StressTest { snapshot = std::make_unique(db_); ro_copy.snapshot = snapshot->snapshot(); } - std::unique_ptr iter(db_->NewIterator(ro_copy, cfh)); + std::unique_ptr iter(NewTraceIterator(db_, ro_copy, cfh)); uint64_t count = 0; Status s; @@ -2588,9 +2589,10 @@ class NonBatchedOpsStressTest : public StressTest { for (auto cf_index : rand_column_families) { cfhs.emplace_back(column_families_[cf_index]); } - return db_->NewCoalescingIterator(debug_ro, cfhs); + return WrapTraceIterator(db_->NewCoalescingIterator(debug_ro, cfhs), + debug_ro); } - return std::unique_ptr(db_->NewIterator(debug_ro, cfh)); + return NewTraceIterator(db_, debug_ro, cfh); }; auto dump_debug_iter = [&](const char* label, const ReadOptions& debug_ro, @@ -2750,9 +2752,9 @@ class NonBatchedOpsStressTest : public StressTest { cfhs.emplace_back(column_families_[cf_index]); } assert(!cfhs.empty()); - iter = db_->NewCoalescingIterator(ro, cfhs); + iter = WrapTraceIterator(db_->NewCoalescingIterator(ro, cfhs), ro); } else { - iter = std::unique_ptr(db_->NewIterator(ro, cfh)); + iter = NewTraceIterator(db_, ro, cfh); } for (int64_t i = 0; i < static_cast(expected_values_size); ++i) { diff --git a/src.mk b/src.mk index 6b5f539b2218..d5cbb9da0000 100644 --- a/src.mk +++ b/src.mk @@ -413,6 +413,7 @@ STRESS_LIB_SOURCES = \ db_stress_tool/db_stress_gflags.cc \ db_stress_tool/db_stress_listener.cc \ db_stress_tool/db_stress_shared_state.cc \ + db_stress_tool/db_stress_trace.cc \ db_stress_tool/db_stress_test_base.cc \ db_stress_tool/db_stress_tool.cc \ db_stress_tool/db_stress_wide_merge_operator.cc \ diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 8f0b0bb1c881..769fd4cf6061 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -2,6 +2,7 @@ # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. import argparse +import db_stress_trace_parser import glob import math import os @@ -507,6 +508,9 @@ def apply_random_seed_per_iteration(): # except on remote filesystem _TEST_EXPECTED_DIR_ENV_VAR = "TEST_TMPDIR_EXPECTED" _DEBUG_LEVEL_ENV_VAR = "DEBUG_LEVEL" +_TRACE_ARTIFACT_DIRNAME = "db_stress_trace_artifacts" +_PUBLIC_ITERATOR_TRACE_PREFIX = "db_stress_public_iterator_trace" +_PUBLIC_ITERATOR_TRACE_KEEP_LAST_RUNS = 5 stress_cmd = "./db_stress" @@ -2004,6 +2008,102 @@ def cleanup_after_success(dbname): sys.exit(2) +def _get_trace_artifact_dir(): + artifact_dir = os.path.join( + os.environ.get(_TEST_DIR_ENV_VAR) or "/tmp", _TRACE_ARTIFACT_DIRNAME + ) + os.makedirs(artifact_dir, exist_ok=True) + return artifact_dir + + +def _sanitize_trace_label(label): + sanitized = re.sub(r"[^A-Za-z0-9._-]+", "-", str(label)) + sanitized = sanitized.strip(".-") + return sanitized or "run" + + +def _format_trace_timestamp(unix_seconds): + try: + return time.strftime("%Y%m%d-%H%M%S", time.localtime(unix_seconds)) + except (OverflowError, OSError, ValueError): + return "unknown-time" + + +def _parse_trace_pid_and_timestamp(raw_log, raw_prefix): + match = re.fullmatch( + rf"{re.escape(raw_prefix)}_(\d+)_(\d+)\.bin$", os.path.basename(raw_log) + ) + if match is None: + return None, None + return int(match.group(1)), int(match.group(2)) + + +def _allocate_trace_archive_base(raw_log, raw_prefix, run_label): + pid, unix_seconds = _parse_trace_pid_and_timestamp(raw_log, raw_prefix) + parts = [raw_prefix, _sanitize_trace_label(run_label)] + if unix_seconds is not None: + parts.append(_format_trace_timestamp(unix_seconds)) + parts.append(f"ts{unix_seconds}") + if pid is not None: + parts.append(f"pid{pid}") + if pid is None and unix_seconds is None: + parts.append(os.path.splitext(os.path.basename(raw_log))[0]) + + base = os.path.join(_get_trace_artifact_dir(), ".".join(parts)) + candidate = base + suffix = 1 + while os.path.exists(candidate + ".bin") or os.path.exists(candidate + ".log"): + suffix += 1 + candidate = f"{base}.dup{suffix}" + return candidate + + +def _prune_archived_public_iterator_traces(keep_last_runs): + if keep_last_runs <= 0: + return + + archived = [] + pattern = os.path.join(_get_trace_artifact_dir(), _PUBLIC_ITERATOR_TRACE_PREFIX + ".*.bin") + for raw_path in glob.glob(pattern): + base, _ = os.path.splitext(raw_path) + newest_mtime = os.path.getmtime(raw_path) + decoded_log = base + ".log" + if os.path.exists(decoded_log): + newest_mtime = max(newest_mtime, os.path.getmtime(decoded_log)) + archived.append((newest_mtime, base)) + + archived.sort(reverse=True) + for _, base in archived[keep_last_runs:]: + for ext in (".bin", ".log"): + path = base + ext + if os.path.exists(path): + os.remove(path) + + +def _print_public_iterator_trace_log(decoded_log, max_tail_lines): + print("=== db_stress public iterator trace: %s ===" % decoded_log) + with open(decoded_log) as f: + lines = f.readlines() + if len(lines) <= max_tail_lines: + print("".join(lines), end="") + else: + print("".join(lines[:2]), end="") + skipped = len(lines) - max_tail_lines + print( + "... (%d lines omitted, showing last %d)\n" % (skipped, max_tail_lines), + end="", + ) + print("".join(lines[-max_tail_lines:]), end="") + + +def _format_process_exit_label(returncode, hit_timeout): + if hit_timeout: + return "sigterm-timeout" + if returncode is None: + return "unknown-exit" + if returncode < 0: + return f"signal-{abs(returncode)}" + return f"exit-{returncode}" def print_and_cleanup_fault_injection_log(pid): # Fault injection logs are stored in TEST_TMPDIR (or /tmp) to survive # DB reopen cleanup, and to be included in sandcastle's db.tar.gz artifact. @@ -2052,12 +2152,50 @@ def print_and_cleanup_fault_injection_log(pid): pass +def print_and_cleanup_public_iterator_trace( + pid, run_label=None, keep_last_runs=_PUBLIC_ITERATOR_TRACE_KEEP_LAST_RUNS +): + max_tail_lines = 64 + run_label = run_label or f"pid-{pid}" + log_dir = os.environ.get(_TEST_DIR_ENV_VAR) or "/tmp" + pattern = os.path.join(log_dir, _PUBLIC_ITERATOR_TRACE_PREFIX + "_%d_*.bin" % pid) + archived_paths = [] + for raw_log in sorted(glob.glob(pattern)): + archive_base = _allocate_trace_archive_base( + raw_log, _PUBLIC_ITERATOR_TRACE_PREFIX, run_label + ) + archived_raw = archive_base + ".bin" + decoded_log = archive_base + ".log" + try: + shutil.move(raw_log, archived_raw) + db_stress_trace_parser.decode_public_iterator_trace(archived_raw, decoded_log) + print("Saved db_stress public iterator trace raw: %s" % archived_raw) + print("Saved db_stress public iterator trace log: %s" % decoded_log) + _print_public_iterator_trace_log(decoded_log, max_tail_lines) + archived_paths.append((archived_raw, decoded_log)) + except (OSError, ValueError) as exc: + if os.path.exists(raw_log) and not os.path.exists(archived_raw): + try: + shutil.move(raw_log, archived_raw) + except OSError: + archived_raw = raw_log + print("Saved db_stress public iterator trace raw: %s" % archived_raw) + print( + "WARNING: failed to decode db_stress public iterator trace %s: %s\n" + % (archived_raw, exc) + ) + archived_paths.append((archived_raw, None)) + _prune_archived_public_iterator_traces(keep_last_runs) + return archived_paths + + # This script runs and kills db_stress multiple times. It checks consistency # in case of unsafe crashes in RocksDB. def blackbox_crash_main(args, unknown_args): cmd_params = gen_cmd_params(args) dbname = get_dbname("blackbox") exit_time = time.time() + cmd_params["duration"] + run_index = 0 print( "Running blackbox-crash-test with \n" @@ -2070,6 +2208,7 @@ def blackbox_crash_main(args, unknown_args): ) while time.time() < exit_time: + run_index += 1 apply_random_seed_per_iteration() cmd, finalized_params = gen_cmd( dict(list(cmd_params.items()) + list({"db": dbname}.items())), unknown_args @@ -2078,6 +2217,11 @@ def blackbox_crash_main(args, unknown_args): hit_timeout, retcode, outs, errs, pid = execute_cmd(cmd, cmd_params["interval"]) print_and_cleanup_fault_injection_log(pid) + print_and_cleanup_public_iterator_trace( + pid, + "blackbox_run_%04d_%s" + % (run_index, _format_process_exit_label(retcode, hit_timeout)), + ) outs, errs = strip_expected_sigterm_stderr(outs, errs, hit_timeout) # Reset destroy_db_initially after each run (it may have been set by @@ -2109,6 +2253,11 @@ def blackbox_crash_main(args, unknown_args): ) print_and_cleanup_fault_injection_log(pid) + print_and_cleanup_public_iterator_trace( + pid, + "blackbox_verify_run_%04d_%s" + % (run_index + 1, _format_process_exit_label(retcode, hit_timeout)), + ) # For the final run print_run_output_and_exit_on_error(args, finalized_params, outs, errs) @@ -2141,7 +2290,9 @@ def whitebox_crash_main(args, unknown_args): prev_compaction_style = -1 succeeded = True hit_timeout = False + run_index = 0 while time.time() < exit_time: + run_index += 1 apply_random_seed_per_iteration() if check_mode == 0: additional_opts = { @@ -2254,6 +2405,18 @@ def whitebox_crash_main(args, unknown_args): cmd, exit_time - time.time() + 900 ) + print_and_cleanup_fault_injection_log(pid) + print_and_cleanup_public_iterator_trace( + pid, + "whitebox_run_%04d_check_%d_%s_%s" + % ( + run_index, + check_mode, + "kill" if additional_opts["kill_random_test"] is not None else "normal", + _format_process_exit_label(retncode, hit_timeout), + ), + ) + # Reset destroy_db_initially after each run (it may have been set by # command line for first run, or set for various reasons for a step) cmd_params["destroy_db_initially"] = 0 diff --git a/tools/db_crashtest_test.py b/tools/db_crashtest_test.py index dfb49db7578c..21cfd808de98 100644 --- a/tools/db_crashtest_test.py +++ b/tools/db_crashtest_test.py @@ -6,14 +6,20 @@ # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. import importlib.util +import io import os import shutil +import struct import sys import tempfile import unittest +from contextlib import redirect_stdout _DB_CRASHTEST_PATH = os.path.join(os.path.dirname(__file__), "db_crashtest.py") +_DB_STRESS_TRACE_PARSER_PATH = os.path.join( + os.path.dirname(__file__), "db_stress_trace_parser.py" +) _TEST_DIR_ENV_VAR = "TEST_TMPDIR" _TEST_EXPECTED_DIR_ENV_VAR = "TEST_TMPDIR_EXPECTED" @@ -32,6 +38,15 @@ def load_db_crashtest_module(): return module +def load_db_stress_trace_parser_module(): + spec = importlib.util.spec_from_file_location( + "db_stress_trace_parser_under_test", _DB_STRESS_TRACE_PARSER_PATH + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + class DBCrashTestTest(unittest.TestCase): def setUp(self): self.test_tmpdir = tempfile.mkdtemp(prefix="db_crashtest_test_") @@ -59,6 +74,9 @@ def tearDown(self): def load_db_crashtest(self): return load_db_crashtest_module() + def load_db_stress_trace_parser(self): + return load_db_stress_trace_parser_module() + def build_params(self, base_params, overrides=None): params = dict(base_params) params["db"] = self.test_tmpdir @@ -66,6 +84,66 @@ def build_params(self, base_params, overrides=None): params.update(overrides) return params + def write_sample_public_iterator_trace(self, raw_trace): + trace_parser = self.load_db_stress_trace_parser() + + def key_sample(key): + head = key[:32] + tail = key[len(head) :][-16:] if len(key) > len(head) else b"" + return struct.pack( + " 0 and full_len > head_len: + out += f"..{tail[:tail_len].hex()}" + return out + + +def _format_flags(flags): + names = [name for bit, name in _FLAG_NAMES if (flags & bit) != 0] + return "|".join(names) if names else "-" + + +def _event_type_name(event_type): + return _EVENT_TYPE_NAMES.get(event_type, f"Unknown({event_type})") + + +def _status_code_name(status_code): + return _STATUS_CODE_NAMES.get(status_code, f"Unknown({status_code})") + + +def _status_subcode_name(status_subcode): + return _STATUS_SUBCODE_NAMES.get( + status_subcode, f"Unknown({status_subcode})" + ) + + +def _format_entry(entry_blob): + ( + timestamp_us, + sequence, + object_id, + aux0, + _aux1, + os_thread_id_hash, + cf_id, + flags, + slot, + event_type, + status_code, + status_subcode, + valid_before, + valid_after, + result_bool, + _reserved0, + key0_blob, + key1_blob, + _reserved, + ) = _ENTRY_STRUCT.unpack(entry_blob) + + secs = timestamp_us // 1000000 + usecs = timestamp_us % 1000000 + event_name = _event_type_name(event_type) + code_name = _status_code_name(status_code) + subcode_name = _status_subcode_name(status_subcode) + key0 = _format_key_sample(key0_blob) + key1 = _format_key_sample(key1_blob) + + if event_type == 1: + flags_str = _format_flags(flags) + return ( + f"[{secs}.{usecs:06d}] seq={sequence} slot={slot} " + f"thread={os_thread_id_hash} iter={object_id} cf={cf_id} " + f"op={event_name} status={code_name}/{subcode_name} " + f"flags={flags_str} snapshot=0x{aux0:x} lower={key0} upper={key1}\n" + ) + if event_type in (2, 3): + return ( + f"[{secs}.{usecs:06d}] seq={sequence} slot={slot} " + f"thread={os_thread_id_hash} iter={object_id} cf={cf_id} " + f"op={event_name} valid={valid_before}->{valid_after} " + f"status={code_name}/{subcode_name} target={key0} result={key1}\n" + ) + if event_type == 8: + ok = 0 if result_bool == 0xFF else result_bool + return ( + f"[{secs}.{usecs:06d}] seq={sequence} slot={slot} " + f"thread={os_thread_id_hash} iter={object_id} cf={cf_id} " + f"op={event_name} valid={valid_before}->{valid_after} " + f"status={code_name}/{subcode_name} ok={ok} key={key0} " + f"result={key1}\n" + ) + if event_type == 9: + return ( + f"[{secs}.{usecs:06d}] seq={sequence} slot={slot} " + f"thread={os_thread_id_hash} iter={object_id} cf={cf_id} " + f"op={event_name} valid={valid_before}->{valid_after} " + f"status={code_name}/{subcode_name} snapshot=0x{aux0:x} " + f"before={key0} result={key1}\n" + ) + return ( + f"[{secs}.{usecs:06d}] seq={sequence} slot={slot} " + f"thread={os_thread_id_hash} iter={object_id} cf={cf_id} " + f"op={event_name} valid={valid_before}->{valid_after} " + f"status={code_name}/{subcode_name} before={key0} result={key1}\n" + ) + + +def decode_public_iterator_trace(raw_path, output_path=None): + if output_path is None: + output_path = raw_path + ".log" + + with open(raw_path, "rb") as infile, open(output_path, "w") as outfile: + ( + magic, + trace_budget_bytes, + dropped_threads, + next_sequence, + next_iterator_id, + dump_timestamp_us, + version, + header_size, + slot_header_size, + entry_size, + max_threads, + entries_per_thread, + used_slots, + _reserved0, + ) = _FILE_HEADER_STRUCT.unpack(_read_exact(infile, _FILE_HEADER_STRUCT.size)) + + if magic != TRACE_FILE_MAGIC: + raise ValueError(f"unexpected trace magic: {magic!r}") + if version != TRACE_FILE_VERSION: + raise ValueError(f"unsupported trace version: {version}") + if header_size != _FILE_HEADER_STRUCT.size: + raise ValueError( + f"unexpected trace header size: {header_size} != {_FILE_HEADER_STRUCT.size}" + ) + if slot_header_size != _SLOT_HEADER_STRUCT.size: + raise ValueError( + f"unexpected slot header size: {slot_header_size} != {_SLOT_HEADER_STRUCT.size}" + ) + if entry_size != _ENTRY_STRUCT.size: + raise ValueError( + f"unexpected entry size: {entry_size} != {_ENTRY_STRUCT.size}" + ) + + outfile.write( + "=== db_stress public iterator trace (decoded from raw dump) ===\n" + ) + outfile.write( + "budget=%dB threads=%d entries_per_thread=%d entry_size=%dB " + "dropped_threads=%d used_slots=%d dump_timestamp_us=%d " + "next_sequence=%d next_iterator_id=%d\n" + % ( + trace_budget_bytes, + max_threads, + entries_per_thread, + entry_size, + dropped_threads, + used_slots, + dump_timestamp_us, + next_sequence, + next_iterator_id, + ) + ) + + for _ in range(used_slots): + ( + thread_id_hash, + total_entries, + slot, + entry_count, + _reserved1, + _reserved2, + ) = _SLOT_HEADER_STRUCT.unpack( + _read_exact(infile, _SLOT_HEADER_STRUCT.size) + ) + outfile.write( + "-- slot=%d thread=%d entries=%d total=%d --\n" + % (slot, thread_id_hash, entry_count, total_entries) + ) + for _ in range(entry_count): + entry_blob = _read_exact(infile, _ENTRY_STRUCT.size) + sequence = struct.unpack_from( + " Date: Tue, 14 Apr 2026 17:18:46 -0700 Subject: [PATCH 2/5] convert fault injection fs to raw write to reduce cpu usage as well Benchmark (DEBUG_LEVEL=1, db_stress, 1 thread, readpercent=100, read_fault_one_in=1, ops_per_thread=1000000, median of 5 runs): - previous text log path: 3.45s - raw log path: 2.99s (-13.3%) --- BUCK | 4 + buckifier/buckify_rocksdb.py | 2 + db_stress_tool/db_stress_tool.cc | 6 +- tools/db_crashtest.py | 94 +++-- tools/db_crashtest_test.py | 90 +++++ tools/fault_injection_log_parser.py | 204 +++++++++++ utilities/fault_injection_fs.cc | 48 +-- utilities/fault_injection_fs.h | 512 +++++++++++++++++---------- utilities/fault_injection_fs_test.cc | 131 +++++-- 9 files changed, 802 insertions(+), 289 deletions(-) create mode 100644 tools/fault_injection_log_parser.py diff --git a/BUCK b/BUCK index fb49bff3e17b..55cc969ecca8 100644 --- a/BUCK +++ b/BUCK @@ -5796,3 +5796,7 @@ cpp_unittest_wrapper(name="write_unprepared_transaction_test", export_file(name = "tools/db_crashtest.py") + +export_file(name = "tools/db_stress_trace_parser.py") + +export_file(name = "tools/fault_injection_log_parser.py") diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index 262e54734c2d..96039fbe85c7 100755 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -360,6 +360,8 @@ def generate_buck(repo_path, deps_map): extra_compiler_flags=json.dumps(deps["extra_compiler_flags"]), ) BUCK.export_file("tools/db_crashtest.py") + BUCK.export_file("tools/db_stress_trace_parser.py") + BUCK.export_file("tools/fault_injection_log_parser.py") print(ColorString.info("Generated BUCK Summary:")) print(ColorString.info("- %d libs" % BUCK.total_lib)) diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 8fffaefbeb03..92294b88c3c9 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -370,7 +370,7 @@ int db_stress_tool(int argc, char** argv) { } // Now that FLAGS_db is resolved, set the fault injection log file path - // so that PrintAll() writes to a file instead of stderr (signal-safe). + // so the raw dump file can be pre-opened before any crash callback runs. // Store the log in TEST_TMPDIR (outside the DB directory) so it survives // DB reopen (which cleans untracked files) and gets included in the // sandcastle db.tar.gz artifact for post-failure analysis. @@ -384,7 +384,7 @@ int db_stress_tool(int argc, char** argv) { } std::string log_path = log_dir + "/fault_injection_" + std::to_string(getpid()) + "_" + - std::to_string(time(nullptr)) + ".log"; + std::to_string(time(nullptr)) + ".bin"; fault_fs_guard->SetInjectedErrorLogPath(log_path); } @@ -406,7 +406,7 @@ int db_stress_tool(int argc, char** argv) { if (fault_fs_guard || IsDbStressPublicIteratorTraceEnabled()) { port::RegisterCrashCallback([]() { if (fault_fs_guard) { - fault_fs_guard->PrintRecentInjectedErrors(); + fault_fs_guard->DumpRecentInjectedErrors(); } DumpDbStressPublicIteratorTrace(); }); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 769fd4cf6061..69a8fffda5e0 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -3,6 +3,7 @@ import argparse import db_stress_trace_parser +import fault_injection_log_parser import glob import math import os @@ -2104,50 +2105,69 @@ def _format_process_exit_label(returncode, hit_timeout): if returncode < 0: return f"signal-{abs(returncode)}" return f"exit-{returncode}" + + +def _print_fault_injection_text_log(log, max_tail_entries): + print("=== Fault injection log: %s ===" % log) + with open(log) as f: + lines = f.readlines() + header = [] + footer = [] + entries = [] + for line in lines: + stripped = line.strip() + if stripped.startswith("=== End of"): + footer.append(line) + elif stripped.startswith("===") or stripped == "(none)": + header.append(line) + else: + entries.append(line) + total_entries = len(entries) + print("".join(header), end="") + if total_entries <= max_tail_entries: + print("".join(entries), end="") + print("".join(footer), end="") + else: + skipped = total_entries - max_tail_entries + print( + "... (%d entries omitted, showing last %d. " + "Full log: %s)\n" % (skipped, max_tail_entries, log), + end="", + ) + print("".join(entries[-max_tail_entries:]), end="") + print( + "=== Showed %d of %d injected error entries ===\n" + % (max_tail_entries, total_entries), + end="", + ) + + def print_and_cleanup_fault_injection_log(pid): # Fault injection logs are stored in TEST_TMPDIR (or /tmp) to survive # DB reopen cleanup, and to be included in sandcastle's db.tar.gz artifact. # Filter by pid to only print the log from the current run. max_tail_entries = 32 log_dir = os.environ.get(_TEST_DIR_ENV_VAR) or "/tmp" - pattern = os.path.join(log_dir, "fault_injection_%d_*.log" % pid) - for log in glob.glob(pattern): - print("=== Fault injection log: %s ===" % log) + + raw_pattern = os.path.join(log_dir, "fault_injection_%d_*.bin" % pid) + for raw_log in glob.glob(raw_pattern): + decoded_log = raw_log + ".txt" try: - with open(log) as f: - lines = f.readlines() - # Log format: header line(s), entry lines, footer line. - # The footer starts with "=== End of". - # Print header and footer always, truncate entries in the middle. - header = [] - footer = [] - entries = [] - for line in lines: - stripped = line.strip() - if stripped.startswith("=== End of"): - footer.append(line) - elif stripped.startswith("===") or stripped == "(none)": - header.append(line) - else: - entries.append(line) - total_entries = len(entries) - print("".join(header), end="") - if total_entries <= max_tail_entries: - print("".join(entries), end="") - print("".join(footer), end="") - else: - skipped = total_entries - max_tail_entries - print( - "... (%d entries omitted, showing last %d. " - "Full log: %s)\n" % (skipped, max_tail_entries, log), - end="", - ) - print("".join(entries[-max_tail_entries:]), end="") - print( - "=== Showed %d of %d injected error entries ===\n" - % (max_tail_entries, total_entries), - end="", - ) + fault_injection_log_parser.decode_fault_injection_log( + raw_log, decoded_log + ) + print("Raw fault injection log: %s" % raw_log) + _print_fault_injection_text_log(decoded_log, max_tail_entries) + except (OSError, ValueError) as exc: + print( + "WARNING: failed to decode fault injection log %s: %s\n" + % (raw_log, exc) + ) + + text_pattern = os.path.join(log_dir, "fault_injection_%d_*.log" % pid) + for log in glob.glob(text_pattern): + try: + _print_fault_injection_text_log(log, max_tail_entries) except OSError: pass diff --git a/tools/db_crashtest_test.py b/tools/db_crashtest_test.py index 21cfd808de98..e421c6cfc178 100644 --- a/tools/db_crashtest_test.py +++ b/tools/db_crashtest_test.py @@ -20,6 +20,9 @@ _DB_STRESS_TRACE_PARSER_PATH = os.path.join( os.path.dirname(__file__), "db_stress_trace_parser.py" ) +_FAULT_INJECTION_LOG_PARSER_PATH = os.path.join( + os.path.dirname(__file__), "fault_injection_log_parser.py" +) _TEST_DIR_ENV_VAR = "TEST_TMPDIR" _TEST_EXPECTED_DIR_ENV_VAR = "TEST_TMPDIR_EXPECTED" @@ -47,6 +50,15 @@ def load_db_stress_trace_parser_module(): return module +def load_fault_injection_log_parser_module(): + spec = importlib.util.spec_from_file_location( + "fault_injection_log_parser_under_test", _FAULT_INJECTION_LOG_PARSER_PATH + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + class DBCrashTestTest(unittest.TestCase): def setUp(self): self.test_tmpdir = tempfile.mkdtemp(prefix="db_crashtest_test_") @@ -77,6 +89,9 @@ def load_db_crashtest(self): def load_db_stress_trace_parser(self): return load_db_stress_trace_parser_module() + def load_fault_injection_log_parser(self): + return load_fault_injection_log_parser_module() + def build_params(self, base_params, overrides=None): params = dict(base_params) params["db"] = self.test_tmpdir @@ -401,5 +416,80 @@ def test_print_and_cleanup_public_iterator_trace_keeps_last_five_runs(self): any("blackbox_run_0005_exit-0" in path for path in raw_logs + decoded_logs) ) + def test_print_and_cleanup_fault_injection_log_decodes_raw_trace(self): + db_crashtest = self.load_db_crashtest() + fault_parser = self.load_fault_injection_log_parser() + pid = 5151 + raw_log = os.path.join(self.test_tmpdir, f"fault_injection_{pid}_1.bin") + decoded_log = raw_log + ".txt" + + header = struct.pack( + "<8sQIIIIII", + fault_parser.TRACE_FILE_MAGIC, + 2, + fault_parser.TRACE_FILE_VERSION, + 40, + fault_parser.ENTRY_V2_STRUCT.size, + 1000, + 2, + 0, + ) + entry0 = fault_parser.ENTRY_V2_STRUCT.pack( + 123456789, + 17, + 7, + 4, + 0, + 0, + fault_parser.DETAIL_KIND_OFFSET_SIZE_AND_HEAD, + 4, + 1, + 0, + b"Append\0".ljust(32, b"\0"), + b"/tmp/000001.log\0".ljust(72, b"\0"), + b"injected write error\0".ljust(56, b"\0"), + b"abcd".ljust(48, b"\0"), + ) + entry1 = fault_parser.ENTRY_V2_STRUCT.pack( + 123456790, + 23, + 0, + 6, + 0, + 0, + fault_parser.DETAIL_KIND_TWO_FILES, + 6, + 0, + 1, + b"Rename\0".ljust(32, b"\0"), + b"/tmp/a\0".ljust(72, b"\0"), + b"injected metadata read error\0".ljust(56, b"\0"), + b"/tmp/b".ljust(48, b"\0"), + ) + with open(raw_log, "wb") as f: + f.write(header) + f.write(entry0) + f.write(entry1) + + stdout = io.StringIO() + with redirect_stdout(stdout): + db_crashtest.print_and_cleanup_fault_injection_log(pid) + + self.assertTrue(os.path.exists(decoded_log)) + with open(decoded_log) as f: + decoded_text = f.read() + + self.assertIn( + 'Append("/tmp/000001.log", offset=7, size=4, head=[61 62 63 64])', + decoded_text, + ) + self.assertIn("IO error: injected write error [retryable]", decoded_text) + self.assertIn( + 'Rename("/tmp/a", "/tmp/b") -> IO error: injected metadata read error [data_loss]', + decoded_text, + ) + self.assertIn(decoded_log, stdout.getvalue()) + + if __name__ == "__main__": unittest.main() diff --git a/tools/fault_injection_log_parser.py b/tools/fault_injection_log_parser.py new file mode 100644 index 000000000000..598f02f4d070 --- /dev/null +++ b/tools/fault_injection_log_parser.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# This source code is licensed under both the GPLv2 (found in the COPYING file +# in the root directory) and the Apache 2.0 License (found in the +# LICENSE.Apache file in the root directory). + +import argparse +import struct + + +TRACE_FILE_MAGIC = b"FINJLOG1" +LEGACY_TRACE_FILE_VERSION = 1 +TRACE_FILE_VERSION = 2 + +DETAIL_KIND_NONE = 0 +DETAIL_KIND_TWO_FILES = 1 +DETAIL_KIND_SIZE_AND_HEAD = 2 +DETAIL_KIND_OFFSET_SIZE_AND_HEAD = 3 +DETAIL_KIND_OFFSET_AND_SIZE = 4 +DETAIL_KIND_SIZE = 5 +DETAIL_KIND_COUNT = 6 +DETAIL_KIND_REQ_OFFSET_AND_SIZE = 7 + +HEADER_STRUCT = struct.Struct("<8sQIIIIII") +LEGACY_ENTRY_STRUCT = struct.Struct(" len(payload): + return f"{head} ..." if head else "..." + return head + + +def _format_v2_detail(detail_kind, offset, size, count, req_idx, payload): + if detail_kind == DETAIL_KIND_NONE: + return "" + if detail_kind == DETAIL_KIND_TWO_FILES: + suffix = "..." if size > len(payload) else "" + return f"\"{payload.decode('utf-8', 'replace')}{suffix}\"" + if detail_kind == DETAIL_KIND_SIZE_AND_HEAD: + return f"size={size}, head=[{_format_hex_head(payload, size)}]" + if detail_kind == DETAIL_KIND_OFFSET_SIZE_AND_HEAD: + return ( + f"offset={offset}, size={size}, " + f"head=[{_format_hex_head(payload, size)}]" + ) + if detail_kind == DETAIL_KIND_OFFSET_AND_SIZE: + return f"offset={offset}, size={size}" + if detail_kind == DETAIL_KIND_SIZE: + return f"size={size}" + if detail_kind == DETAIL_KIND_COUNT: + return f"num_reqs={count}" + if detail_kind == DETAIL_KIND_REQ_OFFSET_AND_SIZE: + return f"req[{req_idx}], offset={offset}, size={size}" + return f"detail_kind={detail_kind}" + + +def _decode_legacy_entries(infile, outfile, dumped_entries): + printed = 0 + for _ in range(dumped_entries): + timestamp_us, thread_id, context = LEGACY_ENTRY_STRUCT.unpack( + _read_exact(infile, LEGACY_ENTRY_STRUCT.size) + ) + if timestamp_us == 0: + continue + secs = timestamp_us // 1000000 + usecs = timestamp_us % 1000000 + context_str = _decode_c_string(context) + outfile.write(f"[{secs}.{usecs:06d}] thread={thread_id}: {context_str}\n") + printed += 1 + return printed + + +def _decode_v2_entries(infile, outfile, dumped_entries): + printed = 0 + for _ in range(dumped_entries): + ( + timestamp_us, + thread_id, + offset, + size, + count, + req_idx, + detail_kind, + payload_size, + retryable, + data_loss, + op_name, + file_name, + status_message, + detail_payload, + ) = ENTRY_V2_STRUCT.unpack(_read_exact(infile, ENTRY_V2_STRUCT.size)) + if timestamp_us == 0: + continue + if payload_size > len(detail_payload): + raise ValueError(f"invalid payload size in trace entry: {payload_size}") + secs = timestamp_us // 1000000 + usecs = timestamp_us % 1000000 + op_name = _decode_c_string(op_name) + file_name = _decode_c_string(file_name) + status_message = _decode_c_string(status_message) + payload = detail_payload[:payload_size] + detail = _format_v2_detail(detail_kind, offset, size, count, req_idx, payload) + line = f"[{secs}.{usecs:06d}] thread={thread_id}: {op_name}(\"{file_name}\"" + if detail: + line += f", {detail}" + line += f") -> IO error: {status_message}" + flags = [] + if retryable: + flags.append("retryable") + if data_loss: + flags.append("data_loss") + if flags: + line += " [" + ",".join(flags) + "]" + outfile.write(line + "\n") + printed += 1 + return printed + + +def decode_fault_injection_log(raw_path, output_path=None): + if output_path is None: + output_path = raw_path + ".txt" + + with open(raw_path, "rb") as infile, open(output_path, "w") as outfile: + ( + magic, + total_entries, + version, + header_size, + entry_size, + max_entries, + dumped_entries, + reserved, + ) = HEADER_STRUCT.unpack(_read_exact(infile, HEADER_STRUCT.size)) + + if magic != TRACE_FILE_MAGIC: + raise ValueError(f"unexpected trace magic: {magic!r}") + if version not in (LEGACY_TRACE_FILE_VERSION, TRACE_FILE_VERSION): + raise ValueError(f"unsupported trace version: {version}") + if header_size != HEADER_STRUCT.size: + raise ValueError( + f"unexpected trace header size: {header_size} != {HEADER_STRUCT.size}" + ) + + outfile.write( + "=== Recently Injected Fault Injection Errors (most recent last) ===\n" + ) + + if version == LEGACY_TRACE_FILE_VERSION: + if entry_size != LEGACY_ENTRY_STRUCT.size: + raise ValueError( + "unexpected legacy trace entry size: " + f"{entry_size} != {LEGACY_ENTRY_STRUCT.size}" + ) + if reserved != 256: + raise ValueError(f"unexpected legacy max message len: {reserved}") + printed = _decode_legacy_entries(infile, outfile, dumped_entries) + else: + if entry_size != ENTRY_V2_STRUCT.size: + raise ValueError( + f"unexpected trace entry size: {entry_size} != {ENTRY_V2_STRUCT.size}" + ) + printed = _decode_v2_entries(infile, outfile, dumped_entries) + + if printed == 0: + outfile.write("(none)\n") + + outfile.write( + "=== End of injected error log (%d entries, total=%d, max=%d) ===\n" + % (printed, total_entries, max_entries) + ) + + return output_path + + +def _main(): + parser = argparse.ArgumentParser( + description="Decode raw fault injection logs emitted by db_stress." + ) + parser.add_argument("raw_log", help="Path to the raw .bin fault injection log") + parser.add_argument( + "--output", + help="Path for decoded text output. Defaults to .txt", + ) + args = parser.parse_args() + output_path = decode_fault_injection_log(args.raw_log, args.output) + print(output_path) + + +if __name__ == "__main__": + _main() diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc index 90ae92c7b838..bbb1b540808e 100644 --- a/utilities/fault_injection_fs.cc +++ b/utilities/fault_injection_fs.cc @@ -1413,7 +1413,7 @@ void FaultInjectionTestFS::UntrackFile(const std::string& f) { IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError( const IOOptions& io_options, const char* op_name, - const std::string& file_name, std::function detail_fn, + const std::string& file_name, InjectedErrorLog::DetailRef detail, ErrorOperation op, Slice* result, bool direct_io, char* scratch, bool need_count_increase, bool* fault_injected) { bool dummy_bool; @@ -1429,7 +1429,7 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError( IOStatus ret; if (ctx->rand.OneIn(ctx->one_in)) { if (ctx->count == 0) { - ctx->message = ""; + ctx->message.clear(); } if (need_count_increase) { ctx->count++; @@ -1439,12 +1439,9 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError( } ctx->callstack = port::SaveStack(&ctx->frames); - std::stringstream msg; - msg << FaultInjectionTestFS::kInjected << " "; if (op != ErrorOperation::kMultiReadSingleReq) { // Likely non-per read status code for MultiRead - msg << "read error"; - ctx->message = msg.str(); + ctx->message = kInjectedReadError; ret_fault_injected = true; ret = IOStatus::IOError(ctx->message); } else if (Random::GetTLSInstance()->OneIn(8)) { @@ -1452,8 +1449,7 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError( // For a small chance, set the failure to status but turn the // result to be empty, which is supposed to be caught for a check. *result = Slice(); - msg << "empty result"; - ctx->message = msg.str(); + ctx->message = kInjectedEmptyResult; ret_fault_injected = true; ret = IOStatus::IOError(ctx->message); } else if (!direct_io && Random::GetTLSInstance()->OneIn(7) && @@ -1471,13 +1467,11 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError( // It would work for CRC. Not 100% sure for xxhash and will adjust // if it is not the case. const_cast(result->data())[result->size() - 1]++; - msg << "corrupt last byte"; - ctx->message = msg.str(); + ctx->message = kInjectedCorruptLastByte; ret_fault_injected = true; ret = IOStatus::IOError(ctx->message); } else { - msg << "error result multiget single"; - ctx->message = msg.str(); + ctx->message = kInjectedErrorResultMultiGetSingle; ret_fault_injected = true; ret = IOStatus::IOError(ctx->message); } @@ -1486,15 +1480,8 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError( ret.SetRetryable(ctx->retryable); ret.SetDataLoss(ctx->has_data_loss); if (!ret.ok()) { - std::string detail = detail_fn ? detail_fn() : ""; - if (detail.empty()) { - injected_error_log_.Record("%s(\"%.128s\") -> %s", op_name, - file_name.c_str(), ret.ToString().c_str()); - } else { - injected_error_log_.Record("%s(\"%.128s\", %s) -> %s", op_name, - file_name.c_str(), detail.c_str(), - ret.ToString().c_str()); - } + injected_error_log_.Record(op_name, file_name, detail, ctx->message, + ctx->retryable, ctx->has_data_loss); } return ret; } @@ -1508,13 +1495,13 @@ bool FaultInjectionTestFS::TryParseFileName(const std::string& file_name, IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError( FaultInjectionIOType type, const IOOptions& io_options, const char* op_name, - const std::string& file_name, std::function detail_fn, + const std::string& file_name, InjectedErrorLog::DetailRef detail, ErrorOperation op, Slice* result, bool direct_io, char* scratch, bool need_count_increase, bool* fault_injected) { if (type == FaultInjectionIOType::kRead) { return MaybeInjectThreadLocalReadError( - io_options, op_name, file_name, std::move(detail_fn), op, result, - direct_io, scratch, need_count_increase, fault_injected); + io_options, op_name, file_name, detail, op, result, direct_io, scratch, + need_count_increase, fault_injected); } ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type); @@ -1536,17 +1523,8 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError( ret = IOStatus::IOError(ctx->message); ret.SetRetryable(ctx->retryable); ret.SetDataLoss(ctx->has_data_loss); - { - std::string detail = detail_fn ? detail_fn() : ""; - if (detail.empty()) { - injected_error_log_.Record("%s(\"%.128s\") -> %s", op_name, - file_name.c_str(), ret.ToString().c_str()); - } else { - injected_error_log_.Record("%s(\"%.128s\", %s) -> %s", op_name, - file_name.c_str(), detail.c_str(), - ret.ToString().c_str()); - } - } + injected_error_log_.Record(op_name, file_name, detail, ctx->message, + ctx->retryable, ctx->has_data_loss); if (type == FaultInjectionIOType::kWrite) { TEST_SYNC_POINT( "FaultInjectionTestFS::InjectMetadataWriteError:Injected"); diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index 31102c1ce1e4..26fe93e71105 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -17,10 +17,12 @@ #pragma once #include +#include #include #include -#include +#include #include +#include #include #include #include @@ -48,61 +50,148 @@ namespace ROCKSDB_NAMESPACE { // A fixed-size circular buffer that records recently injected errors. // Thread-safe for concurrent writes. Designed to be safe to read from a -// signal handler (PrintAll uses only fprintf to stderr). +// signal handler by dumping raw binary data to a pre-opened file. class InjectedErrorLog { public: static constexpr size_t kMaxEntries = 1000; - static constexpr size_t kMaxMessageLen = 256; + static constexpr size_t kMaxOpNameLen = 32; + static constexpr size_t kMaxFileNameLen = 72; + static constexpr size_t kMaxStatusMessageLen = 56; + static constexpr size_t kMaxDetailPayloadLen = 48; + static constexpr uint32_t kFileVersion = 2; + static constexpr std::array kFileMagic = {'F', 'I', 'N', 'J', + 'L', 'O', 'G', '1'}; + + enum class DetailKind : uint8_t { + kNone = 0, + kTwoFiles = 1, + kSizeAndHead = 2, + kOffsetSizeAndHead = 3, + kOffsetAndSize = 4, + kSize = 5, + kCount = 6, + kReqOffsetAndSize = 7, + }; + + // Borrowed raw detail payload. The referenced Slice is consumed + // synchronously when a fault is actually injected. + struct DetailRef { + DetailKind kind = DetailKind::kNone; + uint64_t offset = 0; + uint64_t size = 0; + uint32_t count = 0; + uint32_t req_idx = 0; + Slice payload; + }; struct Entry { uint64_t timestamp_us; uint64_t thread_id; - char context[kMaxMessageLen]; + uint64_t offset; + uint64_t size; + uint32_t count; + uint32_t req_idx; + uint8_t detail_kind; + uint8_t detail_payload_size; + uint8_t retryable; + uint8_t data_loss; + char op_name[kMaxOpNameLen]; + char file_name[kMaxFileNameLen]; + char status_message[kMaxStatusMessageLen]; + char detail_payload[kMaxDetailPayloadLen]; + }; + + static_assert(sizeof(Entry) == 256, + "Injected error log entry size must stay stable"); + + struct RawFileHeader { + char magic[8]; + uint64_t total_entries; + uint32_t version; + uint32_t header_size; + uint32_t entry_size; + uint32_t max_entries; + uint32_t dumped_entries; + uint32_t reserved; }; - InjectedErrorLog() : head_(0), entries_{} { log_path_[0] = '\0'; } + static_assert(sizeof(RawFileHeader) == 40, + "Injected error log file header size must stay stable"); + + InjectedErrorLog() : head_(0), dump_started_(0), log_fd_(-1), entries_{} { + log_path_[0] = '\0'; + } + + ~InjectedErrorLog() { +#ifndef OS_WIN + if (log_fd_ >= 0) { + close(log_fd_); + } + if (dump_started_.load(std::memory_order_relaxed) == 0 && + log_path_[0] != '\0') { + unlink(log_path_); + } +#endif + } - // Set the file path for PrintAll() output. Must be called before any - // signal handler invocation (not async-signal-safe itself due to string - // copy, but called once at setup time). If not set, PrintAll() falls - // back to writing to stderr. + // Set the file path for raw dump output. Must be called before any + // signal handler invocation. This also pre-opens the file so the + // crash callback only needs write(). void SetLogFilePath(const std::string& path) { +#ifndef OS_WIN + if (log_fd_ >= 0) { + close(log_fd_); + log_fd_ = -1; + } + if (dump_started_.load(std::memory_order_relaxed) == 0 && + log_path_[0] != '\0') { + unlink(log_path_); + } +#endif + size_t len = std::min(path.size(), sizeof(log_path_) - 1); memcpy(log_path_, path.data(), len); log_path_[len] = '\0'; - } - TSAN_SUPPRESSION void Record(const char* fmt, ...) -#if defined(__GNUC__) || defined(__clang__) - __attribute__((format(printf, 2, 3))) +#ifndef OS_WIN + if (log_path_[0] == '\0') { + return; + } + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef O_CLOEXEC + flags |= O_CLOEXEC; +#endif + log_fd_ = open(log_path_, flags, 0644); #endif - { + } + + TSAN_SUPPRESSION void Record(const Slice& op_name, const Slice& file_name, + const DetailRef& detail, + const Slice& status_message, bool retryable, + bool data_loss) { + Entry local{}; + local.thread_id = std::hash{}(std::this_thread::get_id()); + local.offset = detail.offset; + local.size = detail.size; + local.count = detail.count; + local.req_idx = detail.req_idx; + local.detail_kind = static_cast(detail.kind); + local.detail_payload_size = static_cast(CopySliceBytes( + detail.payload, local.detail_payload, sizeof(local.detail_payload))); + local.retryable = retryable ? 1 : 0; + local.data_loss = data_loss ? 1 : 0; + CopyStringSample(op_name, local.op_name, sizeof(local.op_name)); + CopyStringSample(file_name, local.file_name, sizeof(local.file_name)); + CopyStringSample(status_message, local.status_message, + sizeof(local.status_message)); + size_t idx = head_.fetch_add(1, std::memory_order_relaxed) % kMaxEntries; - Entry& e = entries_[idx]; - e.thread_id = std::hash{}(std::this_thread::get_id()); - auto now = std::chrono::system_clock::now(); - e.timestamp_us = std::chrono::duration_cast( - now.time_since_epoch()) - .count(); - // Format into a local buffer first, then copy into the shared entry. - // This avoids calling the TSAN-intercepted vsnprintf directly on shared - // memory. We use a byte-by-byte loop instead of memcpy because - // TSAN_SUPPRESSION (no_sanitize("thread")) only suppresses - // compiler-inserted instrumentation -- it does NOT suppress TSAN's - // runtime interceptors for libc functions like memcpy, vsnprintf, and - // snprintf. Plain store instructions are always suppressed regardless - // of optimization level. The volatile source pointer prevents the - // compiler from recognizing this as a memcpy idiom and replacing it - // with a memcpy call. - char local_buf[kMaxMessageLen]; - va_list args; - va_start(args, fmt); - vsnprintf(local_buf, kMaxMessageLen, fmt, args); - va_end(args); - const volatile char* src = local_buf; - for (size_t i = 0; i < kMaxMessageLen; i++) { - e.context[i] = src[i]; - } + CopyBytesToShared(reinterpret_cast(&local), + reinterpret_cast(&entries_[idx]), + sizeof(local)); + // Publish timestamp last so DumpRaw() can treat zero as an incomplete slot. + std::atomic_signal_fence(std::memory_order_release); + entries_[idx].timestamp_us = NowMicros(); } // Format the first few bytes of a buffer as hex for logging. @@ -111,92 +200,141 @@ class InjectedErrorLog { size_t max_bytes = 8) { std::string result; size_t n = std::min(size, max_bytes); - char buf[4]; + static const char kHexDigits[] = "0123456789abcdef"; + result.reserve(n * 3 + ((size > max_bytes) ? 4 : 0)); for (size_t i = 0; i < n; i++) { - snprintf(buf, sizeof(buf), "%02x ", (unsigned char)data[i]); - result += buf; + if (i > 0) { + result.push_back(' '); + } + uint8_t byte = static_cast(data[i]); + result.push_back(kHexDigits[byte >> 4]); + result.push_back(kHexDigits[byte & 0x0f]); + } + if (size > max_bytes) { + result.append(" ..."); } - if (size > max_bytes) result += "..."; - if (!result.empty() && result.back() == ' ') result.pop_back(); return result; } - // Print all recorded entries to a log file (or stderr as fallback). - // Async-signal-safe: uses only open/write/close/snprintf (no fprintf, - // no malloc). Safe to call from a signal handler. + // Dump all recorded entries to a raw binary file. + // The file must have been opened by SetLogFilePath() before any signal + // handler invocation, so the crash path only needs write(). // // Note: entries may be read while being written by another thread. // This is a benign race -- at worst, one entry may appear garbled. - // We accept this trade-off to keep PrintAll() free of locks and safe + // We accept this trade-off to keep DumpRaw() free of locks and safe // for use in signal handlers. - TSAN_SUPPRESSION void PrintAll() const { + TSAN_SUPPRESSION void DumpRaw() const { #ifndef OS_WIN - int fd = -1; - if (log_path_[0] != '\0') { - fd = open(log_path_, O_WRONLY | O_CREAT | O_TRUNC, 0644); - } - // Fall back to stdout if open failed or no path was set. - // We avoid stderr because db_crashtest.py treats any stderr output - // as a test failure. - if (fd < 0) { - fd = STDOUT_FILENO; + if (log_fd_ < 0) { + return; } - auto write_str = [fd](const char* buf, int len) { - if (len > 0) { - // Ignore return value in signal handler -- nothing we can do - auto unused __attribute__((unused)) = write(fd, buf, len); - } - }; - - char buf[512]; - int len = snprintf(buf, sizeof(buf), - "\n=== Recently Injected Fault Injection Errors " - "(most recent last) ===\n"); - write_str(buf, len); - - size_t total = head_.load(std::memory_order_relaxed); - if (total == 0) { - len = snprintf(buf, sizeof(buf), "(none)\n"); - write_str(buf, len); - if (fd != STDOUT_FILENO) close(fd); + uint32_t expected = 0; + if (!dump_started_.compare_exchange_strong(expected, 1, + std::memory_order_relaxed)) { return; } + + size_t total = head_.load(std::memory_order_relaxed); size_t count = std::min(total, kMaxEntries); size_t start = (total >= kMaxEntries) ? (total % kMaxEntries) : 0; + + RawFileHeader header; + for (size_t i = 0; i < kFileMagic.size(); ++i) { + header.magic[i] = kFileMagic[i]; + } + header.total_entries = total; + header.version = kFileVersion; + header.header_size = static_cast(sizeof(header)); + header.entry_size = static_cast(sizeof(Entry)); + header.max_entries = static_cast(kMaxEntries); + header.dumped_entries = static_cast(count); + header.reserved = 0; + + if (!WriteAll(log_fd_, reinterpret_cast(&header), + sizeof(header))) { + return; + } + + std::array chunk; + size_t chunk_count = 0; for (size_t i = 0; i < count; i++) { size_t idx = (start + i) % kMaxEntries; - // Copy entry fields to locals to avoid passing shared memory through - // TSAN-intercepted snprintf. See comment in Record() for why we use a - // volatile pointer to prevent loop-to-memcpy optimization. - const Entry& e = entries_[idx]; - uint64_t local_ts = e.timestamp_us; - uint64_t local_tid = e.thread_id; - char local_ctx[kMaxMessageLen]; - const volatile char* ctx_src = e.context; - for (size_t j = 0; j < kMaxMessageLen; j++) { - local_ctx[j] = ctx_src[j]; + Entry& dst = chunk[chunk_count++]; + CopyBytesFromShared( + reinterpret_cast(&entries_[idx]), + reinterpret_cast(&dst), sizeof(dst)); + if (chunk_count == chunk.size() || i + 1 == count) { + if (!WriteAll(log_fd_, reinterpret_cast(chunk.data()), + chunk_count * sizeof(Entry))) { + return; + } + chunk_count = 0; } - if (local_ts == 0) continue; - uint64_t secs = local_ts / 1000000; - uint64_t usecs = local_ts % 1000000; - len = snprintf(buf, sizeof(buf), "[%llu.%06llu] thread=%llu: %s\n", - (unsigned long long)secs, (unsigned long long)usecs, - (unsigned long long)local_tid, local_ctx); - write_str(buf, len); } - len = snprintf(buf, sizeof(buf), - "=== End of injected error log (%zu entries) ===\n", count); - write_str(buf, len); - if (fd != STDOUT_FILENO) close(fd); #else // On Windows, crash callbacks via signal handlers are not used, - // so PrintAll() is a no-op. + // so DumpRaw() is a no-op. #endif } private: + static uint64_t NowMicros() { + auto now = std::chrono::system_clock::now(); + return std::chrono::duration_cast( + now.time_since_epoch()) + .count(); + } + + static void CopyBytesToShared(const char* src, volatile char* dst, + size_t len) { + for (size_t i = 0; i < len; ++i) { + dst[i] = src[i]; + } + } + + static void CopyBytesFromShared(const volatile char* src, char* dst, + size_t len) { + for (size_t i = 0; i < len; ++i) { + dst[i] = src[i]; + } + } + + static void CopyStringSample(const Slice& src, char* dst, size_t dst_len) { + if (dst_len == 0) { + return; + } + size_t copied = std::min(src.size(), dst_len - 1); + for (size_t i = 0; i < copied; ++i) { + dst[i] = src[i]; + } + dst[copied] = '\0'; + } + + static size_t CopySliceBytes(const Slice& src, char* dst, size_t dst_len) { + size_t copied = std::min(src.size(), dst_len); + for (size_t i = 0; i < copied; ++i) { + dst[i] = src[i]; + } + return copied; + } + + static bool WriteAll(int fd, const char* data, size_t len) { + while (len > 0) { + ssize_t written = write(fd, data, len); + if (written <= 0) { + return false; + } + data += static_cast(written); + len -= static_cast(written); + } + return true; + } + std::atomic head_; + mutable std::atomic dump_started_; + int log_fd_; Entry entries_[kMaxEntries]; char log_path_[PATH_MAX]; }; @@ -204,78 +342,71 @@ class InjectedErrorLog { class TestFSWritableFile; class FaultInjectionTestFS; -// Deferred detail builders for injected error logging. -// These return lambdas that are only evaluated when a fault is actually -// injected, avoiding string formatting overhead on the common (no-fault) path. -// Captured references are safe because the lambda is called synchronously -// within MaybeInjectThreadLocalError before the caller returns. +// Borrowed raw detail builders for injected error logging. +// These avoid hot-path string formatting and are consumed synchronously +// when a fault is actually injected. namespace fault_injection_detail { -inline std::function NoDetail() { return {}; } +using DetailRef = InjectedErrorLog::DetailRef; -inline std::function TwoFiles(const std::string& /*f1*/, - const std::string& f2) { - return [&f2]() -> std::string { - char buf[160]; - snprintf(buf, sizeof(buf), "\"%.128s\"", f2.c_str()); - return std::string(buf); - }; +inline DetailRef NoDetail() { return DetailRef(); } + +inline DetailRef TwoFiles(const std::string& /*f1*/, const std::string& f2) { + DetailRef detail; + detail.kind = InjectedErrorLog::DetailKind::kTwoFiles; + detail.size = static_cast(f2.size()); + detail.payload = Slice(f2); + return detail; } -inline std::function SizeAndHead(const Slice& data) { - return [data]() -> std::string { - char buf[128]; - snprintf(buf, sizeof(buf), "size=%zu, head=[%s]", data.size(), - InjectedErrorLog::HexHead(data.data(), data.size()).c_str()); - return std::string(buf); - }; +inline DetailRef SizeAndHead(const Slice& data) { + DetailRef detail; + detail.kind = InjectedErrorLog::DetailKind::kSizeAndHead; + detail.size = static_cast(data.size()); + detail.payload = data; + return detail; } -inline std::function OffsetSizeAndHead(uint64_t offset, - const Slice& data) { - return [offset, data]() -> std::string { - char buf[160]; - snprintf(buf, sizeof(buf), "offset=%llu, size=%zu, head=[%s]", - (unsigned long long)offset, data.size(), - InjectedErrorLog::HexHead(data.data(), data.size()).c_str()); - return std::string(buf); - }; +inline DetailRef OffsetSizeAndHead(uint64_t offset, const Slice& data) { + DetailRef detail; + detail.kind = InjectedErrorLog::DetailKind::kOffsetSizeAndHead; + detail.offset = offset; + detail.size = static_cast(data.size()); + detail.payload = data; + return detail; } -inline std::function OffsetAndSize(uint64_t offset, size_t n) { - return [offset, n]() -> std::string { - char buf[64]; - snprintf(buf, sizeof(buf), "offset=%llu, size=%zu", - (unsigned long long)offset, n); - return std::string(buf); - }; +inline DetailRef OffsetAndSize(uint64_t offset, size_t n) { + DetailRef detail; + detail.kind = InjectedErrorLog::DetailKind::kOffsetAndSize; + detail.offset = offset; + detail.size = static_cast(n); + return detail; } -inline std::function Size(uint64_t size) { - return [size]() -> std::string { - char buf[32]; - snprintf(buf, sizeof(buf), "size=%llu", (unsigned long long)size); - return std::string(buf); - }; +inline DetailRef Size(uint64_t size) { + DetailRef detail; + detail.kind = InjectedErrorLog::DetailKind::kSize; + detail.size = size; + return detail; } -inline std::function Count(size_t count) { - return [count]() -> std::string { - char buf[32]; - snprintf(buf, sizeof(buf), "num_reqs=%zu", count); - return std::string(buf); - }; +inline DetailRef Count(size_t count) { + assert(count <= std::numeric_limits::max()); + DetailRef detail; + detail.kind = InjectedErrorLog::DetailKind::kCount; + detail.count = static_cast(count); + return detail; } -inline std::function ReqOffsetAndSize(size_t req_idx, - uint64_t offset, - size_t n) { - return [req_idx, offset, n]() -> std::string { - char buf[96]; - snprintf(buf, sizeof(buf), "req[%zu], offset=%llu, size=%zu", req_idx, - (unsigned long long)offset, n); - return std::string(buf); - }; +inline DetailRef ReqOffsetAndSize(size_t req_idx, uint64_t offset, size_t n) { + assert(req_idx <= std::numeric_limits::max()); + DetailRef detail; + detail.kind = InjectedErrorLog::DetailKind::kReqOffsetAndSize; + detail.req_idx = static_cast(req_idx); + detail.offset = offset; + detail.size = static_cast(n); + return detail; } } // namespace fault_injection_detail @@ -795,8 +926,9 @@ class FaultInjectionTestFS : public FileSystemWrapper { IOStatus MaybeInjectThreadLocalError( FaultInjectionIOType type, const IOOptions& io_options, const char* op_name, const std::string& file_name, - std::function detail_fn = {}, ErrorOperation op = kUnknown, - Slice* slice = nullptr, bool direct_io = false, char* scratch = nullptr, + InjectedErrorLog::DetailRef detail = InjectedErrorLog::DetailRef(), + ErrorOperation op = kUnknown, Slice* slice = nullptr, + bool direct_io = false, char* scratch = nullptr, bool need_count_increase = false, bool* fault_injected = nullptr); int GetAndResetInjectedThreadLocalErrorCount(FaultInjectionIOType type) { @@ -860,17 +992,16 @@ class FaultInjectionTestFS : public FileSystemWrapper { void ReadUnsynced(const std::string& fname, uint64_t offset, size_t n, Slice* result, char* scratch, int64_t* pos_at_last_sync); - // Access the injected error log for printing on crash or test failure. + // Access the injected error log for dumping on crash or test failure. InjectedErrorLog& GetInjectedErrorLog() { return injected_error_log_; } const InjectedErrorLog& GetInjectedErrorLog() const { return injected_error_log_; } - // Print recently injected errors to stderr. Call this on test failure - // to see what errors were injected leading up to the failure. - void PrintRecentInjectedErrors() const { injected_error_log_.PrintAll(); } + // Dump recently injected errors to the preconfigured raw trace file. + void DumpRecentInjectedErrors() const { injected_error_log_.DumpRaw(); } - // Set the file path where PrintAll() will write its output. + // Set the file path where DumpRaw() will write its output. // Must be called before any signal handler invocation. void SetInjectedErrorLogPath(const std::string& path) { injected_error_log_.SetLogFilePath(path); @@ -881,6 +1012,20 @@ class FaultInjectionTestFS : public FileSystemWrapper { private: inline static const std::string kFailedToWriteToWAL = "failed to write to WAL"; + inline static const std::string kInjectedReadError = "injected read error"; + inline static const std::string kInjectedEmptyResult = + "injected empty result"; + inline static const std::string kInjectedCorruptLastByte = + "injected corrupt last byte"; + inline static const std::string kInjectedErrorResultMultiGetSingle = + "injected error result multiget single"; + inline static const std::string kInjectedWriteError = "injected write error"; + inline static const std::string kInjectedWriteErrorFailedToWriteToWAL = + "injected write error failed to write to WAL"; + inline static const std::string kInjectedMetadataReadError = + "injected metadata read error"; + inline static const std::string kInjectedMetadataWriteError = + "injected metadata write error"; port::Mutex mutex_; std::map db_file_state_; std::set open_managed_files_; @@ -952,7 +1097,7 @@ class FaultInjectionTestFS : public FileSystemWrapper { // because some fault is inected with IOStatus to be OK. IOStatus MaybeInjectThreadLocalReadError( const IOOptions& io_options, const char* op_name, - const std::string& file_name, std::function detail_fn, + const std::string& file_name, InjectedErrorLog::DetailRef detail, ErrorOperation op, Slice* slice, bool direct_io, char* scratch, bool need_count_increase, bool* fault_injected); @@ -1029,39 +1174,32 @@ class FaultInjectionTestFS : public FileSystemWrapper { } } - std::string GetErrorMessage(FaultInjectionIOType type, - const std::string& file_name, ErrorOperation op) { - std::ostringstream msg; - msg << kInjected << " "; + const std::string& GetErrorMessage(FaultInjectionIOType type, + const std::string& file_name, + ErrorOperation op) { switch (type) { case FaultInjectionIOType::kRead: - msg << "read error"; - break; + return kInjectedReadError; case FaultInjectionIOType::kWrite: - msg << "write error"; - break; + if (op == ErrorOperation::kOpen || op == ErrorOperation::kAppend || + op == ErrorOperation::kPositionedAppend) { + FileType file_type = kTempFile; + uint64_t ignore = 0; + if (TryParseFileName(file_name, &ignore, &file_type) && + file_type == FileType::kWalFile) { + return kInjectedWriteErrorFailedToWriteToWAL; + } + } + return kInjectedWriteError; case FaultInjectionIOType::kMetadataRead: - msg << "metadata read error"; - break; + return kInjectedMetadataReadError; case FaultInjectionIOType::kMetadataWrite: - msg << "metadata write error"; - break; + return kInjectedMetadataWriteError; default: assert(false); break; } - - if (type == FaultInjectionIOType::kWrite && - (op == ErrorOperation::kOpen || op == ErrorOperation::kAppend || - op == ErrorOperation::kPositionedAppend)) { - FileType file_type = kTempFile; - uint64_t ignore = 0; - if (TryParseFileName(file_name, &ignore, &file_type) && - file_type == FileType::kWalFile) { - msg << " " << kFailedToWriteToWAL; - } - } - return msg.str(); + return kInjectedReadError; } }; diff --git a/utilities/fault_injection_fs_test.cc b/utilities/fault_injection_fs_test.cc index 4138272db800..112010b409bd 100644 --- a/utilities/fault_injection_fs_test.cc +++ b/utilities/fault_injection_fs_test.cc @@ -6,6 +6,7 @@ #include "utilities/fault_injection_fs.h" #include +#include #include #include @@ -13,42 +14,111 @@ namespace ROCKSDB_NAMESPACE { -class InjectedErrorLogTest : public testing::Test {}; +class InjectedErrorLogTest : public testing::Test { + protected: + static std::string DecodeCString(const char* data, size_t len) { + size_t actual_len = 0; + while (actual_len < len && data[actual_len] != '\0') { + ++actual_len; + } + return std::string(data, actual_len); + } -// Test basic Record and PrintAll functionality. -TEST_F(InjectedErrorLogTest, BasicRecordAndPrint) { - InjectedErrorLog log; - log.SetLogFilePath("/dev/null"); + std::string ReadRawLog(const std::string& path) { + std::string raw; + Status s = ReadFileToString(Env::Default(), path, &raw); + EXPECT_OK(s); + return raw; + } - // Record some entries. - log.Record("op=Get key=0x%08x status=%s", 0x12345678, "OK"); - log.Record("op=Put key=0x%08x value_size=%d", 0xABCDEF00, 100); - log.Record("op=Delete key=0x%08x", 0x00000001); + InjectedErrorLog::RawFileHeader DecodeHeader(const std::string& raw) { + InjectedErrorLog::RawFileHeader header{}; + EXPECT_GE(raw.size(), sizeof(header)); + if (raw.size() >= sizeof(header)) { + std::memcpy(&header, raw.data(), sizeof(header)); + } + return header; + } - // PrintAll should not crash. - log.PrintAll(); + InjectedErrorLog::Entry DecodeEntry(const std::string& raw, size_t index) { + InjectedErrorLog::Entry entry{}; + size_t offset = sizeof(InjectedErrorLog::RawFileHeader) + + index * sizeof(InjectedErrorLog::Entry); + EXPECT_GE(raw.size(), offset + sizeof(entry)); + if (raw.size() >= offset + sizeof(entry)) { + std::memcpy(&entry, raw.data() + offset, sizeof(entry)); + } + return entry; + } +}; + +TEST_F(InjectedErrorLogTest, BasicRecordAndDumpRaw) { + std::string path = test::PerThreadDBPath("injected_error_log_basic.bin"); + InjectedErrorLog log; + log.SetLogFilePath(path); + log.Record("Append", "/tmp/000001.log", + fault_injection_detail::OffsetSizeAndHead(7, Slice("abcd", 4)), + "injected write error", false, true); + log.DumpRaw(); + + std::string raw = ReadRawLog(path); + auto header = DecodeHeader(raw); + ASSERT_EQ(std::string(header.magic, sizeof(header.magic)), + std::string(InjectedErrorLog::kFileMagic.data(), + InjectedErrorLog::kFileMagic.size())); + ASSERT_EQ(header.version, InjectedErrorLog::kFileVersion); + ASSERT_EQ(header.entry_size, sizeof(InjectedErrorLog::Entry)); + ASSERT_EQ(header.dumped_entries, 1U); + ASSERT_EQ(header.total_entries, 1U); + + auto entry = DecodeEntry(raw, 0); + EXPECT_NE(entry.timestamp_us, 0U); + EXPECT_EQ(entry.offset, 7U); + EXPECT_EQ(entry.size, 4U); + EXPECT_EQ( + entry.detail_kind, + static_cast(InjectedErrorLog::DetailKind::kOffsetSizeAndHead)); + EXPECT_EQ(entry.detail_payload_size, 4U); + EXPECT_EQ(entry.retryable, 0U); + EXPECT_EQ(entry.data_loss, 1U); + EXPECT_EQ(DecodeCString(entry.op_name, sizeof(entry.op_name)), "Append"); + EXPECT_EQ(DecodeCString(entry.file_name, sizeof(entry.file_name)), + "/tmp/000001.log"); + EXPECT_EQ(DecodeCString(entry.status_message, sizeof(entry.status_message)), + "injected write error"); + EXPECT_EQ(std::string(entry.detail_payload, entry.detail_payload + 4), + "abcd"); } -// Test that the circular buffer wraps correctly. TEST_F(InjectedErrorLogTest, CircularBufferWrap) { + std::string path = test::PerThreadDBPath("injected_error_log_wrap.bin"); InjectedErrorLog log; - log.SetLogFilePath("/dev/null"); + log.SetLogFilePath(path); - // Fill beyond kMaxEntries to trigger wraparound. for (size_t i = 0; i < InjectedErrorLog::kMaxEntries + 100; i++) { - log.Record("entry=%zu", i); + std::string file_name = "file" + std::to_string(i); + log.Record("Append", file_name, fault_injection_detail::NoDetail(), + "injected write error", false, false); } - - // PrintAll should handle the wrapped buffer without crashing. - log.PrintAll(); + log.DumpRaw(); + + std::string raw = ReadRawLog(path); + auto header = DecodeHeader(raw); + ASSERT_EQ(header.total_entries, + static_cast(InjectedErrorLog::kMaxEntries + 100)); + ASSERT_EQ(header.dumped_entries, + static_cast(InjectedErrorLog::kMaxEntries)); + + auto first = DecodeEntry(raw, 0); + auto last = DecodeEntry(raw, InjectedErrorLog::kMaxEntries - 1); + EXPECT_EQ(DecodeCString(first.file_name, sizeof(first.file_name)), "file100"); + EXPECT_EQ(DecodeCString(last.file_name, sizeof(last.file_name)), "file1099"); } -// Test concurrent Record() from multiple threads. -// Keep total records (kNumThreads * kRecordsPerThread) under kMaxEntries -// to avoid write-write races from buffer wraparound, which are benign but -// would trigger TSAN warnings. TEST_F(InjectedErrorLogTest, ConcurrentRecord) { + std::string path = test::PerThreadDBPath("injected_error_log_concurrent.bin"); InjectedErrorLog log; + log.SetLogFilePath(path); constexpr int kNumThreads = 4; constexpr int kRecordsPerThread = 200; static_assert(kNumThreads * kRecordsPerThread < @@ -61,7 +131,10 @@ TEST_F(InjectedErrorLogTest, ConcurrentRecord) { for (int t = 0; t < kNumThreads; t++) { threads.emplace_back([&log, t]() { for (int i = 0; i < kRecordsPerThread; i++) { - log.Record("thread=%d iter=%d op=Get key=0x%08x", t, i, i * 17); + std::string file_name = + "thread" + std::to_string(t) + "_" + std::to_string(i); + log.Record("Read", file_name, fault_injection_detail::NoDetail(), + "injected read error", false, false); } }); } @@ -70,12 +143,16 @@ TEST_F(InjectedErrorLogTest, ConcurrentRecord) { t.join(); } - // PrintAll after all threads are done -- no race. - log.SetLogFilePath("/dev/null"); - log.PrintAll(); + log.DumpRaw(); + + std::string raw = ReadRawLog(path); + auto header = DecodeHeader(raw); + ASSERT_EQ(header.total_entries, + static_cast(kNumThreads * kRecordsPerThread)); + ASSERT_EQ(header.dumped_entries, + static_cast(kNumThreads * kRecordsPerThread)); } -// Test HexHead utility. TEST_F(InjectedErrorLogTest, HexHead) { const char data[] = "\x01\x02\xAB\xCD"; std::string result = InjectedErrorLog::HexHead(data, 4); From cc314a3a98090166861cf731047ee7daca0acab3 Mon Sep 17 00:00:00 2001 From: xingbowang Date: Wed, 15 Apr 2026 10:06:15 -0700 Subject: [PATCH 3/5] Fix CI portability issues in trace logging --- db_stress_tool/db_stress_trace.cc | 73 ++++++++++++++++++------------- db_stress_tool/db_stress_trace.h | 5 +++ utilities/fault_injection_fs.h | 7 +++ 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/db_stress_tool/db_stress_trace.cc b/db_stress_tool/db_stress_trace.cc index ce4d43714b77..5fcfc4f59002 100644 --- a/db_stress_tool/db_stress_trace.cc +++ b/db_stress_tool/db_stress_trace.cc @@ -1,3 +1,8 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License @@ -6,10 +11,6 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_trace.h" -#include -#include -#include - #include #include #include @@ -18,6 +19,16 @@ #include #include +#ifndef OS_WIN +#include +#include +#include +#endif + +#ifndef PATH_MAX +#define PATH_MAX 4096 +#endif + #include "db_stress_tool/db_stress_common.h" #include "port/lang.h" @@ -146,6 +157,7 @@ void CopyVolatileBytes(const volatile char* src, char* dst, size_t len) { } bool WriteAll(int fd, const char* data, size_t len) { +#ifndef OS_WIN while (len > 0) { ssize_t written = write(fd, data, len); if (written <= 0) { @@ -155,6 +167,12 @@ bool WriteAll(int fd, const char* data, size_t len) { len -= static_cast(written); } return true; +#else + (void)fd; + (void)data; + (void)len; + return false; +#endif } KeySample CaptureKeySample(const Slice& key) { @@ -288,23 +306,19 @@ class DbStressPublicIteratorTraceLog { if (read_opts.iterate_upper_bound != nullptr) { upper = CaptureKeySample(*read_opts.iterate_upper_bound); } - Record(IteratorTraceEventType::kCreate, iterator_id, cf_id, lower, upper, - false, false, Status::OK(), BuildCreateFlags(read_opts), - static_cast( - reinterpret_cast(read_opts.snapshot)), - 0, kResultBoolUnset); - } - - TSAN_SUPPRESSION void RecordIteratorOp(IteratorTraceEventType event_type, - uint64_t iterator_id, uint32_t cf_id, - const KeySample& key0, - const KeySample& key1, - bool valid_before, bool valid_after, - const Status& status, - uint32_t flags = 0, uint64_t aux0 = 0, - uint64_t aux1 = 0, - uint8_t result_bool = - kResultBoolUnset) { + Record( + IteratorTraceEventType::kCreate, iterator_id, cf_id, lower, upper, + false, false, Status::OK(), BuildCreateFlags(read_opts), + static_cast(reinterpret_cast(read_opts.snapshot)), + 0, kResultBoolUnset); + } + + TSAN_SUPPRESSION void RecordIteratorOp( + IteratorTraceEventType event_type, uint64_t iterator_id, uint32_t cf_id, + const KeySample& key0, const KeySample& key1, bool valid_before, + bool valid_after, const Status& status, uint32_t flags = 0, + uint64_t aux0 = 0, uint64_t aux1 = 0, + uint8_t result_bool = kResultBoolUnset) { Record(event_type, iterator_id, cf_id, key0, key1, valid_before, valid_after, status, flags, aux0, aux1, result_bool); } @@ -339,7 +353,8 @@ class DbStressPublicIteratorTraceLog { header.dump_timestamp_us = 0; header.version = kTraceFileVersion; header.header_size = static_cast(sizeof(header)); - header.slot_header_size = static_cast(sizeof(TraceFileSlotHeader)); + header.slot_header_size = + static_cast(sizeof(TraceFileSlotHeader)); header.entry_size = static_cast(sizeof(TraceEntry)); header.max_threads = static_cast(kMaxTraceThreads); header.entries_per_thread = static_cast(kEntriesPerThread); @@ -485,10 +500,9 @@ class DbStressTraceIterator : public Iterator { if (iter_->Valid()) { after = CaptureKeySample(iter_->key()); } - trace_log_->RecordIteratorOp(IteratorTraceEventType::kSeekToFirst, - iterator_id_, cf_id_, before, after, - before.full_len != 0, iter_->Valid(), - iter_->status()); + trace_log_->RecordIteratorOp( + IteratorTraceEventType::kSeekToFirst, iterator_id_, cf_id_, before, + after, before.full_len != 0, iter_->Valid(), iter_->status()); } void SeekToLast() override { @@ -501,10 +515,9 @@ class DbStressTraceIterator : public Iterator { if (iter_->Valid()) { after = CaptureKeySample(iter_->key()); } - trace_log_->RecordIteratorOp(IteratorTraceEventType::kSeekToLast, - iterator_id_, cf_id_, before, after, - before.full_len != 0, iter_->Valid(), - iter_->status()); + trace_log_->RecordIteratorOp( + IteratorTraceEventType::kSeekToLast, iterator_id_, cf_id_, before, + after, before.full_len != 0, iter_->Valid(), iter_->status()); } void Seek(const Slice& target) override { diff --git a/db_stress_tool/db_stress_trace.h b/db_stress_tool/db_stress_trace.h index 89fdb28a6587..fb4b577c34cf 100644 --- a/db_stress_tool/db_stress_trace.h +++ b/db_stress_tool/db_stress_trace.h @@ -1,3 +1,8 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index 26fe93e71105..877421f2f70c 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -321,6 +321,7 @@ class InjectedErrorLog { } static bool WriteAll(int fd, const char* data, size_t len) { +#ifndef OS_WIN while (len > 0) { ssize_t written = write(fd, data, len); if (written <= 0) { @@ -330,6 +331,12 @@ class InjectedErrorLog { len -= static_cast(written); } return true; +#else + (void)fd; + (void)data; + (void)len; + return false; +#endif } std::atomic head_; From 8d26db1827db02e95cf98ae5325f5cacdbc59fcf Mon Sep 17 00:00:00 2001 From: xingbowang Date: Wed, 15 Apr 2026 11:03:46 -0700 Subject: [PATCH 4/5] Address clang-tidy findings in trace logging --- db_stress_tool/db_stress_tool.cc | 25 ++++++--------- db_stress_tool/db_stress_trace.cc | 53 +++++++++++++++++++------------ 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 92294b88c3c9..2bc1630bf663 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -63,6 +63,14 @@ void DumpAndReportDbStressPublicIteratorTrace(const char* reason) { } fputc('\n', stdout); } + +std::string GetTraceArtifactDir(Env* env) { + std::string dir; + if (env != nullptr && env->GetTestDirectory(&dir).ok() && !dir.empty()) { + return dir; + } + return "/tmp"; +} } // namespace KeyGenContext key_gen_ctx; @@ -121,7 +129,6 @@ int db_stress_tool(int argc, char** argv) { fault_env_guard = std::make_shared(raw_env, fault_fs_guard); raw_env = fault_env_guard.get(); - } auto db_stress_fs = @@ -375,13 +382,7 @@ int db_stress_tool(int argc, char** argv) { // DB reopen (which cleans untracked files) and gets included in the // sandcastle db.tar.gz artifact for post-failure analysis. if (fault_fs_guard) { - std::string log_dir; - const char* test_tmpdir = getenv("TEST_TMPDIR"); - if (test_tmpdir && test_tmpdir[0] != '\0') { - log_dir = test_tmpdir; - } else { - log_dir = "/tmp"; - } + const std::string log_dir = GetTraceArtifactDir(db_stress_env); std::string log_path = log_dir + "/fault_injection_" + std::to_string(getpid()) + "_" + std::to_string(time(nullptr)) + ".bin"; @@ -389,13 +390,7 @@ int db_stress_tool(int argc, char** argv) { } if (FLAGS_trace_public_iterator_api) { - std::string log_dir; - const char* test_tmpdir = getenv("TEST_TMPDIR"); - if (test_tmpdir && test_tmpdir[0] != '\0') { - log_dir = test_tmpdir; - } else { - log_dir = "/tmp"; - } + const std::string log_dir = GetTraceArtifactDir(db_stress_env); std::string log_path = log_dir + "/db_stress_public_iterator_trace_" + std::to_string(getpid()) + "_" + std::to_string(time(nullptr)) + ".bin"; diff --git a/db_stress_tool/db_stress_trace.cc b/db_stress_tool/db_stress_trace.cc index 5fcfc4f59002..9817219a7397 100644 --- a/db_stress_tool/db_stress_trace.cc +++ b/db_stress_tool/db_stress_trace.cc @@ -240,9 +240,16 @@ class DbStressPublicIteratorTraceLog { dropped_threads_(0), dump_started_(0), log_fd_(-1), - logs_{} { - log_path_[0] = '\0'; - } + log_path_{}, + logs_{} {} + + DbStressPublicIteratorTraceLog(const DbStressPublicIteratorTraceLog&) = + delete; + DbStressPublicIteratorTraceLog& operator=( + const DbStressPublicIteratorTraceLog&) = delete; + DbStressPublicIteratorTraceLog(DbStressPublicIteratorTraceLog&&) = delete; + DbStressPublicIteratorTraceLog& operator=(DbStressPublicIteratorTraceLog&&) = + delete; ~DbStressPublicIteratorTraceLog() { #ifndef OS_WIN @@ -342,7 +349,7 @@ class DbStressPublicIteratorTraceLog { } } - TraceFileHeader header; + TraceFileHeader header{}; for (size_t i = 0; i < kTraceFileMagic.size(); ++i) { header.magic[i] = kTraceFileMagic[i]; } @@ -366,7 +373,7 @@ class DbStressPublicIteratorTraceLog { return; } - std::array chunk; + std::array chunk{}; for (size_t slot = 0; slot < kMaxTraceThreads; ++slot) { const uint64_t total = logs_[slot].head.load(std::memory_order_relaxed); if (total == 0) { @@ -378,7 +385,7 @@ class DbStressPublicIteratorTraceLog { const uint64_t start = (total >= kEntriesPerThread) ? (total % kEntriesPerThread) : 0; - TraceFileSlotHeader slot_header; + TraceFileSlotHeader slot_header{}; slot_header.thread_id_hash = logs_[slot].thread_id_hash.load(std::memory_order_relaxed); slot_header.total_entries = total; @@ -638,50 +645,56 @@ class DbStressTraceIterator : public Iterator { uint64_t iterator_id_; }; -std::unique_ptr - g_db_stress_public_iterator_trace; +std::unique_ptr& +DbStressPublicIteratorTraceLogSingleton() { + static std::unique_ptr trace_log; + return trace_log; +} } // namespace bool IsDbStressPublicIteratorTraceEnabled() { - return FLAGS_trace_public_iterator_api && g_db_stress_public_iterator_trace; + return FLAGS_trace_public_iterator_api && + DbStressPublicIteratorTraceLogSingleton(); } void InitDbStressPublicIteratorTrace(const std::string& path) { if (!FLAGS_trace_public_iterator_api) { return; } - if (!g_db_stress_public_iterator_trace) { - g_db_stress_public_iterator_trace.reset( - new DbStressPublicIteratorTraceLog()); + auto& trace_log = DbStressPublicIteratorTraceLogSingleton(); + if (!trace_log) { + trace_log.reset(new DbStressPublicIteratorTraceLog()); } - g_db_stress_public_iterator_trace->SetLogFilePath(path); + trace_log->SetLogFilePath(path); } void DumpDbStressPublicIteratorTrace() { - if (!g_db_stress_public_iterator_trace) { + auto& trace_log = DbStressPublicIteratorTraceLogSingleton(); + if (!trace_log) { return; } - g_db_stress_public_iterator_trace->DumpRaw(); + trace_log->DumpRaw(); } std::string GetDbStressPublicIteratorTracePath() { - if (!g_db_stress_public_iterator_trace) { + auto& trace_log = DbStressPublicIteratorTraceLogSingleton(); + if (!trace_log) { return std::string(); } - return g_db_stress_public_iterator_trace->GetLogFilePath(); + return trace_log->GetLogFilePath(); } std::unique_ptr MaybeWrapDbStressTraceIterator( std::unique_ptr iter, const ReadOptions& read_opts, ColumnFamilyHandle* column_family) { - if (!g_db_stress_public_iterator_trace || !iter) { + auto& trace_log = DbStressPublicIteratorTraceLogSingleton(); + if (!trace_log || !iter) { return iter; } const uint32_t cf_id = column_family != nullptr ? column_family->GetID() : 0; return std::unique_ptr(new DbStressTraceIterator( - std::move(iter), g_db_stress_public_iterator_trace.get(), cf_id, - read_opts)); + std::move(iter), trace_log.get(), cf_id, read_opts)); } std::unique_ptr NewDbStressTraceIterator( From 916e782ea515615f29c92093dd4578c3f37f78f8 Mon Sep 17 00:00:00 2001 From: xingbowang Date: Wed, 15 Apr 2026 14:55:42 -0700 Subject: [PATCH 5/5] Fix db_stress trace ctor init order --- db_stress_tool/db_stress_trace.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db_stress_tool/db_stress_trace.cc b/db_stress_tool/db_stress_trace.cc index 9817219a7397..dbaef56b7993 100644 --- a/db_stress_tool/db_stress_trace.cc +++ b/db_stress_tool/db_stress_trace.cc @@ -240,8 +240,8 @@ class DbStressPublicIteratorTraceLog { dropped_threads_(0), dump_started_(0), log_fd_(-1), - log_path_{}, - logs_{} {} + logs_{}, + log_path_{} {} DbStressPublicIteratorTraceLog(const DbStressPublicIteratorTraceLog&) = delete;