Skip to content

Commit

Permalink
Add simple catalog and refine statment-level rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhe committed Jan 21, 2025
1 parent e2a3135 commit 1220556
Show file tree
Hide file tree
Showing 17 changed files with 2,012 additions and 1,228 deletions.
59 changes: 59 additions & 0 deletions doradb-storage/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::buffer::BufferPool;
use crate::index::{BlockIndex, PartitionIntIndex, SingleKeyIndex};
use crate::table::{Schema, Table, TableID};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

/// Catalog contains metadata of user tables.
/// Initial implementation would be a in-mem hash-table.
pub struct Catalog<P> {
table_id: AtomicU64,
tables: Mutex<HashMap<TableID, TableMeta<P>>>,
}

impl<P: BufferPool> Catalog<P> {
#[inline]
pub fn empty() -> Self {
Catalog {
table_id: AtomicU64::new(1),
tables: Mutex::new(HashMap::new()),
}
}

#[inline]
pub fn create_table(&self, buf_pool: &P, schema: Schema) -> TableID {
let table_id = self.table_id.fetch_add(1, Ordering::SeqCst);
let blk_idx = BlockIndex::new(buf_pool).unwrap();
let sec_idx = PartitionIntIndex::empty();
let mut g = self.tables.lock();
let res = g.insert(
table_id,
TableMeta {
schema: Arc::new(schema),
blk_idx: Arc::new(blk_idx),
sec_idx: Arc::new(sec_idx),
},
);
debug_assert!(res.is_none());
table_id
}

#[inline]
pub fn get_table(&self, table_id: TableID) -> Option<Table<P>> {
let g = self.tables.lock();
g.get(&table_id).map(|meta| Table {
table_id,
schema: Arc::clone(&meta.schema),
blk_idx: Arc::clone(&meta.blk_idx),
sec_idx: Arc::clone(&meta.sec_idx),
})
}
}

