Skip to content

Commit

Permalink
feat: add mongodb gridfs service support (apache#3491)
Browse files Browse the repository at this point in the history
* feat: add mongodb gridfs service support

* fix: behavior tests error

* fix: optimize code

* refactor: expose service name from GridFs to Gridfs
  • Loading branch information
poltao authored Nov 7, 2023
1 parent 3e2114d commit c96a170
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,8 @@ OPENDAL_SWIFT_ACCOUNT=<account>
OPENDAL_SWIFT_CONTAINER=<container>
OPENDAL_SWIFT_ROOT=/path/to/dir
OPENDAL_SWIFT_TOKEN=<token>
# gridfs
OPENDAL_GRIDFS_CONNECTION_STRING=mongodb://localhost:27017
OPENDAL_GRIDFS_DATABASE=<database>
OPENDAL_GRIDFS_BUCKET=<fs>
OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ services-gcs = [
]
services-gdrive = []
services-ghac = []
services-gridfs = ["dep:mongodb"]
services-hdfs = ["dep:hdrs"]
services-http = []
services-ipfs = ["dep:prost"]
Expand Down
291 changes: 291 additions & 0 deletions core/src/services/gridfs/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use futures::{AsyncWriteExt, StreamExt};
use mongodb::bson::doc;
use mongodb::options::{ClientOptions, GridFsBucketOptions, GridFsFindOptions};
use mongodb::GridFsBucket;
use std::fmt::{Debug, Formatter};
use tokio::sync::OnceCell;

use crate::raw::adapters::kv;
use crate::raw::new_std_io_error;
use crate::*;

#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct GridFsBuilder {
connection_string: Option<String>,
database: Option<String>,
bucket: Option<String>,
chunk_size: Option<u32>,
root: Option<String>,
}

impl Debug for GridFsBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GridFsBuilder")
.field("database", &self.database)
.field("bucket", &self.bucket)
.field("chunk_size", &self.chunk_size)
.field("root", &self.root)
.finish()
}
}

impl GridFsBuilder {
/// Set the connection_string of the MongoDB service.
///
/// This connection string is used to connect to the MongoDB service. It typically follows the format:
///
/// ## Format
///
/// `mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`
///
/// Examples:
///
/// - Connecting to a local MongoDB instance: `mongodb://localhost:27017`
/// - Using authentication: `mongodb://myUser:myPassword@localhost:27017/myAuthDB`
/// - Specifying authentication mechanism: `mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256`
///
/// ## Options
///
/// - `authMechanism`: Specifies the authentication method to use. Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`.
/// - ... (any other options you wish to highlight)
///
/// For more information, please refer to [MongoDB Connection String URI Format](https://docs.mongodb.com/manual/reference/connection-string/).
pub fn connection_string(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
self.connection_string = Some(v.to_string());
}
self
}

/// Set the working directory, all operations will be performed under it.
///
/// default: "/"
pub fn root(&mut self, root: &str) -> &mut Self {
if !root.is_empty() {
self.root = Some(root.to_owned());
}
self
}

/// Set the database name of the MongoDB GridFs service to read/write.
pub fn database(&mut self, database: &str) -> &mut Self {
if !database.is_empty() {
self.database = Some(database.to_string());
}
self
}

/// Set the buctet name of the MongoDB GridFs service to read/write.
///
/// Default to `fs` if not specified.
pub fn bucket(&mut self, bucket: &str) -> &mut Self {
if !bucket.is_empty() {
self.bucket = Some(bucket.to_string());
}
self
}

/// Set the chunk size of the MongoDB GridFs service used to break the user file into chunks.
///
/// Default to `255 KiB` if not specified.
pub fn chunk_size(&mut self, chunk_size: u32) -> &mut Self {
if chunk_size > 0 {
self.chunk_size = Some(chunk_size);
}
self
}
}

impl Builder for GridFsBuilder {
const SCHEME: Scheme = Scheme::Mongodb;

type Accessor = GridFsBackend;

fn from_map(map: std::collections::HashMap<String, String>) -> Self {
let mut builder = Self::default();

map.get("connection_string")
.map(|v| builder.connection_string(v));
map.get("database").map(|v| builder.database(v));
map.get("bucket").map(|v| builder.bucket(v));
map.get("chunk_size")
.map(|v| builder.chunk_size(v.parse::<u32>().unwrap_or_default()));
map.get("root").map(|v| builder.root(v));

builder
}

fn build(&mut self) -> Result<Self::Accessor> {
let conn = match &self.connection_string.clone() {
Some(v) => v.clone(),
None => {
return Err(
Error::new(ErrorKind::InvalidInput, "connection_string is required")
.with_context("service", Scheme::Gridfs),
)
}
};
let database = match &self.database.clone() {
Some(v) => v.clone(),
None => {
return Err(Error::new(ErrorKind::InvalidInput, "database is required")
.with_context("service", Scheme::Gridfs))
}
};
let bucket = match &self.bucket.clone() {
Some(v) => v.clone(),
None => "fs".to_string(),
};
let chunk_size = self.chunk_size.unwrap_or(255);

Ok(GridFsBackend::new(Adapter {
connection_string: conn,
database,
bucket,
chunk_size,
bucket_instance: OnceCell::new(),
}))
}
}

