Skip to content

Commit

Permalink
refactor(services/sqlite): Polish sqlite via adding connection pool (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Oct 9, 2023
1 parent 5d4966e commit 593a789
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 48 deletions.
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,9 @@ OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token>
OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token>
OPENDAL_GDRIVE_CLIENT_ID=<client_id>
OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret>
# sqlite
OPENDAL_SQLITE_TEST=on
OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db
OPENDAL_SQLITE_TABLE=data
OPENDAL_SQLITE_KEY_FIELD=key
OPENDAL_SQLITE_VALUE_FIELD=data
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ services-memcached = ["dep:bb8"]
services-memory = []
services-mini-moka = ["dep:mini-moka"]
services-moka = ["dep:moka"]
services-mysql = ["dep:mysql_async"]
services-obs = [
"dep:reqsign",
"reqsign?/services-huaweicloud",
Expand All @@ -166,6 +167,7 @@ services-s3 = [
]
services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"]
services-sled = ["dep:sled"]
services-sqlite = ["dep:rusqlite", "dep:r2d2"]
services-supabase = []
services-tikv = ["tikv-client"]
services-vercel-artifacts = []
Expand All @@ -176,8 +178,6 @@ services-wasabi = [
]
services-webdav = []
services-webhdfs = []
services-mysql = ["dep:mysql_async"]
services-sqlite = ["dep:rusqlite"]

[lib]
bench = false
Expand Down Expand Up @@ -206,6 +206,7 @@ await-tree = { version = "0.1.1", optional = true }
backon = "0.4.1"
base64 = "0.21"
bb8 = { version = "0.8", optional = true }
bb8-postgres = { version = "0.8.1", optional = true }
bytes = "1.4"
cacache = { version = "11.6", default-features = false, features = [
"tokio-runtime",
Expand Down Expand Up @@ -235,6 +236,7 @@ metrics = { version = "0.20", optional = true }
mini-moka = { version = "0.10", optional = true }
minitrace = { version = "0.5", optional = true }
moka = { version = "0.10", optional = true, features = ["future"] }
mysql_async = { version = "0.32.2", optional = true }
once_cell = "1"
openssh = { version = "0.9.9", optional = true }
openssh-sftp-client = { version = "0.13.9", optional = true, features = [
Expand All @@ -250,6 +252,7 @@ prometheus = { version = "0.13", features = ["process"], optional = true }
prometheus-client = { version = "0.21.2", optional = true }
prost = { version = "0.11", optional = true }
quick-xml = { version = "0.30", features = ["serialize", "overlapped-lists"] }
r2d2 = { version = "0.8", optional = true }
rand = { version = "0.8", optional = true }
redb = { version = "1.1.0", optional = true }
redis = { version = "0.23.1", features = [
Expand All @@ -262,6 +265,7 @@ reqwest = { version = "0.11.18", features = [
"stream",
], default-features = false }
rocksdb = { version = "0.21.0", default-features = false, optional = true }
rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = { version = "0.10", optional = true }
Expand All @@ -275,9 +279,6 @@ tokio = "1.27"
tokio-postgres = { version = "0.7.8", optional = true }
tracing = { version = "0.1", optional = true }
uuid = { version = "1", features = ["serde", "v4"] }
mysql_async = { version = "0.32.2", optional = true }
bb8-postgres = { version = "0.8.1", optional = true }
rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }

[dev-dependencies]
criterion = { version = "0.4", features = ["async", "async_tokio"] }
Expand Down
3 changes: 3 additions & 0 deletions core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub use serde_util::*;
mod chrono_util;
pub use chrono_util::*;

mod tokio_util;
pub use tokio_util::*;

// Expose as a pub mod to avoid confusing.
pub mod adapters;
pub mod oio;
23 changes: 23 additions & 0 deletions core/src/raw/tokio_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::*;

/// Parse tokio error into opendal::Error.
pub fn new_task_join_error(e: tokio::task::JoinError) -> Error {
Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)
}
102 changes: 60 additions & 42 deletions core/src/services/sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ impl Builder for SqliteBuilder {
.unwrap_or_else(|| "/".to_string())
.as_str(),
);
let mgr = SqliteConnectionManager { connection_string };
let pool = r2d2::Pool::new(mgr).map_err(|err| {
Error::new(ErrorKind::Unexpected, "sqlite pool init failed").set_source(err)
})?;

Ok(SqliteBackend::new(Adapter {
connection_string,
pool,
table,
key_field,
value_field,
Expand All @@ -165,11 +170,33 @@ impl Builder for SqliteBuilder {
}
}

struct SqliteConnectionManager {
connection_string: String,
}

impl r2d2::ManageConnection for SqliteConnectionManager {
type Connection = Connection;
type Error = Error;

fn connect(&self) -> Result<Connection> {
Connection::open(&self.connection_string)
.map_err(|err| Error::new(ErrorKind::Unexpected, "sqlite open error").set_source(err))
}

fn is_valid(&self, conn: &mut Connection) -> Result<()> {
conn.execute_batch("").map_err(parse_rusqlite_error)
}

fn has_broken(&self, _: &mut Connection) -> bool {
false
}
}

pub type SqliteBackend = kv::Backend<Adapter>;

#[derive(Clone)]
pub struct Adapter {
connection_string: String,
pool: r2d2::Pool<SqliteConnectionManager>,

table: String,
key_field: String,
Expand All @@ -179,7 +206,6 @@ pub struct Adapter {
impl Debug for Adapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("SqliteAdapter");
ds.field("connection_string", &self.connection_string);
ds.field("table", &self.table);
ds.field("key_field", &self.key_field);
ds.field("value_field", &self.value_field);
Expand All @@ -198,93 +224,85 @@ impl kv::Adapter for Adapter {
write: true,
create_dir: true,
delete: true,
blocking: true,
..Default::default()
},
)
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let cloned_path = path.to_string();
let cloned_self = self.clone();
let this = self.clone();
let path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
task::spawn_blocking(move || this.blocking_get(&path))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
.map_err(new_task_join_error)?
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let conn = self.pool.get().map_err(parse_r2d2_error)?;

let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
self.value_field, self.table, self.key_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?;
let result = statement.query_row([path], |row| row.get(0));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(Error::from(err)),
Err(err) => Err(parse_rusqlite_error(err)),
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
let cloned_path = path.to_string();
let cloned_value = value.to_vec();
let cloned_self = self.clone();
let this = self.clone();
let path = path.to_string();
// FIXME: can we avoid this copy?
let value = value.to_vec();

task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
task::spawn_blocking(move || this.blocking_set(&path, &value))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
.map_err(new_task_join_error)?
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
let conn = self.pool.get().map_err(parse_r2d2_error)?;

let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
self.table, self.key_field, self.value_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?;
statement
.execute(params![path, value])
.map_err(Error::from)?;
.map_err(parse_rusqlite_error)?;
Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
let cloned_path = path.to_string();
let cloned_self = self.clone();
let this = self.clone();
let path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
task::spawn_blocking(move || this.blocking_delete(&path))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
.map_err(new_task_join_error)?
}

fn blocking_delete(&self, path: &str) -> Result<()> {
let conn = Connection::open(self.connection_string.clone()).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err)
})?;
let conn = self.pool.get().map_err(parse_r2d2_error)?;

let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table, self.key_field);
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement.execute([path]).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?;
statement.execute([path]).map_err(parse_rusqlite_error)?;
Ok(())
}
}

impl From<rusqlite::Error> for Error {
fn from(value: rusqlite::Error) -> Error {
Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(value)
}
fn parse_rusqlite_error(err: rusqlite::Error) -> Error {
Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(err)
}

impl From<task::JoinError> for Error {
fn from(value: task::JoinError) -> Error {
Error::new(
ErrorKind::Unexpected,
"unhandled error from sqlite when spawning task",
)
.set_source(value)
}
fn parse_r2d2_error(err: r2d2::Error) -> Error {
Error::new(ErrorKind::Unexpected, "unhandled error from r2d2").set_source(err)
}
2 changes: 1 addition & 1 deletion core/tests/behavior/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
tests.extend(behavior_test::<services::Atomicserver>());
#[cfg(feature = "services-azblob")]
tests.extend(behavior_test::<services::Azblob>());
#[cfg(feature = "services-Azdls")]
#[cfg(feature = "services-azdls")]
tests.extend(behavior_test::<services::Azdls>());
#[cfg(feature = "services-cacache")]
tests.extend(behavior_test::<services::Cacache>());
Expand Down

0 comments on commit 593a789

Please sign in to comment.