From c96a170f1a878e688f23cdb5b21b21989575a5a7 Mon Sep 17 00:00:00 2001 From: taobo Date: Tue, 7 Nov 2023 15:51:41 +0800 Subject: [PATCH] feat: add mongodb gridfs service support (#3491) * feat: add mongodb gridfs service support * fix: behavior tests error * fix: optimize code * refactor: expose service name from GridFs to Gridfs --- .env.example | 5 + core/Cargo.toml | 1 + core/src/services/gridfs/backend.rs | 291 ++++++++++++++++++++++++++++ core/src/services/gridfs/docs.md | 47 +++++ core/src/services/gridfs/mod.rs | 19 ++ core/src/services/mod.rs | 5 + core/src/types/operator/builder.rs | 2 + core/src/types/scheme.rs | 4 + website/docs/services/gridfs.mdx | 68 +++++++ 9 files changed, 442 insertions(+) create mode 100644 core/src/services/gridfs/backend.rs create mode 100644 core/src/services/gridfs/docs.md create mode 100644 core/src/services/gridfs/mod.rs create mode 100644 website/docs/services/gridfs.mdx diff --git a/.env.example b/.env.example index e57e121c743b..32eee467bb45 100644 --- a/.env.example +++ b/.env.example @@ -161,3 +161,8 @@ OPENDAL_SWIFT_ACCOUNT= OPENDAL_SWIFT_CONTAINER= OPENDAL_SWIFT_ROOT=/path/to/dir OPENDAL_SWIFT_TOKEN= +# gridfs +OPENDAL_GRIDFS_CONNECTION_STRING=mongodb://localhost:27017 +OPENDAL_GRIDFS_DATABASE= +OPENDAL_GRIDFS_BUCKET= +OPENDAL_GRIDFS_CHUNK_SIZE= diff --git a/core/Cargo.toml b/core/Cargo.toml index b7f44a86d01d..6917eee7e0e0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -146,6 +146,7 @@ services-gcs = [ ] services-gdrive = [] services-ghac = [] +services-gridfs = ["dep:mongodb"] services-hdfs = ["dep:hdrs"] services-http = [] services-ipfs = ["dep:prost"] diff --git a/core/src/services/gridfs/backend.rs b/core/src/services/gridfs/backend.rs new file mode 100644 index 000000000000..606b464d3793 --- /dev/null +++ b/core/src/services/gridfs/backend.rs @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use futures::{AsyncWriteExt, StreamExt}; +use mongodb::bson::doc; +use mongodb::options::{ClientOptions, GridFsBucketOptions, GridFsFindOptions}; +use mongodb::GridFsBucket; +use std::fmt::{Debug, Formatter}; +use tokio::sync::OnceCell; + +use crate::raw::adapters::kv; +use crate::raw::new_std_io_error; +use crate::*; + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct GridFsBuilder { + connection_string: Option, + database: Option, + bucket: Option, + chunk_size: Option, + root: Option, +} + +impl Debug for GridFsBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GridFsBuilder") + .field("database", &self.database) + .field("bucket", &self.bucket) + .field("chunk_size", &self.chunk_size) + .field("root", &self.root) + .finish() + } +} + +impl GridFsBuilder { + /// Set the connection_string of the MongoDB service. + /// + /// This connection string is used to connect to the MongoDB service. It typically follows the format: + /// + /// ## Format + /// + /// `mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]` + /// + /// Examples: + /// + /// - Connecting to a local MongoDB instance: `mongodb://localhost:27017` + /// - Using authentication: `mongodb://myUser:myPassword@localhost:27017/myAuthDB` + /// - Specifying authentication mechanism: `mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256` + /// + /// ## Options + /// + /// - `authMechanism`: Specifies the authentication method to use. Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`. + /// - ... (any other options you wish to highlight) + /// + /// For more information, please refer to [MongoDB Connection String URI Format](https://docs.mongodb.com/manual/reference/connection-string/). + pub fn connection_string(&mut self, v: &str) -> &mut Self { + if !v.is_empty() { + self.connection_string = Some(v.to_string()); + } + self + } + + /// Set the working directory, all operations will be performed under it. + /// + /// default: "/" + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_owned()); + } + self + } + + /// Set the database name of the MongoDB GridFs service to read/write. + pub fn database(&mut self, database: &str) -> &mut Self { + if !database.is_empty() { + self.database = Some(database.to_string()); + } + self + } + + /// Set the buctet name of the MongoDB GridFs service to read/write. + /// + /// Default to `fs` if not specified. + pub fn bucket(&mut self, bucket: &str) -> &mut Self { + if !bucket.is_empty() { + self.bucket = Some(bucket.to_string()); + } + self + } + + /// Set the chunk size of the MongoDB GridFs service used to break the user file into chunks. + /// + /// Default to `255 KiB` if not specified. + pub fn chunk_size(&mut self, chunk_size: u32) -> &mut Self { + if chunk_size > 0 { + self.chunk_size = Some(chunk_size); + } + self + } +} + +impl Builder for GridFsBuilder { + const SCHEME: Scheme = Scheme::Mongodb; + + type Accessor = GridFsBackend; + + fn from_map(map: std::collections::HashMap) -> Self { + let mut builder = Self::default(); + + map.get("connection_string") + .map(|v| builder.connection_string(v)); + map.get("database").map(|v| builder.database(v)); + map.get("bucket").map(|v| builder.bucket(v)); + map.get("chunk_size") + .map(|v| builder.chunk_size(v.parse::().unwrap_or_default())); + map.get("root").map(|v| builder.root(v)); + + builder + } + + fn build(&mut self) -> Result { + let conn = match &self.connection_string.clone() { + Some(v) => v.clone(), + None => { + return Err( + Error::new(ErrorKind::InvalidInput, "connection_string is required") + .with_context("service", Scheme::Gridfs), + ) + } + }; + let database = match &self.database.clone() { + Some(v) => v.clone(), + None => { + return Err(Error::new(ErrorKind::InvalidInput, "database is required") + .with_context("service", Scheme::Gridfs)) + } + }; + let bucket = match &self.bucket.clone() { + Some(v) => v.clone(), + None => "fs".to_string(), + }; + let chunk_size = self.chunk_size.unwrap_or(255); + + Ok(GridFsBackend::new(Adapter { + connection_string: conn, + database, + bucket, + chunk_size, + bucket_instance: OnceCell::new(), + })) + } +} + +pub type GridFsBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + connection_string: String, + database: String, + bucket: String, + chunk_size: u32, + bucket_instance: OnceCell, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Adapter") + .field("database", &self.database) + .field("bucket", &self.bucket) + .field("chunk_size", &self.chunk_size) + .finish() + } +} + +impl Adapter { + async fn get_bucket(&self) -> Result<&GridFsBucket> { + self.bucket_instance + .get_or_try_init(|| async { + let client_options = ClientOptions::parse(&self.connection_string) + .await + .map_err(parse_mongodb_error)?; + let client = + mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?; + let bucket_options = GridFsBucketOptions::builder() + .bucket_name(Some(self.bucket.clone())) + .chunk_size_bytes(Some(self.chunk_size)) + .build(); + let bucket = client + .database(&self.database) + .gridfs_bucket(bucket_options); + Ok(bucket) + }) + .await + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::Gridfs, + &format!("{}/{}", self.database, self.bucket), + Capability { + read: true, + write: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result>> { + let bucket = self.get_bucket().await?; + let filter = doc! { "filename": path }; + let options = GridFsFindOptions::builder().limit(Some(1)).build(); + let mut cursor = bucket + .find(filter, options) + .await + .map_err(parse_mongodb_error)?; + + match cursor.next().await { + Some(doc) => { + let mut destination = Vec::new(); + let file_id = doc.map_err(parse_mongodb_error)?.id; + bucket + .download_to_futures_0_3_writer(file_id, &mut destination) + .await + .map_err(parse_mongodb_error)?; + Ok(Some(destination)) + } + None => Ok(None), + } + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + let bucket = self.get_bucket().await?; + // delete old file if exists + let filter = doc! { "filename": path }; + let options = GridFsFindOptions::builder().limit(Some(1)).build(); + let mut cursor = bucket + .find(filter, options) + .await + .map_err(parse_mongodb_error)?; + if let Some(doc) = cursor.next().await { + let file_id = doc.map_err(parse_mongodb_error)?.id; + bucket.delete(file_id).await.map_err(parse_mongodb_error)?; + } + // set new file + let mut upload_stream = bucket.open_upload_stream(path, None); + upload_stream + .write_all(value) + .await + .map_err(new_std_io_error)?; + upload_stream.close().await.map_err(new_std_io_error)?; + + Ok(()) + } + + async fn delete(&self, path: &str) -> Result<()> { + let bucket = self.get_bucket().await?; + let filter = doc! { "filename": path }; + let mut cursor = bucket + .find(filter, None) + .await + .map_err(parse_mongodb_error)?; + while let Some(doc) = cursor.next().await { + let file_id = doc.map_err(parse_mongodb_error)?.id; + bucket.delete(file_id).await.map_err(parse_mongodb_error)?; + } + Ok(()) + } +} + +fn parse_mongodb_error(err: mongodb::error::Error) -> Error { + Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err) +} diff --git a/core/src/services/gridfs/docs.md b/core/src/services/gridfs/docs.md new file mode 100644 index 000000000000..b69a3392b124 --- /dev/null +++ b/core/src/services/gridfs/docs.md @@ -0,0 +1,47 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] ~~list~~ +- [ ] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `connection_string`: Set the connection string of mongodb server +- `database`: Set the database of mongodb +- `bucket`: Set the bucket of mongodb gridfs +- `chunk_size`: Set the chunk size of mongodb gridfs + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Gridfs; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Gridfs::default(); + builder.root("/"); + builder.connection_string("mongodb://myUser:myPassword@localhost:27017/myAuthDB"); + builder.database("your_database"); + builder.bucket("your_bucket"); + // The chunk size in bytes used to break the user file into chunks. + builder.chunk_size(255); + + let op = Operator::new(builder)?.finish(); + Ok(()) +} +``` diff --git a/core/src/services/gridfs/mod.rs b/core/src/services/gridfs/mod.rs new file mode 100644 index 000000000000..4623f8cb09bc --- /dev/null +++ b/core/src/services/gridfs/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::GridFsBuilder as Gridfs; diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index cf8000632abf..dbc509ed36b3 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -69,6 +69,11 @@ mod ghac; #[cfg(feature = "services-ghac")] pub use ghac::Ghac; +#[cfg(feature = "services-gridfs")] +mod gridfs; +#[cfg(feature = "services-gridfs")] +pub use gridfs::Gridfs; + #[cfg(feature = "services-hdfs")] mod hdfs; #[cfg(feature = "services-hdfs")] diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 8f34f79aeebf..3efbcddf79bc 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -183,6 +183,8 @@ impl Operator { Scheme::Gcs => Self::from_map::(map)?.finish(), #[cfg(feature = "services-ghac")] Scheme::Ghac => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-gridfs")] + Scheme::Gridfs => Self::from_map::(map)?.finish(), #[cfg(feature = "services-hdfs")] Scheme::Hdfs => Self::from_map::(map)?.finish(), #[cfg(feature = "services-http")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 4699835d9e26..ec582a637c81 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -127,6 +127,8 @@ pub enum Scheme { Azfile, /// [mongodb](crate::services::mongodb): MongoDB Services Mongodb, + /// [gridfs](crate::services::gridfs): MongoDB Gridfs Services + Gridfs, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -288,6 +290,7 @@ impl FromStr for Scheme { "gcs" => Ok(Scheme::Gcs), "gdrive" => Ok(Scheme::Gdrive), "ghac" => Ok(Scheme::Ghac), + "gridfs" => Ok(Scheme::Gridfs), "hdfs" => Ok(Scheme::Hdfs), "http" | "https" => Ok(Scheme::Http), "ftp" | "ftps" => Ok(Scheme::Ftp), @@ -340,6 +343,7 @@ impl From for &'static str { Scheme::Fs => "fs", Scheme::Gcs => "gcs", Scheme::Ghac => "ghac", + Scheme::Gridfs => "gridfs", Scheme::Hdfs => "hdfs", Scheme::Http => "http", Scheme::Foundationdb => "foundationdb", diff --git a/website/docs/services/gridfs.mdx b/website/docs/services/gridfs.mdx new file mode 100644 index 000000000000..d33137f751ca --- /dev/null +++ b/website/docs/services/gridfs.mdx @@ -0,0 +1,68 @@ +--- +title: Gridfs +--- + +[Gridfs](https://www.mongodb.com/docs/manual/core/gridfs/) services support. + +import Docs from '../../../core/src/services/gridfs/docs.md' + + + +### Via Config + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + + + +```rust +use anyhow::Result; +use opendal::Operator; +use opendal::Scheme; +use std::collections::HashMap; + +#[tokio::main] +async fn main() -> Result<()> { + let mut map = HashMap::new(); + map.insert("connection_string".to_string(), "connection_string".to_string()); + map.insert("database".to_string(), "database".to_string()); + map.insert("bucket".to_string(), "bucket".to_string()); + let op: Operator = Operator::via_map(Scheme::Gridfs, map)?; + Ok(()) +} +``` + + + + +```javascript +import { Operator } from "opendal"; + +async function main() { + const config = { + connection_string: "connection_string", + database: "database", + bucket: "bucket", + }; + const op = new Operator("gridfs", config); +} +``` + + + + +```python +import opendal + +config = { + "connection_string": "connection_string", + "database": "database", + "bucket": "bucket", +} + +op = opendal.Operator("gridfs", **config) +``` + + + \ No newline at end of file