From eb4b13622d0b420e1c9a0bbfebe7df131b0f49fc Mon Sep 17 00:00:00 2001 From: taobo Date: Fri, 13 Oct 2023 03:51:29 -0500 Subject: [PATCH] feat(service/d1): Support d1 for opendal (#3248) * feat(service/d1): Support d1 for opendal * refactor: optimize code * fix: delete extra code * fix: optimize code * refactor: errors and get method * fix: let clippy http * fix: let clippy happy --- .env.example | 8 + core/Cargo.toml | 1 + core/src/services/d1/backend.rs | 331 ++++++++++++++++++++++++++++++++ core/src/services/d1/docs.md | 61 ++++++ core/src/services/d1/error.rs | 83 ++++++++ core/src/services/d1/mod.rs | 21 ++ core/src/services/d1/model.rs | 141 ++++++++++++++ core/src/services/mod.rs | 5 + core/src/types/scheme.rs | 4 + core/tests/behavior/main.rs | 2 + 10 files changed, 657 insertions(+) create mode 100644 core/src/services/d1/backend.rs create mode 100644 core/src/services/d1/docs.md create mode 100644 core/src/services/d1/error.rs create mode 100644 core/src/services/d1/mod.rs create mode 100644 core/src/services/d1/model.rs diff --git a/.env.example b/.env.example index 79211cb97a46..b42f03bddcf0 100644 --- a/.env.example +++ b/.env.example @@ -178,3 +178,11 @@ OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db OPENDAL_SQLITE_TABLE=data OPENDAL_SQLITE_KEY_FIELD=key OPENDAL_SQLITE_VALUE_FIELD=data +# d1 +OPENDAL_D1_TEST=false +OPENDAL_D1_TOKEN= +OPENDAL_D1_ACCOUNT_ID= +OPENDAL_D1_DATABASE_ID= +OPENDAL_D1_TABLE= +OPENDAL_D1_KEY_FIELD= +OPENDAL_D1_VALUE_FIELD= diff --git a/core/Cargo.toml b/core/Cargo.toml index c5ca1b20cd75..5e20e949e731 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -120,6 +120,7 @@ services-cos = [ "reqsign?/services-tencent", "reqsign?/reqwest_request", ] +services-d1 = [] services-dashmap = ["dep:dashmap"] services-dropbox = [] services-etcd = ["dep:etcd-client", "dep:bb8"] diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs new file mode 100644 index 000000000000..faf0c472efe2 --- /dev/null +++ b/core/src/services/d1/backend.rs @@ -0,0 +1,331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + +use async_trait::async_trait; +use http::header; +use http::Request; +use http::StatusCode; +use serde_json::Value; + +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::ErrorKind; +use crate::*; + +use super::error::parse_error; +use super::model::D1Response; + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct D1Builder { + token: Option, + account_id: Option, + database_id: Option, + + http_client: Option, + root: Option, + + table: Option, + key_field: Option, + value_field: Option, +} + +impl Debug for D1Builder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("D1Builder"); + ds.field("root", &self.root); + ds.field("table", &self.table); + ds.field("key_field", &self.key_field); + ds.field("value_field", &self.value_field); + ds.finish() + } +} + +impl D1Builder { + /// Set api token for the cloudflare d1 service. + /// + /// create a api token from [here](https://dash.cloudflare.com/profile/api-tokens) + pub fn token(&mut self, token: &str) -> &mut Self { + if !token.is_empty() { + self.token = Some(token.to_string()); + } + self + } + + /// Set the account identifier for the cloudflare d1 service. + /// + /// get the account identifier from Workers & Pages -> Overview -> Account ID + /// If not specified, it will return an error when building. + pub fn account_id(&mut self, account_id: &str) -> &mut Self { + if !account_id.is_empty() { + self.account_id = Some(account_id.to_string()); + } + self + } + + /// Set the database identifier for the cloudflare d1 service. + /// + /// get the database identifier from Workers & Pages -> D1 -> [Your Database] -> Database ID + /// If not specified, it will return an error when building. + pub fn database_id(&mut self, database_id: &str) -> &mut Self { + if !database_id.is_empty() { + self.database_id = Some(database_id.to_string()); + } + self + } + + /// set the working directory, all operations will be performed under it. + /// + /// default: "/" + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_owned()); + } + self + } + + /// Set the table name of the d1 service to read/write. + /// + /// If not specified, it will return an error when building. + pub fn table(&mut self, table: &str) -> &mut Self { + if !table.is_empty() { + self.table = Some(table.to_owned()); + } + self + } + + /// Set the key field name of the d1 service to read/write. + /// + /// Default to `key` if not specified. + pub fn key_field(&mut self, key_field: &str) -> &mut Self { + if !key_field.is_empty() { + self.key_field = Some(key_field.to_string()); + } + self + } + + /// Set the value field name of the d1 service to read/write. + /// + /// Default to `value` if not specified. + pub fn value_field(&mut self, value_field: &str) -> &mut Self { + if !value_field.is_empty() { + self.value_field = Some(value_field.to_string()); + } + self + } +} + +impl Builder for D1Builder { + const SCHEME: Scheme = Scheme::D1; + type Accessor = D1Backend; + + fn from_map(map: HashMap) -> Self { + let mut builder = D1Builder::default(); + map.get("token").map(|v| builder.token(v)); + map.get("account_id").map(|v| builder.account_id(v)); + map.get("database_id").map(|v| builder.database_id(v)); + + map.get("root").map(|v| builder.root(v)); + map.get("table").map(|v| builder.table(v)); + map.get("key_field").map(|v| builder.key_field(v)); + map.get("value_field").map(|v| builder.value_field(v)); + builder + } + + fn build(&mut self) -> Result { + let mut authorization = None; + if let Some(token) = &self.token { + authorization = Some(format_authorization_by_bearer(token)?) + } + + let Some(account_id) = self.account_id.clone() else { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "account_id is required", + )); + }; + + let Some(database_id) = self.database_id.clone() else { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "database_id is required", + )); + }; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::D1) + })? + }; + + let Some(table) = self.table.clone() else { + return Err(Error::new(ErrorKind::ConfigInvalid, "table is required")); + }; + + let key_field = self.key_field.clone().unwrap_or_else(|| "key".to_string()); + + let value_field = self + .value_field + .clone() + .unwrap_or_else(|| "value".to_string()); + + let root = normalize_root( + self.root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + Ok(D1Backend::new(Adapter { + authorization, + account_id, + database_id, + client, + table, + key_field, + value_field, + }) + .with_root(&root)) + } +} + +pub type D1Backend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + authorization: Option, + account_id: String, + database_id: String, + + client: HttpClient, + table: String, + key_field: String, + value_field: String, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("D1Adapter"); + ds.field("table", &self.table); + ds.field("key_field", &self.key_field); + ds.field("value_field", &self.value_field); + ds.finish() + } +} + +impl Adapter { + fn create_d1_query_request(&self, sql: &str, params: Vec) -> Result> { + let p = format!( + "/accounts/{}/d1/database/{}/query", + self.account_id, self.database_id + ); + let url: String = format!( + "{}{}", + "https://api.cloudflare.com/client/v4", + percent_encode_path(&p) + ); + + let mut req = Request::post(&url); + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + req = req.header(header::CONTENT_TYPE, "application/json"); + + let json = serde_json::json!({ + "sql": sql, + "params": params, + }); + + let body = serde_json::to_vec(&json).map_err(new_json_serialize_error)?; + req.body(AsyncBody::Bytes(body.into())) + .map_err(new_request_build_error) + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::D1, + &self.table, + Capability { + read: true, + write: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result>> { + let query = format!( + "SELECT {} FROM {} WHERE {} = ? LIMIT 1", + self.value_field, self.table, self.key_field + ); + let req = self.create_d1_query_request(&query, vec![path.into()])?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let body = resp.into_body().bytes().await?; + let d1_response = D1Response::parse(&body)?; + Ok(d1_response.get_result(&self.value_field)) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + let table = &self.table; + let key_field = &self.key_field; + let value_field = &self.value_field; + let query = format!( + "INSERT INTO {table} ({key_field}, {value_field}) \ + VALUES (?, ?) \ + ON CONFLICT ({key_field}) \ + DO UPDATE SET {value_field} = EXCLUDED.{value_field}", + ); + + let params = vec![path.into(), value.into()]; + let req = self.create_d1_query_request(&query, params)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()), + _ => Err(parse_error(resp).await?), + } + } + + async fn delete(&self, path: &str) -> Result<()> { + let query = format!("DELETE FROM {} WHERE {} = ?", self.table, self.key_field); + let req = self.create_d1_query_request(&query, vec![path.into()])?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()), + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/d1/docs.md b/core/src/services/d1/docs.md new file mode 100644 index 000000000000..17e3c71232cc --- /dev/null +++ b/core/src/services/d1/docs.md @@ -0,0 +1,61 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] ~~list~~ +- [ ] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `token`: Set the token of cloudflare api +- `account_identifier`: Set the account identifier of d1 +- `database_identifier`: Set the database identifier of d1 +- `endpoint`: Set the endpoint of d1 service +- `table`: Set the table name of the d1 service to read/write +- `key_field`: Set the key field of d1 +- `value_field`: Set the value field of d1 + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::D1; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = D1::default(); + builder + .token("token") + .account_id("account_id") + .database_id("database_id") + .table("table") + .key_field("key_field") + .value_field("value_field"); + + let op = Operator::new(builder)?.finish(); + let source_path = "ALFKI"; + // set value to d1 "opendal test value" as Vec + let value = "opendal test value".as_bytes(); + // write value to d1, the key is source_path + op.write(source_path, value).await?; + // read value from d1, the key is source_path + let v = op.read(source_path).await?; + assert_eq!(v, value); + // delete value from d1, the key is source_path + op.delete(source_path).await?; + Ok(()) +} +``` diff --git a/core/src/services/d1/error.rs b/core/src/services/d1/error.rs new file mode 100644 index 000000000000..2a2f75de49bc --- /dev/null +++ b/core/src/services/d1/error.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use http::StatusCode; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +use serde_json::de; + +use super::model::D1Error; +use super::model::D1Response; + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (mut kind, mut retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + // Some services (like owncloud) return 403 while file locked. + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, true), + // Allowing retry for resource locked. + StatusCode::LOCKED => (ErrorKind::Unexpected, true), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, d1_err) = de::from_reader::<_, D1Response>(bs.clone().reader()) + .map(|d1_err| (format!("{d1_err:?}"), Some(d1_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + if let Some(d1_err) = d1_err { + (kind, retryable) = parse_d1_error_code(d1_err.errors).unwrap_or((kind, retryable)); + } + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +pub fn parse_d1_error_code(errors: Vec) -> Option<(ErrorKind, bool)> { + if errors.is_empty() { + return None; + } + + match errors[0].code { + // The request is malformed: failed to decode id. + 7400 => Some((ErrorKind::Unexpected, false)), + // no such column: Xxxx. + 7500 => Some((ErrorKind::NotFound, false)), + // Authentication error. + 10000 => Some((ErrorKind::PermissionDenied, false)), + _ => None, + } +} diff --git a/core/src/services/d1/mod.rs b/core/src/services/d1/mod.rs new file mode 100644 index 000000000000..9163a135c6c9 --- /dev/null +++ b/core/src/services/d1/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +mod error; +mod model; +pub use backend::D1Builder as D1; diff --git a/core/src/services/d1/model.rs b/core/src/services/d1/model.rs new file mode 100644 index 000000000000..c086af0d52f4 --- /dev/null +++ b/core/src/services/d1/model.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Error; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use std::fmt::Debug; + +/// response data from d1 +#[derive(Deserialize, Debug)] +pub struct D1Response { + pub result: Vec, + pub success: bool, + pub errors: Vec, + pub messages: Vec, +} + +impl D1Response { + pub fn parse(bs: &Bytes) -> Result { + let response: D1Response = serde_json::from_slice(bs).map_err(|e| { + Error::new( + crate::ErrorKind::Unexpected, + &format!("failed to parse error response: {}", e), + ) + })?; + + if !response.success { + return Err(Error::new( + crate::ErrorKind::Unexpected, + &String::from_utf8_lossy(bs), + )); + } + Ok(response) + } + + pub fn get_result(&self, key: &str) -> Option> { + if self.result.is_empty() || self.result[0].results.is_empty() { + return None; + } + let result = &self.result[0].results[0]; + let value = result.get(key); + + match value { + Some(Value::Array(s)) => { + let mut v = Vec::new(); + for i in s { + if let Value::Number(n) = i { + v.push(n.as_u64().unwrap() as u8); + } + } + Some(v) + } + _ => None, + } + } +} + +#[derive(Deserialize, Debug)] +pub struct D1Result { + pub meta: Meta, + pub results: Vec>, + pub success: bool, +} + +#[derive(Deserialize, Debug)] +pub struct Meta { + pub served_by: String, + pub duration: f64, + pub changes: i32, + pub last_row_id: i32, + pub changed_db: bool, + pub size_after: i32, + pub rows_read: i32, + pub rows_written: i32, +} + +#[derive(Clone, Deserialize, Debug, Serialize)] +pub struct D1Error { + pub message: String, + pub code: i32, +} + +#[derive(Deserialize, Debug)] +pub struct D1Message { + pub message: String, + pub code: i32, +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_deserialize_get_object_json_response() { + let data = r#" + { + "result": [ + { + "results": [ + { + "CustomerId": "4", + "CompanyName": "Around the Horn", + "ContactName": "Thomas Hardy" + } + ], + "success": true, + "meta": { + "served_by": "v3-prod", + "duration": 0.2147, + "changes": 0, + "last_row_id": 0, + "changed_db": false, + "size_after": 2162688, + "rows_read": 3, + "rows_written": 2 + } + } + ], + "success": true, + "errors": [], + "messages": [] + }"#; + let response: D1Response = serde_json::from_str(data).unwrap(); + println!("{:?}", response.result[0].results[0]); + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 5293aaa443e9..93da8b917b0a 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -228,3 +228,8 @@ pub use self::mysql::Mysql; mod sqlite; #[cfg(feature = "services-sqlite")] pub use self::sqlite::Sqlite; + +#[cfg(feature = "services-d1")] +mod d1; +#[cfg(feature = "services-d1")] +pub use self::d1::D1; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index ec3298b7bd59..fad9d7f6e207 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -41,6 +41,8 @@ pub enum Scheme { Cacache, /// [cos][crate::services::Cos]: Tencent Cloud Object Storage services. Cos, + /// [d1][crate::services::D1]: D1 services + D1, /// [dashmap][crate::services::Dashmap]: dashmap backend support. Dashmap, /// [etcd][crate::services::Etcd]: Etcd Services @@ -159,6 +161,7 @@ impl FromStr for Scheme { "azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls), "cacache" => Ok(Scheme::Cacache), "cos" => Ok(Scheme::Cos), + "d1" => Ok(Scheme::D1), "dashmap" => Ok(Scheme::Dashmap), "dropbox" => Ok(Scheme::Dropbox), "etcd" => Ok(Scheme::Etcd), @@ -208,6 +211,7 @@ impl From for &'static str { Scheme::Azdls => "Azdls", Scheme::Cacache => "cacache", Scheme::Cos => "cos", + Scheme::D1 => "d1", Scheme::Dashmap => "dashmap", Scheme::Etcd => "etcd", Scheme::Fs => "fs", diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index de38efd149da..cc0bf667f154 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -185,6 +185,8 @@ fn main() -> anyhow::Result<()> { tests.extend(behavior_test::()); #[cfg(feature = "services-sqlite")] tests.extend(behavior_test::()); + #[cfg(feature = "services-d1")] + tests.extend(behavior_test::()); // Don't init logging while building operator which may break cargo // nextest output