Skip to content

Commit

Permalink
Basic garbage collect
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhe committed Feb 5, 2025
1 parent 7fbdb94 commit 9948cfe
Show file tree
Hide file tree
Showing 20 changed files with 2,285 additions and 949 deletions.
2 changes: 2 additions & 0 deletions doradb-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ humantime = "2.1"
criterion = { version = "0.5", features = ["html_reports"] }
criterion-perf-events = "0.4"
perfcnt = "0.8"
byte-unit = "5"
fastrand = "2"

[profile.release]
debug = true
2 changes: 1 addition & 1 deletion doradb-storage/examples/bench_block_index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use doradb_storage::buffer::FixedBufferPool;
use doradb_storage::table::{IndexKey, IndexSchema, TableSchema};
use doradb_storage::catalog::{IndexKey, IndexSchema, TableSchema};
use doradb_storage::value::ValKind;
use perfcnt::linux::{HardwareEventType as Hardware, PerfCounterBuilderLinux as Builder};
use perfcnt::{AbstractPerfCounter, PerfCounter};
Expand Down
250 changes: 250 additions & 0 deletions doradb-storage/examples/bench_insert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
//! Multi-threaded transaction processing.
//! This example runs empty transactions via multiple threads.
//! Its goal is to testing system bottleneck on starting and committing transactions.
use byte_unit::{Byte, ParseError};
use clap::Parser;
use crossbeam_utils::sync::WaitGroup;
use doradb_storage::buffer::FixedBufferPool;
use doradb_storage::catalog::{Catalog, IndexKey, IndexSchema, TableSchema};
use doradb_storage::session::Session;
use doradb_storage::table::TableID;
use doradb_storage::trx::log::LogSync;
use doradb_storage::trx::sys::{TransactionSystem, TrxSysConfig};
use doradb_storage::value::{Val, ValKind};
use easy_parallel::Parallel;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

fn main() {
let args = Args::parse();

let buf_pool = FixedBufferPool::with_capacity_static(args.buffer_pool_size).unwrap();
println!("buffer pool size is {}", buf_pool.size());
let catalog = Catalog::empty_static();
let trx_sys = TrxSysConfig::default()
.log_file_prefix(args.log_file_prefix.to_string())
.log_partitions(args.log_partitions)
.io_depth_per_log(args.io_depth_per_log)
.log_file_max_size(args.log_file_max_size)
.log_sync(args.log_sync)
.log_drop(args.log_drop)
.max_io_size(args.max_io_size)
.gc(args.gc_enabled)
.purge_threads(args.purge_threads)
.build_static(buf_pool, catalog);
// create empty table
let table_id = catalog.create_table(
buf_pool,
TableSchema::new(
vec![
ValKind::I32.nullable(false),
ValKind::I32.nullable(false),
ValKind::VarByte.nullable(false),
ValKind::VarByte.nullable(false),
],
vec![IndexSchema::new(vec![IndexKey::new(0)], true)],
),
);
// start benchmark
{
let start = Instant::now();
let wg = WaitGroup::new();
let stop = Arc::new(AtomicBool::new(false));
let ex = smol::Executor::new();
let (notify, shutdown) = flume::unbounded::<()>();
// start transaction sessions.
for sess_id in 0..args.sessions {
let wg = wg.clone();
let stop = Arc::clone(&stop);
ex.spawn(worker(
buf_pool,
trx_sys,
catalog,
table_id,
sess_id as i32,
args.sessions as i32,
stop,
wg,
))
.detach();
}
// start system threads.
let _ = Parallel::new()
.each(0..args.threads, |_| {
smol::block_on(ex.run(shutdown.recv_async()))
})
.finish({
let stop = Arc::clone(&stop);
move || {
std::thread::sleep(args.duration);
stop.store(true, Ordering::SeqCst);
wg.wait();
drop(notify)
}
});
let dur = start.elapsed();
let stats = trx_sys.trx_sys_stats();
let total_trx_count = stats.trx_count;
let commit_count = stats.commit_count;
let log_bytes = stats.log_bytes;
let sync_count = stats.sync_count;
let sync_nanos = stats.sync_nanos;
let sync_latency = if sync_count == 0 {
0f64
} else {
sync_nanos as f64 / 1000f64 / sync_count as f64
};
let io_submit_count = stats.io_submit_count;
let io_submit_nanos = stats.io_submit_nanos;
let io_submit_latency = if io_submit_count == 0 {
0f64
} else {
io_submit_nanos as f64 / 1000f64 / io_submit_count as f64
};
let io_wait_count = stats.io_wait_count;
let io_wait_nanos = stats.io_wait_nanos;
let io_wait_latency = if io_wait_count == 0 {
0f64
} else {
io_wait_nanos as f64 / 1000f64 / io_wait_count as f64
};
let trx_per_group = if commit_count == 0 {
0f64
} else {
total_trx_count as f64 / commit_count as f64
};
let tps = total_trx_count as f64 * 1_000_000_000f64 / dur.as_nanos() as f64;
println!(
"threads={},dur={},total_trx={},groups={},sync={},sync_dur={:.2}us,\
io_submit={},io_submit_dur={:.2}us,io_wait={},io_wait_dur={:.2}us,\
trx/grp={:.2},trx/s={:.0},log/s={:.2}MB,purge_trx={},purge_row={},purge_index={}",
args.threads,
dur.as_micros(),
total_trx_count,
commit_count,
sync_count,
sync_latency,
io_submit_count,
io_submit_latency,
io_wait_count,
io_wait_latency,
trx_per_group,
tps,
log_bytes as f64 / dur.as_micros() as f64,
stats.purge_trx_count,
stats.purge_row_count,
stats.purge_index_count,
);
}
unsafe {
TransactionSystem::drop_static(trx_sys);
Catalog::drop_static(catalog);
FixedBufferPool::drop_static(buf_pool);
}
}

