From 16e963137a998b1c7eb5140c3a899ffcc4afba35 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 17 Mar 2025 22:13:59 +0100 Subject: [PATCH] Moving toward a paging-based recordlog Instead of chained list of 128mb files, we pre-allocate a file, and split it into pages. A header in the file stores the order in which the pages should be read. On the write side, when we truncate queues, we often end up freeing pages. When all pages are in use, we perform a GC. It simply reorders pages, and updates the file header accordingly. --- Cargo.toml | 7 +- TODO | 12 + src/block_read_write.rs | 40 ++- src/error.rs | 10 + src/frame/reader.rs | 33 ++- src/frame/tests.rs | 50 ++-- src/frame/writer.rs | 34 ++- src/lib.rs | 54 +++- src/mem/queue.rs | 68 ++--- src/mem/queues.rs | 65 +++-- src/mem/summary.rs | 6 +- src/mem/tests.rs | 161 +++++------- src/mockfile.rs | 74 ++++++ src/multi_record_log.rs | 155 +++++------ src/page_directory/header.rs | 117 +++++++++ src/page_directory/mod.rs | 177 +++++++++++++ src/page_directory/page_list.rs | 374 +++++++++++++++++++++++++++ src/page_directory/page_refcounts.rs | 133 ++++++++++ src/page_directory/reader.rs | 93 +++++++ src/page_directory/readme.txt | 23 ++ src/page_directory/writer.rs | 146 +++++++++++ src/persist_policy.rs | 2 +- src/proptests.rs | 30 ++- src/recordlog/reader.rs | 26 +- src/recordlog/tests.rs | 83 ++++-- src/recordlog/writer.rs | 39 ++- src/rolling/directory.rs | 22 +- src/rolling/mod.rs | 40 +++ src/rolling/tests.rs | 68 +++-- src/tests.rs | 97 ++++--- 30 files changed, 1787 insertions(+), 452 deletions(-) create mode 100644 TODO create mode 100644 src/mockfile.rs create mode 100644 src/page_directory/header.rs create mode 100644 src/page_directory/mod.rs create mode 100644 src/page_directory/page_list.rs create mode 100644 src/page_directory/page_refcounts.rs create mode 100644 src/page_directory/reader.rs create mode 100644 src/page_directory/readme.txt create mode 100644 src/page_directory/writer.rs diff --git a/Cargo.toml b/Cargo.toml index 62741ca..fe76f54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,10 @@ rust-version = "1.68" # 1.67 contains an UB we would tri [dependencies] bytes = "1" crc32fast = "1.2" -serde = { version = "1", features = ["derive"] } thiserror = "2" tracing = "0.1.37" +arc-swap = "1.7" +rclite = "0.2" [dev-dependencies] criterion = "0.5" @@ -26,3 +27,7 @@ tempfile = "3" [[bench]] name = "bench" harness = false + +# Uncomment for profiling +# [profile.bench] +# debug=true diff --git a/TODO b/TODO new file mode 100644 index 0000000..39cd85b --- /dev/null +++ b/TODO @@ -0,0 +1,12 @@ +we do not need to record position anymore. Keeping the last record is sufficient. +Of course it would be nicer to store that in a separate way to avoid fragmentation. + +Record refcount + +No GC however means on restart we will often have to read way more than what is actually required. + +We need a way to stop reading the past. + +Truncating a non existing queue. + +just truncate to record position. diff --git a/src/block_read_write.rs b/src/block_read_write.rs index c259283..c4c3f5b 100644 --- a/src/block_read_write.rs +++ b/src/block_read_write.rs @@ -4,14 +4,22 @@ use crate::PersistAction; pub const BLOCK_NUM_BYTES: usize = 32_768; +/// A block read is supposed to be positioned on a block at its initialization. +/// +/// In other words, it is not necessary to call `next_block` a first time +/// before calling `block()`. pub trait BlockRead { + type Session; + + fn start_session(&self) -> Self::Session; + /// Loads the next block. /// If `Ok(true)` is returned, the new block is available through /// `.block()`. /// /// If `Ok(false)` is returned, the end of the `BlockReader` /// has been reached and the content of `block()` could be anything. - fn next_block(&mut self) -> io::Result; + fn next_block(&mut self, read_session: &mut Self::Session) -> io::Result; /// A `BlockReader` is always position on a specific block. /// @@ -25,8 +33,17 @@ pub trait BlockRead { } pub trait BlockWrite { + type Session; + + fn start_write_session(&mut self) -> io::Result; + + fn make_room(&mut self, num_bytes: u64) -> io::Result<()>; + /// Must panic if buf is larger than `num_bytes_remaining_in_block`. - fn write(&mut self, buf: &[u8]) -> io::Result<()>; + /// Not that this trait does not have next_block() method. + /// + /// We automatically go to the next block after the current block has been entirely written. + fn write(&mut self, buf: &[u8], write_session: &mut Self::Session) -> io::Result<()>; /// Persist the data following the `persist_action`. fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>; /// Number of bytes that can be added in the block. @@ -49,7 +66,11 @@ impl<'a> From<&'a [u8]> for ArrayReader<'a> { } impl BlockRead for ArrayReader<'_> { - fn next_block(&mut self) -> io::Result { + type Session = (); + + fn start_session(&self) -> Self::Session {} + + fn next_block(&mut self, _session: &mut Self::Session) -> io::Result { if self.data.len() < BLOCK_NUM_BYTES { return Ok(false); } @@ -81,7 +102,14 @@ impl From for Vec { } impl BlockWrite for VecBlockWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<()> { + type Session = (); + fn make_room(&mut self, num_bytes: u64) -> io::Result<()> { + // TODO consider just doubling for performance. + let new_len = ceil_to_block(self.cursor + num_bytes as usize); + self.buffer.resize(new_len, 0u8); + Ok(()) + } + fn write(&mut self, buf: &[u8], _session: &mut Self::Session) -> io::Result<()> { assert!(buf.len() <= self.num_bytes_remaining_in_block()); if self.cursor + buf.len() > self.buffer.len() { let new_len = ceil_to_block((self.cursor + buf.len()) * 2 + 1); @@ -99,4 +127,8 @@ impl BlockWrite for VecBlockWriter { fn num_bytes_remaining_in_block(&self) -> usize { BLOCK_NUM_BYTES - (self.cursor % BLOCK_NUM_BYTES) } + + fn start_write_session(&mut self) -> io::Result { + Ok(()) + } } diff --git a/src/error.rs b/src/error.rs index 4e53926..5c18bb7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -91,3 +91,13 @@ pub enum ReadRecordError { #[error("Corruption")] Corruption, } + +#[derive(Error, Debug)] +pub enum HeaderError { + #[error("invalid magic number: found {magic_number}")] + InvalidMagicNumber { magic_number: u32 }, + #[error("invalid checksum")] + InvalidChecksum, + #[error("unsupported version: {version}")] + UnsupportedVersion { version: u32 }, +} diff --git a/src/frame/reader.rs b/src/frame/reader.rs index 7fd7c12..0ffea6b 100644 --- a/src/frame/reader.rs +++ b/src/frame/reader.rs @@ -2,15 +2,14 @@ use std::io; use thiserror::Error; -use crate::frame::{FrameType, FrameWriter, Header, HEADER_LEN}; -use crate::rolling::{RollingReader, RollingWriter}; +use crate::frame::{FrameType, Header, HEADER_LEN}; use crate::{BlockRead, BLOCK_NUM_BYTES}; pub struct FrameReader { - reader: R, + pub(crate) reader: R, /// In block cursor - cursor: usize, + pub(crate) cursor: usize, // The current block is corrupted. block_corrupted: bool, @@ -35,8 +34,8 @@ impl FrameReader { } } - pub fn read(&self) -> &R { - &self.reader + pub fn start_session(&self) -> R::Session { + self.reader.start_session() } // Returns the number of bytes remaining into @@ -47,13 +46,16 @@ impl FrameReader { crate::BLOCK_NUM_BYTES - self.cursor } - fn go_to_next_block_if_necessary(&mut self) -> Result<(), ReadFrameError> { + fn go_to_next_block_if_necessary( + &mut self, + session: &mut R::Session, + ) -> Result<(), ReadFrameError> { let num_bytes_to_end_of_block = self.num_bytes_to_end_of_block(); let need_to_skip_block = self.block_corrupted || num_bytes_to_end_of_block < HEADER_LEN; if !need_to_skip_block { return Ok(()); } - if !self.reader.next_block()? { + if !self.reader.next_block(session)? { return Err(ReadFrameError::NotAvailable); } @@ -79,8 +81,11 @@ impl FrameReader { } // Reads the next frame. - pub fn read_frame(&mut self) -> Result<(FrameType, &[u8]), ReadFrameError> { - self.go_to_next_block_if_necessary()?; + pub fn read_frame( + &mut self, + session: &mut R::Session, + ) -> Result<(FrameType, &[u8]), ReadFrameError> { + self.go_to_next_block_if_necessary(session)?; let header = self.get_frame_header()?; self.cursor += HEADER_LEN; if self.cursor + header.len() > BLOCK_NUM_BYTES { @@ -103,11 +108,3 @@ impl FrameReader { Ok((header.frame_type(), frame_payload)) } } - -impl FrameReader { - pub fn into_writer(self) -> io::Result> { - let mut rolling_writer: RollingWriter = self.reader.into_writer()?; - rolling_writer.forward(self.cursor)?; - Ok(FrameWriter::create(rolling_writer)) - } -} diff --git a/src/frame/tests.rs b/src/frame/tests.rs index 1ea8a7b..c66ff27 100644 --- a/src/frame/tests.rs +++ b/src/frame/tests.rs @@ -3,39 +3,42 @@ use std::io; use crate::block_read_write::{ArrayReader, VecBlockWriter}; use crate::frame::header::{FrameType, HEADER_LEN}; use crate::frame::{FrameReader, FrameWriter, ReadFrameError}; -use crate::{PersistAction, BLOCK_NUM_BYTES}; +use crate::{BlockWrite as _, PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_frame_simple() { let block_writer = { - let wrt: VecBlockWriter = VecBlockWriter::default(); + let mut wrt: VecBlockWriter = VecBlockWriter::default(); + let mut session = wrt.start_write_session().unwrap(); let mut frame_writer = FrameWriter::create(wrt); + frame_writer - .write_frame(FrameType::First, &b"abc"[..]) + .write_frame(FrameType::First, &b"abc"[..], &mut session) .unwrap(); frame_writer - .write_frame(FrameType::Middle, &b"de"[..]) + .write_frame(FrameType::Middle, &b"de"[..], &mut session) .unwrap(); frame_writer - .write_frame(FrameType::Last, &b"fgh"[..]) + .write_frame(FrameType::Last, &b"fgh"[..], &mut session) .unwrap(); frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer() }; let buffer: Vec = block_writer.into(); let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..])); - let read_frame_res = frame_reader.read_frame(); + let mut session = frame_reader.start_session(); + let read_frame_res = frame_reader.read_frame(&mut session); assert_eq!(read_frame_res.unwrap(), (FrameType::First, &b"abc"[..])); assert_eq!( - frame_reader.read_frame().unwrap(), + frame_reader.read_frame(&mut session).unwrap(), (FrameType::Middle, &b"de"[..]) ); assert_eq!( - frame_reader.read_frame().unwrap(), + frame_reader.read_frame(&mut session).unwrap(), (FrameType::Last, &b"fgh"[..]) ); assert!(matches!( - frame_reader.read_frame().unwrap_err(), + frame_reader.read_frame(&mut session).unwrap_err(), ReadFrameError::NotAvailable )); } @@ -44,20 +47,22 @@ fn test_frame_simple() { fn test_frame_corruption_in_payload() -> io::Result<()> { let mut buf: Vec = { let mut frame_writer = FrameWriter::create(VecBlockWriter::default()); - frame_writer.write_frame(FrameType::First, &b"abc"[..])?; + let mut session = frame_writer.start_session()?; + frame_writer.write_frame(FrameType::First, &b"abc"[..], &mut session)?; frame_writer.persist(PersistAction::Flush)?; - frame_writer.write_frame(FrameType::Middle, &b"de"[..])?; + frame_writer.write_frame(FrameType::Middle, &b"de"[..], &mut session)?; frame_writer.persist(PersistAction::Flush)?; frame_writer.into_writer().into() }; buf[8] = 0u8; let mut frame_reader = FrameReader::open(ArrayReader::from(&buf[..])); + let mut session = frame_reader.start_session(); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Err(ReadFrameError::Corruption) )); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Ok((FrameType::Middle, b"de")) )); Ok(()) @@ -65,8 +70,11 @@ fn test_frame_corruption_in_payload() -> io::Result<()> { fn repeat_empty_frame_util(repeat: usize) -> Vec { let mut frame_writer = FrameWriter::create(VecBlockWriter::default()); + let mut session = frame_writer.start_session().unwrap(); for _ in 0..repeat { - frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap(); + frame_writer + .write_frame(FrameType::Full, &b""[..], &mut session) + .unwrap(); } frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer().into() @@ -77,12 +85,13 @@ fn test_simple_multiple_blocks() -> io::Result<()> { let num_frames = 1 + BLOCK_NUM_BYTES / HEADER_LEN; let buffer = repeat_empty_frame_util(num_frames); let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..])); + let mut session = frame_reader.start_session(); for _ in 0..num_frames { - let read_frame_res = frame_reader.read_frame(); + let read_frame_res = frame_reader.read_frame(&mut session); assert!(matches!(read_frame_res, Ok((FrameType::Full, &[])))); } assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut &mut &mut &mut &mut &mut &mut &mut session), Err(ReadFrameError::NotAvailable) )); Ok(()) @@ -96,20 +105,21 @@ fn test_multiple_blocks_corruption_on_length() -> io::Result<()> { let mut buffer = repeat_empty_frame_util(num_frames); buffer[2000 * HEADER_LEN + 5] = 255u8; let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..])); + let mut session = frame_reader.start_session(); for _ in 0..2000 { - let read_frame_res = frame_reader.read_frame(); + let read_frame_res = frame_reader.read_frame(&mut session); assert!(matches!(read_frame_res, Ok((FrameType::Full, &[])))); } assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Err(ReadFrameError::Corruption) )); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Ok((FrameType::Full, &[])) )); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Err(ReadFrameError::NotAvailable) )); Ok(()) diff --git a/src/frame/writer.rs b/src/frame/writer.rs index 69d23c0..a070a78 100644 --- a/src/frame/writer.rs +++ b/src/frame/writer.rs @@ -1,11 +1,10 @@ use std::io; use crate::frame::{FrameType, Header, HEADER_LEN}; -use crate::rolling::{Directory, RollingWriter}; use crate::{BlockWrite, PersistAction, BLOCK_NUM_BYTES}; pub struct FrameWriter { - wrt: W, + pub(crate) wrt: W, // temporary buffer, not storing anything in particular after any function returns buffer: Box<[u8; BLOCK_NUM_BYTES]>, } @@ -18,21 +17,40 @@ impl FrameWriter { } } + pub fn start_session(&mut self) -> io::Result { + self.wrt.start_write_session() + } + + pub fn make_room(&mut self, num_bytes: u64) -> io::Result<()> { + // Framing adds some overhead. We can however compute an upperbound of the amount of room + // that will be needed. The worst case scenario is if we start at the very end of a + // block and the first frame is empty and we end up just writing a header. + const MAX_EFFECTIVE_BLOCK_BYTES: u64 = (BLOCK_NUM_BYTES - HEADER_LEN) as u64; + let num_blocks_upperbound = 1 + num_bytes.div_ceil(MAX_EFFECTIVE_BLOCK_BYTES); + let room_needed_upperbound: u64 = HEADER_LEN as u64 * num_blocks_upperbound + num_bytes; + self.wrt.make_room(room_needed_upperbound) + } + /// Writes a frame. The payload has to be lower than the /// remaining space in the frame as defined /// by `max_writable_frame_length`. - pub fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result<()> { + pub fn write_frame( + &mut self, + frame_type: FrameType, + payload: &[u8], + session: &mut W::Session, + ) -> io::Result<()> { let num_bytes_remaining_in_block = self.wrt.num_bytes_remaining_in_block(); if num_bytes_remaining_in_block < HEADER_LEN { let zero_bytes = [0u8; HEADER_LEN]; self.wrt - .write(&zero_bytes[..num_bytes_remaining_in_block])?; + .write(&zero_bytes[..num_bytes_remaining_in_block], session)?; } let record_len = HEADER_LEN + payload.len(); let (buffer_header, buffer_record) = self.buffer[..record_len].split_at_mut(HEADER_LEN); buffer_record.copy_from_slice(payload); Header::for_payload(frame_type, payload).serialize(buffer_header); - self.wrt.write(&self.buffer[..record_len])?; + self.wrt.write(&self.buffer[..record_len], session)?; Ok(()) } @@ -65,9 +83,3 @@ impl FrameWriter { self.wrt } } - -impl FrameWriter { - pub fn directory(&mut self) -> &mut Directory { - &mut self.wrt.directory - } -} diff --git a/src/lib.rs b/src/lib.rs index 41ce3f4..deb578a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ use std::borrow::Cow; +use std::fs::File; +use std::io::{self, BufWriter, Cursor}; mod block_read_write; @@ -6,11 +8,12 @@ pub use block_read_write::{BlockRead, BlockWrite, BLOCK_NUM_BYTES}; pub mod error; mod frame; mod mem; + mod multi_record_log; +mod page_directory; mod persist_policy; mod record; mod recordlog; -mod rolling; pub use mem::{QueueSummary, QueuesSummary}; pub use multi_record_log::MultiRecordLog; @@ -39,13 +42,16 @@ pub struct ResourceUsage { pub memory_used_bytes: usize, /// Capacity allocated, a part of which may be unused right now pub memory_allocated_bytes: usize, - /// Disk size used - pub disk_used_bytes: usize, + pub num_pages: u32, + pub num_used_pages: u32, } #[cfg(test)] mod tests; +#[cfg(test)] +mod mockfile; + #[cfg(test)] mod proptests; @@ -65,3 +71,45 @@ impl<'a> Serializable<'a> for &'a str { std::str::from_utf8(buffer).ok() } } + +pub trait FileLikeWrite: io::Write + io::Seek { + fn fsyncdata(&mut self) -> io::Result<()>; + fn set_len(&mut self, num_bytes: u64) -> io::Result<()>; +} + +pub trait FileLike: io::Read + FileLikeWrite {} + +impl FileLikeWrite for BufWriter { + fn fsyncdata(&mut self) -> io::Result<()> { + self.get_mut().fsyncdata() + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + self.get_mut().set_len(num_bytes) + } +} + +impl FileLikeWrite for File { + fn fsyncdata(&mut self) -> io::Result<()> { + self.sync_data() + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + File::set_len(self, num_bytes) + } +} + +impl FileLike for File {} + +impl FileLikeWrite for Cursor> { + fn fsyncdata(&mut self) -> io::Result<()> { + Ok(()) + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + self.get_mut().resize(num_bytes as usize, 0u8); + Ok(()) + } +} + +impl FileLike for Cursor> {} diff --git a/src/mem/queue.rs b/src/mem/queue.rs index c9f1e66..705dcdf 100644 --- a/src/mem/queue.rs +++ b/src/mem/queue.rs @@ -2,41 +2,35 @@ use std::ops::{Bound, RangeBounds, RangeToInclusive}; use super::rolling_buffer::RollingBuffer; use crate::error::AppendError; -use crate::mem::QueueSummary; -use crate::rolling::FileNumber; +use crate::page_directory::PageRangeRef; use crate::Record; #[derive(Clone)] -struct RecordMeta { +struct RecordMeta { start_offset: usize, // in a vec of RecordMeta, this field should be set only on the last record // which relate to that File. - file_number: Option, + ref_count_handle: Option, position: u64, } -#[derive(Default)] -pub(crate) struct MemQueue { +pub(crate) struct MemQueue { // Concatenated records concatenated_records: RollingBuffer, start_position: u64, - record_metas: Vec, + record_metas: Vec>, + // We make sure to keep the last truncate record because it helps us keeping track of the queue + // position even when the queue is empty. + record_position_ref_count: H, } -impl MemQueue { - pub fn with_next_position(next_position: u64) -> Self { +impl MemQueue { + pub fn with_next_position(next_position: u64, record_position_ref_count: H) -> Self { MemQueue { concatenated_records: RollingBuffer::new(), start_position: next_position, record_metas: Vec::new(), - } - } - - pub fn summary(&self) -> QueueSummary { - QueueSummary { - start: self.start_position(), - end: self.last_position(), - file_number: self.first_file_number(), + record_position_ref_count: record_position_ref_count, } } @@ -44,19 +38,6 @@ impl MemQueue { self.record_metas.is_empty() } - pub(crate) fn first_file_number(&self) -> Option { - let file_number: &FileNumber = self - .record_metas - .iter() - .filter_map(|record_meta| record_meta.file_number.as_ref()) - .next()?; - Some(file_number.file_number()) - } - - pub(crate) fn start_position(&self) -> u64 { - self.start_position - } - /// Returns the position of the last record appended to the queue. pub fn last_position(&self) -> Option { self.next_position().checked_sub(1) @@ -84,7 +65,7 @@ impl MemQueue { /// AppendError if the record is strangely in the past or is too much in the future. pub fn append_record( &mut self, - file_number: &FileNumber, + ref_count_handle: &H, target_position: u64, payload: &[u8], ) -> Result<(), AppendError> { @@ -97,19 +78,19 @@ impl MemQueue { self.start_position = target_position; } - let file_number = if let Some(record_meta) = self.record_metas.last_mut() { - if record_meta.file_number.as_ref() == Some(file_number) { - record_meta.file_number.take().unwrap() + let ref_count_handle = if let Some(record_meta) = self.record_metas.last_mut() { + if record_meta.ref_count_handle.as_ref() == Some(ref_count_handle) { + record_meta.ref_count_handle.take().unwrap() } else { - file_number.clone() + ref_count_handle.clone() } } else { - file_number.clone() + ref_count_handle.clone() }; let record_meta = RecordMeta { start_offset: self.concatenated_records.len(), - file_number: Some(file_number), + ref_count_handle: Some(ref_count_handle), position: target_position, }; self.record_metas.push(record_meta); @@ -164,11 +145,18 @@ impl MemQueue { /// /// If truncating to a future position, make the queue go forward to that position. /// Return the number of record removed. - pub fn truncate_head(&mut self, truncate_range: RangeToInclusive) -> usize { + pub fn truncate_head( + &mut self, + truncate_range: RangeToInclusive, + ref_count_handle: H, + ) -> usize { let truncate_up_to_pos = truncate_range.end; if self.start_position > truncate_up_to_pos { return 0; } + + self.record_position_ref_count = ref_count_handle; + if truncate_up_to_pos + 1 >= self.next_position() { self.start_position = truncate_up_to_pos + 1; self.concatenated_records.clear(); @@ -193,11 +181,11 @@ impl MemQueue { pub fn size(&self) -> usize { self.concatenated_records.len() - + self.record_metas.len() * std::mem::size_of::() + + self.record_metas.len() * std::mem::size_of::>() } pub fn capacity(&self) -> usize { self.concatenated_records.capacity() - + self.record_metas.capacity() * std::mem::size_of::() + + self.record_metas.capacity() * std::mem::size_of::>() } } diff --git a/src/mem/queues.rs b/src/mem/queues.rs index 259fb22..3037c85 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -4,33 +4,38 @@ use std::ops::{RangeBounds, RangeToInclusive}; use tracing::{info, warn}; use crate::error::{AlreadyExists, AppendError, MissingQueue}; -use crate::mem::{MemQueue, QueuesSummary}; -use crate::rolling::FileNumber; +use crate::mem::MemQueue; +use crate::page_directory::PageRangeRef; use crate::Record; -#[derive(Default)] -pub(crate) struct MemQueues { - queues: HashMap, +pub(crate) struct MemQueues { + queues: HashMap>, } -impl MemQueues { + +impl Default for MemQueues { + fn default() -> MemQueues { + MemQueues { + queues: Default::default(), + } + } +} + +impl MemQueues { /// The file number argument is here unused. Its point is just to make sure we /// flushed the file before updating the in memory queue. - pub fn create_queue(&mut self, queue: &str) -> Result<(), AlreadyExists> { - if self.queues.contains_key(queue) { + pub fn create_queue( + &mut self, + queue_id: &str, + ref_count_handle: H, + ) -> Result<(), AlreadyExists> { + if self.queues.contains_key(queue_id) { return Err(AlreadyExists); } - self.queues.insert(queue.to_string(), MemQueue::default()); + let mem_queue = MemQueue::with_next_position(0u64, ref_count_handle); + self.queues.insert(queue_id.to_string(), mem_queue); Ok(()) } - pub fn summary(&self) -> QueuesSummary { - let mut summary = QueuesSummary::default(); - for (queue_name, queue) in &self.queues { - summary.queues.insert(queue_name.clone(), queue.summary()); - } - summary - } - pub fn delete_queue(&mut self, queue: &str) -> Result<(), MissingQueue> { info!(queue = queue, "deleting queue"); if self.queues.remove(queue).is_none() { @@ -41,7 +46,8 @@ impl MemQueues { } /// Returns all sub-queues which are currently empty. - pub fn empty_queues(&mut self) -> impl Iterator + '_ { + #[cfg(test)] + pub fn empty_queues(&mut self) -> impl Iterator)> + '_ { self.queues.iter_mut().filter_map(|(queue, mem_queue)| { if mem_queue.is_empty() { Some((queue.as_str(), mem_queue)) @@ -66,7 +72,7 @@ impl MemQueues { } } - pub(crate) fn get_queue(&self, queue: &str) -> Result<&MemQueue, MissingQueue> { + pub(crate) fn get_queue(&self, queue: &str) -> Result<&MemQueue, MissingQueue> { // We do not rely on `entry` in order to avoid // the allocation. self.queues @@ -74,7 +80,7 @@ impl MemQueues { .ok_or_else(|| MissingQueue(queue.to_string())) } - pub(crate) fn get_queue_mut(&mut self, queue: &str) -> Result<&mut MemQueue, MissingQueue> { + pub(crate) fn get_queue_mut(&mut self, queue: &str) -> Result<&mut MemQueue, MissingQueue> { // We do not rely on `entry` in order to avoid // the allocation. self.queues @@ -85,12 +91,12 @@ impl MemQueues { pub fn append_record( &mut self, queue: &str, - file_number: &FileNumber, + ref_count_handle: &H, target_position: u64, payload: &[u8], ) -> Result<(), AppendError> { self.get_queue_mut(queue)? - .append_record(file_number, target_position, payload) + .append_record(ref_count_handle, target_position, payload) } pub fn contains_queue(&self, queue: &str) -> bool { @@ -107,7 +113,7 @@ impl MemQueues { /// match, truncate it and make it go forward to the requested position. /// /// This operation is meant only to rebuild the in memory queue from its on-disk state. - pub fn ack_position(&mut self, queue_name: &str, next_position: u64) { + pub fn ack_position(&mut self, queue_name: &str, next_position: u64, ref_count_handle: &H) { if let Some(queue) = self.queues.get(queue_name) { // It is possible for `ack_position` to be called when a queue already exists. // @@ -123,14 +129,14 @@ impl MemQueues { self.queues.remove(queue_name); self.queues.insert( queue_name.to_string(), - MemQueue::with_next_position(next_position), + MemQueue::with_next_position(next_position, ref_count_handle.clone()), ); } } else { // The queue does not exist! Let's create it and set the right `next_position`. self.queues.insert( queue_name.to_string(), - MemQueue::with_next_position(next_position), + MemQueue::with_next_position(next_position, ref_count_handle.clone()), ); } } @@ -154,9 +160,14 @@ impl MemQueues { /// /// If there are no records `<= position`, the method will /// not do anything. - pub fn truncate(&mut self, queue: &str, position: RangeToInclusive) -> Option { + pub fn truncate( + &mut self, + queue: &str, + position: RangeToInclusive, + ref_count_handle: &H, + ) -> Option { if let Ok(queue) = self.get_queue_mut(queue) { - Some(queue.truncate_head(position)) + Some(queue.truncate_head(position, ref_count_handle.clone())) } else { None } diff --git a/src/mem/summary.rs b/src/mem/summary.rs index 71f62b2..2c808bd 100644 --- a/src/mem/summary.rs +++ b/src/mem/summary.rs @@ -1,15 +1,13 @@ use std::collections::BTreeMap; -use serde::Serialize; - -#[derive(Default, Serialize, Debug)] +#[derive(Default, Debug)] pub struct QueueSummary { pub start: u64, pub end: Option, pub file_number: Option, } -#[derive(Default, Serialize)] +#[derive(Default)] pub struct QueuesSummary { pub queues: BTreeMap, } diff --git a/src/mem/tests.rs b/src/mem/tests.rs index 7948657..c0edc59 100644 --- a/src/mem/tests.rs +++ b/src/mem/tests.rs @@ -1,14 +1,15 @@ +use std::sync::Arc; + use super::*; use crate::error::{AlreadyExists, AppendError}; -use crate::rolling::FileNumber; use crate::Record; #[test] fn test_mem_queues_already_exists() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); + mem_queues.create_queue("droopy", ()).unwrap(); assert!(matches!( - mem_queues.create_queue("droopy"), + mem_queues.create_queue("droopy", ()), Err(AlreadyExists) )); } @@ -16,33 +17,23 @@ fn test_mem_queues_already_exists() { #[test] fn test_mem_queues() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - mem_queues.create_queue("fable").unwrap(); + mem_queues.create_queue("droopy", ()).unwrap(); + mem_queues.create_queue("fable", ()).unwrap(); { - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 1, b"happy") - .is_ok()); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 1, b"happy").is_ok()); } { + assert!(mem_queues.append_record("fable", &(), 0, b"maitre").is_ok()); assert!(mem_queues - .append_record("fable", &FileNumber::for_test(1), 0, b"maitre") - .is_ok()); - assert!(mem_queues - .append_record("fable", &FileNumber::for_test(1), 1, b"corbeau") + .append_record("fable", &(), 1, b"corbeau") .is_ok()); } { - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 2, b"tax") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 3, b"payer") - .is_ok()); + assert!(mem_queues.append_record("droopy", &(), 2, b"tax").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 3, b"payer").is_ok()); assert_eq!( mem_queues.range("droopy", 0..).unwrap().next(), Some(Record::new(0, b"hello")) @@ -64,28 +55,18 @@ fn test_mem_queues() { #[test] fn test_mem_queues_truncate() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); + mem_queues.create_queue("droopy", ()).unwrap(); { - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 1, b"happy") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 2, b"tax") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 3, b"payer") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 4, b"!") - .is_ok()); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 1, b"happy").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 2, b"tax").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 3, b"payer").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 4, b"!").is_ok()); mem_queues - .append_record("droopy", &1.into(), 5, b"payer") + .append_record("droopy", &(), 5, b"payer") .unwrap(); } - mem_queues.truncate("droopy", ..=3); + mem_queues.truncate("droopy", ..=3, &()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); assert_eq!( &droopy[..], @@ -96,18 +77,12 @@ fn test_mem_queues_truncate() { #[test] fn test_mem_queues_skip_advance() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 2, b"happy") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 3, b"happy") - .is_ok()); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 2, b"happy").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 3, b"happy").is_ok()); assert!(mem_queues - .append_record("droopy", &1.into(), 1, b"happy") + .append_record("droopy", &(), 1, b"happy") .is_err()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); assert_eq!( @@ -134,16 +109,12 @@ fn test_mem_queues_skip_advance() { #[test] fn test_mem_queues_append_in_the_past_yield_error() { - let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 1, b"happy") - .is_ok()); + let mut mem_queues: MemQueues<()> = MemQueues::default(); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 1, b"happy").is_ok()); assert!(matches!( - mem_queues.append_record("droopy", &1.into(), 0, b"happy"), + mem_queues.append_record("droopy", &(), 0, b"happy"), Err(AppendError::Past) )); } @@ -151,13 +122,11 @@ fn test_mem_queues_append_in_the_past_yield_error() { #[test] fn test_mem_queues_append_idempotence() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); assert!(matches!( mem_queues - .append_record("droopy", &1.into(), 0, b"different") + .append_record("droopy", &(), 0, b"different") .unwrap_err(), AppendError::Past )); @@ -168,74 +137,76 @@ fn test_mem_queues_append_idempotence() { #[test] fn test_mem_queues_non_zero_first_el() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 5, b"hello") - .is_ok()); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 5, b"hello").is_ok()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); assert_eq!(droopy, &[Record::new(5, b"hello")]); } #[test] -fn test_mem_queues_keep_filenum() { +fn test_mem_queues_keep_ref_count() { + let has_been_dropped = |ref_count: &Arc| Arc::strong_count(ref_count) == 1; + let mut mem_queues = MemQueues::default(); - let files = (0..4).map(FileNumber::for_test).collect::>(); + let ref_counts = (0..4).map(|i| Arc::new(i)).collect::>(); - assert!(files.iter().all(FileNumber::can_be_deleted)); + assert!(ref_counts.iter().all(has_been_dropped)); - mem_queues.create_queue("droopy").unwrap(); mem_queues - .append_record("droopy", &files[0], 0, b"hello") + .create_queue("droopy", ref_counts[0].clone()) + .unwrap(); + mem_queues + .append_record("droopy", &ref_counts[0], 0, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); mem_queues - .append_record("droopy", &files[0], 1, b"hello") + .append_record("droopy", &ref_counts[0], 1, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); mem_queues - .append_record("droopy", &files[0], 2, b"hello") + .append_record("droopy", &ref_counts[0], 2, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); mem_queues - .append_record("droopy", &files[1], 3, b"hello") + .append_record("droopy", &ref_counts[1], 3, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); - assert!(!files[1].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); + assert!(!has_been_dropped(&ref_counts[1])); - mem_queues.truncate("droopy", ..=1); + mem_queues.truncate("droopy", ..=1, &ref_counts[1]); - assert!(!files[0].can_be_deleted()); - assert!(!files[1].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); + assert!(!has_been_dropped(&ref_counts[1])); mem_queues - .append_record("droopy", &files[2], 4, b"hello") + .append_record("droopy", &ref_counts[2], 4, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); - assert!(!files[1].can_be_deleted()); - assert!(!files[2].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); + assert!(!has_been_dropped(&ref_counts[1])); + assert!(!has_been_dropped(&ref_counts[2])); - mem_queues.truncate("droopy", ..=3); + mem_queues.truncate("droopy", ..=3, &ref_counts[2]); - assert!(files[0].can_be_deleted()); - assert!(files[1].can_be_deleted()); - assert!(!files[2].can_be_deleted()); + assert!(has_been_dropped(&ref_counts[0])); + assert!(has_been_dropped(&ref_counts[1])); + assert!(!has_been_dropped(&ref_counts[2])); - mem_queues.truncate("droopy", ..=4); + mem_queues.truncate("droopy", ..=4, &ref_counts[3]); let empty_queues = mem_queues.empty_queues().collect::>(); assert_eq!(empty_queues.len(), 1); assert_eq!(empty_queues[0].0, "droopy"); - mem_queues.ack_position("droopy", 5); + mem_queues.ack_position("droopy", 5, &ref_counts[3]); - assert!(files[2].can_be_deleted()); + assert!(has_been_dropped(&ref_counts[2])); } diff --git a/src/mockfile.rs b/src/mockfile.rs new file mode 100644 index 0000000..62f1ca2 --- /dev/null +++ b/src/mockfile.rs @@ -0,0 +1,74 @@ +use std::io; + +use crate::{FileLike, FileLikeWrite}; + +pub(crate) struct MockFile { + buf: Vec, + cursor: usize, + fsynced: bool, +} + +impl MockFile { + pub fn new() -> MockFile { + MockFile { + buf: Vec::new(), + cursor: 0, + fsynced: false, + } + } + + pub fn len(&self) -> usize { + self.buf.len() + } +} + +impl FileLikeWrite for MockFile { + fn fsyncdata(&mut self) -> io::Result<()> { + self.fsynced = true; + Ok(()) + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + self.buf.resize(num_bytes as usize, 0u8); + Ok(()) + } +} + +impl FileLike for MockFile {} + +impl io::Read for MockFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let end = (self.cursor + buf.len()).min(self.buf.len()); + let len = end - self.cursor; + buf[..len].copy_from_slice(&self.buf[self.cursor..end]); + self.cursor = end; + Ok(len) + } +} + +impl io::Write for MockFile { + fn write(&mut self, buf: &[u8]) -> io::Result { + if self.cursor + buf.len() > self.buf.len() { + self.buf.resize(self.cursor + buf.len(), 0u8); + } + self.fsynced = false; + self.buf[self.cursor..][..buf.len()].copy_from_slice(buf); + self.cursor += buf.len(); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl io::Seek for MockFile { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + match pos { + io::SeekFrom::Start(offset) => self.cursor = offset as usize, + io::SeekFrom::End(offset) => self.cursor = self.buf.len() as usize + offset as usize, + io::SeekFrom::Current(offset) => self.cursor += offset as usize, + } + Ok(self.cursor as u64) + } +} diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index c8f5bae..29b5ff6 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -3,19 +3,33 @@ use std::ops::{RangeBounds, RangeToInclusive}; use std::path::Path; use bytes::Buf; -use tracing::{debug, event_enabled, info, warn, Level}; +use tracing::{debug, info, warn}; use crate::error::{ AppendError, CreateQueueError, DeleteQueueError, MissingQueue, ReadRecordError, TruncateError, }; -use crate::mem::{MemQueue, QueuesSummary}; +use crate::page_directory::{PageListReader, PageListWriter, PageRangeRef}; use crate::record::{MultiPlexedRecord, MultiRecord}; use crate::recordlog::RecordWriter; -use crate::rolling::RollingWriter; use crate::{mem, PersistAction, PersistPolicy, PersistState, Record, ResourceUsage}; +#[derive(Copy, Clone, Debug)] +pub struct Preferences { + pub persist_policy: PersistPolicy, + pub num_bytes: u64, +} + +impl Default for Preferences { + fn default() -> Preferences { + Preferences { + persist_policy: PersistPolicy::Always(PersistAction::Flush), + num_bytes: 10_000_000, + } + } +} + pub struct MultiRecordLog { - record_log_writer: crate::recordlog::RecordWriter, + record_log_writer: crate::recordlog::RecordWriter, in_mem_queues: mem::MemQueues, next_persist: PersistState, // A simple buffer we reuse to avoid allocation. @@ -25,26 +39,30 @@ pub struct MultiRecordLog { impl MultiRecordLog { /// Open the multi record log, flushing after each operation, but not fsyncing. pub fn open(directory_path: &Path) -> Result { - Self::open_with_prefs(directory_path, PersistPolicy::Always(PersistAction::Flush)) - } - - pub fn summary(&self) -> QueuesSummary { - self.in_mem_queues.summary() + Self::open_with_prefs(directory_path, Preferences::default()) } /// Open the multi record log, syncing following the provided policy. pub fn open_with_prefs( directory_path: &Path, - persist_policy: PersistPolicy, + preferences: Preferences, ) -> Result { + let Preferences { + persist_policy, + num_bytes, + } = preferences; // io errors are non-recoverable - let rolling_reader = crate::rolling::RollingReader::open(directory_path)?; - let mut record_reader = crate::recordlog::RecordReader::open(rolling_reader); + // TODO set num pages + let queue_file = directory_path.join(&Path::new("mrecordlog.wal")); + // TODO stop hard coding + let directory = crate::page_directory::Directory::create_or_open(&queue_file, num_bytes)?; + let page_reader = PageListReader::new(directory)?; + let mut record_reader = crate::recordlog::RecordReader::open(page_reader); let mut in_mem_queues = crate::mem::MemQueues::default(); debug!("loading wal"); loop { - let file_number = record_reader.read().current_file().clone(); - let Ok(record) = record_reader.read_record::() else { + let mut session = record_reader.start_session(); + let Ok(record) = record_reader.read_record::(&mut session) else { warn!("Detected corrupted record: some data may have been lost"); continue; }; @@ -56,7 +74,7 @@ impl MultiRecordLog { position, } => { if !in_mem_queues.contains_queue(queue) { - in_mem_queues.ack_position(queue, position); + in_mem_queues.ack_position(queue, position, &session); } for record in records { // if this fails, it means some corruption wasn't detected at a lower @@ -68,7 +86,7 @@ impl MultiRecordLog { // corruption. In that case, maybe we should ack_position() and try // to insert again? in_mem_queues - .append_record(queue, &file_number, position, payload) + .append_record(queue, &session, position, payload) .map_err(|_| ReadRecordError::Corruption)?; } } @@ -76,10 +94,10 @@ impl MultiRecordLog { truncate_range, queue, } => { - in_mem_queues.truncate(queue, truncate_range); + in_mem_queues.truncate(queue, truncate_range, &session); } MultiPlexedRecord::RecordPosition { queue, position } => { - in_mem_queues.ack_position(queue, position); + in_mem_queues.ack_position(queue, position, &session); } MultiPlexedRecord::DeleteQueue { queue, position: _ } => { // can fail if we don't know about the queue getting deleted. It's fine to @@ -92,23 +110,16 @@ impl MultiRecordLog { } } // io errors are non-recoverable - let record_log_writer: RecordWriter = record_reader.into_writer()?; - let mut multi_record_log = MultiRecordLog { + let record_log_writer: RecordWriter = record_reader.into_writer()?; + let multi_record_log = MultiRecordLog { record_log_writer, in_mem_queues, next_persist: persist_policy.into(), multi_record_spare_buffer: Vec::new(), }; - multi_record_log.run_gc_if_necessary()?; Ok(multi_record_log) } - #[cfg(test)] - pub fn list_file_numbers(&self) -> Vec { - let rolling_writer = self.record_log_writer.get_underlying_wrt(); - rolling_writer.list_file_numbers() - } - /// Creates a new queue. /// /// Returns an error if the queue already exists. @@ -117,10 +128,11 @@ impl MultiRecordLog { if self.queue_exists(queue) { return Err(CreateQueueError::AlreadyExists); } + let mut session = self.record_log_writer.start_session()?; let record = MultiPlexedRecord::RecordPosition { queue, position: 0 }; - self.record_log_writer.write_record(record)?; + self.record_log_writer.write_record(record, &mut session)?; self.persist(PersistAction::FlushAndFsync)?; - self.in_mem_queues.create_queue(queue)?; + self.in_mem_queues.create_queue(queue, session)?; Ok(()) } @@ -128,9 +140,9 @@ impl MultiRecordLog { info!(queue = queue, "delete queue"); let position = self.in_mem_queues.next_position(queue)?; let record = MultiPlexedRecord::DeleteQueue { queue, position }; - self.record_log_writer.write_record(record)?; + let mut session = self.record_log_writer.start_session()?; + self.record_log_writer.write_record(record, &mut session)?; self.in_mem_queues.delete_queue(queue)?; - self.run_gc_if_necessary()?; self.persist(PersistAction::FlushAndFsync)?; Ok(()) } @@ -179,7 +191,6 @@ impl MultiRecordLog { } } let position = position_opt.unwrap_or(next_position); - let file_number = self.record_log_writer.current_file().clone(); let mut multi_record_spare_buffer = std::mem::take(&mut self.multi_record_spare_buffer); MultiRecord::serialize(payloads, position, &mut multi_record_spare_buffer); @@ -190,12 +201,15 @@ impl MultiRecordLog { } let records = MultiRecord::new_unchecked(&multi_record_spare_buffer); - let record = MultiPlexedRecord::AppendRecords { + let multi_record = MultiPlexedRecord::AppendRecords { position, queue, records, }; - self.record_log_writer.write_record(record)?; + + let mut session: PageRangeRef = self.record_log_writer.start_session()?; + self.record_log_writer + .write_record(multi_record, &mut session)?; self.persist_on_policy()?; let mem_queue = self.in_mem_queues.get_queue_mut(queue)?; @@ -203,7 +217,7 @@ impl MultiRecordLog { for record in records { // we just serialized it, we know it's valid let (position, payload) = record.unwrap(); - mem_queue.append_record(&file_number, position, payload)?; + mem_queue.append_record(&session, position, payload)?; max_position = position; } @@ -211,25 +225,6 @@ impl MultiRecordLog { Ok(Some(max_position)) } - fn record_empty_queues_position(&mut self) -> io::Result<()> { - let mut has_empty_queues = false; - for (queue_id, queue) in self.in_mem_queues.empty_queues() { - let next_position = queue.next_position(); - let record = MultiPlexedRecord::RecordPosition { - queue: queue_id, - position: next_position, - }; - self.record_log_writer.write_record(record)?; - has_empty_queues = true - } - if has_empty_queues { - // We need to fsync here! We are remove files from the FS - // so we need to make sure our empty queue positions are properly persisted. - self.persist(PersistAction::FlushAndFsync)?; - } - Ok(()) - } - /// Truncates the queue up to a given `position`, included. This method immediately /// truncates the underlying in-memory queue whereas the backing log files are deleted /// asynchronously when they become exclusively composed of deleted records. @@ -245,49 +240,22 @@ impl MultiRecordLog { if !self.queue_exists(queue) { return Err(TruncateError::MissingQueue(queue.to_string())); } + let mut session = self.record_log_writer.start_session()?; + let truncate_record = MultiPlexedRecord::Truncate { + truncate_range, + queue, + }; self.record_log_writer - .write_record(MultiPlexedRecord::Truncate { - truncate_range, - queue, - })?; + .write_record(truncate_record, &mut session)?; let removed_count = self .in_mem_queues - .truncate(queue, truncate_range) + .truncate(queue, truncate_range, &session) .unwrap_or(0); - self.run_gc_if_necessary()?; + // self.run_gc_if_necessary()?; self.persist_on_policy()?; Ok(removed_count) } - fn run_gc_if_necessary(&mut self) -> io::Result<()> { - debug!("run_gc_if_necessary"); - if self - .record_log_writer - .directory() - .has_files_that_can_be_deleted() - { - // We are about to delete files. - // Let's make sure we record the offsets of the empty queues - // so that we don't lose that information after dropping the files. - // - // But first we clone the current file number to make sure that the file that will - // contain the truncate positions it self won't be GC'ed. - let _file_number = self.record_log_writer.current_file().clone(); - self.record_empty_queues_position()?; - self.record_log_writer.directory().gc()?; - } - // only execute the following if we are above the debug level in tokio tracing - if event_enabled!(Level::DEBUG) { - for queue in self.list_queues() { - let queue: &MemQueue = self.in_mem_queues.get_queue(queue).unwrap(); - let first_pos = queue.range(..).next().map(|record| record.position); - let last_pos = queue.last_position(); - debug!(first_pos=?first_pos, last_pos=?last_pos, "queue positions after gc"); - } - } - Ok(()) - } - pub fn range( &self, queue: &str, @@ -323,14 +291,17 @@ impl MultiRecordLog { self.in_mem_queues.last_record(queue) } - /// Return the amount of memory and disk space used by mrecordlog. + // Return the amount of memory and disk space used by mrecordlog. pub fn resource_usage(&self) -> ResourceUsage { - let disk_used_bytes = self.record_log_writer.size(); + let page_list_writer = self.record_log_writer.get_underlying_wrt(); + let num_pages = page_list_writer.num_pages(); + let num_used_pages = page_list_writer.num_used_pages(); let (memory_used_bytes, memory_allocated_bytes) = self.in_mem_queues.size(); ResourceUsage { memory_used_bytes, memory_allocated_bytes, - disk_used_bytes, + num_pages, + num_used_pages, } } } diff --git a/src/page_directory/header.rs b/src/page_directory/header.rs new file mode 100644 index 0000000..3ce721a --- /dev/null +++ b/src/page_directory/header.rs @@ -0,0 +1,117 @@ +use std::io; +use std::ops::Range; + +use crate::error::HeaderError; +use crate::page_directory::compute_slot_len; + +const MAGIC_NUMBER: u32 = 1_778_463_742_u32; +const HEADER_SIZE: usize = 4 + // magic number + 4 + // version + 4 + // num_pages + 4; // CRC +const VERSION: u32 = 1; + +fn crc32(data: &[u8]) -> u32 { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(data); + hasher.finalize() +} + +fn to_header_bytes(num_pages: u32) -> [u8; HEADER_SIZE] { + let mut buf = [0u8; HEADER_SIZE]; + buf[0..4].copy_from_slice(&MAGIC_NUMBER.to_le_bytes()); + buf[4..8].copy_from_slice(&VERSION.to_le_bytes()); + buf[8..12].copy_from_slice(&num_pages.to_le_bytes()); + let checksum = crc32(&buf[..12]); + buf[12..16].copy_from_slice(&checksum.to_le_bytes()); + buf +} + +fn from_header_bytes(header_bytes: [u8; HEADER_SIZE]) -> Result { + let magic_number = u32::from_le_bytes(header_bytes[0..4].try_into().unwrap()); + let version = u32::from_le_bytes(header_bytes[4..8].try_into().unwrap()); + let num_pages = u32::from_le_bytes(header_bytes[8..12].try_into().unwrap()); + let checksum = u32::from_le_bytes(header_bytes[12..16].try_into().unwrap()); + let computed_checksum = crc32(&header_bytes[..12]); + if checksum != computed_checksum { + return Err(HeaderError::InvalidChecksum); + } + if magic_number != MAGIC_NUMBER { + return Err(HeaderError::InvalidMagicNumber { magic_number }); + } + if version != VERSION { + return Err(HeaderError::UnsupportedVersion { version }); + } + Ok(num_pages) +} + +pub fn serialize_header(num_pages: u32, wrt: &mut dyn io::Write) -> io::Result { + let header_bytes = to_header_bytes(num_pages); + wrt.write_all(&header_bytes[..])?; + Ok(HeaderInfo { + header_len: HEADER_SIZE, + num_pages, + }) +} + +pub fn deserialize_header(read: &mut dyn io::Read) -> io::Result { + let mut header_bytes = [0u8; HEADER_SIZE]; + read.read_exact(&mut header_bytes)?; + let num_pages = from_header_bytes(header_bytes) + .map_err(|header_err| io::Error::new(io::ErrorKind::InvalidData, header_err))?; + Ok(HeaderInfo { + header_len: HEADER_SIZE, + num_pages, + }) +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +pub struct HeaderInfo { + pub header_len: usize, + pub num_pages: u32, +} + +impl HeaderInfo { + pub fn compute_slot_range(&self, epoch_parity: bool) -> Range { + let slot_len = compute_slot_len(self.num_pages); + let start_offset = if epoch_parity { + self.header_len + slot_len + } else { + self.header_len + }; + start_offset..start_offset + slot_len + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_simple_serialize_deserialize_header() { + let mut buf = Vec::new(); + let num_pages = 10; + let expected_header_info = HeaderInfo { + header_len: HEADER_SIZE, + num_pages, + }; + assert_eq!( + serialize_header(num_pages, &mut buf).unwrap(), + expected_header_info + ); + assert_eq!( + deserialize_header(&mut buf.as_slice()).unwrap(), + expected_header_info + ); + } + + #[test] + fn test_serialize_header() { + let mut buf = Vec::new(); + let num_pages = 10; + let header_info = serialize_header(num_pages, &mut buf).unwrap(); + assert_eq!(header_info.header_len, buf.len()); + assert_eq!(header_info.num_pages, num_pages); + } +} diff --git a/src/page_directory/mod.rs b/src/page_directory/mod.rs new file mode 100644 index 0000000..e95f682 --- /dev/null +++ b/src/page_directory/mod.rs @@ -0,0 +1,177 @@ +use std::fs::File; +use std::io; +use std::path::Path; + +mod header; +mod page_list; +mod page_refcounts; +mod reader; +mod writer; + +use page_list::PageList; +pub(crate) use page_refcounts::PageRangeRef; +pub use reader::PageListReader; +pub use writer::PageListWriter; + +use crate::frame::{FrameReader, FrameWriter}; +use crate::recordlog::{RecordReader, RecordWriter}; +use crate::{FileLike, BLOCK_NUM_BYTES}; + +pub const PAGE_SIZE: usize = 1 << 16; // 65,536 bytes +pub const MIN_NUM_PAGES: usize = 2; + +pub type PageId = u32; + +pub struct Directory { + file: F, + page_list: PageList, +} + +enum CreateOrOpen { + Created, + Opened, +} + +pub(crate) fn compute_slot_len(num_pages: u32) -> usize { + 8 + // epoch + 3 * num_pages as usize + // page ids encoded over 3 bytes each + 4 // checksum +} + +fn create_or_open_page_file(path: &Path, num_pages: usize) -> io::Result<(CreateOrOpen, File)> { + let len = num_pages * PAGE_SIZE; + match std::fs::File::create_new(path) { + Ok(file) => { + file.set_len(len as u64)?; + Ok((CreateOrOpen::Created, file)) + } + Err(io_err) if io_err.kind() == io::ErrorKind::AlreadyExists => { + let file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path)?; + Ok((CreateOrOpen::Opened, file)) + } + Err(err) => Err(err), + } +} + +impl Directory { + pub fn create_or_open(path: &Path, len: u64) -> io::Result { + let num_pages = len as usize / PAGE_SIZE; + if num_pages < MIN_NUM_PAGES { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "file length is too small. requested {}, should be at least {}", + len, + PAGE_SIZE * MIN_NUM_PAGES + ), + )); + } + let (create_or_open, file) = create_or_open_page_file(path, num_pages)?; + match create_or_open { + CreateOrOpen::Created => Directory::create(num_pages, file), + CreateOrOpen::Opened => Directory::open(num_pages, file), + } + } +} + +impl Directory { + fn create(num_pages: usize, mut file: F) -> io::Result> { + let page_list = PageList::initialize_page_file(num_pages, &mut file)?; + let directory = Directory { file, page_list }; + Ok(directory) + } + + fn open(num_pages: usize, mut file: F) -> io::Result> { + file.seek(io::SeekFrom::Start(0u64))?; + let header = header::deserialize_header(&mut file)?; + if header.num_pages as usize != num_pages { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "number of pages does not match existing file", + )); + } + let page_list = PageList::load(header, &mut file)?; + Ok(Directory { file, page_list }) + } +} + +impl RecordReader { + pub fn into_writer(self) -> io::Result> { + let frame_writer: FrameWriter = self.frame_reader.into_writer()?; + Ok(RecordWriter::from(frame_writer)) + } +} + +impl FrameReader { + pub fn into_writer(self) -> io::Result> { + let offset: u64 = self.reader.block_id as u64 * BLOCK_NUM_BYTES as u64 + self.cursor as u64; + let page_list_writer: PageListWriter = self.reader.into_writer(offset as u64)?; + Ok(FrameWriter::create(page_list_writer)) + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use super::reader::PageListReader; + use super::*; + use crate::{BlockRead as _, BlockWrite as _, BLOCK_NUM_BYTES}; + + #[test] + fn test_simple_no_write() { + let fake_file = { + let directory = Directory::create(100, Cursor::new(Vec::new())).unwrap(); + let mut page_list_reader = PageListReader::new(directory).unwrap(); + let mut session = page_list_reader.start_session(); + assert_eq!(session.num_pages, 1); + loop { + if !page_list_reader.next_block(&mut session).unwrap() { + break; + } + } + assert_eq!(session.start_page_id, 0u32); + assert_eq!(session.num_pages, 100u16); + page_list_reader.into_writer(0u64).unwrap().into_file() + }; + + let directory = Directory::open(100, fake_file).unwrap(); + let page_list_reader = PageListReader::new(directory).unwrap(); + let _writer = page_list_reader.into_writer(0); + } + + #[test] + fn test_simple_write() { + let fake_file0 = Cursor::new(Vec::new()); + let directory = Directory::create(100, fake_file0).unwrap(); + let mut page_list_reader = PageListReader::new(directory).unwrap(); + let mut session = page_list_reader.start_session(); + loop { + if !page_list_reader.next_block(&mut session).unwrap() { + break; + } + } + let fake_file1 = page_list_reader.into_writer(0u64).unwrap().into_file(); + + let directory = Directory::open(100, fake_file1).unwrap(); + let page_list_reader = PageListReader::new(directory).unwrap(); + let mut writer = page_list_reader.into_writer(0).unwrap(); + let mut session = writer.start_write_session().unwrap(); + assert_eq!(writer.num_bytes_remaining_in_block(), BLOCK_NUM_BYTES); + writer.write(b"hello", &mut session).unwrap(); + assert_eq!(session.num_pages, 1u16); + assert_eq!( + writer.num_bytes_remaining_in_block(), + BLOCK_NUM_BYTES - b"hello".len() + ); + let fake_file2 = writer.into_file(); + + let directory = Directory::open(100, fake_file2).unwrap(); + let page_list_reader = PageListReader::new(directory).unwrap(); + assert_eq!(&page_list_reader.block()[..5], b"hello"); + } +} diff --git a/src/page_directory/page_list.rs b/src/page_directory/page_list.rs new file mode 100644 index 0000000..c68b200 --- /dev/null +++ b/src/page_directory/page_list.rs @@ -0,0 +1,374 @@ +use std::io::{self, Read, Write}; +use std::ops::Range; +use std::sync::atomic::Ordering; + +use super::header::{self, HeaderInfo}; +use super::{PageId, PAGE_SIZE}; +use crate::page_directory::page_refcounts::PageRegistry; +use crate::page_directory::PageRangeRef; +use crate::{FileLike, FileLikeWrite}; + +pub struct PageList { + header_info: HeaderInfo, + page_registry: rclite::Arc, + epoch: u64, + first_page_offset: u64, +} + +const CHUNK_NUM_PAGES: usize = 4_096; +const CHUNK_BYTES: usize = CHUNK_NUM_PAGES * 3; + +fn write_slot(epoch: u64, page_ids: &[PageId], wrt: &mut impl Write) -> io::Result<()> { + let mut buf: Vec = Vec::with_capacity(CHUNK_BYTES); + let mut hasher = crc32fast::Hasher::new(); + + let epoch_bytes = epoch.to_le_bytes(); + hasher.update(&epoch_bytes); + wrt.write_all(&epoch_bytes)?; + for page_chunk in page_ids.chunks(CHUNK_NUM_PAGES) { + buf.clear(); + for &page_id in page_chunk { + let page_id_bytes = page_id.to_le_bytes(); + buf.extend_from_slice(&page_id_bytes[..3]); + } + if buf.is_empty() { + break; + } + hasher.update(&buf); + wrt.write_all(&buf)?; + if buf.len() < CHUNK_BYTES { + break; + } + } + let digest = hasher.finalize(); + wrt.write_all(&digest.to_le_bytes())?; + Ok(()) +} + +#[derive(Debug)] +struct PageListSerialized { + epoch: u64, + page_ids: Vec, +} + +fn read_slot(num_pages: u32, read: &mut dyn Read) -> io::Result> { + let mut page_ids = Vec::with_capacity(num_pages as usize); + let mut hasher = crc32fast::Hasher::new(); + + let mut epoch_bytes = [0u8; 8]; + read.read_exact(&mut epoch_bytes)?; + hasher.update(&epoch_bytes); + + let epoch = u64::from_le_bytes(epoch_bytes.try_into().unwrap()); + + let mut buff = vec![0u8; CHUNK_BYTES]; + while page_ids.len() < num_pages as usize { + let num_pages_to_read = (num_pages as usize - page_ids.len()).min(CHUNK_NUM_PAGES); + let num_bytes_to_read = num_pages_to_read * 3; + let chunk_buf = &mut buff[..num_bytes_to_read]; + read.read_exact(chunk_buf)?; + hasher.update(chunk_buf); + page_ids.extend(chunk_buf.chunks_exact(3).map(|page_id| { + let mut page_id_bytes = [0u8; 4]; + page_id_bytes[0..3].copy_from_slice(page_id); + u32::from_le_bytes(page_id_bytes) + })); + } + assert_eq!(page_ids.len(), num_pages as usize); + let expected_digest = hasher.finalize(); + + let mut digest_bytes = [0u8; 4]; + read.read_exact(&mut digest_bytes[..])?; + let digest = u32::from_le_bytes(digest_bytes); + + if expected_digest != digest { + return Ok(None); + } + + Ok(Some(PageListSerialized { epoch, page_ids })) +} + +fn next_multiple_of(offset: u64, mult: u64) -> u64 { + let k = offset.div_ceil(mult); + k * mult +} + +impl PageList { + fn new(header_info: HeaderInfo) -> Self { + let page_registry = PageRegistry::new(header_info.num_pages as usize); + let end_of_page_list = header_info.compute_slot_range(true).end as u64; + let first_page_offset = next_multiple_of(end_of_page_list, PAGE_SIZE as u64); + PageList { + header_info, + page_registry: rclite::Arc::new(page_registry), + epoch: 0u64, + first_page_offset, + } + } + + pub(crate) fn initialize_page_file( + num_pages: usize, + file: &mut W, + ) -> io::Result { + file.seek(io::SeekFrom::Start(0))?; + let header_info = header::serialize_header(num_pages as u32, file)?; + let mut page_list = PageList::new(header_info); + let file_num_bytes = page_list.first_page_offset + (num_pages as u64 * PAGE_SIZE as u64); + file.set_len(file_num_bytes)?; + // We call gc once in order to initialize the page_list. + page_list.gc(0u64, file)?; + Ok(page_list) + } + + fn compute_slot_range(&self, epoch_parity: bool) -> Range { + self.header_info.compute_slot_range(epoch_parity) + } + + pub fn load(header_info: HeaderInfo, file: &mut impl FileLike) -> io::Result { + let mut page_list = PageList::new(header_info); + + let first_slot_range = page_list.compute_slot_range(false); + file.seek(io::SeekFrom::Start(first_slot_range.start as u64))?; + let first_page_list = read_slot(page_list.num_pages(), file)?; + + let second_slot_range = page_list.compute_slot_range(true); + file.seek(io::SeekFrom::Start(second_slot_range.start as u64))?; + let second_page_list = read_slot(page_list.num_pages(), file)?; + + let PageListSerialized { epoch, page_ids } = match (first_page_list, second_page_list) { + (None, None) => { + let error_msg = "page list is corrupted"; + return Err(io::Error::new(io::ErrorKind::InvalidData, error_msg)); + } + (None, Some(page_list)) | (Some(page_list), None) => page_list, + (Some(first_page_list), Some(second_page_list)) => { + if first_page_list.epoch > second_page_list.epoch { + first_page_list + } else { + second_page_list + } + } + }; + + page_list.page_registry = rclite::Arc::new(PageRegistry::with_page_ids(page_ids)); + + page_list.epoch = epoch; + + Ok(page_list) + } + + pub fn new_page_range_ref(&self, start_page_id: u32) -> PageRangeRef { + self.new_page_range_ref_with_num_pages(start_page_id, 1u32) + } + + #[inline(always)] + pub fn new_page_range_ref_with_num_pages( + &self, + start_page_id: u32, + num_pages: u32, + ) -> PageRangeRef { + let mut page_range_ref = PageRangeRef { + start_page_id: start_page_id, + num_pages: 0, + page_registry: self.page_registry.clone(), + }; + for _ in 0..num_pages { + page_range_ref.add_page(); + } + page_range_ref + } + + #[inline(always)] + pub fn num_pages(&self) -> u32 { + self.header_info.num_pages + } + + #[inline(always)] + pub fn page_id(&self, page_ord: usize) -> Option { + self.page_registry + .page_index + .load() + .ord_to_id + .get(page_ord) + .copied() + } + + #[inline(always)] + pub fn page_start_offset(&self, page_ord: usize) -> Option { + let page_id = self.page_id(page_ord)?; + Some(self.first_page_offset + page_id as u64 * PAGE_SIZE as u64) + } + + // Runs a gc operation. + // `cursor` is the next offset of the byte to be written. + // + // This method returns the number that have been freed. + pub fn gc(&mut self, cursor: u64, file: &mut impl FileLikeWrite) -> io::Result { + self.epoch += 1; + let written_page_id = cursor.div_ceil(PAGE_SIZE as u64) as usize; + let mut page_ids = self.page_registry.page_index.load().ord_to_id.clone(); + let mut free_pages: Vec = page_ids.drain(written_page_id..).collect(); + let num_free_pages_when_gc_started = free_pages.len(); + drain_filter( + &mut page_ids, + |page_id| { + self.page_registry.ref_counts[page_id as usize].load(Ordering::Acquire) == 0u32 + }, + &mut free_pages, + ); + let num_freed_pages = free_pages.len() - num_free_pages_when_gc_started; + free_pages.sort_unstable(); + page_ids.extend_from_slice(&free_pages); + let slot_range = self.compute_slot_range(self.epoch % 2 != 0); + file.seek(io::SeekFrom::Start(slot_range.start as u64))?; + write_slot(self.epoch, &page_ids, file)?; + file.flush()?; + file.fsyncdata()?; + self.page_registry.update_page_ids_after_gc(page_ids); + Ok(num_freed_pages) + } +} + +fn drain_filter(els: &mut Vec, filter: impl Fn(PageId) -> bool, output: &mut Vec) { + let mut wrt_cursor = 0; + for read_cursor in 0..els.len() { + let page = els[read_cursor]; + if filter(page) { + output.push(page); + } else { + els[wrt_cursor] = page; + wrt_cursor += 1; + } + } + els.truncate(wrt_cursor); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mockfile::MockFile; + + #[test] + fn test_drain_filter() { + let mut output = vec![6u32, 4]; + let mut els: Vec = vec![1u32, 3, 2, 8, 5, 7]; + drain_filter(&mut els, |page_id| page_id % 2 == 0, &mut output); + assert_eq!(&els[..], &[1, 3, 5, 7]); + assert_eq!(&output[..], &[6, 4, 2, 8]); + } + + #[track_caller] + fn test_serialize_page_list_slot_aux(epoch: u64, page_ids: &[u32]) { + let mut buf = Vec::new(); + write_slot(epoch, page_ids, &mut buf).unwrap(); + assert_eq!( + super::super::compute_slot_len(page_ids.len() as u32), + buf.len(), + "serialized len does not match expectation" + ); + let mut arr = &buf[..]; + let page_list_serialized_opt = read_slot(page_ids.len() as u32, &mut arr).unwrap(); + assert!(arr.is_empty(), "data was not entirely read"); + let page_list_serialized = page_list_serialized_opt.unwrap(); + assert_eq!(page_list_serialized.epoch, epoch, "epoch does not match"); + assert_eq!( + &page_list_serialized.page_ids, page_ids, + "page ids do not match" + ); + } + + #[test] + fn test_serialize_page_list_slot() { + test_serialize_page_list_slot_aux(3u64, &[]); + test_serialize_page_list_slot_aux(3u64, &[3u32, 9u32]); + } + + proptest::proptest! { + #[test] + fn test_proptest_serialize_page_list_slot( + epoch in proptest::num::u64::ANY, + page_ids in proptest::collection::vec(0..1u32 << 24, 0..CHUNK_NUM_PAGES * 3) + ) { + test_serialize_page_list_slot_aux(epoch, &page_ids); + } + } + + #[test] + fn test_page_list_gc_simple() { + let mut file = MockFile::new(); + let header_info = HeaderInfo { + header_len: 16, + num_pages: 10u32, + }; + { + let page_list = PageList::initialize_page_file(10, &mut file).unwrap(); + assert_eq!(page_list.epoch, 1); + assert_eq!(page_list.header_info.header_len, 16); + // The header AND the slots should fit in a single page. + assert_eq!(PAGE_SIZE * 11, file.len()); + assert_eq!(page_list.first_page_offset % PAGE_SIZE as u64, 0u64); + } + { + let mut page_list = PageList::load(header_info, &mut file).unwrap(); + + assert_eq!( + &page_list.page_registry.page_index.load().ord_to_id, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + ); + assert_eq!(page_list.epoch, 1); + assert_eq!(page_list.header_info.num_pages, header_info.num_pages); + for i in 0..header_info.num_pages as usize { + assert_eq!( + page_list.page_registry.page_index.load().ord_to_id[i], + i as PageId + ); + } + assert_eq!(page_list.epoch, 1); + page_list.gc(0, &mut file).unwrap(); + assert_eq!(page_list.epoch, 2); + } + { + let mut page_list = PageList::load(header_info, &mut file).unwrap(); + let page_registry = page_list.page_registry.clone(); + assert_eq!(page_list.epoch, 2); + assert_eq!(page_list.header_info.num_pages, header_info.num_pages); + for i in 0..header_info.num_pages as usize { + assert_eq!(page_registry.page_index.load().ord_to_id[i], i as PageId); + } + + let ref_page0 = page_list.new_page_range_ref(0); + let _ref_page1 = page_list.new_page_range_ref(1); + let _ref_page2 = page_list.new_page_range_ref(2); + let _ref_page3 = page_list.new_page_range_ref(3); + drop(ref_page0); + let ref_page4 = page_list.new_page_range_ref(4); + let _ref_page5 = page_list.new_page_range_ref(5); + drop(ref_page4); + page_list.gc(6u64 * PAGE_SIZE as u64, &mut file).unwrap(); + assert_eq!( + &page_registry.page_index.load().ord_to_id, + &[1, 2, 3, 5, 0, 4, 6, 7, 8, 9] + ); + } + } + + #[test] + fn test_page_list_gc_clone() { + let mut file = MockFile::new(); + let mut page_list = PageList::initialize_page_file(5, &mut file).unwrap(); + let page_registry = page_list.page_registry.clone(); + + let ref_page0 = page_list.new_page_range_ref(0); + let ref_page1 = page_list.new_page_range_ref(1); + let _ref_page2 = page_list.new_page_range_ref(2); + let _ref_page1_clone = ref_page1.clone(); + drop(ref_page0); + drop(ref_page1); + + page_list.gc(3u64 * PAGE_SIZE as u64, &mut file).unwrap(); + assert_eq!( + &page_registry.page_index.load().ord_to_id, + &[1, 2, 0, 3, 4,] + ); + } +} diff --git a/src/page_directory/page_refcounts.rs b/src/page_directory/page_refcounts.rs new file mode 100644 index 0000000..4024b4f --- /dev/null +++ b/src/page_directory/page_refcounts.rs @@ -0,0 +1,133 @@ +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use arc_swap::ArcSwap; + +use crate::page_directory::PageId; + +pub struct PageIndex { + // This is a page_ord -> page_id mapping. + pub ord_to_id: Vec, + // This is a page_id -> page_ord mapping. + pub id_to_ord: Vec, +} + +impl PageIndex { + pub fn new(num_pages: usize) -> PageIndex { + let ord_to_id: Vec = (0..num_pages as u32).collect(); + let id_to_ord = ord_to_id.clone(); + PageIndex { + ord_to_id, + id_to_ord, + } + } + + pub fn from_page_ids(ord_to_id: Vec) -> PageIndex { + let mut page_index = PageIndex { + ord_to_id, + id_to_ord: Vec::new(), + }; + page_index.rebuild_page_index(); + page_index + } + + fn rebuild_page_index(&mut self) { + self.id_to_ord.resize(self.ord_to_id.len(), 0u32); + for (ord, &page_id) in self.ord_to_id.iter().enumerate() { + self.id_to_ord[page_id as usize] = ord as u32; + } + } +} + +pub struct PageRegistry { + pub ref_counts: Box<[AtomicU32]>, + pub page_index: ArcSwap, +} + +impl PageRegistry { + pub fn with_page_ids(ord_to_id: Vec) -> PageRegistry { + let page_index = PageIndex::from_page_ids(ord_to_id); + PageRegistry::with_page_index(page_index) + } + + pub fn update_page_ids_after_gc(&self, ord_to_id: Vec) { + let page_index = PageIndex::from_page_ids(ord_to_id); + self.page_index.store(Arc::new(page_index)); + } + + pub fn with_page_index(page_index: PageIndex) -> PageRegistry { + let num_pages = page_index.ord_to_id.len(); + let ref_counts: Vec = std::iter::repeat_with(AtomicU32::default) + .take(num_pages) + .collect(); + PageRegistry { + ref_counts: ref_counts.into_boxed_slice(), + page_index: ArcSwap::new(Arc::new(page_index)), + } + } + + pub fn new(num_pages: usize) -> PageRegistry { + let page_index = PageIndex::new(num_pages); + Self::with_page_index(page_index) + } +} + +pub struct PageRangeRef { + pub start_page_id: u32, + pub num_pages: u16, + pub page_registry: rclite::Arc, +} + +impl Eq for PageRangeRef {} + +impl PartialEq for PageRangeRef { + #[inline(always)] + fn eq(&self, other: &Self) -> bool { + self.start_page_id == other.start_page_id && self.num_pages == other.num_pages + } +} + +impl PageRangeRef { + // Adds a page to the reference. + pub fn add_page(&mut self) { + let page_id = if self.num_pages == 0u16 { + self.start_page_id + } else { + let page_index = self.page_registry.page_index.load(); + let start_page_ord = page_index.id_to_ord[self.start_page_id as usize]; + let page_ord = start_page_ord + self.num_pages as u32; + page_index.ord_to_id[page_ord as usize] + }; + self.num_pages += 1; + self.page_registry.ref_counts[page_id as usize].fetch_add(1, Ordering::Release); + } +} + +impl Clone for PageRangeRef { + fn clone(&self) -> PageRangeRef { + let mut page_range_ref = PageRangeRef { + start_page_id: self.start_page_id, + num_pages: 0, + page_registry: self.page_registry.clone(), + }; + for _ in 0..self.num_pages { + page_range_ref.add_page(); + } + page_range_ref + } +} + +impl Drop for PageRangeRef { + fn drop(&mut self) { + let page_index = self.page_registry.page_index.load(); + let page_ids = if self.num_pages == 0u16 { + &[self.start_page_id] + } else { + let start_page_ord = page_index.id_to_ord[self.start_page_id as usize]; + &page_index.ord_to_id[start_page_ord as usize..][..self.num_pages as usize] + }; + for &page_id in page_ids { + self.page_registry.ref_counts[page_id as usize].fetch_sub(1, Ordering::AcqRel); + } + } +} diff --git a/src/page_directory/reader.rs b/src/page_directory/reader.rs new file mode 100644 index 0000000..728398d --- /dev/null +++ b/src/page_directory/reader.rs @@ -0,0 +1,93 @@ +use std::fs::File; +use std::io::{self, BufWriter, SeekFrom}; + +use super::page_list::PageList; +use super::writer::PageListWriter; +use super::{Directory, PAGE_SIZE}; +use crate::page_directory::PageRangeRef; +use crate::{BlockRead, FileLike, BLOCK_NUM_BYTES}; + +const NUM_BLOCK_PER_PAGE: usize = PAGE_SIZE / BLOCK_NUM_BYTES; + +pub struct PageListReader { + file: F, + page_list: PageList, + page_buffer: Box<[u8]>, + pub(crate) block_id: usize, + page_id: u32, +} + +impl PageListReader { + pub fn new(page_directory: Directory) -> io::Result { + let Directory { file, page_list } = page_directory; + let mut page_list_reader = PageListReader { + file, + page_list, + page_buffer: vec![0u8; PAGE_SIZE].into_boxed_slice(), + page_id: 0u32, + block_id: 0, + }; + page_list_reader.load_page()?; + Ok(page_list_reader) + } + + fn block_id_within_page(&self) -> usize { + self.block_id % NUM_BLOCK_PER_PAGE + } + + // Loads the current page. + // + // Returns Ok(false) if we have reached the end of the file. + fn load_page(&mut self) -> std::io::Result { + let page_ord = self.block_id / NUM_BLOCK_PER_PAGE; + let Some(page_id) = self.page_list.page_id(page_ord) else { + return Ok(false); + }; + self.page_id = page_id; + let Some(page_start_offset) = self.page_list.page_start_offset(page_ord) else { + return Ok(false); + }; + self.file.seek(SeekFrom::Start(page_start_offset as u64))?; + self.file.read_exact(&mut self.page_buffer[..])?; + Ok(true) + } + + pub fn into_writer(self, num_bytes: u64) -> io::Result> { + let mut page_list_writer = PageListWriter { + page_list: self.page_list, + wrt: BufWriter::with_capacity(BLOCK_NUM_BYTES, self.file), + cursor: num_bytes, + }; + page_list_writer.reposition_write_head()?; + Ok(page_list_writer) + } +} + +impl BlockRead for PageListReader { + type Session = PageRangeRef; + + fn start_session(&self) -> Self::Session { + self.page_list.new_page_range_ref(self.page_id) + } + + fn next_block(&mut self, session: &mut Self::Session) -> std::io::Result { + self.block_id += 1; + let block_id_within_page = self.block_id_within_page(); + if block_id_within_page == 0 { + if self.load_page()? { + session.num_pages += 1u16; + Ok(true) + } else { + Ok(false) + } + } else { + Ok(true) + } + } + + fn block(&self) -> &[u8; crate::BLOCK_NUM_BYTES] { + let start_offset = self.block_id_within_page() * crate::BLOCK_NUM_BYTES; + let block_slice = &self.page_buffer[start_offset..start_offset + crate::BLOCK_NUM_BYTES]; + block_slice.try_into().unwrap() + } +} diff --git a/src/page_directory/readme.txt b/src/page_directory/readme.txt new file mode 100644 index 0000000..e7eb989 --- /dev/null +++ b/src/page_directory/readme.txt @@ -0,0 +1,23 @@ +The page directory is a single file that stores a list of pages. + +These page themselves are used to store the mrecordlog: a WAL common to all indexes. +Because the mrecordlog contains a several queues, it is possible for a single queue to +prevent a truncation at the scale of the mrecordlog. + +With a page system, a single queue lagging will only prevent the recollection of the +subset of pages holding data for that queue. + +The page directory works as follows. All pages have a physical id. +At all point in time, they are all organized in a specific order in which they +are supposed to be read and written in. That order is saved into a header of the file. + +We track the list of pages in use through reference counting. Upon GC, we recompute the right ordering +and written back into the header. + +In order to make writing that page list atomic, the header actually consists of two slots. +We track the number of GC operations that have been executed so far: the GC epoch. + +We alternatively write the first or the second one based on the parity of the epoch. +When reading, we read both slots and select non-corrupted slot that holds the highest epoch. +That way, if a GC operations is interrupted, it will only corrupt the new slot, and the old +will still be readable. diff --git a/src/page_directory/writer.rs b/src/page_directory/writer.rs new file mode 100644 index 0000000..e9b4f80 --- /dev/null +++ b/src/page_directory/writer.rs @@ -0,0 +1,146 @@ +use std::fs::File; +use std::io::{self, BufWriter, Seek as _, SeekFrom, Write}; + +use super::page_list::PageList; +use super::PageId; +use crate::page_directory::{PageRangeRef, PAGE_SIZE}; +use crate::{BlockWrite, FileLike, PersistAction, BLOCK_NUM_BYTES}; + +pub struct PageListWriter { + pub(crate) page_list: PageList, + pub(crate) wrt: BufWriter, + // cursor is the offset at which we are trying to write. + // This is not a physical offset (an offset on disk), but a logical one. + // The page list is here to convert this into an actual physical offset. + pub(crate) cursor: u64, +} + +impl BlockWrite for PageListWriter { + type Session = PageRangeRef; + + #[inline(always)] + fn start_write_session(&mut self) -> io::Result { + let Some(page_id) = self.get_write_page() else { + return Err(std::io::Error::new( + io::ErrorKind::StorageFull, + "All pages are used", + )); + }; + let initial_num_pages = if self.cursor % PAGE_SIZE as u64 == 0 { + // We haven't loaded the page yet. + 0 + } else { + 1 + }; + let page_ref = self + .page_list + .new_page_range_ref_with_num_pages(page_id, initial_num_pages); + Ok(page_ref) + } + + fn write(&mut self, buf: &[u8], page_range_ref: &mut PageRangeRef) -> std::io::Result<()> { + if buf.len() == 0 { + return Ok(()); + } + assert!(buf.len() <= self.num_bytes_remaining_in_block()); + if self.cursor % PAGE_SIZE as u64 == 0 { + // If we are about to run the first write on a page. + // We need to seek into its physical address. + page_range_ref.add_page(); + self.reposition_write_head()?; + } + self.wrt.write_all(buf)?; + self.cursor += buf.len() as u64; + Ok(()) + } + + fn persist(&mut self, persist_action: crate::PersistAction) -> std::io::Result<()> { + match persist_action { + PersistAction::FlushAndFsync => { + self.wrt.flush()?; + self.wrt.get_mut().fsyncdata()?; + Ok(()) + } + PersistAction::Flush => { + // This will flush the buffer of the BufWriter to the underlying OS. + self.wrt.flush() + } + } + } + + #[inline] + fn num_bytes_remaining_in_block(&self) -> usize { + BLOCK_NUM_BYTES - (self.cursor as usize % BLOCK_NUM_BYTES) + } + + fn make_room(&mut self, num_bytes: u64) -> io::Result<()> { + if self.remaining_capacity() >= num_bytes { + return Ok(()); + } + self.gc()?; + if self.remaining_capacity() >= num_bytes { + return Ok(()); + } + let error_msg = format!( + "mrecordlog capacity reached. cursor={}, num_pages={}, requested={num_bytes}", + self.cursor, + self.page_list.num_pages() + ); + Err(io::Error::new(io::ErrorKind::OutOfMemory, error_msg)) + } +} + +impl PageListWriter { + fn gc(&mut self) -> io::Result<()> { + let num_pages_delete = self.page_list.gc(self.cursor, &mut self.wrt)?; + // We need to update the cursor. + self.cursor -= num_pages_delete as u64 * PAGE_SIZE as u64; + self.reposition_write_head()?; + Ok(()) + } + + #[inline(always)] + pub fn num_pages(&self) -> u32 { + self.page_list.num_pages() + } + + pub fn num_used_pages(&self) -> u32 { + self.cursor.div_ceil(PAGE_SIZE as u64) as u32 + } + + // Returns the physical file offset range that corresponds to the current page. + #[inline(always)] + fn get_write_page(&mut self) -> Option { + // if self.cursor == self.page_list.num_pages() as u64 * PAGE_SIZE as u64 { + // self.gc()?; + // } + let page_ord = (self.cursor / PAGE_SIZE as u64) as usize; + self.page_list.page_id(page_ord) + } + + #[inline(always)] + fn remaining_capacity(&self) -> u64 { + let total_capacity = self.page_list.num_pages() as u64 * PAGE_SIZE as u64; + total_capacity - self.cursor + } + + #[cfg(test)] + pub fn into_file(self) -> F { + self.wrt.into_inner().map_err(|_| ()).unwrap() + } + + // Seek into the file to our current write position. + pub fn reposition_write_head(&mut self) -> io::Result<()> { + let page_ord = (self.cursor / PAGE_SIZE as u64) as usize; + let Some(page_start_offset) = self.page_list.page_start_offset(page_ord) else { + return Err(std::io::Error::new( + io::ErrorKind::StorageFull, + "All pages are used", + )); + }; + let offset_within_page = self.cursor - page_ord as u64 * PAGE_SIZE as u64; + self.wrt + .seek(SeekFrom::Start(page_start_offset + offset_within_page))?; + Ok(()) + } +} diff --git a/src/persist_policy.rs b/src/persist_policy.rs index 7a758af..b7b7fdb 100644 --- a/src/persist_policy.rs +++ b/src/persist_policy.rs @@ -32,7 +32,7 @@ impl PersistAction { /// /// The `PersistPolicy` defines the trade-off applied for the second kind of /// operations. -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub enum PersistPolicy { /// Only ensure data is persisted when critical records are written. /// diff --git a/src/proptests.rs b/src/proptests.rs index a33c4f1..8ba2a4e 100644 --- a/src/proptests.rs +++ b/src/proptests.rs @@ -7,20 +7,26 @@ use proptest::prop_oneof; use proptest::strategy::{Just, Strategy}; use tempfile::TempDir; +use crate::multi_record_log::Preferences; use crate::record::{MultiPlexedRecord, MultiRecord}; -use crate::{MultiRecordLog, Record, Serializable}; +use crate::{MultiRecordLog, PersistAction, PersistPolicy, Record, Serializable}; struct PropTestEnv { tempdir: TempDir, record_log: MultiRecordLog, state: HashMap<&'static str, (Range, u64)>, - block_to_write: Vec, + record_to_write: Vec, + preferences: Preferences, } impl PropTestEnv { - pub fn new(block_size: usize) -> Self { + pub fn new(record_len: usize) -> Self { + Self::new_with_prefs(record_len, Preferences::default()) + } + + pub fn new_with_prefs(record_len: usize, preferences: Preferences) -> Self { let tempdir = tempfile::tempdir().unwrap(); - let mut record_log = MultiRecordLog::open(tempdir.path()).unwrap(); + let mut record_log = MultiRecordLog::open_with_prefs(tempdir.path(), preferences).unwrap(); record_log.create_queue("q1").unwrap(); record_log.create_queue("q2").unwrap(); let mut state = HashMap::default(); @@ -30,7 +36,8 @@ impl PropTestEnv { tempdir, record_log, state, - block_to_write: vec![b'A'; block_size], + record_to_write: vec![b'A'; record_len], + preferences, } } @@ -59,7 +66,8 @@ impl PropTestEnv { } pub fn reload(&mut self) { - self.record_log = MultiRecordLog::open(self.tempdir.path()).unwrap(); + self.record_log = + MultiRecordLog::open_with_prefs(self.tempdir.path(), self.preferences).unwrap(); for (queue, (_range, count)) in &self.state { assert_eq!( self.record_log.range(queue, ..).unwrap().count() as u64, @@ -98,7 +106,7 @@ impl PropTestEnv { .append_records( queue, Some(new_pos), - std::iter::repeat(&self.block_to_write[..]).take(count as usize), + std::iter::repeat(&self.record_to_write[..]).take(count as usize), ) .unwrap(); @@ -238,7 +246,13 @@ fn test_scenario_big_records() { }, Reopen, ]; - let mut env = PropTestEnv::new(1 << 26); + let mut env = PropTestEnv::new_with_prefs( + 1 << 26, + Preferences { + persist_policy: PersistPolicy::Always(PersistAction::Flush), + num_bytes: 500_000_000, + }, + ); // 64mb for op in ops { env.apply(op); } diff --git a/src/recordlog/reader.rs b/src/recordlog/reader.rs index c97080c..4c6ed89 100644 --- a/src/recordlog/reader.rs +++ b/src/recordlog/reader.rs @@ -1,13 +1,9 @@ -use std::io; - use crate::error::ReadRecordError; -use crate::frame::{FrameReader, FrameWriter, ReadFrameError}; -use crate::recordlog::RecordWriter; -use crate::rolling::{RollingReader, RollingWriter}; +use crate::frame::{FrameReader, ReadFrameError}; use crate::{BlockRead, Serializable}; pub struct RecordReader { - frame_reader: FrameReader, + pub(crate) frame_reader: FrameReader, record_buffer: Vec, // true if we are in the middle of reading a multifragment record. // This is useful, as it makes it possible to drop a record @@ -25,8 +21,8 @@ impl RecordReader { } } - pub fn read(&self) -> &R { - self.frame_reader.read() + pub fn start_session(&mut self) -> R::Session { + self.frame_reader.start_session() } /// Deserialize a record without actually consuming data. @@ -37,8 +33,9 @@ impl RecordReader { /// Advance cursor and deserialize the next record. pub fn read_record<'a, S: Serializable<'a>>( &'a mut self, + session: &mut R::Session, ) -> Result, ReadRecordError> { - let has_record = self.go_next()?; + let has_record = self.go_next(session)?; if has_record { let record = self.record().ok_or(ReadRecordError::Corruption)?; Ok(Some(record)) @@ -49,9 +46,9 @@ impl RecordReader { // Attempts to position the reader to the next record and return // true or false whether such a record is available or not. - pub fn go_next(&mut self) -> Result { + pub fn go_next(&mut self, session: &mut R::Session) -> Result { loop { - let frame = self.frame_reader.read_frame(); + let frame = self.frame_reader.read_frame(session); match frame { Ok((frame_type, frame_payload)) => { if frame_type.is_first_frame_of_record() { @@ -81,10 +78,3 @@ impl RecordReader { } } } - -impl RecordReader { - pub fn into_writer(self) -> io::Result> { - let frame_writer: FrameWriter = self.frame_reader.into_writer()?; - Ok(RecordWriter::from(frame_writer)) - } -} diff --git a/src/recordlog/tests.rs b/src/recordlog/tests.rs index 59f21df..6cb4adc 100644 --- a/src/recordlog/tests.rs +++ b/src/recordlog/tests.rs @@ -8,30 +8,38 @@ use crate::{PersistAction, BLOCK_NUM_BYTES}; fn test_no_data() { let data = vec![0u8; BLOCK_NUM_BYTES * 4]; let mut reader = RecordReader::open(ArrayReader::from(&data[..])); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + let mut session = reader.start_session(); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] fn test_empty_record() { let mut writer = RecordWriter::in_memory(); - writer.write_record("").unwrap(); + let mut session = writer.start_session().unwrap(); + writer.write_record("", &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); - assert_eq!(reader.read_record::<&str>().unwrap(), Some("")); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + let mut session = reader.start_session(); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), Some("")); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] fn test_simple_record() { let mut writer = RecordWriter::in_memory(); + let mut session = writer.start_session().unwrap(); let record = "hello"; - writer.write_record(record).unwrap(); + writer.write_record(record, &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); - assert!(matches!(reader.read_record::<&str>(), Ok(Some("hello")))); - assert!(matches!(reader.read_record::<&str>(), Ok(None))); + let mut session = reader.start_session(); + assert!(matches!( + reader.read_record::<&str>(&mut session), + Ok(Some("hello")) + )); + assert!(matches!(reader.read_record::<&str>(&mut session), Ok(None))); } fn make_long_entry(len: usize) -> String { @@ -42,13 +50,17 @@ fn make_long_entry(len: usize) -> String { fn test_spans_over_more_than_one_block() { let long_entry: String = make_long_entry(80_000); let mut writer = RecordWriter::in_memory(); - writer.write_record(long_entry.as_str()).unwrap(); + let mut session = writer.start_session().unwrap(); + writer + .write_record(long_entry.as_str(), &mut session) + .unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); - let record_payload: &str = reader.read_record().unwrap().unwrap(); + let mut session = reader.start_session(); + let record_payload: &str = reader.read_record(&mut session).unwrap().unwrap(); assert_eq!(record_payload, &long_entry); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] @@ -58,17 +70,24 @@ fn test_block_requires_padding() { let long_record = make_long_entry(BLOCK_NUM_BYTES - HEADER_LEN - HEADER_LEN - 1 - 8); let short_record = "hello"; let mut writer = RecordWriter::in_memory(); - writer.write_record(long_record.as_str()).unwrap(); - writer.write_record(short_record).unwrap(); + let mut session = writer.start_session().unwrap(); + writer + .write_record(long_record.as_str(), &mut session) + .unwrap(); + writer.write_record(short_record, &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buffer: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); + let mut session = reader.start_session(); assert_eq!( - reader.read_record::<&str>().unwrap(), + reader.read_record::<&str>(&mut session).unwrap(), Some(long_record.as_str()) ); - assert_eq!(reader.read_record::<&str>().unwrap(), Some(short_record)); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(short_record) + ); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] @@ -78,45 +97,59 @@ fn test_first_chunk_empty() { let long_record = make_long_entry(BLOCK_NUM_BYTES - HEADER_LEN - HEADER_LEN); let short_record = "hello"; let mut writer = RecordWriter::in_memory(); - writer.write_record(&long_record[..]).unwrap(); - writer.write_record(short_record).unwrap(); + let mut session = writer.start_session().unwrap(); + writer.write_record(&long_record[..], &mut session).unwrap(); + writer.write_record(short_record, &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); + let mut session = reader.start_session(); assert_eq!( - reader.read_record::<&str>().unwrap(), + reader.read_record::<&str>(&mut session).unwrap(), Some(long_record.as_str()) ); - assert_eq!(reader.read_record::<&str>().unwrap(), Some(short_record)); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(short_record) + ); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] fn test_behavior_upon_corruption() { let records: Vec = (0..1_000).map(|i| format!("hello{i}")).collect(); let mut writer = RecordWriter::in_memory(); + let mut session = writer.start_session().unwrap(); for record in &records { - writer.write_record(record.as_str()).unwrap(); + writer.write_record(record.as_str(), &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); let mut buffer: Vec = writer.into_writer().into(); { let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); + let mut session = reader.start_session(); for record in &records { - assert_eq!(reader.read_record::<&str>().unwrap(), Some(record.as_str())); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(record.as_str()) + ); } - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } // Introducing a corruption. buffer[1_000] = 3; { let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); + let mut session = reader.start_session(); for record in &records[0..72] { // bug at i=72 - assert_eq!(reader.read_record::<&str>().unwrap(), Some(record.as_str())); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(record.as_str()) + ); } assert!(matches!( - reader.read_record::<&str>(), + reader.read_record::<&str>(&mut session), Err(ReadRecordError::Corruption) )); } diff --git a/src/recordlog/writer.rs b/src/recordlog/writer.rs index 0a44d6e..5ac7bc0 100644 --- a/src/recordlog/writer.rs +++ b/src/recordlog/writer.rs @@ -1,12 +1,11 @@ use std::io; use crate::block_read_write::VecBlockWriter; -use crate::frame::{FrameType, FrameWriter}; -use crate::rolling::{Directory, FileNumber, RollingWriter}; -use crate::{BlockWrite, PersistAction, Serializable}; +use crate::frame::{FrameType, FrameWriter, HEADER_LEN}; +use crate::{BlockWrite, PersistAction, Serializable, BLOCK_NUM_BYTES}; pub struct RecordWriter { - frame_writer: FrameWriter, + pub(crate) frame_writer: FrameWriter, buffer: Vec, } @@ -36,6 +35,10 @@ impl RecordWriter { } impl RecordWriter { + pub fn start_session(&mut self) -> io::Result { + self.frame_writer.start_session() + } + /// Writes a record. /// /// Even if this call returns `Ok(())`, at this point the data @@ -44,11 +47,20 @@ impl RecordWriter { /// For instance, the data could be stale in a library level buffer, /// by a writer level buffer, or an application buffer, /// or could not be flushed to disk yet by the OS. - pub fn write_record<'a>(&mut self, record: impl Serializable<'a>) -> io::Result<()> { + pub fn write_record<'a>( + &mut self, + record: impl Serializable<'a>, + session: &mut W::Session, + ) -> io::Result<()> { let mut is_first_frame = true; self.buffer.clear(); record.serialize(&mut self.buffer); let mut payload = &self.buffer[..]; + let room_needed_upperbound: u64 = HEADER_LEN as u64 + + (BLOCK_NUM_BYTES as u64) + * payload.len().div_ceil(BLOCK_NUM_BYTES - HEADER_LEN) as u64; + self.frame_writer.make_room(room_needed_upperbound)?; + loop { let frame_payload_len = self .frame_writer @@ -58,7 +70,8 @@ impl RecordWriter { payload = &payload[frame_payload_len..]; let is_last_frame = payload.is_empty(); let frame_type = frame_type(is_first_frame, is_last_frame); - self.frame_writer.write_frame(frame_type, frame_payload)?; + self.frame_writer + .write_frame(frame_type, frame_payload, session)?; is_first_frame = false; if is_last_frame { break; @@ -77,20 +90,6 @@ impl RecordWriter { } } -impl RecordWriter { - pub fn directory(&mut self) -> &mut Directory { - self.frame_writer.directory() - } - - pub fn current_file(&mut self) -> &FileNumber { - self.get_underlying_wrt().current_file() - } - - pub fn size(&self) -> usize { - self.get_underlying_wrt().size() - } -} - impl RecordWriter { #[cfg(test)] pub fn in_memory() -> Self { diff --git a/src/rolling/directory.rs b/src/rolling/directory.rs index ac64bac..488bfed 100644 --- a/src/rolling/directory.rs +++ b/src/rolling/directory.rs @@ -75,12 +75,12 @@ impl Directory { } /// Get the first still used FileNumber. - pub fn first_file_number(&self) -> &FileNumber { + pub(crate) fn first_file_number(&self) -> &FileNumber { self.files.first() } /// Returns true if some file could be GCed. - pub fn has_files_that_can_be_deleted(&self) -> bool { + pub(crate) fn is_gc_necessary(&self) -> bool { self.files.count() >= 2 && self.files.first().can_be_deleted() } @@ -168,7 +168,13 @@ fn read_block(file: &mut File, block: &mut [u8; BLOCK_NUM_BYTES]) -> io::Result< } impl BlockRead for RollingReader { - fn next_block(&mut self) -> io::Result { + type Session = FileNumber; + + fn start_session(&self) -> Self::Session { + self.current_file().clone() + } + + fn next_block(&mut self, _session: &mut Self::Session) -> io::Result { let success = read_block(&mut self.file, &mut self.block)?; if success { self.block_id += 1; @@ -225,6 +231,8 @@ impl RollingWriter { &self.file_number } + /// Returns number of bytes occupied on the disk by the + /// different files of the directory pub fn size(&self) -> usize { self.directory.files.count() * FILE_NUM_BYTES } @@ -238,7 +246,13 @@ impl RollingWriter { } impl BlockWrite for RollingWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<()> { + type Session = FileNumber; + + fn start_write_session(&mut self) -> io::Result { + Ok(self.current_file().clone()) + } + + fn write(&mut self, buf: &[u8], _session: &mut Self::Session) -> io::Result<()> { if buf.is_empty() { return Ok(()); } diff --git a/src/rolling/mod.rs b/src/rolling/mod.rs index 5cca8d0..9ef27b2 100644 --- a/src/rolling/mod.rs +++ b/src/rolling/mod.rs @@ -1,8 +1,12 @@ mod directory; mod file_number; +use std::io; + pub use self::directory::{Directory, RollingReader, RollingWriter}; pub use self::file_number::{FileNumber, FileTracker}; +use crate::frame::{FrameReader, FrameWriter}; +use crate::recordlog::{RecordReader, RecordWriter}; const FRAME_NUM_BYTES: usize = 1 << 15; @@ -15,3 +19,39 @@ const NUM_BLOCKS_PER_FILE: usize = 4; const FILE_NUM_BYTES: usize = FRAME_NUM_BYTES * NUM_BLOCKS_PER_FILE; #[cfg(test)] mod tests; + +impl FrameReader { + pub fn into_writer(self) -> io::Result> { + let mut rolling_writer: RollingWriter = self.reader.into_writer()?; + rolling_writer.forward(self.cursor)?; + Ok(FrameWriter::create(rolling_writer)) + } +} + +impl FrameWriter { + pub fn directory(&mut self) -> &mut Directory { + &mut self.wrt.directory + } +} + +impl RecordReader { + pub fn into_writer(self) -> io::Result> { + let frame_writer: FrameWriter = self.frame_reader.into_writer()?; + Ok(RecordWriter::from(frame_writer)) + } +} + +// TODO remove me +impl RecordWriter { + pub fn directory(&mut self) -> &mut Directory { + self.frame_writer.directory() + } + + pub fn current_file(&mut self) -> &FileNumber { + self.get_underlying_wrt().current_file() + } + + pub fn size(&self) -> usize { + self.get_underlying_wrt().size() + } +} diff --git a/src/rolling/tests.rs b/src/rolling/tests.rs index 1c017d8..b3b393d 100644 --- a/src/rolling/tests.rs +++ b/src/rolling/tests.rs @@ -9,19 +9,21 @@ fn test_read_write() { let rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); assert!(&rolling_reader.block().iter().all(|&b| b == 0)); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); + let mut session = writer.start_write_session().unwrap(); buffer.fill(0u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); buffer.fill(1u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); buffer.fill(2u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); } let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); assert!(rolling_reader.block().iter().all(|&b| b == 0)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 1)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 2)); } @@ -32,33 +34,36 @@ fn test_read_write_2nd_block() { { let rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); + let mut session = writer.start_write_session().unwrap(); for i in 1..=10 { buffer.fill(i); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); assert!(rolling_reader.block().iter().all(|&b| b == 1)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 2)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 3)); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); for i in 13..=23 { buffer.fill(i); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); assert!(rolling_reader.block().iter().all(|&b| b == 1)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 2)); for i in 13..=23 { - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == i)); } } @@ -72,9 +77,10 @@ fn test_read_truncated() { { let rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); + let mut session = writer.start_write_session().unwrap(); for i in 0..to_write { buffer.fill(i as u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); let file_ids = writer.list_file_numbers(); @@ -91,6 +97,7 @@ fn test_read_truncated() { } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); for i in 0..to_write { // ignore file 1 as it was corrupted @@ -100,7 +107,10 @@ fn test_read_truncated() { assert!(rolling_reader.block().iter().all(|&b| b == i as u8)); // check we manage to get the next block, except for the last block: there is nothing // after - assert_eq!(rolling_reader.next_block().unwrap(), i != to_write - 1); + assert_eq!( + rolling_reader.next_block(&mut session).unwrap(), + i != to_write - 1 + ); } } } @@ -114,10 +124,11 @@ fn test_directory_single_file() { assert_eq!(first_file.unroll(&directory.files), &[0]); } let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); for _ in 0..NUM_BLOCKS_PER_FILE - 1 { - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); } - assert!(!rolling_reader.next_block().unwrap()); + assert!(!rolling_reader.next_block(&mut session).unwrap()); } #[test] @@ -128,9 +139,10 @@ fn test_directory_simple() { .unwrap() .into_writer() .unwrap(); + let mut session = writer.start_write_session().unwrap(); let buf = vec![1u8; FRAME_NUM_BYTES]; for _ in 0..(NUM_BLOCKS_PER_FILE + 1) { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut session).unwrap(); } } { @@ -143,33 +155,33 @@ fn test_directory_simple() { #[test] fn test_directory_truncate() { let tmp_dir = tempfile::tempdir().unwrap(); - let file_0: FileNumber; - let file_1: FileNumber; - let file_2: FileNumber; - let file_3: FileNumber; + let mut file_0: FileNumber; + let mut file_1: FileNumber; + let mut file_2: FileNumber; + let mut file_3: FileNumber; { let reader = RollingReader::open(tmp_dir.path()).unwrap(); - file_0 = reader.current_file().clone(); - assert!(!file_0.can_be_deleted()); let mut writer: RollingWriter = reader.into_writer().unwrap(); + file_0 = writer.start_write_session().unwrap(); + assert!(!file_0.can_be_deleted()); let buf = vec![1u8; FRAME_NUM_BYTES]; assert_eq!(&writer.current_file().unroll(&writer.directory.files), &[0]); for _ in 0..NUM_BLOCKS_PER_FILE + 1 { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut file_0).unwrap(); } assert_eq!(&writer.list_file_numbers(), &[0, 1]); - file_1 = writer.current_file().clone(); + file_1 = writer.start_write_session().unwrap(); assert_eq!(file_1.file_number(), 1); for _ in 0..NUM_BLOCKS_PER_FILE { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut file_1).unwrap(); } assert_eq!(&writer.list_file_numbers(), &[0, 1, 2]); - file_2 = writer.current_file().clone(); + file_2 = writer.start_write_session().unwrap(); assert_eq!(file_2.file_number(), 2); for _ in 0..NUM_BLOCKS_PER_FILE { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut file_2).unwrap(); } - file_3 = writer.current_file().clone(); + file_3 = writer.start_write_session().unwrap(); assert_eq!(&writer.list_file_numbers(), &[0, 1, 2, 3]); assert!(!file_0.can_be_deleted()); drop(file_1); diff --git a/src/tests.rs b/src/tests.rs index f1f03a2..3929e2a 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -2,6 +2,8 @@ use std::borrow::Cow; use bytes::Buf; +use crate::multi_record_log::Preferences; +use crate::page_directory::PAGE_SIZE; use crate::{MultiRecordLog, Record}; fn read_all_records<'a>(multi_record_log: &'a MultiRecordLog, queue: &str) -> Vec> { @@ -60,7 +62,7 @@ fn test_multi_record_log_simple() { &read_all_records(&multi_record_log, "queue"), &[b"hello".as_slice(), b"happy".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -88,7 +90,7 @@ fn test_multi_record_log_chained() { &read_all_records(&multi_record_log, "queue"), &[b"world order".as_slice(), b"nice day".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -111,7 +113,7 @@ fn test_multi_record_log_reopen() { &read_all_records(&multi_record_log, "queue"), &[b"hello".as_slice(), b"happy".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -145,7 +147,7 @@ fn test_multi_record_log() { &read_all_records(&multi_record_log, "queue2"), &[b"maitre".as_slice(), b"corbeau".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); @@ -161,7 +163,7 @@ fn test_multi_record_log() { b"bubu".as_slice() ] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -186,12 +188,12 @@ fn test_multi_record_position_known_after_truncate() { .unwrap(), Some(1) ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); multi_record_log.truncate("queue", ..=1).unwrap(); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); @@ -318,32 +320,33 @@ fn test_truncate_range_correct_pos() { } } -#[test] -fn test_multi_record_size() { - let tempdir = tempfile::tempdir().unwrap(); - { - let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); - assert_eq!(multi_record_log.resource_usage().memory_used_bytes, 0); - assert_eq!(multi_record_log.resource_usage().memory_allocated_bytes, 0); - - multi_record_log.create_queue("queue").unwrap(); - let size_mem_create = multi_record_log.resource_usage(); - assert!(size_mem_create.memory_used_bytes > 0); - assert!(size_mem_create.memory_allocated_bytes >= size_mem_create.memory_used_bytes); - - multi_record_log - .append_record("queue", None, &b"hello"[..]) - .unwrap(); - let size_mem_append = multi_record_log.resource_usage(); - assert!(size_mem_append.memory_used_bytes > size_mem_create.memory_used_bytes); - assert!(size_mem_append.memory_allocated_bytes >= size_mem_append.memory_used_bytes); - assert!(size_mem_append.memory_allocated_bytes >= size_mem_create.memory_allocated_bytes); - - multi_record_log.truncate("queue", ..=0).unwrap(); - let size_mem_truncate = multi_record_log.resource_usage(); - assert!(size_mem_truncate.memory_used_bytes < size_mem_append.memory_used_bytes); - } -} +// #[test] +// fn test_multi_record_size() { +// let tempdir = tempfile::tempdir().unwrap(); +// { +// let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); +// // assert_eq!(multi_record_log.resource_usage().memory_used_bytes, 0); +// // assert_eq!(multi_record_log.resource_usage().memory_allocated_bytes, 0); + +// multi_record_log.create_queue("queue").unwrap(); +// // let size_mem_create = multi_record_log.resource_usage(); +// assert!(size_mem_create.memory_used_bytes > 0); +// assert!(size_mem_create.memory_allocated_bytes >= size_mem_create.memory_used_bytes); + +// multi_record_log +// .append_record("queue", None, &b"hello"[..]) +// .unwrap(); +// // let size_mem_append = multi_record_log.resource_usage(); +// assert!(size_mem_append.memory_used_bytes > size_mem_create.memory_used_bytes); +// assert!(size_mem_append.memory_allocated_bytes >= size_mem_append.memory_used_bytes); +// assert!(size_mem_append.memory_allocated_bytes >= +// size_mem_create.memory_allocated_bytes); + +// multi_record_log.truncate("queue", ..=0).unwrap(); +// // let size_mem_truncate = multi_record_log.resource_usage(); +// assert!(size_mem_truncate.memory_used_bytes < size_mem_append.memory_used_bytes); +// } +// } #[test] fn test_open_corrupted() { @@ -455,3 +458,31 @@ fn test_last_record() { let last_record = multi_record_log.last_record("queue1").unwrap(); assert!(last_record.is_none()); } + +#[test] +fn test_gc() { + let tempdir = tempfile::tempdir().unwrap(); + + let preferences = Preferences { + num_bytes: PAGE_SIZE as u64 * 10, + ..Default::default() + }; + let mut multi_record_log = + MultiRecordLog::open_with_prefs(tempdir.path(), preferences).unwrap(); + multi_record_log.create_queue("queue1").unwrap(); + multi_record_log.create_queue("queue2").unwrap(); + + let payload = vec![b'a'; 5000]; + const N: usize = 100; + for _ in 0..N { + multi_record_log + .append_record("queue1", None, &payload[..]) + .unwrap(); + } + multi_record_log.truncate("queue1", ..=100).unwrap(); + for _ in 0..N { + multi_record_log + .append_record("queue2", None, &payload[..]) + .unwrap(); + } +}