pub struct TableMeta<P> {
pub schema: Arc<Schema>,
pub blk_idx: Arc<BlockIndex<P>>,
pub sec_idx: Arc<dyn SingleKeyIndex>,
}
8 changes: 4 additions & 4 deletions doradb-storage/src/index/block_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ mod tests {
fn test_block_index_free_list() {
let buf_pool = FixedBufferPool::with_capacity_static(64 * 1024 * 1024).unwrap();
{
let schema = Schema::new(vec![Layout::Byte8], 0);
let schema = Schema::new(vec![Layout::Byte4], 0);
let blk_idx = BlockIndex::new(buf_pool).unwrap();
let p1 = blk_idx.get_insert_page(buf_pool, 100, &schema);
let pid1 = p1.page_id();
Expand All @@ -932,7 +932,7 @@ mod tests {
fn test_block_index_insert_row_page() {
let buf_pool = FixedBufferPool::with_capacity_static(64 * 1024 * 1024).unwrap();
{
let schema = Schema::new(vec![Layout::Byte8], 0);
let schema = Schema::new(vec![Layout::Byte4], 0);
let blk_idx = BlockIndex::new(buf_pool).unwrap();
let p1 = blk_idx.get_insert_page(buf_pool, 100, &schema);
let pid1 = p1.page_id();
Expand All @@ -954,7 +954,7 @@ mod tests {
// allocate 1GB buffer pool is enough: 10240 pages ~= 640MB
let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024 * 1024).unwrap();
{
let schema = Schema::new(vec![Layout::Byte8], 0);
let schema = Schema::new(vec![Layout::Byte4], 0);
let blk_idx = BlockIndex::new(buf_pool).unwrap();
for _ in 0..row_pages {
let _ = blk_idx.get_insert_page(buf_pool, 100, &schema);
Expand Down Expand Up @@ -999,7 +999,7 @@ mod tests {
let rows_per_page = 100usize;
let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024 * 1024).unwrap();
{
let schema = Schema::new(vec![Layout::Byte8], 0);
let schema = Schema::new(vec![Layout::Byte4], 0);
let blk_idx = BlockIndex::new(buf_pool).unwrap();
for _ in 0..row_pages {
let _ = blk_idx.get_insert_page(buf_pool, rows_per_page, &schema);
Expand Down
22 changes: 21 additions & 1 deletion doradb-storage/src/index/secondary_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub trait SingleKeyIndex {

fn delete(&self, key: &Val) -> Option<RowID>;

fn compare_exchange(&self, key: &Val, old_row_id: RowID, new_row_id: RowID) -> bool;

// todo: scan
}

Expand Down Expand Up @@ -92,9 +94,27 @@ impl SingleKeyIndex for PartitionIntIndex {

#[inline]
fn delete(&self, key: &Val) -> Option<RowID> {
let key = self.key_to_int(&key);
let key = self.key_to_int(key);
let tree = self.select(key);
let mut g = tree.write();
g.remove(&key)
}

#[inline]
fn compare_exchange(&self, key: &Val, old_row_id: RowID, new_row_id: RowID) -> bool {
let key = self.key_to_int(key);
let tree = self.select(key);
let mut g = tree.write();
match g.get_mut(&key) {
Some(row_id) => {
if *row_id == old_row_id {
*row_id = new_row_id;
true
} else {
false
}
}
None => false,
}
}
}
1 change: 1 addition & 0 deletions doradb-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod col;
pub mod io;
#[macro_use]
pub mod error;
pub mod catalog;
pub mod index;
pub mod latch;
pub mod row;
Expand Down
90 changes: 76 additions & 14 deletions doradb-storage/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,11 @@ impl RowPage {
#[inline]
pub fn delete(&self, row_id: RowID) -> Delete {
if !self.row_id_in_valid_range(row_id) {
return Delete::RowNotFound;
return Delete::NotFound;
}
let row_idx = self.row_idx(row_id);
if self.is_deleted(row_idx) {
return Delete::RowAlreadyDeleted;
return Delete::AlreadyDeleted;
}
self.set_deleted(row_idx, true);
Delete::Ok
Expand Down Expand Up @@ -273,11 +273,11 @@ impl RowPage {
"update columns should be in order"
);
if !self.row_id_in_valid_range(row_id) {
return Update::RowNotFound;
return Update::NotFound;
}
let row_idx = self.row_idx(row_id);
if self.row(row_idx).is_deleted() {
return Update::RowDeleted;
return Update::Deleted;
}
let var_len = self.var_len_for_update(row_idx, user_cols);
let var_offset = if let Some(var_offset) = self.request_free_space(var_len) {
Expand All @@ -299,7 +299,7 @@ impl RowPage {
#[inline]
pub fn select(&self, row_id: RowID) -> Select {
if !self.row_id_in_valid_range(row_id) {
return Select::RowNotFound;
return Select::NotFound;
}
let row_idx = self.row_idx(row_id);
let row = self.row(row_idx);
Expand Down Expand Up @@ -423,7 +423,7 @@ impl RowPage {
}

#[inline]
fn update_val<V: ToValue>(&self, row_idx: usize, col_idx: usize, val: &V) {
pub(crate) fn update_val<V: ToValue>(&self, row_idx: usize, col_idx: usize, val: &V) {
unsafe {
let val = val.to_val();
let offset = self.val_offset(row_idx, col_idx, mem::size_of::<V>());
Expand All @@ -433,13 +433,13 @@ impl RowPage {
}

#[inline]
fn update_var(&self, row_idx: usize, col_idx: usize, var: PageVar) {
pub(crate) fn update_var(&self, row_idx: usize, col_idx: usize, var: PageVar) {
debug_assert!(mem::size_of::<PageVar>() == mem::size_of::<u64>());
self.update_val::<u64>(row_idx, col_idx, unsafe { mem::transmute(&var) });
}

#[inline]
fn add_var(&self, input: &[u8], var_offset: usize) -> (PageVar, usize) {
pub(crate) fn add_var(&self, input: &[u8], var_offset: usize) -> (PageVar, usize) {
let len = input.len();
if len <= PAGE_VAR_LEN_INLINE {
return (PageVar::inline(input), var_offset);
Expand Down Expand Up @@ -498,7 +498,7 @@ impl RowPage {

/// Mark given row as deleted.
#[inline]
pub fn set_deleted(&self, row_idx: usize, deleted: bool) {
pub(crate) fn set_deleted(&self, row_idx: usize, deleted: bool) {
unsafe {
let offset = self.header.del_bit_offset(row_idx);
let ptr = self.data_ptr().add(offset);
Expand Down Expand Up @@ -544,7 +544,7 @@ impl RowPage {
}

#[inline]
fn set_null(&self, row_idx: usize, col_idx: usize, null: bool) {
pub(crate) fn set_null(&self, row_idx: usize, col_idx: usize, null: bool) {
unsafe {
let offset = self.header.null_bit_offset(row_idx, col_idx);
let ptr = self.data_ptr().add(offset);
Expand Down Expand Up @@ -828,6 +828,16 @@ pub trait RowRead {
self.clone_val(schema, user_col_idx + 1)
}

/// Clone single value and its var-len offset with given column index.
#[inline]
fn clone_user_val_with_var_offset(
&self,
schema: &Schema,
user_col_idx: usize,
) -> (Val, Option<u16>) {
self.clone_val_with_var_offset(schema, user_col_idx + 1)
}

/// Clone single value with given column index.
/// NOTE: input column index includes RowID.
#[inline]
Expand Down Expand Up @@ -859,6 +869,38 @@ pub trait RowRead {
}
}

#[inline]
fn clone_val_with_var_offset(&self, schema: &Schema, col_idx: usize) -> (Val, Option<u16>) {
if self.is_null(col_idx) {
return (Val::Null, None);
}
match schema.layout(col_idx) {
Layout::Byte1 => {
let v = self.val::<Byte1Val>(col_idx);
(Val::from(*v), None)
}
Layout::Byte2 => {
let v = self.val::<Byte2Val>(col_idx);
(Val::from(*v), None)
}
Layout::Byte4 => {
let v = self.val::<Byte4Val>(col_idx);
(Val::from(*v), None)
}
Layout::Byte8 => {
let v = self.val::<Byte8Val>(col_idx);
(Val::from(*v), None)
}
Layout::VarByte => {
// let v = self.var(col_idx);
let pv = unsafe { self.page().var_unchecked(self.row_idx(), col_idx) };
let v = pv.as_bytes(self.page().data_ptr());
let offset = pv.offset().map(|os| os as u16);
(Val::VarByte(MemVar::new(v)), offset)
}
}
}

/// Clone all values.
#[inline]
fn clone_vals(&self, schema: &Schema, include_row_id: bool) -> Vec<Val> {
Expand All @@ -870,6 +912,21 @@ pub trait RowRead {
vals
}

/// Clone all values with var-len offset.
#[inline]
fn clone_vals_with_var_offsets(
&self,
schema: &Schema,
include_row_id: bool,
) -> Vec<(Val, Option<u16>)> {
let skip = if include_row_id { 0 } else { 1 };
let mut vals = Vec::with_capacity(schema.col_count() - skip);
for (col_idx, _) in schema.cols().iter().enumerate().skip(skip) {
vals.push(self.clone_val_with_var_offset(schema, col_idx));
}
vals
}

/// Clone values for given read set. (row id is excluded)
#[inline]
fn clone_vals_for_read_set(&self, schema: &Schema, user_read_set: &[usize]) -> Vec<Val> {
Expand Down Expand Up @@ -931,11 +988,16 @@ pub trait RowRead {

/// Returns the old value if different from given index and new value.
#[inline]
fn user_different(&self, schema: &Schema, user_col_idx: usize, value: &Val) -> Option<Val> {
fn user_different(
&self,
schema: &Schema,
user_col_idx: usize,
value: &Val,
) -> Option<(Val, Option<u16>)> {
if !self.is_user_different(schema, user_col_idx, value) {
return None;
}
Some(self.clone_user_val(schema, user_col_idx))
Some(self.clone_user_val_with_var_offset(schema, user_col_idx))
}
}

Expand Down Expand Up @@ -1149,7 +1211,7 @@ mod tests {

#[test]
fn test_row_page_init() {
let schema = Schema::new(vec![Layout::Byte8], 0);
let schema = Schema::new(vec![Layout::Byte4], 0);
let mut page = create_row_page();
page.init(100, 105, &schema);
println!("page header={:?}", page.header);
Expand All @@ -1166,7 +1228,7 @@ mod tests {

#[test]
fn test_row_page_new_row() {
let schema = Schema::new(vec![Layout::Byte8], 0);
let schema = Schema::new(vec![Layout::Byte4], 0);
let mut page = create_row_page();
page.init(100, 200, &schema);
assert!(page.header.row_count() == 0);
Expand Down
Loading

0 comments on commit 1220556

Please sign in to comment.