#[inline]
async fn worker(
buf_pool: &FixedBufferPool,
trx_sys: &TransactionSystem,
catalog: &'static Catalog<FixedBufferPool>,
table_id: TableID,
id_start: i32,
id_step: i32,
stop: Arc<AtomicBool>,
wg: WaitGroup,
) {
let table = catalog.get_table(table_id).unwrap();
let mut session = Session::new();
let stop = &*stop;
let mut id = id_start;
let mut c = [0u8; 120];
let mut pad = [0u8; 60];
while !stop.load(Ordering::Relaxed) {
let k = fastrand::i32(0..1024 * 1024);
c.iter_mut().for_each(|b| {
*b = fastrand::alphabetic() as u8;
});
pad.iter_mut().for_each(|b| {
*b = fastrand::alphabetic() as u8;
});
let mut trx = session.begin_trx(trx_sys);
let mut stmt = trx.start_stmt();
let res = table
.insert_row(
buf_pool,
&mut stmt,
vec![
Val::from(id),
Val::from(k),
Val::from(&c[..]),
Val::from(&pad[..]),
],
)
.await;
assert!(res.is_ok());
trx = stmt.succeed();
match trx_sys.commit(trx, buf_pool, &catalog).await {
Ok(s) => session = s,
Err(_) => return,
}
id += id_step;
}
drop(wg);
}

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// thread number to run transactions
#[arg(long, default_value = "1")]
threads: usize,

#[arg(long, default_value = "1")]
sessions: usize,

/// Number of transactions at least one thread should complete
#[arg(long, default_value = "10s", value_parser = humantime::parse_duration)]
duration: Duration,

/// path of redo log file
#[arg(long, default_value = "redo.log")]
log_file_prefix: String,

#[arg(long, default_value = "1")]
log_partitions: usize,

#[arg(long, default_value = "fsync", value_parser = LogSync::from_str)]
log_sync: LogSync,

#[arg(long, default_value = "false")]
log_drop: bool,

/// size of log file
#[arg(long, default_value = "1GiB", value_parser = parse_byte_size)]
log_file_max_size: usize,

#[arg(long, default_value = "8192", value_parser = parse_byte_size)]
max_io_size: usize,

#[arg(long, default_value = "32")]
io_depth_per_log: usize,

#[arg(long, default_value = "2GiB", value_parser = parse_byte_size)]
buffer_pool_size: usize,

/// whether to enable GC
#[arg(long, action = clap::ArgAction::Set, default_value = "true", value_parser = clap::builder::BoolishValueParser::new())]
gc_enabled: bool,

#[arg(long, default_value = "1")]
purge_threads: usize,
}

