diff --git a/doradb-storage/Cargo.toml b/doradb-storage/Cargo.toml index de03eb7..cd24566 100644 --- a/doradb-storage/Cargo.toml +++ b/doradb-storage/Cargo.toml @@ -34,6 +34,8 @@ humantime = "2.1" criterion = { version = "0.5", features = ["html_reports"] } criterion-perf-events = "0.4" perfcnt = "0.8" +byte-unit = "5" +fastrand = "2" [profile.release] debug = true diff --git a/doradb-storage/examples/bench_block_index.rs b/doradb-storage/examples/bench_block_index.rs index 9520901..050b733 100644 --- a/doradb-storage/examples/bench_block_index.rs +++ b/doradb-storage/examples/bench_block_index.rs @@ -1,6 +1,6 @@ use clap::Parser; use doradb_storage::buffer::FixedBufferPool; -use doradb_storage::table::{IndexKey, IndexSchema, TableSchema}; +use doradb_storage::catalog::{IndexKey, IndexSchema, TableSchema}; use doradb_storage::value::ValKind; use perfcnt::linux::{HardwareEventType as Hardware, PerfCounterBuilderLinux as Builder}; use perfcnt::{AbstractPerfCounter, PerfCounter}; diff --git a/doradb-storage/examples/bench_insert.rs b/doradb-storage/examples/bench_insert.rs new file mode 100644 index 0000000..b8b1495 --- /dev/null +++ b/doradb-storage/examples/bench_insert.rs @@ -0,0 +1,250 @@ +//! Multi-threaded transaction processing. +//! This example runs empty transactions via multiple threads. +//! Its goal is to testing system bottleneck on starting and committing transactions. +use byte_unit::{Byte, ParseError}; +use clap::Parser; +use crossbeam_utils::sync::WaitGroup; +use doradb_storage::buffer::FixedBufferPool; +use doradb_storage::catalog::{Catalog, IndexKey, IndexSchema, TableSchema}; +use doradb_storage::session::Session; +use doradb_storage::table::TableID; +use doradb_storage::trx::log::LogSync; +use doradb_storage::trx::sys::{TransactionSystem, TrxSysConfig}; +use doradb_storage::value::{Val, ValKind}; +use easy_parallel::Parallel; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +fn main() { + let args = Args::parse(); + + let buf_pool = FixedBufferPool::with_capacity_static(args.buffer_pool_size).unwrap(); + println!("buffer pool size is {}", buf_pool.size()); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default() + .log_file_prefix(args.log_file_prefix.to_string()) + .log_partitions(args.log_partitions) + .io_depth_per_log(args.io_depth_per_log) + .log_file_max_size(args.log_file_max_size) + .log_sync(args.log_sync) + .log_drop(args.log_drop) + .max_io_size(args.max_io_size) + .gc(args.gc_enabled) + .purge_threads(args.purge_threads) + .build_static(buf_pool, catalog); + // create empty table + let table_id = catalog.create_table( + buf_pool, + TableSchema::new( + vec![ + ValKind::I32.nullable(false), + ValKind::I32.nullable(false), + ValKind::VarByte.nullable(false), + ValKind::VarByte.nullable(false), + ], + vec![IndexSchema::new(vec![IndexKey::new(0)], true)], + ), + ); + // start benchmark + { + let start = Instant::now(); + let wg = WaitGroup::new(); + let stop = Arc::new(AtomicBool::new(false)); + let ex = smol::Executor::new(); + let (notify, shutdown) = flume::unbounded::<()>(); + // start transaction sessions. + for sess_id in 0..args.sessions { + let wg = wg.clone(); + let stop = Arc::clone(&stop); + ex.spawn(worker( + buf_pool, + trx_sys, + catalog, + table_id, + sess_id as i32, + args.sessions as i32, + stop, + wg, + )) + .detach(); + } + // start system threads. + let _ = Parallel::new() + .each(0..args.threads, |_| { + smol::block_on(ex.run(shutdown.recv_async())) + }) + .finish({ + let stop = Arc::clone(&stop); + move || { + std::thread::sleep(args.duration); + stop.store(true, Ordering::SeqCst); + wg.wait(); + drop(notify) + } + }); + let dur = start.elapsed(); + let stats = trx_sys.trx_sys_stats(); + let total_trx_count = stats.trx_count; + let commit_count = stats.commit_count; + let log_bytes = stats.log_bytes; + let sync_count = stats.sync_count; + let sync_nanos = stats.sync_nanos; + let sync_latency = if sync_count == 0 { + 0f64 + } else { + sync_nanos as f64 / 1000f64 / sync_count as f64 + }; + let io_submit_count = stats.io_submit_count; + let io_submit_nanos = stats.io_submit_nanos; + let io_submit_latency = if io_submit_count == 0 { + 0f64 + } else { + io_submit_nanos as f64 / 1000f64 / io_submit_count as f64 + }; + let io_wait_count = stats.io_wait_count; + let io_wait_nanos = stats.io_wait_nanos; + let io_wait_latency = if io_wait_count == 0 { + 0f64 + } else { + io_wait_nanos as f64 / 1000f64 / io_wait_count as f64 + }; + let trx_per_group = if commit_count == 0 { + 0f64 + } else { + total_trx_count as f64 / commit_count as f64 + }; + let tps = total_trx_count as f64 * 1_000_000_000f64 / dur.as_nanos() as f64; + println!( + "threads={},dur={},total_trx={},groups={},sync={},sync_dur={:.2}us,\ + io_submit={},io_submit_dur={:.2}us,io_wait={},io_wait_dur={:.2}us,\ + trx/grp={:.2},trx/s={:.0},log/s={:.2}MB,purge_trx={},purge_row={},purge_index={}", + args.threads, + dur.as_micros(), + total_trx_count, + commit_count, + sync_count, + sync_latency, + io_submit_count, + io_submit_latency, + io_wait_count, + io_wait_latency, + trx_per_group, + tps, + log_bytes as f64 / dur.as_micros() as f64, + stats.purge_trx_count, + stats.purge_row_count, + stats.purge_index_count, + ); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); + FixedBufferPool::drop_static(buf_pool); + } +} + +#[inline] +async fn worker( + buf_pool: &FixedBufferPool, + trx_sys: &TransactionSystem, + catalog: &'static Catalog, + table_id: TableID, + id_start: i32, + id_step: i32, + stop: Arc, + wg: WaitGroup, +) { + let table = catalog.get_table(table_id).unwrap(); + let mut session = Session::new(); + let stop = &*stop; + let mut id = id_start; + let mut c = [0u8; 120]; + let mut pad = [0u8; 60]; + while !stop.load(Ordering::Relaxed) { + let k = fastrand::i32(0..1024 * 1024); + c.iter_mut().for_each(|b| { + *b = fastrand::alphabetic() as u8; + }); + pad.iter_mut().for_each(|b| { + *b = fastrand::alphabetic() as u8; + }); + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row( + buf_pool, + &mut stmt, + vec![ + Val::from(id), + Val::from(k), + Val::from(&c[..]), + Val::from(&pad[..]), + ], + ) + .await; + assert!(res.is_ok()); + trx = stmt.succeed(); + match trx_sys.commit(trx, buf_pool, &catalog).await { + Ok(s) => session = s, + Err(_) => return, + } + id += id_step; + } + drop(wg); +} + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// thread number to run transactions + #[arg(long, default_value = "1")] + threads: usize, + + #[arg(long, default_value = "1")] + sessions: usize, + + /// Number of transactions at least one thread should complete + #[arg(long, default_value = "10s", value_parser = humantime::parse_duration)] + duration: Duration, + + /// path of redo log file + #[arg(long, default_value = "redo.log")] + log_file_prefix: String, + + #[arg(long, default_value = "1")] + log_partitions: usize, + + #[arg(long, default_value = "fsync", value_parser = LogSync::from_str)] + log_sync: LogSync, + + #[arg(long, default_value = "false")] + log_drop: bool, + + /// size of log file + #[arg(long, default_value = "1GiB", value_parser = parse_byte_size)] + log_file_max_size: usize, + + #[arg(long, default_value = "8192", value_parser = parse_byte_size)] + max_io_size: usize, + + #[arg(long, default_value = "32")] + io_depth_per_log: usize, + + #[arg(long, default_value = "2GiB", value_parser = parse_byte_size)] + buffer_pool_size: usize, + + /// whether to enable GC + #[arg(long, action = clap::ArgAction::Set, default_value = "true", value_parser = clap::builder::BoolishValueParser::new())] + gc_enabled: bool, + + #[arg(long, default_value = "1")] + purge_threads: usize, +} + +#[inline] +fn parse_byte_size(input: &str) -> Result { + Byte::parse_str(input, true).map(|b| b.as_u64() as usize) +} diff --git a/doradb-storage/examples/multi_threaded_trx.rs b/doradb-storage/examples/multi_threaded_trx.rs index 5c97a31..8127d53 100644 --- a/doradb-storage/examples/multi_threaded_trx.rs +++ b/doradb-storage/examples/multi_threaded_trx.rs @@ -1,10 +1,14 @@ //! Multi-threaded transaction processing. //! This example runs empty transactions via multiple threads. //! Its goal is to testing system bottleneck on starting and committing transactions. +use byte_unit::{Byte, ParseError}; use clap::Parser; use crossbeam_utils::sync::WaitGroup; +use doradb_storage::buffer::FixedBufferPool; +use doradb_storage::catalog::Catalog; use doradb_storage::session::Session; -use doradb_storage::trx::sys::{LogSync, TransactionSystem, TrxSysConfig}; +use doradb_storage::trx::log::LogSync; +use doradb_storage::trx::sys::{TransactionSystem, TrxSysConfig}; use easy_parallel::Parallel; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; @@ -15,6 +19,8 @@ use std::time::Instant; fn main() { let args = Args::parse(); + let buf_pool = FixedBufferPool::with_capacity_static(128 * 1024 * 1024).unwrap(); + let catalog = Catalog::::empty_static(); let trx_sys = TrxSysConfig::default() .log_file_prefix(args.log_file_prefix.to_string()) .log_partitions(args.log_partitions) @@ -24,7 +30,7 @@ fn main() { .log_drop(args.log_drop) .max_io_size(args.max_io_size) .gc(args.gc_enabled) - .build_static(); + .build_static(buf_pool, catalog); { let start = Instant::now(); let wg = WaitGroup::new(); @@ -35,7 +41,8 @@ fn main() { for _ in 0..args.sessions { let wg = wg.clone(); let stop = Arc::clone(&stop); - ex.spawn(worker(trx_sys, stop, wg)).detach(); + ex.spawn(worker(buf_pool, catalog, trx_sys, stop, wg)) + .detach(); } // start system threads. let _ = Parallel::new() @@ -81,17 +88,25 @@ fn main() { } unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); + FixedBufferPool::drop_static(buf_pool); } } #[inline] -async fn worker(trx_sys: &TransactionSystem, stop: Arc, wg: WaitGroup) { +async fn worker( + buf_pool: &FixedBufferPool, + catalog: &Catalog, + trx_sys: &TransactionSystem, + stop: Arc, + wg: WaitGroup, +) { let mut session = Session::new(); let stop = &*stop; while !stop.load(Ordering::Relaxed) { let mut trx = session.begin_trx(trx_sys); trx.add_pseudo_redo_log_entry(); - match trx_sys.commit(trx).await { + match trx_sys.commit(trx, buf_pool, &catalog).await { Ok(s) => session = s, Err(_) => return, } @@ -127,10 +142,10 @@ struct Args { log_drop: bool, /// size of log file - #[arg(long, default_value = "1073741824")] + #[arg(long, default_value = "1GiB", value_parser = parse_byte_size)] log_file_max_size: usize, - #[arg(long, default_value = "8192")] + #[arg(long, default_value = "8KiB", value_parser = parse_byte_size)] max_io_size: usize, #[arg(long, default_value = "32")] @@ -140,3 +155,8 @@ struct Args { #[arg(long)] gc_enabled: bool, } + +#[inline] +fn parse_byte_size(input: &str) -> Result { + Byte::parse_str(input, true).map(|b| b.as_u64() as usize) +} diff --git a/doradb-storage/src/buffer/mod.rs b/doradb-storage/src/buffer/mod.rs index f7a472e..cb5c1da 100644 --- a/doradb-storage/src/buffer/mod.rs +++ b/doradb-storage/src/buffer/mod.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; pub const SAFETY_PAGES: usize = 10; /// Abstraction of buffer pool. -pub trait BufferPool { +pub trait BufferPool: Sync { /// Allocate a new page. fn allocate_page(&self) -> PageExclusiveGuard<'_, T>; @@ -101,6 +101,12 @@ impl FixedBufferPool { Ok(leak) } + /// Returns the maximum page number of this pool. + #[inline] + pub fn size(&self) -> usize { + self.size + } + /// Drop static buffer pool. /// /// # Safety diff --git a/doradb-storage/src/catalog.rs b/doradb-storage/src/catalog.rs index c1d5038..c1c145e 100644 --- a/doradb-storage/src/catalog.rs +++ b/doradb-storage/src/catalog.rs @@ -1,8 +1,11 @@ use crate::buffer::BufferPool; use crate::index::{BlockIndex, SecondaryIndex}; -use crate::table::{Table, TableID, TableSchema}; +use crate::row::ops::{SelectKey, UpdateCol}; +use crate::table::{Table, TableID}; +use crate::value::{Layout, Val, ValKind, ValType}; use parking_lot::Mutex; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -22,6 +25,17 @@ impl Catalog

{ } } + #[inline] + pub fn empty_static() -> &'static Self { + let cat = Self::empty(); + Box::leak(Box::new(cat)) + } + + #[inline] + pub unsafe fn drop_static(this: &'static Self) { + drop(Box::from_raw(this as *const Self as *mut Self)) + } + #[inline] pub fn create_table(&self, buf_pool: &P, schema: TableSchema) -> TableID { let table_id = self.table_id.fetch_add(1, Ordering::SeqCst); @@ -65,3 +79,224 @@ pub struct TableMeta

