diff --git a/db/db_iter.cc b/db/db_iter.cc index 3a33c6b02b2f..5d9498bc4f53 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -455,6 +455,13 @@ bool DBIter::SetValueAndColumnsFromMergeResult(const Status& merge_status, return false; } + if (result_type == kTypeDeletion) { + // Merge operator signaled that this key should be deleted. + // Mark the iterator entry as invalid so that callers skip this key. + valid_ = false; + return true; + } + if (result_type == kTypeWideColumnEntity) { if (!SetValueAndColumnsFromEntity(value_columns_state_.saved_value())) { assert(!valid_); @@ -681,7 +688,27 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key) { // By now, we are sure the current ikey is going to yield a value current_entry_is_merged_ = true; valid_ = true; - return MergeValuesNewToOld(); // Go to a different state machine + if (!MergeValuesNewToOld()) { + return false; // Error during merge + } + if (!valid_) { + // Merge operator signaled deletion. iter_ is already positioned + // past all merge entries. Continue searching for the next visible + // user key. + current_entry_is_merged_ = false; + skipping_saved_key = true; + num_skipped = 0; + reseek_done = false; + // Release pinned data from the merge that just resolved to + // deletion. Without this, a subsequent MergeValuesNewToOld + // call in the same loop iteration would hit the StartPinning() + // assertion (pinning_enabled must be false). + ReleaseTempPinnedData(); + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + continue; // Skip iter_.Next() at the bottom; iter_ is already + // positioned at the next entry by MergeValuesNewToOld + } + return true; default: valid_ = false; status_ = Status::Corruption( @@ -1582,7 +1609,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); } - valid_ = true; + // valid_ was already set by SetValueAndColumnsFromMergeResult: + // true for normal merge results, false if merge produced a deletion. return true; } diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 143203fd7b7e..e8aaf0e5ba68 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -2,7 +2,9 @@ // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include #include +#include #include #include "db/db_test_util.h" @@ -12,6 +14,7 @@ #include "rocksdb/merge_operator.h" #include "rocksdb/snapshot.h" #include "rocksdb/utilities/debug.h" +#include "util/coding.h" #include "util/random.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend2.h" @@ -1047,6 +1050,1195 @@ TEST_F(DBMergeOperatorTest, MaxSuccessiveMergesBaseValues) { } } +// A merge operator whose FullMergeV3 can signal deletion by returning +// std::monostate. It interprets operands as int64_t increments applied to a +// counter; when the counter reaches zero (or below), the key is deleted. +class CounterDeleteMergeOperator : public MergeOperator { + public: + bool FullMergeV3(const MergeOperationInputV3& merge_in, + MergeOperationOutputV3* merge_out) const override { + int64_t counter = 0; + // Parse base value if it exists. + if (auto* pval = std::get_if(&merge_in.existing_value)) { + if (pval->size() == sizeof(int64_t)) { + counter = DecodeFixed64(pval->data()); + } + } + // std::monostate means no base value — counter starts at 0. + + // Apply each operand (encoded as int64_t delta). + for (const auto& operand : merge_in.operand_list) { + if (operand.size() == sizeof(int64_t)) { + counter += static_cast(DecodeFixed64(operand.data())); + } + } + + if (counter <= 0) { + // Signal deletion. + merge_out->new_value = std::monostate{}; + } else { + std::string result; + PutFixed64(&result, static_cast(counter)); + merge_out->new_value = std::move(result); + } + return true; + } + + const char* Name() const override { return "CounterDeleteMergeOperator"; } +}; + +// A merge operator that always signals deletion from FullMergeV3. +class AlwaysDeleteMergeOperator : public MergeOperator { + public: + bool FullMergeV3(const MergeOperationInputV3& merge_in, + MergeOperationOutputV3* merge_out) const override { + (void)merge_in; + merge_out->new_value = std::monostate{}; + return true; + } + + const char* Name() const override { return "AlwaysDeleteMergeOperator"; } +}; + +// A merge operator that conditionally deletes based on operand content. +// If the last operand is "DELETE", the key is deleted. +// Otherwise, the operands are concatenated with "," as a delimiter. +class ConditionalDeleteMergeOperator : public MergeOperator { + public: + bool FullMergeV3(const MergeOperationInputV3& merge_in, + MergeOperationOutputV3* merge_out) const override { + if (!merge_in.operand_list.empty() && + merge_in.operand_list.back() == Slice("DELETE")) { + merge_out->new_value = std::monostate{}; + return true; + } + + std::string result; + if (auto* pval = std::get_if(&merge_in.existing_value)) { + result.assign(pval->data(), pval->size()); + } + for (const auto& operand : merge_in.operand_list) { + if (!result.empty()) { + result += ","; + } + result.append(operand.data(), operand.size()); + } + merge_out->new_value = std::move(result); + return true; + } + + const char* Name() const override { return "ConditionalDeleteMergeOperator"; } +}; + +static std::string EncodeInt64(int64_t val) { + std::string result; + PutFixed64(&result, static_cast(val)); + return result; +} + +static int64_t DecodeInt64(const std::string& s) { + assert(s.size() == sizeof(int64_t)); + return static_cast(DecodeFixed64(s.data())); +} + +// --------------------------------------------------------------------------- +// Tests for FullMergeV3 deletion result (std::monostate) +// --------------------------------------------------------------------------- + +// Basic: counter reaches zero → Get returns NotFound. +TEST_F(DBMergeOperatorTest, MergeDeletionCounterReachesZero) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + // Put(key, 5), Merge(key, -5) → counter == 0 → deleted + ASSERT_OK(Put("k1", EncodeInt64(5))); + ASSERT_OK(Merge("k1", EncodeInt64(-5))); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Put(key, 10), Merge(key, -3), Merge(key, -7) → counter == 0 → deleted + ASSERT_OK(Put("k2", EncodeInt64(10))); + ASSERT_OK(Merge("k2", EncodeInt64(-3))); + ASSERT_OK(Merge("k2", EncodeInt64(-7))); + ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).IsNotFound()); +} + +// Counter stays positive → Get returns the value. +TEST_F(DBMergeOperatorTest, MergeDeletionCounterStaysPositive) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", EncodeInt64(10))); + ASSERT_OK(Merge("k1", EncodeInt64(-3))); + + std::string value; + ASSERT_OK(db_->Get(ReadOptions(), "k1", &value)); + ASSERT_EQ(DecodeInt64(value), 7); +} + +// Counter goes negative → still deleted. +TEST_F(DBMergeOperatorTest, MergeDeletionCounterGoesNegative) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", EncodeInt64(3))); + ASSERT_OK(Merge("k1", EncodeInt64(-10))); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); +} + +// Merge with no base value (all merge operands, no Put/Delete). +TEST_F(DBMergeOperatorTest, MergeDeletionNoBaseValue) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + // No Put, just merges. Sum <= 0 → deleted. + ASSERT_OK(Merge("k1", EncodeInt64(5))); + ASSERT_OK(Merge("k1", EncodeInt64(-5))); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // No Put, just merges. Sum > 0 → value. + ASSERT_OK(Merge("k2", EncodeInt64(5))); + ASSERT_OK(Merge("k2", EncodeInt64(3))); + ASSERT_OK(db_->Get(ReadOptions(), "k2", &value)); + ASSERT_EQ(DecodeInt64(value), 8); +} + +// Merge on top of a Delete base. +TEST_F(DBMergeOperatorTest, MergeDeletionOnTopOfDelete) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", EncodeInt64(100))); + ASSERT_OK(Delete("k1")); + // Merge on top of Delete → base is monostate, counter starts at 0. + ASSERT_OK(Merge("k1", EncodeInt64(-1))); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Now merge a positive → should create the key + ASSERT_OK(Put("k2", EncodeInt64(100))); + ASSERT_OK(Delete("k2")); + ASSERT_OK(Merge("k2", EncodeInt64(42))); + ASSERT_OK(db_->Get(ReadOptions(), "k2", &value)); + ASSERT_EQ(DecodeInt64(value), 42); +} + +// AlwaysDelete merge operator deletes all keys it touches. +TEST_F(DBMergeOperatorTest, MergeDeletionAlwaysDelete) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Merge("k1", "anything")); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); +} + +// Conditional delete based on operand. +TEST_F(DBMergeOperatorTest, MergeDeletionConditional) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", "base")); + ASSERT_OK(Merge("k1", "a")); + ASSERT_OK(Merge("k1", "b")); + + std::string value; + ASSERT_OK(db_->Get(ReadOptions(), "k1", &value)); + ASSERT_EQ(value, "base,a,b"); + + // Now issue a "DELETE" operand + ASSERT_OK(Merge("k1", "DELETE")); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Flush to materialize the deletion in an SST before adding new operands. + // Without flush, a subsequent Merge would land in the same memtable and the + // full merge would see all operands together (including "DELETE" as a + // non-last operand), which is not a deletion. + ASSERT_OK(Flush()); + + // Re-merge after deletion → fresh start (no base value) + ASSERT_OK(Merge("k1", "fresh")); + ASSERT_OK(db_->Get(ReadOptions(), "k1", &value)); + ASSERT_EQ(value, "fresh"); +} + +// Forward iterator should skip keys deleted by merge. +TEST_F(DBMergeOperatorTest, MergeDeletionForwardIteration) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Put("d", "val_d")); + // Delete "b" and "d" via merge + ASSERT_OK(Merge("b", "DELETE")); + ASSERT_OK(Merge("d", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + ASSERT_EQ(iter->value().ToString(), "val_a"); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "c"); + ASSERT_EQ(iter->value().ToString(), "val_c"); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); +} + +// Backward iterator (Prev) should skip keys deleted by merge. +TEST_F(DBMergeOperatorTest, MergeDeletionBackwardIteration) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Put("d", "val_d")); + ASSERT_OK(Merge("b", "DELETE")); + ASSERT_OK(Merge("d", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "c"); + ASSERT_EQ(iter->value().ToString(), "val_c"); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + ASSERT_EQ(iter->value().ToString(), "val_a"); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); +} + +// Merge deletion survives flush. +TEST_F(DBMergeOperatorTest, MergeDeletionSurvivesFlush) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", EncodeInt64(10))); + ASSERT_OK(Merge("k1", EncodeInt64(-10))); + ASSERT_OK(Flush()); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); +} + +// Merge deletion is cleaned up during compaction. +TEST_F(DBMergeOperatorTest, MergeDeletionCompaction) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + options.disable_auto_compactions = true; + Reopen(options); + + // L0 file 1: Put(k1, 10) + ASSERT_OK(Put("k1", EncodeInt64(10))); + ASSERT_OK(Flush()); + + // L0 file 2: Merge(k1, -10) → counter hits 0 + ASSERT_OK(Merge("k1", EncodeInt64(-10))); + ASSERT_OK(Flush()); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Compact all — the key should be fully removed at the bottommost level. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Verify the key is really gone from the SST files. + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); +} + +// Merge deletion interacts correctly with snapshots. +TEST_F(DBMergeOperatorTest, MergeDeletionWithSnapshot) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + // Put(k1, 10) + ASSERT_OK(Put("k1", EncodeInt64(10))); + + // Take snapshot before deletion + const Snapshot* snap = db_->GetSnapshot(); + + // Merge(k1, -10) → deleted + ASSERT_OK(Merge("k1", EncodeInt64(-10))); + + // Current state: deleted + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Snapshot should still see the value + ReadOptions snap_ro; + snap_ro.snapshot = snap; + ASSERT_OK(db_->Get(snap_ro, "k1", &value)); + ASSERT_EQ(DecodeInt64(value), 10); + + db_->ReleaseSnapshot(snap); +} + +// Multiple merge deletions interleaved with puts. +TEST_F(DBMergeOperatorTest, MergeDeletionMultipleRounds) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + // Round 1: put, accumulate, delete + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Merge("k1", "v2")); + ASSERT_OK(Merge("k1", "DELETE")); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Flush to materialize the deletion before adding new merge operands. + ASSERT_OK(Flush()); + + // Round 2: fresh value after deletion + ASSERT_OK(Merge("k1", "v3")); + ASSERT_OK(db_->Get(ReadOptions(), "k1", &value)); + ASSERT_EQ(value, "v3"); + + // Round 3: delete again + ASSERT_OK(Merge("k1", "DELETE")); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Round 4: put after merge deletion (Put is a base value, no flush needed) + ASSERT_OK(Put("k1", "v4")); + ASSERT_OK(db_->Get(ReadOptions(), "k1", &value)); + ASSERT_EQ(value, "v4"); +} + +// Merge deletion across flush + compaction boundaries. +TEST_F(DBMergeOperatorTest, MergeDeletionAcrossFlushAndCompaction) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + options.disable_auto_compactions = true; + Reopen(options); + + // SST 1: Put(k1, base) + ASSERT_OK(Put("k1", "base")); + ASSERT_OK(Put("k2", "keep_me")); + ASSERT_OK(Flush()); + + // SST 2: Merge(k1, v2) + ASSERT_OK(Merge("k1", "v2")); + ASSERT_OK(Flush()); + + // SST 3: Merge(k1, DELETE) + ASSERT_OK(Merge("k1", "DELETE")); + ASSERT_OK(Flush()); + + // Read before compaction: k1 deleted, k2 alive + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_OK(db_->Get(ReadOptions(), "k2", &value)); + ASSERT_EQ(value, "keep_me"); + + // Compact — k1 should be fully removed + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_OK(db_->Get(ReadOptions(), "k2", &value)); + ASSERT_EQ(value, "keep_me"); +} + +// Test merge deletion during MultiGet. +TEST_F(DBMergeOperatorTest, MergeDeletionMultiGet) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Merge("b", "DELETE")); + + std::vector keys = {Slice("a"), Slice("b"), Slice("c")}; + std::vector values(3); + std::vector statuses = db_->MultiGet(ReadOptions(), keys, &values); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], "val_a"); + ASSERT_TRUE(statuses[1].IsNotFound()); + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], "val_c"); +} + +// Stress test: random puts, merges, and deletions via merge. +TEST_F(DBMergeOperatorTest, MergeDeletionStress) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + options.disable_auto_compactions = false; + Reopen(options); + + constexpr int kNumKeys = 50; + constexpr int kOpsPerKey = 20; + Random rnd(42); + + // Track expected state: positive counter means key exists with that value, + // non-positive means deleted. + std::map expected; + + for (int i = 0; i < kNumKeys; i++) { + std::string key = "key_" + std::to_string(i); + int64_t counter = 0; + + for (int j = 0; j < kOpsPerKey; j++) { + int op = rnd.Uniform(3); + if (op == 0) { + // Put with a fresh counter value + counter = rnd.Uniform(100) + 1; + ASSERT_OK(Put(key, EncodeInt64(counter))); + } else { + // Merge with a delta + int64_t delta = + static_cast(rnd.Uniform(40)) - 20; // [-20, 19] + counter += delta; + ASSERT_OK(Merge(key, EncodeInt64(delta))); + } + + // Occasionally flush. After flush, re-sync our local counter with the + // DB's actual state because flush can resolve merges (including + // merge-produced deletions), which resets the effective base value. + if (rnd.OneIn(10)) { + ASSERT_OK(Flush()); + std::string val; + Status flush_s = db_->Get(ReadOptions(), key, &val); + if (flush_s.ok()) { + counter = DecodeInt64(val); + } else if (flush_s.IsNotFound()) { + counter = 0; + } else { + ASSERT_OK(flush_s); + } + } + } + expected[key] = counter; + } + + ASSERT_OK(Flush()); + + // Verify all keys + for (const auto& [key, counter] : expected) { + std::string value; + Status s = db_->Get(ReadOptions(), key, &value); + if (counter <= 0) { + ASSERT_TRUE(s.IsNotFound()) << "Key " << key << " should be deleted"; + } else { + ASSERT_OK(s) << "Key " << key << " should exist"; + ASSERT_EQ(DecodeInt64(value), counter); + } + } + + // Compact and verify again + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + for (const auto& [key, counter] : expected) { + std::string value; + Status s = db_->Get(ReadOptions(), key, &value); + if (counter <= 0) { + ASSERT_TRUE(s.IsNotFound()) << "Key " << key << " post-compact"; + } else { + ASSERT_OK(s) << "Key " << key << " post-compact"; + ASSERT_EQ(DecodeInt64(value), counter); + } + } + + // Verify via iteration + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + int live_count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + auto it = expected.find(iter->key().ToString()); + ASSERT_NE(it, expected.end()); + ASSERT_GT(it->second, 0) << "Deleted key visible: " << it->first; + ASSERT_EQ(DecodeInt64(iter->value().ToString()), it->second); + live_count++; + } + ASSERT_OK(iter->status()); + + int expected_live = 0; + for (const auto& [key, counter] : expected) { + if (counter > 0) { + expected_live++; + } + } + ASSERT_EQ(live_count, expected_live); +} + +// Merge deletion with compaction and snapshot: deletion marker should be +// retained as long as snapshots need it. +TEST_F(DBMergeOperatorTest, MergeDeletionCompactionWithSnapshot) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + options.disable_auto_compactions = true; + Reopen(options); + + ASSERT_OK(Put("k1", "base")); + ASSERT_OK(Flush()); + + const Snapshot* snap = db_->GetSnapshot(); + + ASSERT_OK(Merge("k1", "DELETE")); + ASSERT_OK(Flush()); + + // Without snapshot: not found + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // With snapshot: found + ReadOptions snap_ro; + snap_ro.snapshot = snap; + ASSERT_OK(db_->Get(snap_ro, "k1", &value)); + ASSERT_EQ(value, "base"); + + // Compact — snapshot should still work + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_OK(db_->Get(snap_ro, "k1", &value)); + ASSERT_EQ(value, "base"); + + db_->ReleaseSnapshot(snap); + + // After releasing snapshot, compact again — entry should be fully removed + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); +} + +// Merge on an empty key (no prior Put or Merge) producing deletion. +TEST_F(DBMergeOperatorTest, MergeDeletionOnNonexistentKey) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + // Key never existed, merge on it → deletion of nothing → NotFound + ASSERT_OK(Merge("k1", "anything")); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Flush + compact → still not found + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); +} + +// Seek-based iteration with merge deletions (tests FindValueForCurrentKey +// fast path). +TEST_F(DBMergeOperatorTest, MergeDeletionSeekIteration) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + // Create keys a through f, delete c and e via merge + for (char c = 'a'; c <= 'f'; c++) { + ASSERT_OK(Put(std::string(1, c), std::string("val_") + c)); + } + ASSERT_OK(Merge("c", "DELETE")); + ASSERT_OK(Merge("e", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + + // Seek to "b" and iterate forward + iter->Seek("b"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "b"); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "d"); // c is skipped + + // SeekForPrev to "d" and iterate backward + iter->SeekForPrev("d"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "d"); + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "b"); // c is skipped + + ASSERT_OK(iter->status()); +} + +// Merge deletion after Reopen — tests WAL recovery path. +TEST_F(DBMergeOperatorTest, MergeDeletionRecovery) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", "base")); + ASSERT_OK(Merge("k1", "DELETE")); + ASSERT_OK(Put("k2", "alive")); + + // Reopen (recovers from WAL) + Reopen(options); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_OK(db_->Get(ReadOptions(), "k2", &value)); + ASSERT_EQ(value, "alive"); +} + +// Seek() landing directly on a merge-deleted key should skip to next. +TEST_F(DBMergeOperatorTest, MergeDeletionSeekToDeletedKey) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Merge("b", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + + // Seek directly to the deleted key "b" — should land on "c" + iter->Seek("b"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "c"); + + ASSERT_OK(iter->status()); +} + +// SeekForPrev() landing directly on a merge-deleted key should skip to prev. +TEST_F(DBMergeOperatorTest, MergeDeletionSeekForPrevToDeletedKey) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Merge("b", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + + // SeekForPrev directly to the deleted key "b" — should land on "a" + iter->SeekForPrev("b"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + + ASSERT_OK(iter->status()); +} + +// SeekToFirst() when the very first key is merge-deleted. +TEST_F(DBMergeOperatorTest, MergeDeletionFirstKeyDeleted) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Merge("a", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "b"); + ASSERT_OK(iter->status()); +} + +// SeekToLast() when the very last key is merge-deleted. +TEST_F(DBMergeOperatorTest, MergeDeletionLastKeyDeleted) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Merge("c", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "b"); + ASSERT_OK(iter->status()); +} + +// Every key in the DB is merge-deleted — iterator is immediately invalid. +TEST_F(DBMergeOperatorTest, MergeDeletionAllKeysDeleted) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Merge("a", "x")); + ASSERT_OK(Merge("b", "x")); + ASSERT_OK(Merge("c", "x")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); +} + +// Multiple consecutive deleted keys — verifies the continue-loop in +// FindNextUserEntryInternal handles chains, not just a single skip. +TEST_F(DBMergeOperatorTest, MergeDeletionConsecutiveDeletedKeys) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Put("d", "val_d")); + ASSERT_OK(Put("e", "val_e")); + // Delete b, c, d — three consecutive keys + ASSERT_OK(Merge("b", "DELETE")); + ASSERT_OK(Merge("c", "DELETE")); + ASSERT_OK(Merge("d", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + + // Forward: a → e (skip b, c, d) + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "e"); + + // Backward: e → a (skip d, c, b) + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "e"); + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + + ASSERT_OK(iter->status()); +} + +// Direction change: forward iteration then Prev() across a deleted key. +// This exercises ReverseToBackward() + FindValueForCurrentKey(). +TEST_F(DBMergeOperatorTest, MergeDeletionDirectionChangeForwardToBackward) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Put("d", "val_d")); + ASSERT_OK(Merge("c", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + + // Forward to "d" + iter->Seek("d"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "d"); + + // Now Prev() — should skip deleted "c" and land on "b" + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "b"); + + // Forward again — should skip deleted "c" and land on "d" + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "d"); + + ASSERT_OK(iter->status()); +} + +// Direction change: backward iteration then Next() across a deleted key. +// This exercises ReverseToForward() + FindNextUserEntryInternal(). +TEST_F(DBMergeOperatorTest, MergeDeletionDirectionChangeBackwardToForward) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Put("d", "val_d")); + ASSERT_OK(Merge("b", "DELETE")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + + // Backward to "a" + iter->SeekForPrev("a"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + + // Now Next() — should skip deleted "b" and land on "c" + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "c"); + + // Backward again — should skip deleted "b" and land on "a" + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + + ASSERT_OK(iter->status()); +} + +// Snapshot-based iteration: iterator with snapshot should see the +// pre-deletion state of keys. +TEST_F(DBMergeOperatorTest, MergeDeletionSnapshotIteration) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + + const Snapshot* snap = db_->GetSnapshot(); + + ASSERT_OK(Merge("b", "DELETE")); + + // Current iterator: sees a, c + { + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "c"); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + + // Snapshot iterator: sees a, b, c + { + ReadOptions snap_ro; + snap_ro.snapshot = snap; + auto iter = std::unique_ptr(db_->NewIterator(snap_ro)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "b"); + ASSERT_EQ(iter->value().ToString(), "val_b"); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "c"); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + + db_->ReleaseSnapshot(snap); +} + +// GetEntity (wide-column read) should return NotFound for a merge-deleted key. +TEST_F(DBMergeOperatorTest, MergeDeletionGetEntity) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("k1", "base")); + ASSERT_OK(Merge("k1", "DELETE")); + + PinnableWideColumns result; + Status s = + db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), "k1", &result); + ASSERT_TRUE(s.IsNotFound()); +} + +// Write-batch merge-on-write: when a Put and Merge for the same key are in the +// same batch, the merge is resolved during write. If it produces deletion, +// a kTypeDeletion should be written to the memtable. +TEST_F(DBMergeOperatorTest, MergeDeletionWriteBatchMergeOnWrite) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + // max_successive_merges = 0 means merge-on-write triggers when a Put and + // a Merge for the same key are inserted within the same batch. + options.max_successive_merges = 1000; + Reopen(options); + + // Put a base value first, then issue a merge in the same batch. + ASSERT_OK(Put("k1", "base")); + + // Second merge triggers merge-on-write against the memtable Put. + ASSERT_OK(Merge("k1", "v2")); + ASSERT_OK(Merge("k1", "DELETE")); + + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); +} + +// MultiGet after flush — exercises the SST/GetContext path rather than +// the memtable path. +TEST_F(DBMergeOperatorTest, MergeDeletionMultiGetAfterFlush) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Merge("b", "DELETE")); + ASSERT_OK(Flush()); + + std::vector keys = {Slice("a"), Slice("b"), Slice("c")}; + std::vector values(3); + std::vector statuses = db_->MultiGet(ReadOptions(), keys, &values); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], "val_a"); + ASSERT_TRUE(statuses[1].IsNotFound()); + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], "val_c"); +} + +// Merge produces deletion, then additional merges on top of the deleted state +// (all in memtable, no flush between). The later merges should see no base +// value and produce a fresh result or another deletion. +TEST_F(DBMergeOperatorTest, MergeDeletionThenMoreMerges) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + // Put 5, Merge -5 → counter=0 → deleted + ASSERT_OK(Put("k1", EncodeInt64(5))); + ASSERT_OK(Merge("k1", EncodeInt64(-5))); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // More merges on top of the "deleted" state. + // These see the deletion as base → counter starts at 0. + // +10 → counter=10 → alive + ASSERT_OK(Merge("k1", EncodeInt64(10))); + ASSERT_OK(db_->Get(ReadOptions(), "k1", &value)); + ASSERT_EQ(DecodeInt64(value), 10); + + // -10 → counter=0 → deleted again + ASSERT_OK(Merge("k1", EncodeInt64(-10))); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // Same after flush + compaction + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + + // And one more merge to revive + ASSERT_OK(Merge("k1", EncodeInt64(7))); + ASSERT_OK(db_->Get(ReadOptions(), "k1", &value)); + ASSERT_EQ(DecodeInt64(value), 7); +} + +// Non-bottommost compaction: the deletion tombstone from a merge must be +// retained so it can suppress older versions on lower levels. +// This exercises the "base value found" + kTypeDeletion path in MergeUntil +// where at_bottom is false, so a tombstone must be emitted. +TEST_F(DBMergeOperatorTest, MergeDeletionNonBottommostCompaction) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + options.disable_auto_compactions = true; + options.num_levels = 4; + Reopen(options); + + // Put an anchor key in L3 so that L1 is never the bottommost level. + ASSERT_OK(Put("k0", "anchor")); + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + + // Put("k1", "base") in L1. This will be the base value for the merge. + ASSERT_OK(Put("k1", "base")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + // Merge("k1", "DELETE") in L0. + ASSERT_OK(Merge("k1", "DELETE")); + ASSERT_OK(Flush()); + + // Compact L0→L1. The compaction picks L0 (Merge) and overlapping L1 (Put). + // MergeUntil resolves: Put("base") + Merge("DELETE") → deletion. + // at_bottom is false (L3 has data), so a kTypeDeletion tombstone is emitted. + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // k1 should be NotFound (tombstone suppresses any older versions). + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_OK(db_->Get(ReadOptions(), "k0", &value)); + ASSERT_EQ(value, "anchor"); + + // Full compaction should clean up the tombstone too. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsNotFound()); +} + +// Backward seek-optimization path with base value: when a key has many +// entries, FindValueForCurrentKey switches to FindValueForCurrentKeyUsingSeek. +// Test that merge-deletion works through this path when the merge has a +// Put base value. +TEST_F(DBMergeOperatorTest, MergeDeletionBackwardSeekPathWithBase) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + // Force the seek path after just 2 skipped entries. + options.max_sequential_skip_in_iterations = 2; + Reopen(options); + + // Create enough entries for "k1" to exceed max_sequential_skip_in_iterations. + // The old Put overwrites create the entries that get skipped during backward + // iteration, triggering the switch to the seek-based path. + ASSERT_OK(Put("k1", "old1")); // oldest + ASSERT_OK(Put("k1", "old2")); + ASSERT_OK(Put("k1", "base")); + ASSERT_OK(Merge("k1", "DELETE")); // newest — triggers deletion + + ASSERT_OK(Put("k2", "val_k2")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k2"); + + // Prev() triggers FindValueForCurrentKey for "k1", which after 2 skips + // switches to FindValueForCurrentKeyUsingSeek. The seek finds the Merge + // and the Put("base"), resolves to deletion via MergeWithPlainBaseValue. + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); +} + +// Backward seek-optimization path with NO base value: all entries for the +// key are Merge operands. FindValueForCurrentKeyUsingSeek collects them all, +// then calls MergeWithNoBaseValue. This is the path where we removed the +// unconditional `valid_ = true`. +TEST_F(DBMergeOperatorTest, MergeDeletionBackwardSeekPathNoBase) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + options.max_sequential_skip_in_iterations = 2; + Reopen(options); + + // All entries for "k1" are Merges — no Put base. + ASSERT_OK(Merge("k1", "a")); // oldest + ASSERT_OK(Merge("k1", "b")); + ASSERT_OK(Merge("k1", "c")); + ASSERT_OK(Merge("k1", "DELETE")); // newest — triggers deletion + + ASSERT_OK(Put("k2", "val_k2")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k2"); + + // Prev() triggers FindValueForCurrentKey → FindValueForCurrentKeyUsingSeek. + // All entries are Merges, loop exits without finding a base value. + // MergeWithNoBaseValue resolves to deletion. valid_ = false. + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); +} + +// Same as above but the merge does NOT produce deletion — verifies that +// the removed `valid_ = true` in FindValueForCurrentKeyUsingSeek doesn't +// break the normal (non-deletion) case. +TEST_F(DBMergeOperatorTest, MergeDeletionBackwardSeekPathNoBaseNonDeletion) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + options.max_sequential_skip_in_iterations = 2; + Reopen(options); + + // All entries for "k1" are Merges, none is "DELETE" → value survives. + ASSERT_OK(Merge("k1", "a")); + ASSERT_OK(Merge("k1", "b")); + ASSERT_OK(Merge("k1", "c")); + + ASSERT_OK(Put("k2", "val_k2")); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k2"); + + // Prev() should find "k1" with merged value "a,b,c" + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k1"); + ASSERT_EQ(iter->value().ToString(), "a,b,c"); + ASSERT_OK(iter->status()); +} + +// Merge deletion with iterate_upper_bound: a deleted key at the boundary +// should not confuse the iterator. +TEST_F(DBMergeOperatorTest, MergeDeletionWithIterateBounds) { + Options options = CurrentOptions(); + options.merge_operator = std::make_shared(); + Reopen(options); + + ASSERT_OK(Put("a", "val_a")); + ASSERT_OK(Put("b", "val_b")); + ASSERT_OK(Put("c", "val_c")); + ASSERT_OK(Put("d", "val_d")); + ASSERT_OK(Merge("c", "DELETE")); + + // Upper bound = "d" — range is [a, d), "c" is deleted + Slice upper("d"); + ReadOptions ro; + ro.iterate_upper_bound = &upper; + auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "a"); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "b"); + iter->Next(); + // "c" is deleted, "d" is past upper bound → invalid + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + // Lower bound = "b" — Prev from "d" should skip deleted "c" + Slice lower("b"); + ReadOptions ro2; + ro2.iterate_lower_bound = &lower; + auto iter2 = std::unique_ptr(db_->NewIterator(ro2)); + iter2->SeekForPrev("d"); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ(iter2->key().ToString(), "d"); + iter2->Prev(); + // "c" is deleted → land on "b" + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ(iter2->key().ToString(), "b"); + ASSERT_OK(iter2->status()); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/memtable.h b/db/memtable.h index 7211366f6428..a30577d13ee2 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -443,6 +443,9 @@ class ReadOnlyMemTable { value, merge_context->GetOperands(), info_log, statistics, clock, /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, out_value, out_columns); + // Note: if the merge operator returns std::monostate (deletion), + // TimedFullMerge returns Status::NotFound(), which is the correct + // result for point lookups. } } else if (out_value) { out_value->assign(value.data(), value.size()); @@ -470,6 +473,9 @@ class ReadOnlyMemTable { merge_context->GetOperands(), logger, statistics, clock, /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, out_value, out_columns); + // Note: if the merge operator returns std::monostate (deletion), + // TimedFullMerge returns Status::NotFound(), which is the correct + // result for point lookups. } else { // We have found a final value (a base deletion) and have newer // merge operands that we do not intend to merge. Nothing remains @@ -512,6 +518,9 @@ class ReadOnlyMemTable { merge_context->GetOperands(), logger, statistics, clock, /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, out_value, out_columns); + // Note: if the merge operator returns std::monostate (deletion), + // TimedFullMerge returns Status::NotFound(), which is the correct + // result for point lookups. } return true; } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 61fdbc0b095a..313605f0de63 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -160,6 +160,17 @@ Status MergeHelper::TimedFullMergeImpl( result->assign(operand.data(), operand.size()); } + return Status::OK(); + }, + [&](std::monostate) -> Status { + *result_type = kTypeDeletion; + + if (result_operand) { + *result_operand = Slice(nullptr, 0); + } + + result->clear(); + return Status::OK(); }}; @@ -247,6 +258,11 @@ Status MergeHelper::TimedFullMergeImpl( result_entity->SetPlainValue(operand); return Status::OK(); + }, + [&](std::monostate) -> Status { + // Merge operator signaled deletion. Return NotFound so + // point-lookup callers treat the key as deleted. + return Status::NotFound(); }}; Status s = TimedFullMergeCommonImpl( @@ -484,21 +500,52 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (s.ok()) { - // The original key encountered - original_key = std::move(keys_.back()); + if (merge_result_type == kTypeDeletion) { + if (at_bottom) { + // At the bottommost level: no lower levels can hold older + // versions of this key, so we can drop it entirely. We must + // not emit a kTypeDeletion tombstone here because the + // compaction iterator's PrepareOutput() asserts that deletion + // entries have already been removed at the bottommost level. + keys_.clear(); + merge_context_.Clear(); + } else { + // Not at the bottommost level: produce a deletion tombstone + // so that older versions on lower levels are properly + // suppressed. + original_key = std::move(keys_.back()); + orig_ikey.type = kTypeDeletion; + UpdateInternalKey(&original_key, orig_ikey.sequence, + orig_ikey.type); + + keys_.clear(); + merge_context_.Clear(); + keys_.emplace_front(std::move(original_key)); + // Slice() points to a static empty string, safe to mark as + // pinned to avoid a heap allocation for the empty deletion + // value. + merge_context_.PushOperand(Slice(), true /* operand_pinned */); + } - assert(merge_result_type == kTypeValue || - merge_result_type == kTypeWideColumnEntity); - orig_ikey.type = merge_result_type; - UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); + // move iter to the next entry + iter->Next(); + } else { + // The original key encountered + original_key = std::move(keys_.back()); - keys_.clear(); - merge_context_.Clear(); - keys_.emplace_front(std::move(original_key)); - merge_context_.PushOperand(merge_result); + assert(merge_result_type == kTypeValue || + merge_result_type == kTypeWideColumnEntity); + orig_ikey.type = merge_result_type; + UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); + + keys_.clear(); + merge_context_.Clear(); + keys_.emplace_front(std::move(original_key)); + merge_context_.PushOperand(merge_result); - // move iter to the next entry - iter->Next(); + // move iter to the next entry + iter->Next(); + } } else if (op_failure_scope == MergeOperator::OpFailureScope::kMustMerge) { // Change to `Status::MergeInProgress()` to denote output consists of @@ -626,20 +673,27 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, &merge_result, /* result_operand */ nullptr, &merge_result_type); if (s.ok()) { - // The original key encountered - // We are certain that keys_ is not empty here (see assertions couple of - // lines before). - original_key = std::move(keys_.back()); - - assert(merge_result_type == kTypeValue || - merge_result_type == kTypeWideColumnEntity); - orig_ikey.type = merge_result_type; - UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); - - keys_.clear(); - merge_context_.Clear(); - keys_.emplace_front(std::move(original_key)); - merge_context_.PushOperand(merge_result); + if (merge_result_type == kTypeDeletion) { + // Merge operator signaled deletion. We've seen the entire history + // of this key (at bottom), so we can safely drop it entirely. + keys_.clear(); + merge_context_.Clear(); + } else { + // The original key encountered + // We are certain that keys_ is not empty here (see assertions couple + // of lines before). + original_key = std::move(keys_.back()); + + assert(merge_result_type == kTypeValue || + merge_result_type == kTypeWideColumnEntity); + orig_ikey.type = merge_result_type; + UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); + + keys_.clear(); + merge_context_.Clear(); + keys_.emplace_front(std::move(original_key)); + merge_context_.PushOperand(merge_result); + } } else if (op_failure_scope == MergeOperator::OpFailureScope::kMustMerge) { // Change to `Status::MergeInProgress()` to denote output consists of // merge operands only. diff --git a/db/merge_helper.h b/db/merge_helper.h index 098b9b5baba6..2b7621c32eb7 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -186,13 +186,19 @@ class MergeHelper { // These are valid until the next MergeUntil call // If the merging was successful: // - keys() contains a single element with the latest sequence number of - // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. + // the merges. The type will be Put, Merge, or Delete. + // See IMPORTANT 1 note, below. // - values() contains a single element with the result of merging all the // operands together + // - If the merge produced a deletion and at_bottom is true, keys() and + // values() will be empty, meaning the key should be dropped. + // - If the merge produced a deletion and at_bottom is false, keys() + // contains a single element with type kTypeDeletion to tombstone + // older versions on lower levels. // // IMPORTANT 1: the key type could change after the MergeUntil call. - // Put/Delete + Merge + ... + Merge => Put - // Merge + ... + Merge => Merge + // Put/Delete + Merge + ... + Merge => Put (or empty if deletion) + // Merge + ... + Merge => Merge (or empty if deletion at bottom) // // If the merge operator is not associative, and if a Put/Delete is not found // then the merging will be unsuccessful. In this case: diff --git a/db/version_set.cc b/db/version_set.cc index faf06868e4fd..618bd216a51a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3150,13 +3150,17 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, iter->merge_context.GetOperands(), info_log_, db_statistics_, clock_, /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, iter->value ? iter->value->GetSelf() : nullptr, iter->columns); - if (LIKELY(iter->value != nullptr)) { - iter->value->PinSelf(); - range->AddValueSize(iter->value->size()); - } else { - assert(iter->columns); - range->AddValueSize(iter->columns->serialized_size()); + if (status->ok()) { + if (LIKELY(iter->value != nullptr)) { + iter->value->PinSelf(); + range->AddValueSize(iter->value->size()); + } else { + assert(iter->columns); + range->AddValueSize(iter->columns->serialized_size()); + } } + // Note: if merge produced deletion (status is NotFound), the key is + // correctly marked as done with NotFound status below. range->MarkKeyDone(iter); if (range->GetValueSize() > read_options.value_size_soft_limit) { diff --git a/db/write_batch.cc b/db/write_batch.cc index 806bd1f23d00..5357cd8625c0 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -2893,6 +2893,21 @@ class MemTableInserter : public WriteBatch::Handler { // Failed to merge! // Store the delta in memtable perform_merge = false; + } else if (new_value_type == kTypeDeletion) { + // Merge operator signaled deletion. Add a deletion entry to the + // memtable instead of a value. + assert(!concurrent_memtable_writes_); + if (kv_prot_info != nullptr) { + auto merged_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + merged_kv_prot_info.UpdateV(value, Slice()); + merged_kv_prot_info.UpdateO(kTypeMerge, kTypeDeletion); + ret_status = mem->Add(sequence_, kTypeDeletion, key, Slice(), + &merged_kv_prot_info); + } else { + ret_status = mem->Add(sequence_, kTypeDeletion, key, Slice(), + nullptr /* kv_prot_info */); + } } else { // 3) Add value to memtable assert(!concurrent_memtable_writes_); diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index 387e5345c009..2e99251eda31 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -188,11 +188,22 @@ class MergeOperator : public Customizable { struct MergeOperationOutputV3 { using NewColumns = std::vector>; - using NewValue = std::variant; + using NewValue = + std::variant; - // The result of the merge operation. Can be one of three things (see the - // NewValue variant above): a new plain value, a new wide-column value, or - // an existing merge operand. + // The result of the merge operation. Can be one of four things (see the + // NewValue variant above): + // - std::string: a new plain value + // - NewColumns: a new wide-column value + // - Slice: an existing merge operand (zero-copy reference) + // - std::monostate: the key should be deleted + // + // When std::monostate is set, the merge result is treated as a deletion: + // - During Get()/MultiGet(): returns Status::NotFound() + // - During iteration: the key is skipped (not visible) + // - During compaction: produces a deletion tombstone, or drops the key + // entirely when the full history of the key has been seen (bottommost + // level with no pending snapshot boundaries) NewValue new_value; // The scope of the failure if applicable. See above for more details. OpFailureScope op_failure_scope = OpFailureScope::kDefault; diff --git a/table/get_context.cc b/table/get_context.cc index 8bd0d6f0133c..9e0d3bed5c87 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -596,7 +596,11 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, void GetContext::PostprocessMerge(const Status& merge_status) { if (!merge_status.ok()) { - if (merge_status.subcode() == Status::SubCode::kMergeOperatorFailed) { + if (merge_status.IsNotFound()) { + // Merge operator signaled deletion (std::monostate result). + state_ = kDeleted; + } else if (merge_status.subcode() == + Status::SubCode::kMergeOperatorFailed) { state_ = kMergeOperatorFailed; } else { state_ = kCorrupt;