Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ byteorder = { package = "byteorder-lite", version = "0.1.0" }
byteview = "~0.10.0"
crossbeam-skiplist = "0.1.3"
enum_dispatch = "0.3.13"
equivalent = "1.0.2"
interval-heap = "0.0.5"
log = "0.4.27"
lz4_flex = { version = "0.11.5", optional = true, default-features = false }
Expand All @@ -43,6 +44,7 @@ xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
criterion = { version = "0.8.0", features = ["html_reports"] }
fs_extra = "1.3.0"
nanoid = "0.4.0"
quickcheck = "1.0.3"
rand = "0.9.2"
strum = { version = "0.27.2", features = ["derive"] }
test-log = "0.2.18"
Expand Down
14 changes: 10 additions & 4 deletions benches/memtable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lsm_tree::{InternalValue, Memtable};
use lsm_tree::{InternalValue, Memtable, SeqNo};
use nanoid::nanoid;

fn memtable_get_hit(c: &mut Criterion) {
Expand All @@ -25,7 +25,10 @@ fn memtable_get_hit(c: &mut Criterion) {
b.iter(|| {
assert_eq!(
[1, 2, 3],
&*memtable.get(b"abc_w5wa35aw35naw", None).unwrap().value,
&*memtable
.get(b"abc_w5wa35aw35naw", SeqNo::MAX)
.unwrap()
.value,
)
});
});
Expand Down Expand Up @@ -60,7 +63,10 @@ fn memtable_get_snapshot(c: &mut Criterion) {
b.iter(|| {
assert_eq!(
[1, 2, 3],
&*memtable.get(b"abc_w5wa35aw35naw", Some(1)).unwrap().value,
&*memtable
.get(b"abc_w5wa35aw35naw", SeqNo::MAX)
.unwrap()
.value,
);
});
});
Expand All @@ -79,7 +85,7 @@ fn memtable_get_miss(c: &mut Criterion) {
}

c.bench_function("memtable get miss", |b| {
b.iter(|| assert!(memtable.get(b"abc_564321", None).is_none()));
b.iter(|| assert!(memtable.get(b"abc_564321", SeqNo::MAX).is_none()));
});
}

Expand Down
61 changes: 61 additions & 0 deletions src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// (found in the LICENSE-* files in the repository)

use crate::{SeqNo, UserKey, ValueType};
use equivalent::{Comparable, Equivalent};
use std::cmp::Reverse;

#[derive(Clone, Eq)]
Expand Down Expand Up @@ -71,3 +72,63 @@ impl Ord for InternalKey {
(&self.user_key, Reverse(self.seqno)).cmp(&(&other.user_key, Reverse(other.seqno)))
}
}

impl Equivalent<InternalKeyRef<'_>> for InternalKey {
fn equivalent(&self, other: &InternalKeyRef<'_>) -> bool {
self.user_key == other.user_key && self.seqno == other.seqno
}
}

impl Comparable<InternalKeyRef<'_>> for InternalKey {
fn compare(&self, other: &InternalKeyRef<'_>) -> std::cmp::Ordering {
(&*self.user_key, Reverse(self.seqno)).cmp(&(other.user_key, Reverse(other.seqno)))
}
}

/// Temporary internal key without heap allocation
#[derive(Debug, Eq)]
pub struct InternalKeyRef<'a> {
pub user_key: &'a [u8],
pub seqno: SeqNo,
pub value_type: ValueType,
}

impl<'a> InternalKeyRef<'a> {
pub fn new(user_key: &'a [u8], seqno: u64, value_type: ValueType) -> Self {
InternalKeyRef {
user_key,
seqno,
value_type,
}
}
}

impl<'a> PartialEq for InternalKeyRef<'a> {
fn eq(&self, other: &Self) -> bool {
self.user_key == other.user_key && self.seqno == other.seqno
}
}

impl<'a> PartialOrd for InternalKeyRef<'a> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl<'a> Ord for InternalKeyRef<'a> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(&self.user_key, Reverse(self.seqno)).cmp(&(&other.user_key, Reverse(other.seqno)))
}
}

impl Equivalent<InternalKey> for InternalKeyRef<'_> {
fn equivalent(&self, other: &InternalKey) -> bool {
self.user_key == other.user_key && self.seqno == other.seqno
}
}

impl Comparable<InternalKey> for InternalKeyRef<'_> {
fn compare(&self, other: &InternalKey) -> std::cmp::Ordering {
(self.user_key, Reverse(self.seqno)).cmp(&(&other.user_key, Reverse(other.seqno)))
}
}
23 changes: 19 additions & 4 deletions src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::key::InternalKey;
#[expect(unsafe_code)]
mod skiplist;

use crate::key::{InternalKey, InternalKeyRef};
use crate::{
value::{InternalValue, SeqNo, UserValue},
ValueType,
};
use crossbeam_skiplist::SkipMap;
use skiplist::SkipMap;
use std::ops::RangeBounds;
use std::sync::atomic::{AtomicBool, AtomicU64};

Expand Down Expand Up @@ -38,6 +41,14 @@ pub struct Memtable {
}

