Skip to content

Commit

Permalink
add catalog interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhe committed Feb 14, 2025
1 parent 22ec4d9 commit 51a32fc
Show file tree
Hide file tree
Showing 13 changed files with 494 additions and 192 deletions.
11 changes: 3 additions & 8 deletions doradb-catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
pub mod error;
pub mod mem_impl;
pub mod object;

pub use object::*;

use crate::error::Result;
use bitflags::bitflags;
use doradb_datatype::PreciseType;
use semistr::SemiStr;
use std::fmt;
use std::hash::Hash;
use std::marker::PhantomData;

/// Catalog maintains metadata of all database objects.
/// It could be shared between threads.
pub trait Catalog: Send + Sync {
Expand Down Expand Up @@ -45,11 +45,6 @@ pub trait Catalog: Send + Sync {
fn find_keys(&self, table_id: &TableID) -> Vec<Key>;
}

pub type ObjID = u64;
pub type TableID = ObjID;
pub type SchemaID = ObjID;
pub type ColumnID = ObjID;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Schema {
pub id: SchemaID,
Expand Down
61 changes: 61 additions & 0 deletions doradb-catalog/src/object.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use bitflags::bitflags;
use doradb_datatype::PreciseType;
use semistr::SemiStr;

pub type ObjID = u64;
pub type TableID = ObjID;
pub type SchemaID = ObjID;
pub type ColumnID = ObjID;
pub type IndexID = ObjID;

#[derive(Debug)]
pub struct SchemaObject {
pub schema_id: SchemaID,
pub schema_name: SemiStr,
}

#[derive(Debug)]
pub struct TableObject {
pub table_id: TableID,
pub schema_id: SchemaID,
pub table_name: SemiStr,
}

#[derive(Debug)]
pub struct ColumnObject {
pub column_id: ColumnID,
pub table_id: TableID,
pub column_type: PreciseType,
pub column_attributes: ColumnAttributes,
}

#[derive(Debug)]
pub struct IndexObject {
pub index_id: IndexID,
pub index_name: SemiStr,
pub table_id: TableID,
pub index_attributes: IndexAttributes,
}

#[derive(Debug)]
pub struct IndexColumnObject {
pub column_id: ColumnID,
pub index_id: IndexID,
pub descending: bool,
}

bitflags! {
pub struct ColumnAttributes: u32 {
// whether value can be null.
const NULLABLE = 0x01;
// whether it belongs to any index.
const INDEX = 0x02;
}
}

bitflags! {
pub struct IndexAttributes: u32 {
const PK = 0x01;
const UK = 0x02;
}
}
2 changes: 2 additions & 0 deletions doradb-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ repository = "https://github.com/jiangzhe/doradb/doradb-storage/"

[dependencies]
doradb-datatype = { version = "0.1.0", path = "../doradb-datatype" }
doradb-catalog = { version = "0.1.0", path = "../doradb-catalog" }
smallvec = {version = "1.8", features = ["union"]}
thiserror = "1.0"
bitflags = "1.3"
Expand All @@ -23,6 +24,7 @@ bincode = { version = "2.0.0-rc", features = ["serde"] }
flume = { version = "0.11", features = ["select"] }
either = "1.13"
event-listener = "5.4"
semistr = "0.1"

[dev-dependencies]
rand = "0.8"
Expand Down
35 changes: 35 additions & 0 deletions doradb-storage/src/catalog/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
pub struct IndexSchema {
pub keys: Vec<IndexKey>,
pub unique: bool,
}

impl IndexSchema {
#[inline]
pub fn new(keys: Vec<IndexKey>, unique: bool) -> Self {
debug_assert!(!keys.is_empty());
IndexSchema { keys, unique }
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct IndexKey {
pub user_col_idx: u16,
pub order: IndexOrder,
}

impl IndexKey {
#[inline]
pub fn new(user_col_idx: u16) -> Self {
IndexKey {
user_col_idx,
order: IndexOrder::Asc,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum IndexOrder {
Asc,
Desc,
}
120 changes: 120 additions & 0 deletions doradb-storage/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
mod index;
mod storage;
mod table;

pub use index::*;
pub use storage::*;
pub use table::*;

use crate::buffer::BufferPool;
use crate::index::{BlockIndex, SecondaryIndex};
use crate::lifetime::StaticLifetime;
use crate::table::{Table, TableID};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

/// Catalog contains metadata of user tables.
/// Initial implementation would be a in-mem hash-table.
pub struct Catalog<P: BufferPool> {
table_id: AtomicU64,
tables: Mutex<HashMap<TableID, TableMeta<P>>>,
}

impl<P: BufferPool> Catalog<P> {
#[inline]
pub fn empty() -> Self {
Catalog {
table_id: AtomicU64::new(1),
tables: Mutex::new(HashMap::new()),
}
}

#[inline]
pub fn empty_static() -> &'static Self {
let cat = Self::empty();
StaticLifetime::new_static(cat)
}

#[inline]
pub fn create_table(&self, buf_pool: &P, schema: TableSchema) -> TableID {
let table_id = self.table_id.fetch_add(1, Ordering::SeqCst);
let blk_idx = BlockIndex::new(buf_pool).unwrap();
let sec_idx: Vec<_> = schema
.indexes
.iter()
.enumerate()
.map(|(index_no, index_schema)| {
SecondaryIndex::new(index_no, index_schema, schema.user_types())
})
.collect();

let mut g = self.tables.lock();
let res = g.insert(
table_id,
TableMeta {
schema: Arc::new(schema),
blk_idx: Arc::new(blk_idx),
sec_idx: Arc::from(sec_idx.into_boxed_slice()),
},
);
debug_assert!(res.is_none());
table_id
}

#[inline]
pub fn get_table(&self, table_id: TableID) -> Option<Table<P>> {
let g = self.tables.lock();
g.get(&table_id).map(|meta| Table {
table_id,
schema: Arc::clone(&meta.schema),
blk_idx: Arc::clone(&meta.blk_idx),
sec_idx: Arc::clone(&meta.sec_idx),
})
}
}

unsafe impl<P: BufferPool> StaticLifetime for Catalog<P> {}

pub struct TableCache<'a, P: BufferPool> {
catalog: &'a Catalog<P>,
map: HashMap<TableID, Option<Table<P>>>,
}

impl<'a, P: BufferPool> TableCache<'a, P> {
#[inline]
pub fn new(catalog: &'a Catalog<P>) -> Self {
TableCache {
catalog,
map: HashMap::new(),
}
}

#[inline]
pub fn get_table(&mut self, table_id: TableID) -> &Option<Table<P>> {
if self.map.contains_key(&table_id) {
return &self.map[&table_id];
}
let table = self.catalog.get_table(table_id);
self.map.entry(table_id).or_insert(table)
}
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::value::ValKind;

/// Table1 has single i32 column, with unique index of this column.
#[inline]
pub(crate) fn table1<P: BufferPool>(buf_pool: &P, catalog: &Catalog<P>) -> TableID {
catalog.create_table(
buf_pool,
TableSchema::new(
vec![ValKind::I32.nullable(false)],
vec![IndexSchema::new(vec![IndexKey::new(0)], true)],
),
)
}
}
113 changes: 113 additions & 0 deletions doradb-storage/src/catalog/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::buffer::BufferPool;
use crate::catalog::index::{IndexKey, IndexSchema};
use crate::catalog::table::TableSchema;
use crate::row::ops::{SelectKey, SelectUncommitted};
use crate::row::RowRead;
use crate::table::Table;
use crate::value::{Val, ValKind};
use doradb_catalog::{
ColumnObject, IndexColumnObject, IndexObject, SchemaID, SchemaObject, TableID, TableObject,
};
use semistr::SemiStr;

pub const TABLE_ID_SCHEMAS: TableID = 0;
pub const TABLE_ID_TABLES: TableID = 1;
pub const TABLE_ID_COLUMNS: TableID = 2;
pub const TABLEID_INDEXES: TableID = 3;

/// Metadata storage interface
pub trait MetadataStorage<P: BufferPool>: Sized {
type Object;
type ObjID;

/// Create a new metadata persitence instance.
fn new(buf_pool: &P) -> Self;

/// Find object by name.
fn find(&self, buf_pool: &P, name: &str) -> Option<Self::Object>;

/// Find object by id.
fn find_by_id(&self, buf_pool: &P, id: Self::ObjID) -> Option<Self::Object>;

/// Insert object.
fn insert(&self, buf_pool: &P, obj: Self::Object) -> bool;

/// Delete object by id.
fn delete_by_id(&self, buf_pool: &P, id: Self::ObjID) -> bool;
}

#[inline]
fn schema_of_schemas() -> TableSchema {
TableSchema::new(
vec![
// schema_id bigint primary key not null
ValKind::I64.nullable(false),
// schema_name string unique not null
ValKind::VarByte.nullable(false),
],
vec![
// unique key idx_schemas_schema_id (schema_id)
IndexSchema::new(vec![IndexKey::new(0)], true),
// unique key idx_schemas_schema_name (schema_name)
IndexSchema::new(vec![IndexKey::new(1)], true),
],
)
}

const COL_NO_SCHEMAS_SCHEMA_ID: usize = 0;
const COL_NO_SCHEMAS_SCHEMA_NAME: usize = 1;
const INDEX_NO_SCHEMAS_SCHEMA_ID: usize = 0;
const INDEX_NO_SCHEMAS_SCHEMA_NAME: usize = 1;

pub struct Schemas<P> {
table: Table<P>,
}

impl<P: BufferPool> MetadataStorage<P> for Schemas<P> {
type Object = SchemaObject;
type ObjID = SchemaID;

#[inline]
fn new(buf_pool: &P) -> Self {
let table = Table::new(buf_pool, TABLE_ID_SCHEMAS, schema_of_schemas());
Schemas { table }
}

#[inline]
fn find(&self, buf_pool: &P, name: &str) -> Option<SchemaObject> {
let name = Val::from(name);
let key = SelectKey::new(INDEX_NO_SCHEMAS_SCHEMA_NAME, vec![name]);
self.table.select_row_uncommitted(buf_pool, &key, |row| {
let schema_id = row.user_val::<u64>(COL_NO_SCHEMAS_SCHEMA_ID);
let schema_name = row.user_str(COL_NO_SCHEMAS_SCHEMA_NAME);
SchemaObject {
schema_id: *schema_id,
schema_name: SemiStr::new(schema_name),
}
})
}

#[inline]
fn find_by_id(&self, buf_pool: &P, id: SchemaID) -> Option<Self::Object> {
let id = Val::from(id);
let key = SelectKey::new(INDEX_NO_SCHEMAS_SCHEMA_ID, vec![id]);
self.table.select_row_uncommitted(buf_pool, &key, |row| {
let schema_id = row.user_val::<u64>(COL_NO_SCHEMAS_SCHEMA_ID);
let schema_name = row.user_str(COL_NO_SCHEMAS_SCHEMA_NAME);
SchemaObject {
schema_id: *schema_id,
schema_name: SemiStr::new(schema_name),
}
})
}

#[inline]
fn insert(&self, buf_pool: &P, obj: Self::Object) -> bool {
todo!()
}

#[inline]
fn delete_by_id(&self, buf_pool: &P, id: Self::ObjID) -> bool {
todo!()
}
}
Loading

0 comments on commit 51a32fc

Please sign in to comment.