Skip to content

Commit

Permalink
feat(services/redb): support redb service (apache#2526)
Browse files Browse the repository at this point in the history
* feat(services/redb): support redb service

Signed-off-by: owl <[email protected]>

* feat(services/redb): add redb workflow

Signed-off-by: owl <[email protected]>

* feat(services/redb): add redb workflow

Signed-off-by: owl <[email protected]>

* feat(services/redb): fix code

Signed-off-by: owl <[email protected]>

* feat(services/redb): fix code

Signed-off-by: owl <[email protected]>

* feat(services/redb): fix code

Signed-off-by: owl <[email protected]>

* feat(services/redb): fix code

Signed-off-by: owl <[email protected]>

* feat(services/redb): fix code

Signed-off-by: owl <[email protected]>

* feat(services/redb): fix code

Signed-off-by: owl <[email protected]>

---------

Signed-off-by: owl <[email protected]>
  • Loading branch information
oowl authored Jun 25, 2023
1 parent ad6401c commit 65898d3
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,7 @@ OPENDAL_WASABI_BUCKET=<bucket>
OPENDAL_WASABI_ENDPOINT=<endpoint>
OPENDAL_WASABI_ACCESS_KEY_ID=<ak>
OPENDAL_WASABI_SECRET_ACCESS_KEY=<sk>
# redb
OPENDAL_REDB_TEST=false
OPENDAL_REDB_DATADIR=/tmp/redb
OPENDAL_REDB_TABLE=redb-table
62 changes: 62 additions & 0 deletions .github/workflows/service_test_redb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Service Test Redb

on:
push:
branches:
- main
pull_request:
branches:
- main
paths:
- "core/src/**"
- "core/tests/**"
- "!core/src/docs/**"
- "!core/src/services/**"
- "core/src/services/redb/**"
- ".github/workflows/redb.yml"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
cancel-in-progress: true

jobs:
redb:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Rust toolchain
uses: ./.github/actions/setup
- name: Prepare for tests
shell: bash
working-directory: core
run: mkdir -p $OPENDAL_REDB_DATADIR
env:
OPENDAL_REDB_DATADIR: /tmp/opendal/
- name: Test
shell: bash
working-directory: core
run: cargo test redb --features services-redb -j=1
env:
RUST_BACKTRACE: full
RUST_LOG: debug
OPENDAL_REDB_TEST: on
OPENDAL_REDB_ROOT: /
OPENDAL_REDB_DATADIR: /tmp/opendal/redb
OPENDAL_REDB_TABLE: redb-table
25 changes: 23 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ services-wasabi = [
]
services-webdav = []
services-webhdfs = []
services-redb = ["dep:redb"]

[lib]
bench = false
Expand Down Expand Up @@ -221,6 +222,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = { version = "0.10", optional = true }
sled = { version = "0.34.7", optional = true }
redb = { version = "1.0.0", optional = true }
suppaftp = { version = "4.5", default-features = false, features = [
"async-secure",
"async-rustls",
Expand Down
5 changes: 5 additions & 0 deletions core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,8 @@ pub use webhdfs::Webhdfs;
mod vercel_artifacts;
#[cfg(feature = "services-vercel-artifacts")]
pub use vercel_artifacts::VercelArtifacts;

#[cfg(feature = "services-redb")]
mod redb;
#[cfg(feature = "services-redb")]
pub use self::redb::Redb;
224 changes: 224 additions & 0 deletions core/src/services/redb/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use async_trait::async_trait;
use redb::ReadableTable;

use crate::raw::adapters::kv;
use crate::Builder;
use crate::Error;
use crate::ErrorKind;
use crate::Scheme;
use crate::*;

/// Redb service support.
#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct RedbBuilder {
/// That path to the redb data directory.
datadir: Option<String>,
root: Option<String>,
table: Option<String>,
}

impl RedbBuilder {
/// Set the path to the redb data directory. Will create if not exists.
pub fn datadir(&mut self, path: &str) -> &mut Self {
self.datadir = Some(path.into());
self
}

/// Set the table name for Redb.
pub fn table(&mut self, table: &str) -> &mut Self {
self.table = Some(table.into());
self
}

/// Set the root for Redb.
pub fn root(&mut self, path: &str) -> &mut Self {
self.root = Some(path.into());
self
}
}

impl Builder for RedbBuilder {
const SCHEME: Scheme = Scheme::Redb;
type Accessor = RedbBackend;

fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = RedbBuilder::default();

map.get("datadir").map(|v| builder.datadir(v));
map.get("table").map(|v| builder.table(v));
map.get("root").map(|v| builder.root(v));

builder
}

fn build(&mut self) -> Result<Self::Accessor> {
let datadir_path = self.datadir.take().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
.with_context("service", Scheme::Redb)
})?;

let table_name = self.table.take().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "table is required but not set")
.with_context("service", Scheme::Redb)
})?;

let db = redb::Database::create(&datadir_path).map_err(parse_database_error)?;

let db = Arc::new(db);

Ok(RedbBackend::new(Adapter {
datadir: datadir_path,
table: table_name,
db,
})
.with_root(self.root.as_deref().unwrap_or_default()))
}
}

/// Backend for Redb services.
pub type RedbBackend = kv::Backend<Adapter>;

#[derive(Clone)]
pub struct Adapter {
datadir: String,
table: String,
db: Arc<redb::Database>,
}

impl Debug for Adapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("Adapter");
ds.field("path", &self.datadir);
ds.finish()
}
}

#[async_trait]
impl kv::Adapter for Adapter {
fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Redb,
&self.datadir,
Capability {
read: true,
write: true,
blocking: true,
..Default::default()
},
)
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
self.blocking_get(path)
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

let table = read_txn
.open_table(table_define)
.map_err(parse_table_error)?;

let result = match table.get(path) {
Ok(Some(v)) => Ok(Some(v.value().to_vec())),
Ok(None) => Ok(None),
Err(e) => Err(parse_storage_error(e)),
};
result
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
self.blocking_set(path, value)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

{
let mut table = write_txn
.open_table(table_define)
.map_err(parse_table_error)?;

table.insert(path, value).map_err(parse_storage_error)?;
}

write_txn.commit().map_err(parse_commit_error)?;
Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
self.blocking_delete(path)
}

fn blocking_delete(&self, path: &str) -> Result<()> {
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

{
let mut table = write_txn
.open_table(table_define)
.map_err(parse_table_error)?;

table.remove(path).map_err(parse_storage_error)?;
}

write_txn.commit().map_err(parse_commit_error)?;
Ok(())
}
}

fn parse_transaction_error(e: redb::TransactionError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}

fn parse_table_error(e: redb::TableError) -> Error {
match e {
redb::TableError::TableDoesNotExist(_) => {
Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
}
_ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e),
}
}

fn parse_storage_error(e: redb::StorageError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}

fn parse_database_error(e: redb::DatabaseError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}

fn parse_commit_error(e: redb::CommitError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}
Loading

0 comments on commit 65898d3

Please sign in to comment.