{ pub blk_idx: Arc>, pub sec_idx: Arc<[SecondaryIndex]>, } + +pub struct TableSchema { + types: Vec, + // fix length is the total inline length of all columns. + pub fix_len: usize, + // index of var-length columns. + pub var_cols: Vec, + // index column id. + pub indexes: Vec, + // columns that are included in any index. + pub user_index_cols: HashSet, +} + +impl TableSchema { + /// Create a new schema. + /// RowID is not included in input, but will be created + /// automatically. + #[inline] + pub fn new(user_types: Vec, indexes: Vec) -> Self { + debug_assert!(!user_types.is_empty()); + debug_assert!(indexes.iter().all(|is| { + is.keys + .iter() + .all(|k| (k.user_col_idx as usize) < user_types.len()) + })); + + let mut types = Vec::with_capacity(user_types.len() + 1); + types.push(ValType { + kind: ValKind::U64, + nullable: false, + }); + types.extend(user_types); + let mut fix_len = 0; + let mut var_cols = vec![]; + for (idx, ty) in types.iter().enumerate() { + fix_len += ty.kind.layout().inline_len(); + if !ty.kind.layout().is_fixed() { + var_cols.push(idx); + } + } + let mut user_index_cols = HashSet::new(); + for index in &indexes { + for key in &index.keys { + user_index_cols.insert(key.user_col_idx as usize); + } + } + TableSchema { + types, + fix_len, + var_cols, + indexes, + user_index_cols, + } + } + + /// Returns column count of this schema, including row id. + #[inline] + pub fn col_count(&self) -> usize { + self.types.len() + } + + /// Returns layouts of all columns, including row id. + #[inline] + pub fn types(&self) -> &[ValType] { + &self.types + } + + #[inline] + pub fn user_types(&self) -> &[ValType] { + &self.types[1..] + } + + /// Returns whether the type is matched at given column index, row id is excluded. + #[inline] + pub fn user_col_type_match(&self, user_col_idx: usize, val: &Val) -> bool { + self.col_type_match(user_col_idx + 1, val) + } + + /// Returns whether the type is matched at given column index. + #[inline] + pub fn col_type_match(&self, col_idx: usize, val: &Val) -> bool { + layout_match(val, self.layout(col_idx)) + } + + #[inline] + pub fn index_layout_match(&self, index_no: usize, vals: &[Val]) -> bool { + let index = &self.indexes[index_no]; + if index.keys.len() != vals.len() { + return false; + } + index + .keys + .iter() + .map(|k| self.user_layout(k.user_col_idx as usize)) + .zip(vals) + .all(|(layout, val)| layout_match(val, layout)) + } + + #[inline] + pub fn user_layout(&self, user_col_idx: usize) -> Layout { + self.layout(user_col_idx + 1) + } + + #[inline] + pub fn layout(&self, col_idx: usize) -> Layout { + self.types[col_idx].kind.layout() + } + + #[inline] + pub fn keys_for_insert(&self, row: &[Val]) -> Vec { + self.indexes + .iter() + .enumerate() + .map(|(index_no, is)| { + let vals: Vec = is + .keys + .iter() + .map(|k| row[k.user_col_idx as usize].clone()) + .collect(); + SelectKey { index_no, vals } + }) + .collect() + } + + #[inline] + pub fn index_may_change(&self, update: &[UpdateCol]) -> bool { + update + .iter() + .any(|uc| self.user_index_cols.contains(&uc.idx)) + } +} + +#[inline] +fn layout_match(val: &Val, layout: Layout) -> bool { + match (val, layout) { + (Val::Null, _) => true, + (Val::Byte1(_), Layout::Byte1) + | (Val::Byte2(_), Layout::Byte2) + | (Val::Byte4(_), Layout::Byte4) + | (Val::Byte8(_), Layout::Byte8) + | (Val::VarByte(_), Layout::VarByte) => true, + _ => false, + } +} + +pub struct IndexSchema { + pub keys: Vec, + pub unique: bool, +} + +impl IndexSchema { + #[inline] + pub fn new(keys: Vec, unique: bool) -> Self { + debug_assert!(!keys.is_empty()); + IndexSchema { keys, unique } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct IndexKey { + pub user_col_idx: u16, + pub order: IndexOrder, +} + +impl IndexKey { + #[inline] + pub fn new(user_col_idx: u16) -> Self { + IndexKey { + user_col_idx, + order: IndexOrder::Asc, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum IndexOrder { + Asc, + Desc, +} + +pub struct TableCache<'a, P> { + catalog: &'a Catalog

, + map: HashMap>>, +} + +impl<'a, P: BufferPool> TableCache<'a, P> { + #[inline] + pub fn new(catalog: &'a Catalog

) -> Self { + TableCache { + catalog, + map: HashMap::new(), + } + } + + #[inline] + pub fn get_table(&mut self, table_id: TableID) -> &Option> { + if self.map.contains_key(&table_id) { + return &self.map[&table_id]; + } + let table = self.catalog.get_table(table_id); + self.map.entry(table_id).or_insert(table) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + + /// Table1 has single i32 column, with unique index of this column. + #[inline] + pub(crate) fn table1(buf_pool: &P, catalog: &Catalog

) -> TableID { + catalog.create_table( + buf_pool, + TableSchema::new( + vec![ValKind::I32.nullable(false)], + vec![IndexSchema::new(vec![IndexKey::new(0)], true)], + ), + ) + } +} diff --git a/doradb-storage/src/index/block_index.rs b/doradb-storage/src/index/block_index.rs index d5db938..8e6bd13 100644 --- a/doradb-storage/src/index/block_index.rs +++ b/doradb-storage/src/index/block_index.rs @@ -2,13 +2,13 @@ use crate::buffer::frame::BufferFrameAware; use crate::buffer::guard::{PageExclusiveGuard, PageGuard, PageOptimisticGuard, PageSharedGuard}; use crate::buffer::page::{PageID, LSN, PAGE_SIZE}; use crate::buffer::BufferPool; +use crate::catalog::TableSchema; use crate::error::{ Error, Result, Validation, Validation::{Invalid, Valid}, }; use crate::latch::LatchFallbackMode; use crate::row::{RowID, RowPage, INVALID_ROW_ID}; -use crate::table::TableSchema; use either::Either::{Left, Right}; use parking_lot::Mutex; use std::marker::PhantomData; @@ -770,6 +770,8 @@ impl BlockIndex

{ } } +unsafe impl

Send for BlockIndex