#[inline]
fn parse_byte_size(input: &str) -> Result<usize, ParseError> {
Byte::parse_str(input, true).map(|b| b.as_u64() as usize)
}
34 changes: 27 additions & 7 deletions doradb-storage/examples/multi_threaded_trx.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//! Multi-threaded transaction processing.
//! This example runs empty transactions via multiple threads.
//! Its goal is to testing system bottleneck on starting and committing transactions.
use byte_unit::{Byte, ParseError};
use clap::Parser;
use crossbeam_utils::sync::WaitGroup;
use doradb_storage::buffer::FixedBufferPool;
use doradb_storage::catalog::Catalog;
use doradb_storage::session::Session;
use doradb_storage::trx::sys::{LogSync, TransactionSystem, TrxSysConfig};
use doradb_storage::trx::log::LogSync;
use doradb_storage::trx::sys::{TransactionSystem, TrxSysConfig};
use easy_parallel::Parallel;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -15,6 +19,8 @@ use std::time::Instant;
fn main() {
let args = Args::parse();

let buf_pool = FixedBufferPool::with_capacity_static(128 * 1024 * 1024).unwrap();
let catalog = Catalog::<FixedBufferPool>::empty_static();
let trx_sys = TrxSysConfig::default()
.log_file_prefix(args.log_file_prefix.to_string())
.log_partitions(args.log_partitions)
Expand All @@ -24,7 +30,7 @@ fn main() {
.log_drop(args.log_drop)
.max_io_size(args.max_io_size)
.gc(args.gc_enabled)
.build_static();
.build_static(buf_pool, catalog);
{
let start = Instant::now();
let wg = WaitGroup::new();
Expand All @@ -35,7 +41,8 @@ fn main() {
for _ in 0..args.sessions {
let wg = wg.clone();
let stop = Arc::clone(&stop);
ex.spawn(worker(trx_sys, stop, wg)).detach();
ex.spawn(worker(buf_pool, catalog, trx_sys, stop, wg))
.detach();
}
// start system threads.
let _ = Parallel::new()
Expand Down Expand Up @@ -81,17 +88,25 @@ fn main() {
}
unsafe {
TransactionSystem::drop_static(trx_sys);
Catalog::drop_static(catalog);
FixedBufferPool::drop_static(buf_pool);
}
}

#[inline]
async fn worker(trx_sys: &TransactionSystem, stop: Arc<AtomicBool>, wg: WaitGroup) {
async fn worker(
buf_pool: &FixedBufferPool,
catalog: &Catalog<FixedBufferPool>,
trx_sys: &TransactionSystem,
stop: Arc<AtomicBool>,
wg: WaitGroup,
) {
let mut session = Session::new();
let stop = &*stop;
while !stop.load(Ordering::Relaxed) {
let mut trx = session.begin_trx(trx_sys);
trx.add_pseudo_redo_log_entry();
match trx_sys.commit(trx).await {
match trx_sys.commit(trx, buf_pool, &catalog).await {
Ok(s) => session = s,
Err(_) => return,
}
Expand Down Expand Up @@ -127,10 +142,10 @@ struct Args {
log_drop: bool,

/// size of log file
#[arg(long, default_value = "1073741824")]
#[arg(long, default_value = "1GiB", value_parser = parse_byte_size)]
log_file_max_size: usize,

#[arg(long, default_value = "8192")]
#[arg(long, default_value = "8KiB", value_parser = parse_byte_size)]
max_io_size: usize,

#[arg(long, default_value = "32")]
Expand All @@ -140,3 +155,8 @@ struct Args {
#[arg(long)]
gc_enabled: bool,
}

#[inline]
fn parse_byte_size(input: &str) -> Result<usize, ParseError> {
Byte::parse_str(input, true).map(|b| b.as_u64() as usize)
}
8 changes: 7 additions & 1 deletion doradb-storage/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
pub const SAFETY_PAGES: usize = 10;

/// Abstraction of buffer pool.
pub trait BufferPool {
pub trait BufferPool: Sync {
/// Allocate a new page.
fn allocate_page<T: BufferFrameAware>(&self) -> PageExclusiveGuard<'_, T>;

Expand Down Expand Up @@ -101,6 +101,12 @@ impl FixedBufferPool {
Ok(leak)
}

/// Returns the maximum page number of this pool.
#[inline]
pub fn size(&self) -> usize {
self.size
}

/// Drop static buffer pool.
///
/// # Safety
Expand Down
Loading

0 comments on commit 9948cfe

Please sign in to comment.