impl Memtable {
/// Clears the memtable.
pub fn clear(&mut self) {
self.items = SkipMap::default();
self.highest_seqno = AtomicU64::new(0);
self.approximate_size
.store(0, std::sync::atomic::Ordering::Release);
}

/// Returns the memtable ID.
pub fn id(&self) -> MemtableId {
self.id
Expand Down Expand Up @@ -111,7 +122,7 @@ impl Memtable {
// abcdef -> 6
// abcdef -> 5
//
let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
let lower_bound = InternalKeyRef::new(key, seqno - 1, ValueType::Value);

let mut iter = self
.items
Expand Down Expand Up @@ -158,7 +169,11 @@ impl Memtable {
.fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);

let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
self.items.insert(key, item.value);
// TODO(ajwerner): Decide what we want to do here. The panic is sort of
// extreme, but also seems right given the invariants.
if let Err((key, _value)) = self.items.insert(key, item.value) {
panic!("duplicate insert of {key:?} into memtable")
}

self.highest_seqno
.fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
Expand Down
120 changes: 120 additions & 0 deletions src/memtable/skiplist/arena.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use std::{
alloc::Layout,
mem::offset_of,
sync::{
atomic::{AtomicPtr, AtomicUsize, Ordering},
Mutex,
},
};

// DEFAULT_BUFFER_SIZE needs to be at least big enough for one fullly-aligned node
// for the crate to work correctly. Anything larger than that will work.
//
// TODO: Justify this size.
const DEFAULT_BUFFER_SIZE: usize = (32 << 10) - size_of::<AtomicUsize>();

impl<const BUFFER_SIZE: usize> Default for Arenas<BUFFER_SIZE> {
fn default() -> Self {
Self::new()
}
}

unsafe impl<const N: usize> Send for Arenas<N> {}
unsafe impl<const N: usize> Sync for Arenas<N> {}

pub(crate) struct Arenas<const BUFFER_SIZE: usize = DEFAULT_BUFFER_SIZE> {
// The current set of Arenas
arenas: Mutex<Vec<*mut Buffer<BUFFER_SIZE>>>,
// Cache of the currently open Arena. It'll be the last item in the buffers
// vec. This atomic is only ever written while holding the buffers Mutex.
open_arena: AtomicPtr<Buffer<BUFFER_SIZE>>,
}

impl<const BUFFER_SIZE: usize> Arenas<BUFFER_SIZE> {
pub(crate) fn new() -> Self {
Self {
arenas: Mutex::default(),
open_arena: AtomicPtr::default(),
}
}
}

impl<const BUFFER_SIZE: usize> Arenas<BUFFER_SIZE> {
pub(crate) fn alloc(&self, layout: Layout) -> *mut u8 {
loop {
let buffer_tail = self.open_arena.load(Ordering::Acquire);
if !buffer_tail.is_null() {
if let Some(offset) = try_alloc(buffer_tail, layout) {
return offset;
}
}

let mut buffers = self.arenas.lock().expect("lock is poisoned");
let buffer = buffers.last().unwrap_or(&std::ptr::null_mut());
if *buffer != buffer_tail {
// Lost the race with somebody else.
continue;
}

let new_buffer: Box<Buffer<BUFFER_SIZE>> = Box::default();
let new_buffer = Box::into_raw(new_buffer);
self.open_arena.store(new_buffer, Ordering::Release);
buffers.push(new_buffer);
}
}
}

struct Buffer<const N: usize> {
offset: AtomicUsize,
data: [u8; N],
}

impl<const N: usize> Default for Buffer<N> {
fn default() -> Self {
Self {
offset: AtomicUsize::default(),
data: [0; N],
}
}
}

impl<const N: usize> Drop for Arenas<N> {
fn drop(&mut self) {
let mut buffers = self.arenas.lock().expect("lock is poisoned");

for buffer in buffers.drain(..) {
drop(unsafe { Box::from_raw(buffer) });
}
}
}

fn try_alloc<const N: usize>(buf: *mut Buffer<N>, layout: Layout) -> Option<*mut u8> {
let mut cur_offset = unsafe { &(*buf).offset }.load(Ordering::Relaxed);

loop {
let buf_start = unsafe { buf.byte_add(offset_of!(Buffer<N>, data)) as *mut u8 };
let free_start = unsafe { buf_start.byte_add(cur_offset) };
let start_addr = unsafe { free_start.byte_add(free_start.align_offset(layout.align())) };
let new_offset = ((start_addr as usize) + layout.size()) - (buf_start as usize);
if new_offset > N {
return None;
}

// Note that we can get away with using relaxed ordering here because we're not
// asserting anything about the contents of the buffer. We're just trying to
// allocate a new node.
match unsafe { &(*buf).offset }.compare_exchange(
cur_offset,
new_offset,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_offset) => return Some(start_addr),
Err(offset) => cur_offset = offset,
}
}
}
30 changes: 30 additions & 0 deletions src/memtable/skiplist/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

// This implementation was heavily inspired by:
// * https://github.com/andy-kimball/arenaskl/tree/f7010085
// * https://github.com/crossbeam-rs/crossbeam/tree/983d56b6/crossbeam-skiplist

//! This mod is a purpose-built concurrent skiplist intended for use
//! by the memtable.
//!
//! Due to the requirements of memtable, there are a number of notable in the
//! features it lacks:
//! - Updates
//! - Deletes
//! - Overwrites
//!
//! The main reasons for its existence are that it
//! - provides concurrent reads and inserts, and
//! - batches memory allocations
//!
//! Prior to this implementation, `crossbeam_skiplist` was used.

mod arena;
mod skipmap;

pub use skipmap::SkipMap;

#[cfg(test)]
mod test;
Loading
Loading