{} + pub enum RowLocation { ColSegment(u64, u64), RowPage(PageID), @@ -906,7 +908,7 @@ struct BranchLookup<'a> { mod tests { use super::*; use crate::buffer::FixedBufferPool; - use crate::table::schema::{IndexKey, IndexSchema}; + use crate::catalog::{IndexKey, IndexSchema}; use crate::value::ValKind; #[test] diff --git a/doradb-storage/src/index/secondary_index.rs b/doradb-storage/src/index/secondary_index.rs index 4e25d20..6cc3be3 100644 --- a/doradb-storage/src/index/secondary_index.rs +++ b/doradb-storage/src/index/secondary_index.rs @@ -1,6 +1,6 @@ +use crate::catalog::IndexSchema; use crate::index::smart_key::SmartKey; use crate::row::RowID; -use crate::table::schema::IndexSchema; use crate::value::{Val, ValKind, ValType}; use doradb_datatype::konst::{ValidF32, ValidF64}; use either::Either; @@ -96,26 +96,26 @@ pub enum IndexKind { impl IndexKind { #[inline] - pub fn unique(index: T) -> Self { + pub fn unique(index: T) -> Self { IndexKind::Unique(Arc::new(index)) } } -pub trait UniqueIndex { +pub trait UniqueIndex: Send + Sync + 'static { fn lookup(&self, key: &[Val]) -> Option; fn insert(&self, key: &[Val], row_id: RowID) -> Option; fn insert_if_not_exists(&self, key: &[Val], row_id: RowID) -> Option; - fn delete(&self, key: &[Val]) -> Option; + fn compare_delete(&self, key: &[Val], old_row_id: RowID) -> bool; fn compare_exchange(&self, key: &[Val], old_row_id: RowID, new_row_id: RowID) -> bool; - // todo: scan + fn scan_values(&self, values: &mut Vec); } -pub trait NonUniqueIndex { +pub trait NonUniqueIndex: Send + Sync + 'static { fn lookup(&self, key: &[Val], res: &mut Vec); fn insert(&self, key: &[Val], row_id: RowID); @@ -287,7 +287,9 @@ impl PartitionSingleKeyIndex { } } -impl UniqueIndex for PartitionSingleKeyIndex { +impl UniqueIndex + for PartitionSingleKeyIndex +{ #[inline] fn lookup(&self, key: &[Val]) -> Option { let key = T::encode(key); @@ -319,11 +321,21 @@ impl UniqueIndex for PartitionSingleKeyIndex { } #[inline] - fn delete(&self, key: &[Val]) -> Option { + fn compare_delete(&self, key: &[Val], old_row_id: RowID) -> bool { let key = T::encode(key); let tree = self.select(&key); let mut g = tree.write(); - g.remove(&key) + match g.entry(key) { + Entry::Occupied(occ) => { + if occ.get() == &old_row_id { + occ.remove(); + true + } else { + false + } + } + Entry::Vacant(_) => false, + } } #[inline] @@ -343,6 +355,14 @@ impl UniqueIndex for PartitionSingleKeyIndex { None => false, } } + + #[inline] + fn scan_values(&self, values: &mut Vec) { + for tree in &self.0 { + let g = tree.read(); + values.extend(g.values()); + } + } } pub struct PartitionMultiKeyIndex { @@ -389,10 +409,10 @@ impl UniqueIndex for PartitionMultiKeyIndex { } #[inline] - fn delete(&self, key: &[Val]) -> Option { + fn compare_delete(&self, key: &[Val], old_row_id: RowID) -> bool { let key = self.encode(key); let key = std::slice::from_ref(&key); - self.index.delete(key) + self.index.compare_delete(key, old_row_id) } #[inline] @@ -401,4 +421,9 @@ impl UniqueIndex for PartitionMultiKeyIndex { let key = std::slice::from_ref(&key); self.index.compare_exchange(key, old_row_id, new_row_id) } + + #[inline] + fn scan_values(&self, values: &mut Vec) { + self.index.scan_values(values); + } } diff --git a/doradb-storage/src/row/mod.rs b/doradb-storage/src/row/mod.rs index 32f8a1f..ee07df1 100644 --- a/doradb-storage/src/row/mod.rs +++ b/doradb-storage/src/row/mod.rs @@ -3,8 +3,8 @@ pub mod ops; use crate::buffer::frame::{BufferFrameAware, FrameHeader}; use crate::buffer::page::PAGE_SIZE; use crate::buffer::BufferPool; +use crate::catalog::TableSchema; use crate::row::ops::{Delete, InsertRow, Select, SelectKey, Update, UpdateCol}; -use crate::table::TableSchema; use crate::trx::undo::UndoMap; use crate::value::*; use std::fmt; @@ -1219,7 +1219,7 @@ pub fn var_len_for_insert(schema: &TableSchema, user_cols: &[Val]) -> usize { mod tests { use core::str; - use crate::table::schema::{IndexKey, IndexSchema}; + use crate::catalog::{IndexKey, IndexSchema}; use mem::MaybeUninit; use super::*; diff --git a/doradb-storage/src/table/mod.rs b/doradb-storage/src/table/mod.rs index 875a8c0..f0c4da0 100644 --- a/doradb-storage/src/table/mod.rs +++ b/doradb-storage/src/table/mod.rs @@ -1,10 +1,11 @@ -pub mod schema; +// pub mod schema; #[cfg(test)] mod tests; use crate::buffer::guard::PageSharedGuard; use crate::buffer::page::PageID; use crate::buffer::BufferPool; +use crate::catalog::{IndexSchema, TableSchema}; use crate::index::{BlockIndex, RowLocation, SecondaryIndex, UniqueIndex}; use crate::latch::LatchFallbackMode; use crate::row::ops::{ @@ -14,19 +15,17 @@ use crate::row::ops::{ use crate::row::{estimate_max_row_count, RowID, RowPage, RowRead}; use crate::stmt::Statement; use crate::trx::redo::{RedoEntry, RedoKind}; -use crate::trx::row::{RowReadAccess, RowWriteAccess}; +use crate::trx::row::{RowLatestStatus, RowReadAccess, RowWriteAccess}; use crate::trx::undo::{ IndexUndo, IndexUndoKind, NextRowUndo, NextRowUndoStatus, NextTrxCTS, OwnedRowUndo, RowUndoBranch, RowUndoHead, RowUndoKind, RowUndoRef, }; -use crate::trx::{trx_is_committed, ActiveTrx}; +use crate::trx::{trx_is_committed, ActiveTrx, TrxID}; use crate::value::{Val, PAGE_VAR_LEN_INLINE}; use std::collections::HashSet; use std::mem; use std::sync::Arc; -pub use schema::*; - // todo: integrate with doradb_catalog::TableID. pub type TableID = u64; @@ -326,6 +325,93 @@ impl Table

{ } } + /// Delete index by purge threads. + /// This method will be only called by internal threads and don't maintain + /// transaction properties. + /// + /// It checks whether the index entry still points to valid row, and if not, + /// remove the entry. + /// + /// The validation is based on MVCC with minimum active STS. If the input + /// key is not found on the path of undo chain, it means the index entry can be + /// removed. + #[inline] + pub fn delete_index( + &self, + buf_pool: &P, + key: &SelectKey, + row_id: RowID, + min_active_sts: TrxID, + ) -> bool { + // todo: consider index drop. + let index_schema = &self.schema.indexes[key.index_no]; + if index_schema.unique { + let index = self.sec_idx[key.index_no].unique().unwrap(); + return self.delete_unique_index(buf_pool, index, key, row_id, min_active_sts); + } + todo!() + } + + #[inline] + fn delete_unique_index( + &self, + buf_pool: &P, + index: &dyn UniqueIndex, + key: &SelectKey, + row_id: RowID, + min_active_sts: TrxID, + ) -> bool { + loop { + match index.lookup(&key.vals) { + None => return false, // Another thread deleted this entry. + Some(index_row_id) => { + if index_row_id != row_id { + // Row id changed, means another transaction inserted + // new row with same key and reused this index entry. + // So we skip to delete it. + return false; + } + match self.blk_idx.find_row_id(buf_pool, row_id) { + RowLocation::NotFound => { + return index.compare_delete(&key.vals, row_id); + } + RowLocation::ColSegment(..) => todo!(), + RowLocation::RowPage(page_id) => { + let page_guard = buf_pool + .get_page(page_id, LatchFallbackMode::Shared) + .block_until_shared(); + if !validate_page_row_range(&page_guard, page_id, row_id) { + continue; + } + let access = page_guard.read_row_by_id(row_id); + // check if row is invisible + match access.latest_status() { + RowLatestStatus::NotFound => { + return index.compare_delete(&key.vals, row_id); + } + RowLatestStatus::Uncommitted => { + // traverse version chain to see if any visible version matches + // the input key. + todo!() + } + RowLatestStatus::Committed(cts, deleted) => { + if cts < min_active_sts && deleted { + if deleted { + return index.compare_delete(&key.vals, row_id); + } + } + // traverse version chain to see if any visible version matches + // the input key. + todo!() + } + } + } + } + } + } + } + } + /// Move update is similar to a delete+insert. /// It's caused by no more space on current row page. #[inline] @@ -1141,7 +1227,7 @@ impl Table

{ row_id, &mut access, ); - let index_undo = self.index_undo(row_id, IndexUndoKind::GC(key)); + let index_undo = self.index_undo(row_id, IndexUndoKind::DeferDelete(key)); stmt.index_undo.push(index_undo); } else { todo!(); diff --git a/doradb-storage/src/table/schema.rs b/doradb-storage/src/table/schema.rs deleted file mode 100644 index 702f8ee..0000000 --- a/doradb-storage/src/table/schema.rs +++ /dev/null @@ -1,183 +0,0 @@ -use crate::row::ops::{SelectKey, UpdateCol}; -use crate::value::{Layout, Val, ValKind, ValType}; -use std::collections::HashSet; - -pub struct TableSchema { - types: Vec, - // fix length is the total inline length of all columns. - pub fix_len: usize, - // index of var-length columns. - pub var_cols: Vec, - // index column id. - pub indexes: Vec, - // columns that are included in any index. - pub user_index_cols: HashSet, -} - -impl TableSchema { - /// Create a new schema. - /// RowID is not included in input, but will be created - /// automatically. - #[inline] - pub fn new(user_types: Vec, indexes: Vec) -> Self { - debug_assert!(!user_types.is_empty()); - debug_assert!(indexes.iter().all(|is| { - is.keys - .iter() - .all(|k| (k.user_col_idx as usize) < user_types.len()) - })); - - let mut types = Vec::with_capacity(user_types.len() + 1); - types.push(ValType { - kind: ValKind::U64, - nullable: false, - }); - types.extend(user_types); - let mut fix_len = 0; - let mut var_cols = vec![]; - for (idx, ty) in types.iter().enumerate() { - fix_len += ty.kind.layout().inline_len(); - if !ty.kind.layout().is_fixed() { - var_cols.push(idx); - } - } - let mut user_index_cols = HashSet::new(); - for index in &indexes { - for key in &index.keys { - user_index_cols.insert(key.user_col_idx as usize); - } - } - TableSchema { - types, - fix_len, - var_cols, - indexes, - user_index_cols, - } - } - - /// Returns column count of this schema, including row id. - #[inline] - pub fn col_count(&self) -> usize { - self.types.len() - } - - /// Returns layouts of all columns, including row id. - #[inline] - pub fn types(&self) -> &[ValType] { - &self.types - } - - #[inline] - pub fn user_types(&self) -> &[ValType] { - &self.types[1..] - } - - /// Returns whether the type is matched at given column index, row id is excluded. - #[inline] - pub fn user_col_type_match(&self, user_col_idx: usize, val: &Val) -> bool { - self.col_type_match(user_col_idx + 1, val) - } - - /// Returns whether the type is matched at given column index. - #[inline] - pub fn col_type_match(&self, col_idx: usize, val: &Val) -> bool { - layout_match(val, self.layout(col_idx)) - } - - #[inline] - pub fn index_layout_match(&self, index_no: usize, vals: &[Val]) -> bool { - let index = &self.indexes[index_no]; - if index.keys.len() != vals.len() { - return false; - } - index - .keys - .iter() - .map(|k| self.user_layout(k.user_col_idx as usize)) - .zip(vals) - .all(|(layout, val)| layout_match(val, layout)) - } - - #[inline] - pub fn user_layout(&self, user_col_idx: usize) -> Layout { - self.layout(user_col_idx + 1) - } - - #[inline] - pub fn layout(&self, col_idx: usize) -> Layout { - self.types[col_idx].kind.layout() - } - - #[inline] - pub fn keys_for_insert(&self, row: &[Val]) -> Vec { - self.indexes - .iter() - .enumerate() - .map(|(index_no, is)| { - let vals: Vec = is - .keys - .iter() - .map(|k| row[k.user_col_idx as usize].clone()) - .collect(); - SelectKey { index_no, vals } - }) - .collect() - } - - #[inline] - pub fn index_may_change(&self, update: &[UpdateCol]) -> bool { - update - .iter() - .any(|uc| self.user_index_cols.contains(&uc.idx)) - } -} - -#[inline] -fn layout_match(val: &Val, layout: Layout) -> bool { - match (val, layout) { - (Val::Null, _) => true, - (Val::Byte1(_), Layout::Byte1) - | (Val::Byte2(_), Layout::Byte2) - | (Val::Byte4(_), Layout::Byte4) - | (Val::Byte8(_), Layout::Byte8) - | (Val::VarByte(_), Layout::VarByte) => true, - _ => false, - } -} - -pub struct IndexSchema { - pub keys: Vec, - pub unique: bool, -} - -impl IndexSchema { - #[inline] - pub fn new(keys: Vec, unique: bool) -> Self { - debug_assert!(!keys.is_empty()); - IndexSchema { keys, unique } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct IndexKey { - pub user_col_idx: u16, - pub order: IndexOrder, -} - -impl IndexKey { - #[inline] - pub fn new(user_col_idx: u16) -> Self { - IndexKey { - user_col_idx, - order: IndexOrder::Asc, - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u8)] -pub enum IndexOrder { - Asc, - Desc, -} diff --git a/doradb-storage/src/table/tests.rs b/doradb-storage/src/table/tests.rs index 082076a..cb8c229 100644 --- a/doradb-storage/src/table/tests.rs +++ b/doradb-storage/src/table/tests.rs @@ -1,9 +1,8 @@ -use crate::buffer::FixedBufferPool; -use crate::catalog::Catalog; +use crate::buffer::{BufferPool, FixedBufferPool}; +use crate::catalog::{Catalog, IndexKey, IndexSchema, TableSchema}; use crate::row::ops::{SelectKey, SelectMvcc, UpdateCol}; use crate::session::Session; use crate::table::TableID; -use crate::table::{IndexKey, IndexSchema, TableSchema}; use crate::trx::sys::{TransactionSystem, TrxSysConfig}; use crate::value::{Val, ValKind}; @@ -13,9 +12,9 @@ fn test_mvcc_insert_normal() { const SIZE: i32 = 10000; let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); - - let (catalog, table_id) = create_table(buf_pool); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); + let table_id = create_table(buf_pool, catalog); let table = catalog.get_table(table_id).unwrap(); let mut session = Session::new(); { @@ -29,7 +28,7 @@ fn test_mvcc_insert_normal() { trx = stmt.succeed(); assert!(res.is_ok()); } - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); } { let mut trx = session.begin_trx(trx_sys); @@ -48,11 +47,12 @@ fn test_mvcc_insert_normal() { } trx = stmt.succeed(); } - let _ = trx_sys.commit(trx).await.unwrap(); + let _ = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); FixedBufferPool::drop_static(buf_pool); } }); @@ -64,9 +64,10 @@ fn test_mvcc_update_normal() { const SIZE: i32 = 1000; let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); + let table_id = create_table(buf_pool, catalog); { - let (catalog, table_id) = create_table(buf_pool); let table = catalog.get_table(table_id).unwrap(); let mut session = Session::new(); @@ -81,7 +82,7 @@ fn test_mvcc_update_normal() { trx = stmt.succeed(); assert!(res.is_ok()); } - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // update 1 row with short value let mut trx = session.begin_trx(trx_sys); @@ -95,7 +96,7 @@ fn test_mvcc_update_normal() { let res = table.update_row(buf_pool, &mut stmt, &k1, update1).await; assert!(res.is_ok()); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // update 1 row with long value let mut trx = session.begin_trx(trx_sys); @@ -120,7 +121,7 @@ fn test_mvcc_update_normal() { assert!(row[1] == Val::from(&s2[..])); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // lookup with a new transaction let mut trx = session.begin_trx(trx_sys); @@ -133,10 +134,11 @@ fn test_mvcc_update_normal() { assert!(row[1] == Val::from(&s2[..])); trx = stmt.succeed(); - let _ = trx_sys.commit(trx).await.unwrap(); + let _ = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); FixedBufferPool::drop_static(buf_pool); } }); @@ -148,9 +150,10 @@ fn test_mvcc_delete_normal() { const SIZE: i32 = 1000; let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); + let table_id = create_table(buf_pool, catalog); { - let (catalog, table_id) = create_table(buf_pool); let table = catalog.get_table(table_id).unwrap(); let mut session = Session::new(); @@ -165,7 +168,7 @@ fn test_mvcc_delete_normal() { trx = stmt.succeed(); assert!(res.is_ok()); } - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // delete 1 row let mut trx = session.begin_trx(trx_sys); @@ -180,7 +183,7 @@ fn test_mvcc_delete_normal() { let res = table.select_row(buf_pool, &stmt, &k1, &[0]).await; assert!(res.not_found()); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // lookup row in new transaction let mut trx = session.begin_trx(trx_sys); @@ -188,10 +191,11 @@ fn test_mvcc_delete_normal() { let res = table.select_row(buf_pool, &stmt, &k1, &[0]).await; assert!(res.not_found()); trx = stmt.succeed(); - let _ = trx_sys.commit(trx).await.unwrap(); + let _ = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); FixedBufferPool::drop_static(buf_pool); } }); @@ -201,9 +205,10 @@ fn test_mvcc_delete_normal() { fn test_mvcc_rollback_insert_normal() { smol::block_on(async { let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); + let table_id = create_table(buf_pool, catalog); { - let (catalog, table_id) = create_table(buf_pool); let table = catalog.get_table(table_id).unwrap(); let mut session = Session::new(); @@ -219,7 +224,7 @@ fn test_mvcc_rollback_insert_normal() { .await; assert!(res.is_ok()); trx = stmt.fail(buf_pool, &catalog); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // select 1 row let mut trx = session.begin_trx(trx_sys); @@ -228,10 +233,11 @@ fn test_mvcc_rollback_insert_normal() { let res = table.select_row(buf_pool, &stmt, &key, &[0, 1]).await; assert!(res.not_found()); trx = stmt.succeed(); - _ = trx_sys.commit(trx).await.unwrap(); + _ = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); FixedBufferPool::drop_static(buf_pool); } }); @@ -241,9 +247,10 @@ fn test_mvcc_rollback_insert_normal() { fn test_mvcc_move_insert() { smol::block_on(async { let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); + let table_id = create_table(buf_pool, catalog); { - let (catalog, table_id) = create_table(buf_pool); let table = catalog.get_table(table_id).unwrap(); let mut session = Session::new(); @@ -259,7 +266,7 @@ fn test_mvcc_move_insert() { .await; assert!(res.is_ok()); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // delete it let mut trx = session.begin_trx(trx_sys); @@ -268,7 +275,7 @@ fn test_mvcc_move_insert() { let res = table.delete_row(buf_pool, &mut stmt, &key).await; assert!(res.is_ok()); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // insert again, trigger move+insert let mut trx = session.begin_trx(trx_sys); @@ -282,7 +289,7 @@ fn test_mvcc_move_insert() { .await; assert!(res.is_ok()); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // select 1 row let mut trx = session.begin_trx(trx_sys); @@ -293,10 +300,11 @@ fn test_mvcc_move_insert() { let vals = res.unwrap(); assert!(vals[1] == Val::from("world")); trx = stmt.succeed(); - _ = trx_sys.commit(trx).await.unwrap(); + _ = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); FixedBufferPool::drop_static(buf_pool); } }); @@ -306,9 +314,10 @@ fn test_mvcc_move_insert() { fn test_mvcc_rollback_move_insert() { smol::block_on(async { let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); + let table_id = create_table(buf_pool, catalog); { - let (catalog, table_id) = create_table(buf_pool); let table = catalog.get_table(table_id).unwrap(); let mut session = Session::new(); @@ -325,7 +334,7 @@ fn test_mvcc_rollback_move_insert() { assert!(res.is_ok()); println!("row_id={}", res.unwrap()); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // delete it let mut trx = session.begin_trx(trx_sys); @@ -334,7 +343,7 @@ fn test_mvcc_rollback_move_insert() { let res = table.delete_row(buf_pool, &mut stmt, &key).await; assert!(res.is_ok()); trx = stmt.succeed(); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // insert again, trigger move+insert let mut trx = session.begin_trx(trx_sys); @@ -349,7 +358,7 @@ fn test_mvcc_rollback_move_insert() { assert!(res.is_ok()); println!("row_id={}", res.unwrap()); trx = stmt.fail(buf_pool, &catalog); - session = trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); // select 1 row let mut trx = session.begin_trx(trx_sys); @@ -358,18 +367,18 @@ fn test_mvcc_rollback_move_insert() { let res = table.select_row(buf_pool, &stmt, &key, &[0, 1]).await; assert!(res.not_found()); trx = stmt.succeed(); - _ = trx_sys.commit(trx).await.unwrap(); + _ = trx_sys.commit(trx, buf_pool, &catalog).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); FixedBufferPool::drop_static(buf_pool); } }); } -fn create_table(buf_pool: &'static FixedBufferPool) -> (Catalog, TableID) { - let catalog = Catalog::empty(); - let table_id = catalog.create_table( +fn create_table(buf_pool: &P, catalog: &Catalog

) -> TableID { + catalog.create_table( buf_pool, TableSchema::new( vec![ @@ -378,8 +387,7 @@ fn create_table(buf_pool: &'static FixedBufferPool) -> (Catalog ], vec![IndexSchema::new(vec![IndexKey::new(0)], true)], ), - ); - (catalog, table_id) + ) } fn single_key>(value: V) -> SelectKey { diff --git a/doradb-storage/src/trx/group.rs b/doradb-storage/src/trx/group.rs new file mode 100644 index 0000000..f4de2dc --- /dev/null +++ b/doradb-storage/src/trx/group.rs @@ -0,0 +1,79 @@ +use crate::io::{pwrite, Buf, IocbRawPtr, SparseFile}; +use crate::session::{IntoSession, Session}; +use crate::trx::log::SyncGroup; +use crate::trx::{PrecommitTrx, TrxID}; +use flume::{Receiver, Sender}; +use std::collections::VecDeque; +use std::os::fd::RawFd; +use std::sync::atomic::Ordering; + +/// GroupCommit is optimization to group multiple transactions +/// and perform single IO to speed up overall commit performance. +pub(super) struct GroupCommit { + // Commit group queue, there can be multiple groups in commit phase. + // Each of them submit IO request to AIO manager and then wait for + // pwrite & fsync done. + pub(super) queue: VecDeque, + // Current log file. + pub(super) log_file: Option, + // sequence of current file in this partition, starts from 0. + pub(super) file_seq: u32, +} + +pub(super) enum Commit { + Group(CommitGroup), + Shutdown, +} + +/// CommitGroup groups multiple transactions with only +/// one log IO and at most one fsync() call. +/// It is controlled by two parameters: +/// 1. Maximum IO size, e.g. 16KB. +/// 2. Timeout to wait for next transaction to join. +pub(super) struct CommitGroup { + pub(super) trx_list: Vec, + pub(super) max_cts: TrxID, + pub(super) fd: RawFd, + pub(super) offset: usize, + pub(super) log_buf: Buf, + pub(super) sync_signal: Sender<()>, + pub(super) sync_notifier: Receiver<()>, +} + +impl CommitGroup { + #[inline] + pub(super) fn can_join(&self, trx: &PrecommitTrx) -> bool { + if let Some(redo_bin) = trx.redo_bin.as_ref() { + return redo_bin.len() <= self.log_buf.remaining_capacity(); + } + true + } + + #[inline] + pub(super) fn join(&mut self, mut trx: PrecommitTrx) -> (Session, Receiver<()>) { + debug_assert!(self.max_cts < trx.cts); + if let Some(redo_bin) = trx.redo_bin.take() { + self.log_buf.clone_from_slice(&redo_bin); + } + self.max_cts = trx.cts; + let session = trx.split_session(); + self.trx_list.push(trx); + (session, self.sync_notifier.clone()) + } + + #[inline] + pub(super) fn split(self) -> (IocbRawPtr, SyncGroup) { + let log_bytes = self.log_buf.aligned_len(); + let aio = pwrite(self.max_cts, self.fd, self.offset, self.log_buf); + let iocb_ptr = aio.iocb.load(Ordering::Relaxed); + let sync_group = SyncGroup { + trx_list: self.trx_list, + max_cts: self.max_cts, + log_bytes, + aio, + sync_signal: self.sync_signal, + finished: false, + }; + (iocb_ptr, sync_group) + } +} diff --git a/doradb-storage/src/trx/log.rs b/doradb-storage/src/trx/log.rs new file mode 100644 index 0000000..af812ea --- /dev/null +++ b/doradb-storage/src/trx/log.rs @@ -0,0 +1,481 @@ +use crate::buffer::BufferPool; +use crate::catalog::Catalog; +use crate::error::{Error, Result}; +use crate::io::{ + AIOError, AIOManager, Buf, DirectBuf, FreeListWithFactory, IocbRawPtr, PageBuf, AIO, +}; +use crate::session::{IntoSession, Session}; +use crate::trx::group::{Commit, CommitGroup, GroupCommit}; +use crate::trx::purge::{GCAnalyzer, GCBucket, Purge, GC}; +use crate::trx::sys::TrxSysConfig; +use crate::trx::{CommittedTrx, PrecommitTrx, PreparedTrx, TrxID, MAX_COMMIT_TS, MAX_SNAPSHOT_TS}; +use crossbeam_utils::CachePadded; +use flume::{Receiver, Sender}; +use parking_lot::{Condvar, Mutex, MutexGuard}; +use std::collections::{BTreeMap, VecDeque}; +use std::mem; +use std::os::fd::AsRawFd; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +pub(super) struct LogPartition { + /// Group commit of this partition. + pub(super) group_commit: CachePadded<(Mutex, Condvar)>, + /// Maximum persisted CTS of this partition. + pub(super) persisted_cts: CachePadded, + /// Stats of transaction system. + pub(super) stats: CachePadded, + /// GC channel to send committed transactions to GC threads. + pub(super) gc_chan: Sender, + /// Each GC bucket contains active sts list, committed transaction list and + /// old transaction list. + /// Split into multiple buckets in order to avoid bottleneck of global synchronization. + pub(super) gc_buckets: Box<[GCBucket]>, + /// AIO manager to handle async IO with libaio. + pub(super) aio_mgr: AIOManager, + // Index of log partition in total partitions, starts from 0. + pub(super) log_no: usize, + // Maximum IO size of each group. + pub(super) max_io_size: usize, + // Log file prefix, including partition number. + pub(super) file_prefix: String, + // Free list of page buffer, which is used by commit group to concat + // redo logs. + pub(super) buf_free_list: FreeListWithFactory, + // Standalone thread to handle transaction commit. + // Including submit IO requests, wait IO responses + // and do fsync(). + pub(super) sync_thread: Mutex>>, + /// Standalone thread for GC info analysis. + pub(super) gc_thread: Mutex>>, +} + +impl LogPartition { + #[inline] + fn buf(&self, data: &[u8]) -> Buf { + if data.len() > self.max_io_size { + let buf = DirectBuf::uninit(data.len()); + Buf::Direct(buf) + } else { + let mut buf = self.buf_free_list.pop_elem_or_new(); + buf.set_len(data.len()); + buf.clone_from_slice(0, data); + Buf::Reuse(buf) + } + } + + #[inline] + pub(super) fn min_active_sts(&self) -> TrxID { + let mut min = MAX_SNAPSHOT_TS; + for gc_bucket in &self.gc_buckets { + let ts = gc_bucket.min_active_sts.load(Ordering::Relaxed); + if ts < min { + min = ts; + } + } + min + } + + #[inline] + fn create_new_group( + &self, + mut trx: PrecommitTrx, + mut group_commit_g: MutexGuard<'_, GroupCommit>, + ) -> (Session, Receiver<()>) { + let cts = trx.cts; + let redo_bin = trx.redo_bin.take().unwrap(); + // inside the lock, we only need to determine which range of the log file this transaction + // should write to. + let log_file = group_commit_g.log_file.as_ref().unwrap(); + let offset = match log_file.alloc(redo_bin.len()) { + Ok((offset, _)) => offset, + Err(AIOError::OutOfRange) => { + // todo: rotate if log file is full. + todo!(); + } + Err(_) => unreachable!(), + }; + let fd = log_file.as_raw_fd(); + let log_buf = self.buf(&redo_bin); + let (sync_signal, sync_notifier) = flume::unbounded(); + let session = trx.split_session(); + let new_group = CommitGroup { + trx_list: vec![trx], + max_cts: cts, + fd, + offset, + log_buf, + sync_signal, + sync_notifier: sync_notifier.clone(), + }; + group_commit_g.queue.push_back(Commit::Group(new_group)); + drop(group_commit_g); + + (session, sync_notifier) + } + + #[inline] + pub(super) async fn commit(&self, trx: PreparedTrx, ts: &AtomicU64) -> Result { + let mut group_commit_g = self.group_commit.0.lock(); + let cts = ts.fetch_add(1, Ordering::SeqCst); + debug_assert!(cts < MAX_COMMIT_TS); + let precommit_trx = trx.fill_cts(cts); + if group_commit_g.queue.is_empty() { + let (session, sync_notifier) = self.create_new_group(precommit_trx, group_commit_g); + self.group_commit.1.notify_one(); // notify sync thread to work. + + let _ = sync_notifier.recv_async().await; // wait for fsync + assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); + return Ok(session); + } + let last_group = match group_commit_g.queue.back_mut().unwrap() { + Commit::Shutdown => return Err(Error::TransactionSystemShutdown), + Commit::Group(group) => group, + }; + if last_group.can_join(&precommit_trx) { + let (session, sync_notifier) = last_group.join(precommit_trx); + drop(group_commit_g); // unlock to let other transactions to enter commit phase. + + let _ = sync_notifier.recv_async().await; // wait for fsync + assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); + return Ok(session); + } + + let (session, sync_notifier) = self.create_new_group(precommit_trx, group_commit_g); + + let _ = sync_notifier.recv_async().await; // wait for fsync + assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); + Ok(session) + } + + #[inline] + fn update_stats( + &self, + trx_count: usize, + commit_count: usize, + log_bytes: usize, + sync_count: usize, + sync_nanos: usize, + ) { + self.stats.trx_count.fetch_add(trx_count, Ordering::Relaxed); + self.stats + .commit_count + .fetch_add(commit_count, Ordering::Relaxed); + self.stats.log_bytes.fetch_add(log_bytes, Ordering::Relaxed); + self.stats + .sync_count + .fetch_add(sync_count, Ordering::Relaxed); + self.stats + .sync_nanos + .fetch_add(sync_nanos, Ordering::Relaxed); + } + + #[inline] + fn try_fetch_io_reqs( + &self, + io_reqs: &mut Vec, + sync_groups: &mut VecDeque, + ) -> bool { + let mut group_commit_g = self.group_commit.0.lock(); + loop { + match group_commit_g.queue.pop_front() { + None => return false, + Some(Commit::Shutdown) => return true, + Some(Commit::Group(cg)) => { + let (iocb_ptr, sg) = cg.split(); + io_reqs.push(iocb_ptr); + sync_groups.push_back(sg); + } + } + } + } + + #[inline] + fn fetch_io_reqs( + &self, + io_reqs: &mut Vec, + sync_groups: &mut VecDeque, + ) -> bool { + let mut group_commit_g = self.group_commit.0.lock(); + loop { + loop { + match group_commit_g.queue.pop_front() { + None => break, + Some(Commit::Shutdown) => { + return true; + } + Some(Commit::Group(cg)) => { + let (iocb_ptr, sg) = cg.split(); + io_reqs.push(iocb_ptr); + sync_groups.push_back(sg); + } + } + } + if !io_reqs.is_empty() { + return false; + } + self.group_commit + .1 + .wait_for(&mut group_commit_g, Duration::from_secs(1)); + } + } + + #[inline] + pub(super) fn io_loop_noop(&self, config: &TrxSysConfig) { + let io_depth = config.io_depth_per_log; + let mut io_reqs = Vec::with_capacity(io_depth * 2); + let mut sync_groups = VecDeque::with_capacity(io_depth * 2); + let mut shutdown = false; + loop { + if !shutdown { + shutdown |= self.fetch_io_reqs(&mut io_reqs, &mut sync_groups); + } + if !io_reqs.is_empty() { + let mut trx_count = 0; + let mut commit_count = 0; + let mut log_bytes = 0; + for sg in &sync_groups { + trx_count += sg.trx_list.len(); + commit_count += 1; + log_bytes += sg.log_bytes; + } + + let max_cts = sync_groups.back().as_ref().unwrap().max_cts; + + self.persisted_cts.store(max_cts, Ordering::SeqCst); + + io_reqs.clear(); + + for mut sync_group in sync_groups.drain(..) { + let committed_trx_list: Vec<_> = mem::take(&mut sync_group.trx_list) + .into_iter() + .map(|trx| trx.commit()) + .collect(); + let _ = self.gc_chan.send(GC::Commit(committed_trx_list)); + } + + self.update_stats(trx_count, commit_count, log_bytes, 1, 0); + } + if shutdown { + return; + } + } + } + + #[inline] + pub(super) fn io_loop(&self, config: &TrxSysConfig) { + let syncer = { + self.group_commit + .0 + .lock() + .log_file + .as_ref() + .unwrap() + .syncer() + }; + let io_depth = config.io_depth_per_log; + let mut inflight = BTreeMap::new(); + let mut in_progress = 0; + let mut io_reqs = Vec::with_capacity(io_depth * 2); + let mut sync_groups = VecDeque::with_capacity(io_depth * 2); + let mut events = self.aio_mgr.events(); + let mut written = vec![]; + let mut shutdown = false; + loop { + debug_assert!( + io_reqs.len() == sync_groups.len(), + "pending IO number equals to pending group number" + ); + if !shutdown { + if in_progress == 0 { + // there is no processing AIO, so we can block on waiting for next request. + shutdown |= self.fetch_io_reqs(&mut io_reqs, &mut sync_groups); + } else { + // only try non-blocking way to fetch incoming requests, because we also + // need to finish previous IO. + shutdown |= self.try_fetch_io_reqs(&mut io_reqs, &mut sync_groups); + } + } + let (io_submit_count, io_submit_nanos) = if !io_reqs.is_empty() { + let start = Instant::now(); + // try to submit as many IO requests as possible + let limit = io_depth - in_progress; + let submit_count = self.aio_mgr.submit_limit(&mut io_reqs, limit); + // add sync groups to inflight tree. + for sync_group in sync_groups.drain(..submit_count) { + let res = inflight.insert(sync_group.max_cts, sync_group); + debug_assert!(res.is_none()); + } + in_progress += submit_count; + debug_assert!(in_progress <= io_depth); + (1, start.elapsed().as_nanos() as usize) + } else { + (0, 0) + }; + + // wait for any request to be done. + let (io_wait_count, io_wait_nanos) = if in_progress != 0 { + let start = Instant::now(); + let finish_count = + self.aio_mgr + .wait_at_least(&mut events, 1, |cts, res| match res { + Ok(len) => { + let sg = inflight.get_mut(&cts).expect("finish inflight IO"); + debug_assert!(sg.aio.buf.as_ref().unwrap().aligned_len() == len); + sg.finished = true; + } + Err(err) => { + let sg = inflight.remove(&cts).unwrap(); + unimplemented!( + "AIO error: task.cts={}, task.log_bytes={}, {}", + sg.max_cts, + sg.log_bytes, + err + ) + } + }); + in_progress -= finish_count; + (1, start.elapsed().as_nanos() as usize) + } else { + (0, 0) + }; + + // after logs are written to disk, we need to do fsync to make sure its durablity. + // Also, we need to check if any previous transaction is also done and notify them. + written.clear(); + let (trx_count, commit_count, log_bytes) = shrink_inflight(&mut inflight, &mut written); + if !written.is_empty() { + let max_cts = written.last().unwrap().max_cts; + + let start = Instant::now(); + match config.log_sync { + LogSync::Fsync => syncer.fsync(), + LogSync::Fdatasync => syncer.fdatasync(), + LogSync::None => (), + } + let sync_dur = start.elapsed(); + + self.persisted_cts.store(max_cts, Ordering::SeqCst); + + // put IO buffer back into free list. + for mut sync_group in written.drain(..) { + if let Some(Buf::Reuse(elem)) = sync_group.aio.take_buf() { + self.buf_free_list.push_elem(elem); // return buf to free list for future reuse + } + // commit transactions to let waiting read operations to continue + let committed_trx_list: Vec<_> = mem::take(&mut sync_group.trx_list) + .into_iter() + .map(|trx| trx.commit()) + .collect(); + // send committed transaction list to GC thread + let _ = self.gc_chan.send(GC::Commit(committed_trx_list)); + drop(sync_group); // notify transaction thread to continue. + } + + self.update_stats( + trx_count, + commit_count, + log_bytes, + 1, + sync_dur.as_nanos() as usize, + ); + } + if io_submit_count != 0 { + self.stats + .io_submit_count + .fetch_add(io_submit_count, Ordering::Relaxed); + self.stats + .io_submit_nanos + .fetch_add(io_submit_nanos, Ordering::Relaxed); + } + if io_wait_count != 0 { + self.stats + .io_wait_count + .fetch_add(io_wait_count, Ordering::Relaxed); + self.stats + .io_wait_nanos + .fetch_add(io_wait_nanos, Ordering::Relaxed); + } + + if shutdown && inflight.is_empty() { + return; + } + } + } +} + +pub(super) struct SyncGroup { + pub(super) trx_list: Vec, + pub(super) max_cts: TrxID, + pub(super) log_bytes: usize, + pub(super) aio: AIO, + // Signal to notify transaction threads. + // This field won't be used, until the group is dropped. + #[allow(dead_code)] + pub(super) sync_signal: Sender<()>, + pub(super) finished: bool, +} + +#[derive(Default)] +pub struct LogPartitionStats { + pub commit_count: AtomicUsize, + pub trx_count: AtomicUsize, + pub log_bytes: AtomicUsize, + pub sync_count: AtomicUsize, + pub sync_nanos: AtomicUsize, + pub io_submit_count: AtomicUsize, + pub io_submit_nanos: AtomicUsize, + pub io_wait_count: AtomicUsize, + pub io_wait_nanos: AtomicUsize, + pub purge_trx_count: AtomicUsize, + pub purge_row_count: AtomicUsize, + pub purge_index_count: AtomicUsize, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogSync { + None, + Fsync, + Fdatasync, +} + +impl FromStr for LogSync { + type Err = Error; + + #[inline] + fn from_str(s: &str) -> std::result::Result { + if s.eq_ignore_ascii_case("fsync") { + Ok(LogSync::Fsync) + } else if s.eq_ignore_ascii_case("fdatasync") { + Ok(LogSync::Fdatasync) + } else if s.eq_ignore_ascii_case("none") { + Ok(LogSync::None) + } else { + Err(Error::InvalidArgument) + } + } +} + +#[inline] +fn shrink_inflight( + tree: &mut BTreeMap, + buffer: &mut Vec, +) -> (usize, usize, usize) { + let mut trx_count = 0; + let mut commit_count = 0; + let mut log_bytes = 0; + while let Some(entry) = tree.first_entry() { + let task = entry.get(); + if task.finished { + trx_count += task.trx_list.len(); + commit_count += 1; + log_bytes += task.log_bytes; + buffer.push(entry.remove()); + } else { + break; // stop at the transaction which is not persisted. + } + } + (trx_count, commit_count, log_bytes) +} diff --git a/doradb-storage/src/trx/mod.rs b/doradb-storage/src/trx/mod.rs index 827f6e1..8932a0f 100644 --- a/doradb-storage/src/trx/mod.rs +++ b/doradb-storage/src/trx/mod.rs @@ -15,6 +15,9 @@ //! c) If exists, check the timestamp in entry head. If it's larger than current STS, means //! it's invisible, undo change and go to next version in the chain... //! d) If less than current STS, return current version. +pub mod group; +pub mod log; +pub mod purge; pub mod redo; pub mod row; pub mod sys; @@ -23,7 +26,7 @@ pub mod undo; use crate::session::{InternalSession, IntoSession, Session}; use crate::stmt::Statement; use crate::trx::redo::{RedoBin, RedoEntry, RedoKind, RedoLog}; -use crate::trx::undo::{IndexUndoLogs, RowUndoLogs}; +use crate::trx::undo::{IndexPurge, IndexUndoLogs, RowUndoLogs}; use crate::value::Val; use flume::{Receiver, Sender}; use parking_lot::Mutex; @@ -127,7 +130,9 @@ pub struct ActiveTrx { // Snapshot timestamp. pub sts: TrxID, // which log partition it belongs to. - pub log_partition_idx: usize, + pub log_no: usize, + // which GC bucket it belongs to. + pub gc_no: usize, // transaction-level undo logs of row data. pub(crate) row_undo: RowUndoLogs, // transaction-level index undo operations. @@ -141,11 +146,18 @@ pub struct ActiveTrx { impl ActiveTrx { /// Create a new transaction. #[inline] - pub fn new(session: Option>, trx_id: TrxID, sts: TrxID) -> Self { + pub fn new( + session: Option>, + trx_id: TrxID, + sts: TrxID, + log_no: usize, + gc_no: usize, + ) -> Self { ActiveTrx { status: Arc::new(SharedTrxStatus::new(trx_id)), sts, - log_partition_idx: 0, + log_no, + gc_no, row_undo: RowUndoLogs::empty(), index_undo: IndexUndoLogs::empty(), redo: vec![], @@ -168,11 +180,6 @@ impl ActiveTrx { self.status.ts() } - #[inline] - pub fn set_log_partition(&mut self, log_partition_idx: usize) { - self.log_partition_idx = log_partition_idx; - } - /// Starts a statement. #[inline] pub fn start_stmt(self) -> Statement { @@ -196,6 +203,8 @@ impl ActiveTrx { return PreparedTrx { status: Arc::clone(&self.status), sts: self.sts, + log_no: self.log_no, + gc_no: self.gc_no, redo_bin: None, row_undo: RowUndoLogs::empty(), index_undo: IndexUndoLogs::empty(), @@ -223,25 +232,19 @@ impl ActiveTrx { Some(redo_bin) }; let row_undo = mem::take(&mut self.row_undo); + let index_undo = mem::take(&mut self.index_undo); PreparedTrx { status: self.status.clone(), sts: self.sts, + log_no: self.log_no, + gc_no: self.gc_no, redo_bin, row_undo, - // We do not have rollback logic when transaction enters commit phase. - // Instead, we prepare them for GC because we assume the transaction should - // be successfully committed. - index_undo: self.index_undo.prepare_for_gc(), + index_undo, session: self.session.take(), } } - /// Rollback current transaction. - #[inline] - pub fn rollback(self) { - todo!() - } - /// Add one redo log entry. /// This function is only use for test purpose. #[inline] @@ -272,6 +275,18 @@ impl Drop for ActiveTrx { } } +impl IntoSession for ActiveTrx { + #[inline] + fn into_session(mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } + + #[inline] + fn split_session(&mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } +} + static PSEUDO_SYSBENCH_VAR1: [u8; 60] = [3; 60]; static PSEUDO_SYSBENCH_VAR2: [u8; 120] = [4; 120]; @@ -279,6 +294,8 @@ static PSEUDO_SYSBENCH_VAR2: [u8; 120] = [4; 120]; pub struct PreparedTrx { status: Arc, sts: TrxID, + log_no: usize, + gc_no: usize, redo_bin: Option, row_undo: RowUndoLogs, index_undo: IndexUndoLogs, @@ -295,14 +312,19 @@ impl PreparedTrx { None }; let row_undo = mem::take(&mut self.row_undo); - let index_undo = mem::take(&mut self.index_undo); + // Once we get a concrete CTS, we won't rollback this transaction. + // So we convert IndexUndo to IndexGC, and let purge threads to + // remove unused index entries. + // let index_undo = mem::take(&mut self.index_undo); + let index_gc = self.index_undo.commit_for_gc(); PrecommitTrx { status: Arc::clone(&self.status), sts: self.sts, cts, + gc_no: self.gc_no, redo_bin, row_undo, - index_undo, + index_gc, session: self.session.take(), } } @@ -340,9 +362,10 @@ pub struct PrecommitTrx { status: Arc, pub sts: TrxID, pub cts: TrxID, + pub gc_no: usize, pub redo_bin: Option, pub row_undo: RowUndoLogs, - pub index_undo: IndexUndoLogs, + pub index_gc: Vec, session: Option>, } @@ -365,12 +388,13 @@ impl PrecommitTrx { drop(g.take()); } let row_undo = mem::take(&mut self.row_undo); - let index_undo = mem::take(&mut self.index_undo); + let index_gc = mem::take(&mut self.index_gc); CommittedTrx { sts: self.sts, cts: self.cts, + gc_no: self.gc_no, row_undo, - index_undo, + index_gc, session: self.session.take(), } } @@ -381,7 +405,7 @@ impl Drop for PrecommitTrx { fn drop(&mut self) { assert!(self.redo_bin.is_none(), "redo should be cleared"); assert!(self.row_undo.is_empty(), "row undo should be cleared"); - assert!(self.index_undo.is_empty(), "index undo should be cleared"); + assert!(self.index_gc.is_empty(), "index gc should be cleared"); } } @@ -400,8 +424,9 @@ impl IntoSession for PrecommitTrx { pub struct CommittedTrx { pub sts: TrxID, pub cts: TrxID, + pub gc_no: usize, pub row_undo: RowUndoLogs, - pub index_undo: IndexUndoLogs, + pub index_gc: Vec, session: Option>, } diff --git a/doradb-storage/src/trx/purge.rs b/doradb-storage/src/trx/purge.rs new file mode 100644 index 0000000..13c7855 --- /dev/null +++ b/doradb-storage/src/trx/purge.rs @@ -0,0 +1,599 @@ +use crate::buffer::BufferPool; +use crate::catalog::{Catalog, TableCache}; +use crate::trx::log::LogPartition; +use crate::trx::sys::TransactionSystem; +use crate::trx::{CommittedTrx, TrxID, MAX_SNAPSHOT_TS}; +use crossbeam_utils::CachePadded; +use flume::{Receiver, Sender}; +use parking_lot::Mutex; +use std::collections::{BTreeSet, HashMap, VecDeque}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::thread::{self, JoinHandle}; + +impl TransactionSystem { + #[inline] + pub(super) fn start_purge_threads( + &'static self, + buf_pool: &'static P, + catalog: &'static Catalog

, + purge_chan: Receiver, + ) { + if self.config.purge_threads == 1 { + // single-threaded purger + let handle = thread::Builder::new() + .name(String::from("Purge-Thread")) + .spawn(move || { + let mut purger = PurgeSingleThreaded::default(); + purger.purge_loop(buf_pool, catalog, self, purge_chan); + }) + .unwrap(); + let mut g = self.purge_threads.lock(); + g.push(handle); + } else { + // multi-threaded purger + let (mut dispatcher, executors) = self.dispatch_purge(buf_pool, catalog); + let handle = thread::Builder::new() + .name(String::from("Purge-Dispatcher")) + .spawn(move || { + dispatcher.purge_loop(buf_pool, catalog, self, purge_chan); + }) + .unwrap(); + let mut g = self.purge_threads.lock(); + g.push(handle); + g.extend(executors); + } + } + + #[inline] + pub(super) fn refresh_min_active_sts(&self) -> Option { + // Refresh minimum active STS. + // first, we load current STS as upperbound. + // There might be case a transaction begins and commits + // when we refresh min_active_sts, if we do not hold this + // upperbound, we may clear the new transaction incorrectly. + let max_active_sts = self.ts.load(Ordering::SeqCst); + // then, load actual minimum active STS from all GC buckets. + let mut min_ts = MAX_SNAPSHOT_TS; + for partition in &*self.log_partitions { + let ts = partition.min_active_sts(); + min_ts = min_ts.min(ts); + } + min_ts = min_ts.min(max_active_sts); + + // update global min_active_sts + let old_ts = self.min_active_sts.load(Ordering::Relaxed); + if min_ts > old_ts { + // Only single thread will update this watermark, so it's safe to overwrite + // current value. + self.min_active_sts.store(min_ts, Ordering::SeqCst); + return Some(min_ts); + } + None + } + + #[inline] + pub(super) fn dispatch_purge( + &'static self, + buf_pool: &'static P, + catalog: &'static Catalog

, + ) -> (PurgeDispatcher, Vec>) { + let mut handles = vec![]; + let mut chans = vec![]; + for i in 0..self.config.purge_threads { + let (tx, rx) = flume::unbounded(); + chans.push(tx); + let handle = thread::Builder::new() + .name(format!("Purge-Executor-{}", i)) + .spawn(move || { + let mut purger = PurgeExecutor::default(); + purger.purge_task_loop(buf_pool, catalog, self, rx); + }) + .unwrap(); + handles.push(handle); + } + (PurgeDispatcher(chans), handles) + } + + #[inline] + pub(super) fn purge_trx_list( + &self, + buf_pool: &P, + catalog: &Catalog

, + log_no: usize, + trx_list: Vec, + min_active_sts: TrxID, + ) { + let partition = &self.log_partitions[log_no]; + let mut table_cache = TableCache::new(catalog); + let purge_trx_count = trx_list.len(); + let mut purge_row_count = 0; + let mut purge_index_count = 0; + for trx in trx_list { + purge_row_count += trx.row_undo.len(); + for ip in &trx.index_gc { + if let Some(table) = table_cache.get_table(ip.table_id) { + if table.delete_index(buf_pool, &ip.key, ip.row_id, min_active_sts) { + purge_index_count += 1; + } + } + } + } + partition + .stats + .purge_trx_count + .fetch_add(purge_trx_count, Ordering::Relaxed); + partition + .stats + .purge_row_count + .fetch_add(purge_row_count, Ordering::Relaxed); + partition + .stats + .purge_index_count + .fetch_add(purge_index_count, Ordering::Relaxed); + } +} + +/// GCBucket is used for GC analyzer to store and analyze GC related information, +/// including committed transaction list, old transaction list, active snapshot timestamp +/// list, etc. +pub(super) struct GCBucket { + /// Committed transaction list. + /// When a transaction is committed, it will be put into this queue in sequence. + /// Head is always oldest and tail is newest. + pub(super) committed_trx_list: CachePadded>>, + /// Active snapshot timestamp list. + /// The smallest value equals to min_active_sts. + pub(super) active_sts_list: CachePadded>>, + /// Minimum active snapshot sts of this bucket. + pub(super) min_active_sts: CachePadded, +} + +impl GCBucket { + /// Create a new GC bucket. + #[inline] + pub(super) fn new() -> Self { + GCBucket { + committed_trx_list: CachePadded::new(Mutex::new(VecDeque::new())), + active_sts_list: CachePadded::new(Mutex::new(BTreeSet::new())), + min_active_sts: CachePadded::new(AtomicU64::new(MAX_SNAPSHOT_TS)), + } + } + + /// Get committed transaction list to purge. + #[inline] + pub(super) fn get_purge_list(&self, min_active_sts: TrxID, trx_list: &mut Vec) { + // If a transaction's committed timestamp is less than the smallest + // snapshot timestamp of all active transactions, it means this transction's + // data vesion is latest and all its undo log can be purged. + // So we move such transactions from commited list to old list. + let mut g = self.committed_trx_list.lock(); + while let Some(trx) = g.front() { + if trx.cts < min_active_sts { + trx_list.push(g.pop_front().unwrap()); + } else { + break; + } + } + } + + /// Remove active STS. + /// This method is used for transaction rollback and read-only transaction commit. + #[inline] + pub(super) fn remove_active_sts(&self, sts: TrxID) { + let mut g = self.active_sts_list.lock(); + let res = g.remove(&sts); + debug_assert!(res); + } +} + +pub(super) enum GC { + Stop, + Commit(Vec), +} + +#[derive(Default)] +pub(super) struct GCAnalyzer { + map: HashMap>, +} + +impl GCAnalyzer { + #[inline] + pub fn gc_loop( + &mut self, + partition: &'static LogPartition, + gc_rx: Receiver, + purge_chan: Sender, + enabled: bool, + ) { + while let Ok(msg) = gc_rx.recv() { + match msg { + GC::Stop => return, + GC::Commit(trx_list) => { + if enabled { + let min_active_sts_may_change = self.analyze_commit(partition, trx_list); + if min_active_sts_may_change { + let _ = purge_chan.send(Purge::Next); + } + } + } + } + } + } + + /// Analyze committed transaction and modify active sts list and committed transaction list. + /// Returns whether min_active_sts may change. + #[inline] + fn analyze_commit(&mut self, partition: &LogPartition, trx_list: Vec) -> bool { + self.map.clear(); + for trx in trx_list { + let vec = self.map.entry(trx.gc_no).or_default(); + vec.push(trx); + } + let mut changed = false; + for (gc_no, trx_list) in self.map.drain() { + let gc_bucket = &partition.gc_buckets[gc_no]; + // update both active sts list and committed transaction list + let mut active_sts_list = gc_bucket.active_sts_list.lock(); + { + let mut committed_trx_list = gc_bucket.committed_trx_list.lock(); + for trx in trx_list { + let res = active_sts_list.remove(&trx.sts); + debug_assert!(res); + committed_trx_list.push_back(trx); + } + } + // update minimum active STS + // separate load and store is safe because this update will only happen when lock of + // active_sts_list is acquired. + let min_active_sts = gc_bucket.min_active_sts.load(Ordering::Relaxed); + if let Some(sts) = active_sts_list.first().cloned() { + if sts > min_active_sts { + gc_bucket.min_active_sts.store(sts, Ordering::Relaxed); + changed = true; + } + } else { + // as we just commit at least one transaction in this bucket, the min_active_sts must not be MAX + debug_assert!(min_active_sts != MAX_SNAPSHOT_TS); + // mark as MAX_STS to indicate there is no active transaction. + gc_bucket + .min_active_sts + .store(MAX_SNAPSHOT_TS, Ordering::Relaxed); + changed = true; + } + } + changed + } +} + +pub enum Purge { + Stop, + Next, +} + +struct PurgeTask { + log_no: usize, + gc_no: usize, + min_active_sts: TrxID, + signal: Sender<()>, +} + +pub trait PurgeLoop { + fn purge_loop( + &mut self, + buf_pool: &P, + catalog: &Catalog

, + trx_sys: &TransactionSystem, + purge_chan: Receiver, + ); +} + +#[derive(Default)] +pub struct PurgeSingleThreaded; + +impl PurgeLoop for PurgeSingleThreaded { + #[inline] + fn purge_loop( + &mut self, + buf_pool: &P, + catalog: &Catalog

, + trx_sys: &TransactionSystem, + purge_chan: Receiver, + ) { + while let Ok(purge) = purge_chan.recv() { + match purge { + Purge::Stop => return, + Purge::Next => { + // Cascade multiple Next message to avoid unnecessary work. + while let Ok(p) = purge_chan.try_recv() { + match p { + Purge::Stop => return, + Purge::Next => (), + } + } + if let Some(min_active_sts) = trx_sys.refresh_min_active_sts() { + for partition in &*trx_sys.log_partitions { + let mut trx_list = vec![]; + for gc_bucket in &partition.gc_buckets { + gc_bucket.get_purge_list(min_active_sts, &mut trx_list); + } + let log_no = partition.log_no; + trx_sys.purge_trx_list( + buf_pool, + catalog, + log_no, + trx_list, + min_active_sts, + ); + } + } + } + } + } + } +} + +pub struct PurgeDispatcher(Vec>); + +impl PurgeLoop for PurgeDispatcher { + #[inline] + fn purge_loop( + &mut self, + _buf_pool: &P, + _catalog: &Catalog

, + trx_sys: &TransactionSystem, + purge_chan: Receiver, + ) { + // let chans = self.init(trx_sys); + let mut dispatch_no: usize = 0; + 'DISPATCH_LOOP: while let Ok(purge) = purge_chan.recv() { + match purge { + Purge::Stop => break 'DISPATCH_LOOP, + Purge::Next => { + // Cascade multiple Next message to avoid unnecessary work. + while let Ok(p) = purge_chan.try_recv() { + match p { + Purge::Stop => break 'DISPATCH_LOOP, + Purge::Next => (), + } + } + if let Some(min_active_sts) = trx_sys.refresh_min_active_sts() { + // dispatch tasks to executors + let (signal, notify) = flume::unbounded(); + for partition in &*trx_sys.log_partitions { + let log_no = partition.log_no; + for gc_no in 0..partition.gc_buckets.len() { + let task = PurgeTask { + log_no, + gc_no, + min_active_sts, + signal: signal.clone(), + }; + let _ = self.0[dispatch_no % self.0.len()].send(task); + dispatch_no += 1; + } + } + drop(signal); + // wait for all executors finish their tasks. + let _ = notify.recv(); + } + } + } + } + + // notify executors to quit + self.0.clear(); + } +} + +#[derive(Default)] +pub struct PurgeExecutor; + +impl PurgeExecutor { + #[inline] + fn purge_task_loop( + &mut self, + buf_pool: &P, + catalog: &Catalog

, + trx_sys: &TransactionSystem, + purge_chan: Receiver, + ) { + while let Ok(PurgeTask { + log_no, + gc_no, + min_active_sts, + signal, + }) = purge_chan.recv() + { + let mut trx_list = vec![]; + let partition = &trx_sys.log_partitions[log_no]; + partition.gc_buckets[gc_no].get_purge_list(min_active_sts, &mut trx_list); + // actual purge here + trx_sys.purge_trx_list(buf_pool, catalog, log_no, trx_list, min_active_sts); + drop(signal); // notify dispatcher + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::buffer::guard::PageSharedGuard; + use crate::buffer::FixedBufferPool; + use crate::catalog::tests::table1; + use crate::catalog::Catalog; + use crate::index::RowLocation; + use crate::latch::LatchFallbackMode; + use crate::row::ops::SelectKey; + use crate::row::RowPage; + use crate::session::Session; + use crate::trx::sys::TrxSysConfig; + use crate::value::Val; + use std::time::{Duration, Instant}; + + #[test] + fn test_trx_purge_single_thread() { + const PURGE_SIZE: usize = 1000; + let buf_pool = FixedBufferPool::with_capacity_static(16 * 1024 * 1024).unwrap(); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default() + .gc(true) + .purge_threads(1) + .build_static(buf_pool, catalog); + + smol::block_on(async { + let table_id = table1(buf_pool, catalog); + let table = catalog.get_table(table_id).unwrap(); + let mut session = Session::new(); + // insert + for i in 0..PURGE_SIZE { + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row(buf_pool, &mut stmt, vec![Val::from(i as i32)]) + .await; + assert!(res.is_ok()); + trx = stmt.succeed(); + let res = trx_sys.commit(trx, buf_pool, catalog).await; + assert!(res.is_ok()); + session = res.unwrap(); + } + // delete + for i in 0..PURGE_SIZE { + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let key = SelectKey::new(0, vec![Val::from(i as i32)]); + let res = table.delete_row(buf_pool, &mut stmt, &key).await; + assert!(res.is_ok()); + trx = stmt.succeed(); + let res = trx_sys.commit(trx, buf_pool, catalog).await; + assert!(res.is_ok()); + session = res.unwrap(); + } + }); + // wait for GC. + let start = Instant::now(); + loop { + let stats = trx_sys.trx_sys_stats(); + assert!(stats.purge_trx_count <= PURGE_SIZE * 2); + assert!(stats.purge_row_count <= PURGE_SIZE * 2); + assert!(stats.purge_index_count <= PURGE_SIZE); + println!( + "purge_trx={},purge_row={},purge_index={}", + stats.purge_trx_count, stats.purge_row_count, stats.purge_index_count + ); + if stats.purge_trx_count == PURGE_SIZE * 2 + && stats.purge_row_count == PURGE_SIZE * 2 + && stats.purge_index_count == PURGE_SIZE + { + break; + } + if start.elapsed() >= Duration::from_secs(3) { + panic!("gc timeout"); + } else { + std::thread::sleep(Duration::from_millis(100)); + } + } + unsafe { + TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); + FixedBufferPool::drop_static(buf_pool); + } + } + + #[test] + fn test_trx_purge_multi_threads() { + smol::block_on(async { + const PURGE_SIZE: usize = 1000; + let buf_pool = FixedBufferPool::with_capacity_static(16 * 1024 * 1024).unwrap(); + let catalog = Catalog::empty_static(); + let trx_sys = TrxSysConfig::default() + .gc(true) + .purge_threads(2) + .build_static(buf_pool, catalog); + + let table_id = table1(buf_pool, catalog); + let table = catalog.get_table(table_id).unwrap(); + let mut session = Session::new(); + // insert + for i in 0..PURGE_SIZE { + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row(buf_pool, &mut stmt, vec![Val::from(i as i32)]) + .await; + assert!(res.is_ok()); + trx = stmt.succeed(); + let res = trx_sys.commit(trx, buf_pool, catalog).await; + assert!(res.is_ok()); + session = res.unwrap(); + } + // delete + for i in 0..PURGE_SIZE { + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let key = SelectKey::new(0, vec![Val::from(i as i32)]); + let res = table.delete_row(buf_pool, &mut stmt, &key).await; + assert!(res.is_ok()); + trx = stmt.succeed(); + let res = trx_sys.commit(trx, buf_pool, catalog).await; + assert!(res.is_ok()); + session = res.unwrap(); + } + + // wait for GC. + let start = Instant::now(); + let mut gc_timeout = false; + loop { + let stats = trx_sys.trx_sys_stats(); + assert!(stats.purge_trx_count <= PURGE_SIZE * 2); + assert!(stats.purge_row_count <= PURGE_SIZE * 2); + assert!(stats.purge_index_count <= PURGE_SIZE); + println!( + "purge_trx={},purge_row={},purge_index={}", + stats.purge_trx_count, stats.purge_row_count, stats.purge_index_count + ); + if stats.purge_trx_count == PURGE_SIZE * 2 + && stats.purge_row_count == PURGE_SIZE * 2 + && stats.purge_index_count == PURGE_SIZE + { + break; + } + if start.elapsed() >= Duration::from_secs(3) { + // panic!("gc timeout"); + gc_timeout = true; + break; + } else { + std::thread::sleep(Duration::from_millis(100)); + } + } + if gc_timeout { + // see which one is not purged, and its cts. + let index = table.sec_idx[0].unique().unwrap(); + let mut remained_row_ids = vec![]; + index.scan_values(&mut remained_row_ids); + println!("gc timeout, remained_row_ids={:?}", remained_row_ids); + let row_id = remained_row_ids[0]; + let location = table.blk_idx.find_row_id(buf_pool, row_id); + let page_id = match location { + RowLocation::RowPage(page_id) => page_id, + _ => unreachable!(), + }; + let page_guard: PageSharedGuard<'_, RowPage> = buf_pool + .get_page(page_id, LatchFallbackMode::Shared) + .block_until_shared(); + let access = page_guard.read_row_by_id(row_id); + let status = access.latest_status(); + println!("row status={:?}", status); + } + println!( + "final min_active_sts={}", + trx_sys.min_active_sts.load(Ordering::Relaxed) + ); + unsafe { + TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); + FixedBufferPool::drop_static(buf_pool); + } + }); + } +} diff --git a/doradb-storage/src/trx/row.rs b/doradb-storage/src/trx/row.rs index 193237f..8502135 100644 --- a/doradb-storage/src/trx/row.rs +++ b/doradb-storage/src/trx/row.rs @@ -1,12 +1,12 @@ use crate::buffer::guard::PageSharedGuard; +use crate::catalog::TableSchema; use crate::row::ops::{ReadRow, SelectKey, UndoCol, UpdateCol, UpdateRow}; use crate::row::{Row, RowID, RowMut, RowPage, RowRead}; -use crate::table::TableSchema; use crate::trx::undo::{ NextRowUndo, NextRowUndoStatus, NextTrxCTS, OwnedRowUndo, RowUndoBranch, RowUndoHead, RowUndoKind, RowUndoRef, }; -use crate::trx::{trx_is_committed, ActiveTrx, SharedTrxStatus}; +use crate::trx::{trx_is_committed, ActiveTrx, SharedTrxStatus, TrxID, GLOBAL_VISIBLE_COMMIT_TS}; use crate::value::Val; use parking_lot::{RwLockReadGuard, RwLockWriteGuard}; use std::collections::{BTreeMap, BTreeSet, HashMap}; @@ -31,17 +31,17 @@ impl RowReadAccess<'_> { #[inline] pub fn latest_status(&self) -> RowLatestStatus { - if let Some(head) = &*self.undo { + let cts = if let Some(head) = &*self.undo { let ts = head.status.ts(); if !trx_is_committed(ts) { return RowLatestStatus::Uncommitted; } - } + ts + } else { + GLOBAL_VISIBLE_COMMIT_TS + }; // the row is committed, check if it's deleted. - if self.row().is_deleted() { - return RowLatestStatus::Deleted; - } - RowLatestStatus::Committed + RowLatestStatus::Committed(cts, self.row().is_deleted()) } #[inline] @@ -196,6 +196,104 @@ impl RowReadAccess<'_> { } } } + + /// Check whether a key same as input can be found in version chain. + /// This method is similar to read_row_mvcc() but only consider index key + /// match logic. + /// It is used by purge threads to correctly remove unused index entry. + #[inline] + pub fn is_any_version_matches_key( + &self, + schema: &TableSchema, + key: &SelectKey, + sts: TrxID, + ) -> bool { + match &*self.undo { + None => { + let row = self.row(); + return !row.is_deleted() && !row.is_key_different(schema, key); + } + Some(undo_head) => { + let ts = undo_head.status.ts(); + if trx_is_committed(ts) { + if sts > ts { + let row = self.row(); + return !row.is_deleted() && !row.is_key_different(schema, key); + } + } + // page data is invisible, we have to backtrace version chain. + let row = self.row(); + let vals = row.clone_index_vals(schema, key.index_no); + let mvcc_key = SelectKey::new(key.index_no, vals); + let deleted = row.is_deleted(); + if !deleted && &mvcc_key == key { + return true; + } + match undo_head.entry.as_ref() { + None => unreachable!("next version is missing"), + Some(entry) => { + let mut entry = entry.clone(); + let mapping: HashMap = schema.indexes[key.index_no] + .keys + .iter() + .enumerate() + .map(|(key_no, key)| (key.user_col_idx as usize, key_no)) + .collect(); + let mut ver = KeyVersion { + deleted, + mvcc_key, + mapping, + }; + // traverse version chain + loop { + match &entry.as_ref().kind { + RowUndoKind::Insert => { + debug_assert!(!ver.deleted); + ver.deleted = true; + } + RowUndoKind::Update(undo_cols) => { + debug_assert!(!ver.deleted); + ver.undo_update(undo_cols); + } + RowUndoKind::Delete => { + debug_assert!(ver.deleted); + ver.deleted = false; + } + RowUndoKind::Move(del) => { + ver.deleted = *del; + } + } + // here we check if current version matches input key + if !ver.deleted && &ver.mvcc_key == key { + return true; + } + // check whether we should go to next version + match entry.as_ref().find_next_version(key) { + None => { + return false; + } + Some(next) => { + match next.status { + NextRowUndoStatus::SameAsPrev => { + let next_entry = next.entry.clone(); + entry = next_entry; // still invisible. + } + NextRowUndoStatus::CTS(cts) => { + if sts > cts { + // current version is visible + return false; + } + entry = next.entry.clone(); // still invisible + } + } + } + } + } + } + } + } + } + } } // Version of current row. @@ -268,6 +366,24 @@ impl RowVersion { } } +struct KeyVersion { + deleted: bool, + mvcc_key: SelectKey, + // mapping of column number to key number + mapping: HashMap, +} + +impl KeyVersion { + #[inline] + fn undo_update(&mut self, undo: &[UndoCol]) { + for u in undo { + if let Some(key_no) = self.mapping.get(&u.idx) { + self.mvcc_key.vals[*key_no] = u.val.clone(); + } + } + } +} + pub struct RowWriteAccess<'a> { page: &'a RowPage, row_idx: usize, @@ -462,9 +578,9 @@ impl<'a> PageSharedGuard<'a, RowPage> { } } +#[derive(Debug, Clone)] pub enum RowLatestStatus { NotFound, - Deleted, - Committed, + Committed(TrxID, bool), Uncommitted, } diff --git a/doradb-storage/src/trx/sys.rs b/doradb-storage/src/trx/sys.rs index 7e4525b..7dc38c0 100644 --- a/doradb-storage/src/trx/sys.rs +++ b/doradb-storage/src/trx/sys.rs @@ -1,23 +1,24 @@ -use crate::error::{Error, Result}; -use crate::io::{ - align_to_sector_size, pwrite, AIOError, AIOManager, AIOManagerConfig, Buf, DirectBuf, - FreeListWithFactory, IocbRawPtr, PageBuf, SparseFile, AIO, -}; +use crate::buffer::BufferPool; +use crate::catalog::Catalog; +use crate::error::Result; +use crate::io::{align_to_sector_size, AIOManagerConfig, FreeListWithFactory, PageBuf}; use crate::session::{InternalSession, IntoSession, Session}; +use crate::trx::group::{Commit, GroupCommit}; +use crate::trx::log::{LogPartition, LogPartitionStats, LogSync}; +use crate::trx::purge::{GCAnalyzer, GCBucket, Purge, GC}; use crate::trx::{ - ActiveTrx, CommittedTrx, PrecommitTrx, PreparedTrx, TrxID, MAX_COMMIT_TS, MAX_SNAPSHOT_TS, - MIN_ACTIVE_TRX_ID, MIN_SNAPSHOT_TS, + ActiveTrx, PreparedTrx, TrxID, MAX_SNAPSHOT_TS, MIN_ACTIVE_TRX_ID, MIN_SNAPSHOT_TS, }; use crossbeam_utils::CachePadded; use flume::{Receiver, Sender}; -use parking_lot::{Condvar, Mutex, MutexGuard}; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use parking_lot::{Condvar, Mutex}; +use std::collections::VecDeque; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::mem; -use std::os::fd::{AsRawFd, RawFd}; -use std::str::FromStr; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::thread::{self, JoinHandle}; -use std::time::{Duration, Instant}; + +pub const GC_BUCKETS: usize = 64; /// TransactionSystem controls lifecycle of all transactions. /// @@ -61,27 +62,23 @@ pub struct TransactionSystem { /// trx_id range: (1<<63)+1 to uint::MAX-1 /// sts range: 1 to 1<<63 /// cts range: 1 to 1<<63 - ts: CachePadded, + pub(super) ts: CachePadded, /// Minimum active snapshot timestamp. /// It's updated by group committer thread, and is used by query/GC thread to clean /// out-of-date version chains. /// /// Note: this field may not reflect the latest value, but is enough for GC purpose. - min_active_sts: CachePadded, + pub(super) min_active_sts: CachePadded, /// Round-robin partition id generator. partition_id: CachePadded, /// Multiple log partitions. - log_partitions: CachePadded]>>, - /// Persisted commit timestamp is the maximum commit timestamp of all persisted redo - /// logs. Precommit transactions are already notified by group committer. This global - /// atomic variable is used by query or GC thread to perform GC. - persisted_cts: CachePadded, - /// Committed transaction list. - /// When a transaction is committed, it will be put into this queue in sequence. - /// Head is always oldest and tail is newest. - gc_info: CachePadded>, + pub(super) log_partitions: CachePadded]>>, /// Transaction system configuration. - config: CachePadded, + pub(super) config: CachePadded, + /// Channel to send message to purge threads. + purge_chan: Sender, + /// Purge threads purge unused undo logs, row pages and index entries. + pub(super) purge_threads: Mutex>>, } impl TransactionSystem { @@ -107,14 +104,25 @@ impl TransactionSystem { debug_assert!(trx_id >= MIN_ACTIVE_TRX_ID); // assign log partition index so current transaction will stick // to certain log partititon for commit. - let log_partition_idx = if self.config.log_partitions == 1 { + let log_no = if self.config.log_partitions == 1 { 0 } else { self.partition_id.fetch_add(1, Ordering::Relaxed) % self.config.log_partitions }; - let mut trx = ActiveTrx::new(session, trx_id, sts); - trx.set_log_partition(log_partition_idx); - trx + // add to active sts list. + let gc_no = gc_no(sts); + { + let gc_bucket = &self.log_partitions[log_no].gc_buckets[gc_no]; + let mut g = gc_bucket.active_sts_list.lock(); + let res = g.insert(sts); + debug_assert!(res); + // here we should update min_active_sts of this bucket + let min_active_sts = g.first().cloned().unwrap(); + gc_bucket + .min_active_sts + .store(min_active_sts, Ordering::Relaxed); + } + ActiveTrx::new(session, trx_id, sts, log_no, gc_no) } /// Commit an active transaction. @@ -124,15 +132,22 @@ impl TransactionSystem { /// leader to persist log and backfill CTS. /// This strategy can largely reduce logging IO, therefore improve throughput. #[inline] - pub async fn commit(&self, trx: ActiveTrx) -> Result { + pub async fn commit( + &self, + trx: ActiveTrx, + buf_pool: &P, + catalog: &Catalog

, + ) -> Result { // Prepare redo log first, this may take some time, // so keep it out of lock scope, and we can fill cts after the lock is held. - let partition = &*self.log_partitions[trx.log_partition_idx]; + let partition = &*self.log_partitions[trx.log_no]; let prepared_trx = trx.prepare(); if prepared_trx.redo_bin.is_none() { if prepared_trx.row_undo.is_empty() { // This is a read-only transaction, drop it is safe. debug_assert!(prepared_trx.readonly()); + self.log_partitions[prepared_trx.log_no].gc_buckets[prepared_trx.gc_no] + .remove_active_sts(prepared_trx.sts); return Ok(prepared_trx.into_session()); } // There might be scenario that the transaction does not change anything @@ -145,9 +160,9 @@ impl TransactionSystem { // The preparation should shrink redo logs on row level and finally there // is no redo entry. But we have undo logs and already put them into // page-level undo maps. - // In such case, we pass this transaction to group commit, and it will - // directly succeeds if it's group leader. - // Otherwise, it always joins last group successfully. + // In such case, we can just rollback this transaction because it actually + // do nothing. + return Ok(self.rollback_prepared(prepared_trx, buf_pool, catalog)); } // start group commit partition.commit(prepared_trx, &self.ts).await @@ -155,11 +170,34 @@ impl TransactionSystem { /// Rollback active transaction. #[inline] - pub fn rollback(&self, trx: ActiveTrx) { - // let sts = trx.sts; - // let log_partition_idx = trx.log_partition_idx; - trx.rollback(); - // self.log_partitions[log_partition_idx].end_active_sts(sts); + pub fn rollback( + &self, + mut trx: ActiveTrx, + buf_pool: &P, + catalog: &Catalog

, + ) -> Session { + trx.row_undo.rollback(buf_pool); + trx.index_undo.rollback(catalog); + self.log_partitions[trx.log_no].gc_buckets[trx.gc_no].remove_active_sts(trx.sts); + trx.into_session() + } + + /// Rollback prepared transaction. + /// This is special case of transaction commit without redo log. + /// In such case, we do not need to go through entire commit process but just + /// rollback the transaction, because it actually do nothing. + #[inline] + fn rollback_prepared( + &self, + mut trx: PreparedTrx, + buf_pool: &P, + catalog: &Catalog

, + ) -> Session { + debug_assert!(trx.redo_bin.is_none()); + trx.row_undo.rollback(buf_pool); + trx.index_undo.rollback(catalog); + self.log_partitions[trx.log_no].gc_buckets[trx.gc_no].remove_active_sts(trx.sts); + trx.into_session() } /// Returns statistics of group commit. @@ -176,13 +214,32 @@ impl TransactionSystem { stats.io_submit_nanos += partition.stats.io_submit_nanos.load(Ordering::Relaxed); stats.io_wait_count += partition.stats.io_wait_count.load(Ordering::Relaxed); stats.io_wait_nanos += partition.stats.io_wait_nanos.load(Ordering::Relaxed); + stats.purge_trx_count += partition.stats.purge_trx_count.load(Ordering::Relaxed); + stats.purge_row_count += partition.stats.purge_row_count.load(Ordering::Relaxed); + stats.purge_index_count += partition.stats.purge_index_count.load(Ordering::Relaxed); } stats } - /// Start background sync thread. - /// This method should be called once transaction system is initialized, - /// and should after start_gc_thread(). + /// Start background GC threads. + #[inline] + fn start_gc_threads(&'static self, gc_chans: Vec>) { + for ((idx, partition), gc_rx) in self.log_partitions.iter().enumerate().zip(gc_chans) { + let thread_name = format!("GC-Thread-{}", idx); + let partition = &**partition; + let purge_chan = self.purge_chan.clone(); + let handle = thread::Builder::new() + .name(thread_name) + .spawn(move || { + let mut analyzer = GCAnalyzer::default(); + analyzer.gc_loop(partition, gc_rx, purge_chan, self.config.gc) + }) + .unwrap(); + *partition.gc_thread.lock() = Some(handle); + } + } + + /// Start background sync threads. #[inline] fn start_sync_threads(&'static self) { // Start threads for all log partitions @@ -208,17 +265,32 @@ impl Drop for TransactionSystem { #[inline] fn drop(&mut self) { let log_partitions = &*self.log_partitions; - // notify sync thread to quit for each log partition. for partition in log_partitions { - let mut group_commit_g = partition.group_commit.0.lock(); - group_commit_g.queue.push_back(CommitMessage::Shutdown); - if group_commit_g.queue.len() == 1 { - partition.group_commit.1.notify_one(); // notify sync thread to quit. + // notify sync thread to quit. + { + let mut group_commit_g = partition.group_commit.0.lock(); + group_commit_g.queue.push_back(Commit::Shutdown); + if group_commit_g.queue.len() == 1 { + partition.group_commit.1.notify_one(); // notify sync thread to quit. + } } + // notify gc thread to quit. + let _ = partition.gc_chan.send(GC::Stop); } + // wait for sync thread and GC thread to quit. for partition in log_partitions { let sync_thread = { partition.sync_thread.lock().take().unwrap() }; sync_thread.join().unwrap(); + let gc_thread = { partition.gc_thread.lock().take().unwrap() }; + gc_thread.join().unwrap(); + } + // notify purge threads and wait for them to quit. + { + let _ = self.purge_chan.send(Purge::Stop); + let purge_threads = { mem::take(&mut *self.purge_threads.lock()) }; + for handle in purge_threads { + handle.join().unwrap(); + } } // finally close log files for partition in log_partitions { @@ -229,6 +301,14 @@ impl Drop for TransactionSystem { } } +#[inline] +fn gc_no(sts: TrxID) -> usize { + let mut hasher = DefaultHasher::default(); + sts.hash(&mut hasher); + let value = hasher.finish(); + value as usize % GC_BUCKETS +} + pub const DEFAULT_LOG_IO_DEPTH: usize = 32; pub const DEFAULT_LOG_IO_MAX_SIZE: usize = 8192; pub const DEFAULT_LOG_FILE_PREFIX: &'static str = "redo.log"; @@ -238,59 +318,39 @@ pub const DEFAULT_LOG_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; // 1GB, sparse pub const DEFAULT_LOG_SYNC: LogSync = LogSync::Fsync; pub const DEFAULT_LOG_DROP: bool = false; pub const DEFAULT_GC: bool = false; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum LogSync { - None, - Fsync, - Fdatasync, -} - -impl FromStr for LogSync { - type Err = Error; - - #[inline] - fn from_str(s: &str) -> std::result::Result { - if s.eq_ignore_ascii_case("fsync") { - Ok(LogSync::Fsync) - } else if s.eq_ignore_ascii_case("fdatasync") { - Ok(LogSync::Fdatasync) - } else if s.eq_ignore_ascii_case("none") { - Ok(LogSync::None) - } else { - Err(Error::InvalidArgument) - } - } -} +pub const DEFAULT_PURGE_THREADS: usize = 2; pub struct TrxSysConfig { // Controls infight IO request number of each log file. - io_depth_per_log: usize, + pub io_depth_per_log: usize, // Controls maximum IO size of each IO request. // This only limit the combination of multiple transactions. // If single transaction has very large redo log. It is kept // what it is and send to AIO manager as one IO request. - max_io_size: usize, + pub max_io_size: usize, // Prefix of log file. // the complete file name pattern is: // .. // e.g. redo.log.0.00000001 - log_file_prefix: String, + pub log_file_prefix: String, // Log partition number. - log_partitions: usize, + pub log_partitions: usize, // Controls the maximum size of each log file. // Log file will be rotated once the size limit is reached. // a u32 suffix is appended at end of the file name in // hexdigit format. - log_file_max_size: usize, + pub log_file_max_size: usize, // Controls which method to sync data on disk. // By default, use fsync(), // Can be switched to fdatasync() or not sync. - log_sync: LogSync, + pub log_sync: LogSync, // Drop log directly. If this parameter is set to true, // log_sync parameter will be discarded. - log_drop: bool, - gc: bool, + pub log_drop: bool, + // Whether enable GC or not. + pub gc: bool, + // Threads for purging undo logs. + pub purge_threads: usize, } impl TrxSysConfig { @@ -309,12 +369,21 @@ impl TrxSysConfig { } /// Whether to enable GC. + /// Note: #[inline] pub fn gc(mut self, gc: bool) -> Self { self.gc = gc; self } + /// How many threads to execute for purge undo logs and remove + /// unused index and row pages. + #[inline] + pub fn purge_threads(mut self, purge_threads: usize) -> Self { + self.purge_threads = purge_threads; + self + } + /// Log file name. #[inline] pub fn log_file_prefix(mut self, log_file_prefix: String) -> Self { @@ -350,7 +419,7 @@ impl TrxSysConfig { self } - fn setup_log_partition(&self, idx: usize) -> LogPartition { + fn setup_log_partition(&self, idx: usize) -> (LogPartition, Receiver) { let aio_mgr = AIOManagerConfig::default() .max_events(self.io_depth_per_log) .build() @@ -368,41 +437,58 @@ impl TrxSysConfig { file_seq, }; let max_io_size = self.max_io_size; - LogPartition { - group_commit: CachePadded::new((Mutex::new(group_commit), Condvar::new())), - persisted_cts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), - aio_mgr, - idx, - max_io_size, - file_prefix: log_file_prefix(&self.log_file_prefix, idx), - buf_free_list: FreeListWithFactory::prefill(self.io_depth_per_log, move || { - PageBuf::uninit(max_io_size) - }), - sync_thread: Mutex::new(None), - stats: CachePadded::new(LogPartitionStats::default()), - } + let gc_info: Vec<_> = (0..GC_BUCKETS).map(|_| GCBucket::new()).collect(); + let (gc_chan, gc_rx) = flume::unbounded(); + ( + LogPartition { + group_commit: CachePadded::new((Mutex::new(group_commit), Condvar::new())), + persisted_cts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), + stats: CachePadded::new(LogPartitionStats::default()), + gc_chan, + gc_buckets: gc_info.into_boxed_slice(), + aio_mgr, + log_no: idx, + max_io_size, + file_prefix: log_file_prefix(&self.log_file_prefix, idx), + buf_free_list: FreeListWithFactory::prefill(self.io_depth_per_log, move || { + PageBuf::uninit(max_io_size) + }), + sync_thread: Mutex::new(None), + gc_thread: Mutex::new(None), + }, + gc_rx, + ) } /// Build transaction system with logging and GC, leak it to heap /// for the convenience to share the singleton among multiple threads. #[inline] - pub fn build_static(self) -> &'static TransactionSystem { + pub fn build_static( + self, + buf_pool: &'static P, + catalog: &'static Catalog

, + ) -> &'static TransactionSystem { let mut log_partitions = Vec::with_capacity(self.log_partitions); + let mut gc_chans = Vec::with_capacity(self.log_partitions); for idx in 0..self.log_partitions { - let partition = self.setup_log_partition(idx); + let (partition, gc_rx) = self.setup_log_partition(idx); log_partitions.push(CachePadded::new(partition)); + gc_chans.push(gc_rx); } + let (purge_chan, purge_rx) = flume::unbounded(); let trx_sys = TransactionSystem { ts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), min_active_sts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), partition_id: CachePadded::new(AtomicUsize::new(0)), log_partitions: CachePadded::new(log_partitions.into_boxed_slice()), - persisted_cts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), - gc_info: CachePadded::new(Mutex::new(GCInfo::new())), config: CachePadded::new(self), + purge_chan, + purge_threads: Mutex::new(vec![]), }; let trx_sys = Box::leak(Box::new(trx_sys)); trx_sys.start_sync_threads(); + trx_sys.start_gc_threads(gc_chans); + trx_sys.start_purge_threads(buf_pool, catalog, purge_rx); trx_sys } } @@ -419,503 +505,11 @@ impl Default for TrxSysConfig { log_sync: DEFAULT_LOG_SYNC, log_drop: DEFAULT_LOG_DROP, gc: DEFAULT_GC, + purge_threads: DEFAULT_PURGE_THREADS, } } } -struct LogPartition { - // Group commit of this partition. - group_commit: CachePadded<(Mutex, Condvar)>, - // Maximum persisted CTS of this partition. - persisted_cts: CachePadded, - /// AIO manager to handle async IO with libaio. - aio_mgr: AIOManager, - // Index of log partition in total partitions, starts from 0. - idx: usize, - // Maximum IO size of each group. - max_io_size: usize, - // Log file prefix, including partition number. - file_prefix: String, - // Free list of page buffer, which is used by commit group to concat - // redo logs. - buf_free_list: FreeListWithFactory, - // Standalone thread to handle transaction commit. - // Including submit IO requests, wait IO responses - // and do fsync(). - sync_thread: Mutex>>, - /// Stats of transaction system. - stats: CachePadded, -} - -impl LogPartition { - #[inline] - fn buf(&self, data: &[u8]) -> Buf { - if data.len() > self.max_io_size { - let buf = DirectBuf::uninit(data.len()); - Buf::Direct(buf) - } else { - let mut buf = self.buf_free_list.pop_elem_or_new(); - buf.set_len(data.len()); - buf.clone_from_slice(0, data); - Buf::Reuse(buf) - } - } - - #[inline] - fn create_new_group( - &self, - mut trx: PrecommitTrx, - mut group_commit_g: MutexGuard<'_, GroupCommit>, - ) -> (Session, Receiver<()>) { - let cts = trx.cts; - let redo_bin = trx.redo_bin.take().unwrap(); - debug_assert!(!redo_bin.is_empty()); - // inside the lock, we only need to determine which range of the log file this transaction - // should write to. - let log_file = group_commit_g.log_file.as_ref().unwrap(); - let offset = match log_file.alloc(redo_bin.len()) { - Ok((offset, _)) => offset, - Err(AIOError::OutOfRange) => { - // todo: rotate if log file is full. - todo!(); - } - Err(_) => unreachable!(), - }; - let fd = log_file.as_raw_fd(); - let log_buf = self.buf(&redo_bin); - let (sync_signal, sync_notifier) = flume::unbounded(); - let session = trx.split_session(); - let new_group = CommitGroup { - trx_list: vec![trx], - max_cts: cts, - fd, - offset, - log_buf, - sync_signal, - sync_notifier: sync_notifier.clone(), - }; - group_commit_g - .queue - .push_back(CommitMessage::Group(new_group)); - drop(group_commit_g); - - (session, sync_notifier) - } - - /// Transaction has no redo log, so we can just acquire CTS and finish it immediately. - #[inline] - fn commit_without_redo(&self, trx: PreparedTrx, ts: &AtomicU64) -> Session { - let cts = ts.fetch_add(1, Ordering::SeqCst); - let committed_trx = trx.fill_cts(cts).commit(); - // todo: GC - committed_trx.into_session() - } - - #[inline] - async fn commit(&self, trx: PreparedTrx, ts: &AtomicU64) -> Result { - if trx.redo_bin.is_none() { - let session = self.commit_without_redo(trx, ts); - return Ok(session); - } - let mut group_commit_g = self.group_commit.0.lock(); - let cts = ts.fetch_add(1, Ordering::SeqCst); - debug_assert!(cts < MAX_COMMIT_TS); - let precommit_trx = trx.fill_cts(cts); - if group_commit_g.queue.is_empty() { - let (session, sync_notifier) = self.create_new_group(precommit_trx, group_commit_g); - self.group_commit.1.notify_one(); // notify sync thread to work. - - let _ = sync_notifier.recv_async().await; // wait for fsync - assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); - return Ok(session); - } - let last_group = match group_commit_g.queue.back_mut().unwrap() { - CommitMessage::Shutdown => return Err(Error::TransactionSystemShutdown), - CommitMessage::Group(group) => group, - }; - if last_group.can_join(&precommit_trx) { - let (session, sync_notifier) = last_group.join(precommit_trx); - drop(group_commit_g); // unlock to let other transactions to enter commit phase. - - let _ = sync_notifier.recv_async().await; // wait for fsync - assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); - return Ok(session); - } - - let (session, sync_notifier) = self.create_new_group(precommit_trx, group_commit_g); - - let _ = sync_notifier.recv_async().await; // wait for fsync - assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); - Ok(session) - } - - #[inline] - fn update_stats( - &self, - trx_count: usize, - commit_count: usize, - log_bytes: usize, - sync_count: usize, - sync_nanos: usize, - ) { - self.stats.trx_count.fetch_add(trx_count, Ordering::Relaxed); - self.stats - .commit_count - .fetch_add(commit_count, Ordering::Relaxed); - self.stats.log_bytes.fetch_add(log_bytes, Ordering::Relaxed); - self.stats - .sync_count - .fetch_add(sync_count, Ordering::Relaxed); - self.stats - .sync_nanos - .fetch_add(sync_nanos, Ordering::Relaxed); - } - - #[inline] - fn try_fetch_io_reqs( - &self, - io_reqs: &mut Vec, - sync_groups: &mut VecDeque, - ) -> bool { - let mut group_commit_g = self.group_commit.0.lock(); - loop { - match group_commit_g.queue.pop_front() { - None => return false, - Some(CommitMessage::Shutdown) => return true, - Some(CommitMessage::Group(cg)) => { - let (iocb_ptr, sg) = cg.split(); - io_reqs.push(iocb_ptr); - sync_groups.push_back(sg); - } - } - } - } - - #[inline] - fn fetch_io_reqs( - &self, - io_reqs: &mut Vec, - sync_groups: &mut VecDeque, - ) -> bool { - let mut group_commit_g = self.group_commit.0.lock(); - loop { - loop { - match group_commit_g.queue.pop_front() { - None => break, - Some(CommitMessage::Shutdown) => { - return true; - } - Some(CommitMessage::Group(cg)) => { - let (iocb_ptr, sg) = cg.split(); - io_reqs.push(iocb_ptr); - sync_groups.push_back(sg); - } - } - } - if !io_reqs.is_empty() { - return false; - } - self.group_commit - .1 - .wait_for(&mut group_commit_g, Duration::from_secs(1)); - } - } - - #[inline] - fn io_loop_noop(&self, config: &TrxSysConfig) { - let io_depth = config.io_depth_per_log; - let mut io_reqs = Vec::with_capacity(io_depth * 2); - let mut sync_groups = VecDeque::with_capacity(io_depth * 2); - let mut shutdown = false; - loop { - if !shutdown { - shutdown |= self.fetch_io_reqs(&mut io_reqs, &mut sync_groups); - } - if !io_reqs.is_empty() { - let mut trx_count = 0; - let mut commit_count = 0; - let mut log_bytes = 0; - for sg in &sync_groups { - trx_count += sg.trx_list.len(); - commit_count += 1; - log_bytes += sg.log_bytes; - } - - let max_cts = sync_groups.back().as_ref().unwrap().max_cts; - - self.persisted_cts.store(max_cts, Ordering::SeqCst); - - io_reqs.clear(); - - for mut sync_group in sync_groups.drain(..) { - mem::take(&mut sync_group.trx_list) - .into_iter() - .for_each(|trx| { - trx.commit(); - }); - } - - self.update_stats(trx_count, commit_count, log_bytes, 1, 0); - } - if shutdown { - return; - } - } - } - - #[inline] - fn io_loop(&self, config: &TrxSysConfig) { - let syncer = { - self.group_commit - .0 - .lock() - .log_file - .as_ref() - .unwrap() - .syncer() - }; - let io_depth = config.io_depth_per_log; - let mut inflight = BTreeMap::new(); - let mut in_progress = 0; - let mut io_reqs = Vec::with_capacity(io_depth * 2); - let mut sync_groups = VecDeque::with_capacity(io_depth * 2); - let mut events = self.aio_mgr.events(); - let mut written = vec![]; - let mut shutdown = false; - loop { - debug_assert!( - io_reqs.len() == sync_groups.len(), - "pending IO number equals to pending group number" - ); - if !shutdown { - if in_progress == 0 { - // there is no processing AIO, so we can block on waiting for next request. - shutdown |= self.fetch_io_reqs(&mut io_reqs, &mut sync_groups); - } else { - // only try non-blocking way to fetch incoming requests, because we also - // need to finish previous IO. - shutdown |= self.try_fetch_io_reqs(&mut io_reqs, &mut sync_groups); - } - } - let (io_submit_count, io_submit_nanos) = if !io_reqs.is_empty() { - let start = Instant::now(); - // try to submit as many IO requests as possible - let limit = io_depth - in_progress; - let submit_count = self.aio_mgr.submit_limit(&mut io_reqs, limit); - // add sync groups to inflight tree. - for sync_group in sync_groups.drain(..submit_count) { - let res = inflight.insert(sync_group.max_cts, sync_group); - debug_assert!(res.is_none()); - } - in_progress += submit_count; - debug_assert!(in_progress <= io_depth); - (1, start.elapsed().as_nanos() as usize) - } else { - (0, 0) - }; - - // wait for any request to be done. - let (io_wait_count, io_wait_nanos) = if in_progress != 0 { - let start = Instant::now(); - let finish_count = - self.aio_mgr - .wait_at_least(&mut events, 1, |cts, res| match res { - Ok(len) => { - let sg = inflight.get_mut(&cts).expect("finish inflight IO"); - debug_assert!(sg.aio.buf.as_ref().unwrap().aligned_len() == len); - sg.finished = true; - } - Err(err) => { - let sg = inflight.remove(&cts).unwrap(); - unimplemented!( - "AIO error: task.cts={}, task.log_bytes={}, {}", - sg.max_cts, - sg.log_bytes, - err - ) - } - }); - in_progress -= finish_count; - (1, start.elapsed().as_nanos() as usize) - } else { - (0, 0) - }; - - // after logs are written to disk, we need to do fsync to make sure its durablity. - // Also, we need to check if any previous transaction is also done and notify them. - written.clear(); - let (trx_count, commit_count, log_bytes) = shrink_inflight(&mut inflight, &mut written); - if !written.is_empty() { - let max_cts = written.last().unwrap().max_cts; - - let start = Instant::now(); - match config.log_sync { - LogSync::Fsync => syncer.fsync(), - LogSync::Fdatasync => syncer.fdatasync(), - LogSync::None => (), - } - let sync_dur = start.elapsed(); - - self.persisted_cts.store(max_cts, Ordering::SeqCst); - - // put IO buffer back into free list. - for mut sync_group in written.drain(..) { - if let Some(Buf::Reuse(elem)) = sync_group.aio.take_buf() { - self.buf_free_list.push_elem(elem); // return buf to free list for future reuse - } - // commit transactions to let waiting read operations to continue - // todo: send to GC thread. - mem::take(&mut sync_group.trx_list) - .into_iter() - .for_each(|trx| { - trx.commit(); - }); - drop(sync_group); // notify transaction thread to continue. - } - - self.update_stats( - trx_count, - commit_count, - log_bytes, - 1, - sync_dur.as_nanos() as usize, - ); - } - if io_submit_count != 0 { - self.stats - .io_submit_count - .fetch_add(io_submit_count, Ordering::Relaxed); - self.stats - .io_submit_nanos - .fetch_add(io_submit_nanos, Ordering::Relaxed); - } - if io_wait_count != 0 { - self.stats - .io_wait_count - .fetch_add(io_wait_count, Ordering::Relaxed); - self.stats - .io_wait_nanos - .fetch_add(io_wait_nanos, Ordering::Relaxed); - } - - if shutdown && inflight.is_empty() { - return; - } - } - } -} - -struct GroupCommit { - // Commit group queue, there can be multiple groups in commit phase. - // Each of them submit IO request to AIO manager and then wait for - // pwrite & fsync done. - queue: VecDeque, - // Current log file. - log_file: Option, - // sequence of current file in this partition, starts from 0. - file_seq: u32, -} - -/// GCInfo is only used for GroupCommitter to store and analyze GC related information, -/// including committed transaction list, old transaction list, active snapshot timestamp -/// list, etc. -pub struct GCInfo { - /// Committed transaction list. - /// When a transaction is committed, it will be put into this queue in sequence. - /// Head is always oldest and tail is newest. - committed_trx_list: VecDeque, - /// Old transaction list. - /// If a transaction's committed timestamp is less than the smallest - /// snapshot timestamp of all active transactions, it means this transction's - /// data vesion is latest and all its undo log can be purged. - /// So we move such transactions from commited list to old list. - old_trx_list: Vec, - /// Active snapshot timestamp list. - /// The smallest value equals to min_active_sts. - active_sts_list: BTreeSet, -} - -impl GCInfo { - #[inline] - pub fn new() -> Self { - GCInfo { - committed_trx_list: VecDeque::new(), - old_trx_list: Vec::new(), - active_sts_list: BTreeSet::new(), - } - } -} - -enum CommitMessage { - Group(CommitGroup), - Shutdown, -} - -/// CommitGroup groups multiple transactions with only -/// one log IO and at most one fsync() call. -/// It is controlled by two parameters: -/// 1. Maximum IO size, e.g. 16KB. -/// 2. Timeout to wait for next transaction to join. -struct CommitGroup { - trx_list: Vec, - max_cts: TrxID, - fd: RawFd, - offset: usize, - log_buf: Buf, - sync_signal: Sender<()>, - sync_notifier: Receiver<()>, -} - -impl CommitGroup { - #[inline] - fn can_join(&self, trx: &PrecommitTrx) -> bool { - if let Some(redo_bin) = trx.redo_bin.as_ref() { - return redo_bin.len() <= self.log_buf.remaining_capacity(); - } - true - } - - #[inline] - fn join(&mut self, mut trx: PrecommitTrx) -> (Session, Receiver<()>) { - debug_assert!(self.max_cts < trx.cts); - if let Some(redo_bin) = trx.redo_bin.take() { - self.log_buf.clone_from_slice(&redo_bin); - } - self.max_cts = trx.cts; - let session = trx.split_session(); - self.trx_list.push(trx); - (session, self.sync_notifier.clone()) - } - - #[inline] - fn split(self) -> (IocbRawPtr, SyncGroup) { - let log_bytes = self.log_buf.aligned_len(); - let aio = pwrite(self.max_cts, self.fd, self.offset, self.log_buf); - let iocb_ptr = aio.iocb.load(Ordering::Relaxed); - let sync_group = SyncGroup { - trx_list: self.trx_list, - max_cts: self.max_cts, - log_bytes, - aio, - sync_signal: self.sync_signal, - finished: false, - }; - (iocb_ptr, sync_group) - } -} - -struct SyncGroup { - trx_list: Vec, - max_cts: TrxID, - log_bytes: usize, - aio: AIO, - // Signal to notify transaction threads. - // This field won't be used, until the group is dropped. - #[allow(dead_code)] - sync_signal: Sender<()>, - finished: bool, -} - #[derive(Default)] pub struct TrxSysStats { pub commit_count: usize, @@ -927,19 +521,9 @@ pub struct TrxSysStats { pub io_submit_nanos: usize, pub io_wait_count: usize, pub io_wait_nanos: usize, -} - -#[derive(Default)] -pub struct LogPartitionStats { - pub commit_count: AtomicUsize, - pub trx_count: AtomicUsize, - pub log_bytes: AtomicUsize, - pub sync_count: AtomicUsize, - pub sync_nanos: AtomicUsize, - pub io_submit_count: AtomicUsize, - pub io_submit_nanos: AtomicUsize, - pub io_wait_count: AtomicUsize, - pub io_wait_nanos: AtomicUsize, + pub purge_trx_count: usize, + pub purge_row_count: usize, + pub purge_index_count: usize, } #[inline] @@ -952,31 +536,11 @@ fn log_file_prefix(file_prefix: &str, idx: usize) -> String { format!("{}.{}", file_prefix, idx) } -#[inline] -fn shrink_inflight( - tree: &mut BTreeMap, - buffer: &mut Vec, -) -> (usize, usize, usize) { - let mut trx_count = 0; - let mut commit_count = 0; - let mut log_bytes = 0; - while let Some(entry) = tree.first_entry() { - let task = entry.get(); - if task.finished { - trx_count += task.trx_list.len(); - commit_count += 1; - log_bytes += task.log_bytes; - buffer.push(entry.remove()); - } else { - break; // stop at the transaction which is not persisted. - } - } - (trx_count, commit_count, log_bytes) -} - #[cfg(test)] mod tests { use super::*; + use crate::buffer::FixedBufferPool; + use crate::catalog::Catalog; use crate::session::Session; use crossbeam_utils::CachePadded; use parking_lot::Mutex; @@ -986,21 +550,25 @@ mod tests { #[test] fn test_transaction_system() { - let trx_sys = TrxSysConfig::default().build_static(); + let buf_pool = FixedBufferPool::with_capacity_static(128 * 1024 * 1024).unwrap(); + let catalog = Catalog::::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); let session = Session::new(); { let trx = session.begin_trx(trx_sys); - let _ = smol::block_on(trx_sys.commit(trx)); + let _ = smol::block_on(trx_sys.commit(trx, buf_pool, catalog)); } std::thread::spawn(|| { let session = Session::new(); let trx = session.begin_trx(trx_sys); - let _ = smol::block_on(trx_sys.commit(trx)); + let _ = smol::block_on(trx_sys.commit(trx, buf_pool, catalog)); }) .join() .unwrap(); unsafe { TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); + FixedBufferPool::drop_static(buf_pool); } } @@ -1135,18 +703,15 @@ mod tests { #[test] fn test_single_thread_trx_begin_and_commit() { const COUNT: usize = 1000000; - let trx_sys = TrxSysConfig::default().build_static(); - let mut session = Session::new(); - { - // hook persisted_ts to u64::MAX to allaw all transactions immediately finish. - trx_sys.persisted_cts.store(u64::MAX, Ordering::SeqCst); - } - - { + smol::block_on(async { + let buf_pool = FixedBufferPool::with_capacity_static(128 * 1024 * 1024).unwrap(); + let catalog = Catalog::::empty_static(); + let trx_sys = TrxSysConfig::default().build_static(buf_pool, catalog); + let mut session = Session::new(); let start = Instant::now(); for _ in 0..COUNT { let trx = session.begin_trx(trx_sys); - let res = smol::block_on(trx_sys.commit(trx)); + let res = trx_sys.commit(trx, buf_pool, catalog).await; assert!(res.is_ok()); session = res.unwrap(); } @@ -1157,9 +722,11 @@ mod tests { dur.as_micros(), COUNT as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 ); - } - unsafe { - TransactionSystem::drop_static(trx_sys); - } + unsafe { + TransactionSystem::drop_static(trx_sys); + Catalog::drop_static(catalog); + FixedBufferPool::drop_static(buf_pool); + } + }); } } diff --git a/doradb-storage/src/trx/undo/index.rs b/doradb-storage/src/trx/undo/index.rs index 8adb9a5..c77ddb9 100644 --- a/doradb-storage/src/trx/undo/index.rs +++ b/doradb-storage/src/trx/undo/index.rs @@ -20,6 +20,12 @@ impl IndexUndoLogs { self.0.is_empty() } + /// Returns count of index undo logs. + #[inline] + pub fn len(&self) -> usize { + self.0.len() + } + /// Add a new index undo log at end of the buffer. #[inline] pub fn push(&mut self, value: IndexUndo) { @@ -27,6 +33,10 @@ impl IndexUndoLogs { } /// Rollback all index changes. + /// + /// This method has strong assertion to make sure it will not fail, + /// because other transaction can not update the same index entry + /// concurrently. #[inline] pub fn rollback(&mut self, catalog: &Catalog

) { while let Some(entry) = self.0.pop() { @@ -36,8 +46,8 @@ impl IndexUndoLogs { let res = table.sec_idx[key.index_no] .unique() .unwrap() - .delete(&key.vals); - assert!(res.unwrap() == entry.row_id); + .compare_delete(&key.vals, entry.row_id); + assert!(res); } IndexUndoKind::UpdateUnique(key, old_row_id) => { let new_row_id = entry.row_id; @@ -47,7 +57,7 @@ impl IndexUndoLogs { .compare_exchange(&key.vals, new_row_id, old_row_id); assert!(res); } - IndexUndoKind::GC(_) => (), // do nothing. + IndexUndoKind::DeferDelete(_) => (), // do nothing. } } } @@ -65,24 +75,18 @@ impl IndexUndoLogs { /// And to support MVCC, index deletion is delayed to GC phase. /// So here we should only keep potential index deletions. #[inline] - pub fn prepare_for_gc(&mut self) -> Self { - let logs_for_gc: Vec<_> = self - .0 + pub fn commit_for_gc(&mut self) -> Vec { + self.0 .drain(..) .filter_map(|entry| match entry.kind { - IndexUndoKind::InsertUnique(_) => None, - IndexUndoKind::UpdateUnique(key, old_row_id) => { - let kind = IndexUndoKind::GC(key); - Some(IndexUndo { - table_id: entry.table_id, - row_id: old_row_id, - kind, - }) - } - IndexUndoKind::GC(_) => Some(entry), + IndexUndoKind::InsertUnique(_) | IndexUndoKind::UpdateUnique(..) => None, + IndexUndoKind::DeferDelete(key) => Some(IndexPurge { + table_id: entry.table_id, + row_id: entry.row_id, + key, + }), }) - .collect(); - IndexUndoLogs(logs_for_gc) + .collect() } } @@ -104,5 +108,11 @@ pub enum IndexUndoKind { /// in order to support MVCC. /// The actual deletion is performed solely by GC thread. /// This is what GC entry means. - GC(SelectKey), + DeferDelete(SelectKey), +} + +pub struct IndexPurge { + pub table_id: TableID, + pub row_id: RowID, + pub key: SelectKey, } diff --git a/doradb-storage/src/trx/undo/row.rs b/doradb-storage/src/trx/undo/row.rs index 0deef73..4f804b2 100644 --- a/doradb-storage/src/trx/undo/row.rs +++ b/doradb-storage/src/trx/undo/row.rs @@ -184,6 +184,11 @@ impl RowUndoLogs { self.0.is_empty() } + #[inline] + pub fn len(&self) -> usize { + self.0.len() + } + #[inline] pub fn push(&mut self, value: OwnedRowUndo) { self.0.push(value) @@ -272,6 +277,7 @@ impl OwnedRowUndo { pub struct RowUndoRef(NonNull); unsafe impl Send for RowUndoRef {} +unsafe impl Sync for RowUndoRef {} impl RowUndoRef { #[inline] @@ -373,6 +379,8 @@ pub struct RowUndoHead { pub entry: Option, } +unsafe impl Send for RowUndoHead {} + #[derive(Default, Clone, Copy)] pub enum NextTrxCTS { #[default]