pub type GridFsBackend = kv::Backend<Adapter>;

#[derive(Clone)]
pub struct Adapter {
connection_string: String,
database: String,
bucket: String,
chunk_size: u32,
bucket_instance: OnceCell<GridFsBucket>,
}

impl Debug for Adapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Adapter")
.field("database", &self.database)
.field("bucket", &self.bucket)
.field("chunk_size", &self.chunk_size)
.finish()
}
}

impl Adapter {
async fn get_bucket(&self) -> Result<&GridFsBucket> {
self.bucket_instance
.get_or_try_init(|| async {
let client_options = ClientOptions::parse(&self.connection_string)
.await
.map_err(parse_mongodb_error)?;
let client =
mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
let bucket_options = GridFsBucketOptions::builder()
.bucket_name(Some(self.bucket.clone()))
.chunk_size_bytes(Some(self.chunk_size))
.build();
let bucket = client
.database(&self.database)
.gridfs_bucket(bucket_options);
Ok(bucket)
})
.await
}
}

#[async_trait]
impl kv::Adapter for Adapter {
fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Gridfs,
&format!("{}/{}", self.database, self.bucket),
Capability {
read: true,
write: true,
..Default::default()
},
)
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let bucket = self.get_bucket().await?;
let filter = doc! { "filename": path };
let options = GridFsFindOptions::builder().limit(Some(1)).build();
let mut cursor = bucket
.find(filter, options)
.await
.map_err(parse_mongodb_error)?;

match cursor.next().await {
Some(doc) => {
let mut destination = Vec::new();
let file_id = doc.map_err(parse_mongodb_error)?.id;
bucket
.download_to_futures_0_3_writer(file_id, &mut destination)
.await
.map_err(parse_mongodb_error)?;
Ok(Some(destination))
}
None => Ok(None),
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
let bucket = self.get_bucket().await?;
// delete old file if exists
let filter = doc! { "filename": path };
let options = GridFsFindOptions::builder().limit(Some(1)).build();
let mut cursor = bucket
.find(filter, options)
.await
.map_err(parse_mongodb_error)?;
if let Some(doc) = cursor.next().await {
let file_id = doc.map_err(parse_mongodb_error)?.id;
bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
}
// set new file
let mut upload_stream = bucket.open_upload_stream(path, None);
upload_stream
.write_all(value)
.await
.map_err(new_std_io_error)?;
upload_stream.close().await.map_err(new_std_io_error)?;

Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
let bucket = self.get_bucket().await?;
let filter = doc! { "filename": path };
let mut cursor = bucket
.find(filter, None)
.await
.map_err(parse_mongodb_error)?;
while let Some(doc) = cursor.next().await {
let file_id = doc.map_err(parse_mongodb_error)?.id;
bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
}
Ok(())
}
}

fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
}
47 changes: 47 additions & 0 deletions core/src/services/gridfs/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
## Capabilities

This service can be used to:

- [x] stat
- [x] read
- [x] write
- [x] create_dir
- [x] delete
- [ ] copy
- [ ] rename
- [ ] ~~list~~
- [ ] scan
- [ ] ~~presign~~
- [ ] blocking

## Configuration

- `root`: Set the working directory of `OpenDAL`
- `connection_string`: Set the connection string of mongodb server
- `database`: Set the database of mongodb
- `bucket`: Set the bucket of mongodb gridfs
- `chunk_size`: Set the chunk size of mongodb gridfs

## Example

### Via Builder

```rust
use anyhow::Result;
use opendal::services::Gridfs;
use opendal::Operator;

#[tokio::main]
async fn main() -> Result<()> {
let mut builder = Gridfs::default();
builder.root("/");
builder.connection_string("mongodb://myUser:myPassword@localhost:27017/myAuthDB");
builder.database("your_database");
builder.bucket("your_bucket");
// The chunk size in bytes used to break the user file into chunks.
builder.chunk_size(255);

let op = Operator::new(builder)?.finish();
Ok(())
}
```
19 changes: 19 additions & 0 deletions core/src/services/gridfs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod backend;
pub use backend::GridFsBuilder as Gridfs;
5 changes: 5 additions & 0 deletions core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ mod ghac;
#[cfg(feature = "services-ghac")]
pub use ghac::Ghac;

#[cfg(feature = "services-gridfs")]
mod gridfs;
#[cfg(feature = "services-gridfs")]
pub use gridfs::Gridfs;

#[cfg(feature = "services-hdfs")]
mod hdfs;
#[cfg(feature = "services-hdfs")]
Expand Down
2 changes: 2 additions & 0 deletions core/src/types/operator/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ impl Operator {
Scheme::Gcs => Self::from_map::<services::Gcs>(map)?.finish(),
#[cfg(feature = "services-ghac")]
Scheme::Ghac => Self::from_map::<services::Ghac>(map)?.finish(),
#[cfg(feature = "services-gridfs")]
Scheme::Gridfs => Self::from_map::<services::Gridfs>(map)?.finish(),
#[cfg(feature = "services-hdfs")]
Scheme::Hdfs => Self::from_map::<services::Hdfs>(map)?.finish(),
#[cfg(feature = "services-http")]
Expand Down
Loading

0 comments on commit c96a170

Please sign in to comment.