diff --git a/.env.example b/.env.example index 7f11ebf03c2d..450c734f8304 100644 --- a/.env.example +++ b/.env.example @@ -169,3 +169,9 @@ OPENDAL_GRIDFS_CHUNK_SIZE= # alluxio OPENDAL_ALLUXIO_ENDPOINT= OPENDAL_ALLUXIO_ROOT=/path/to/dor +# b2 +OPENDAL_B2_ROOT=/path/to/dir +OPENDAL_B2_BUCKET= +OPENDAL_B2_BUCKET_ID= +OPENDAL_B2_APPLICATION_KEY_ID= +OPENDAL_B2_APPLICATION_KEY= \ No newline at end of file diff --git a/core/Cargo.toml b/core/Cargo.toml index f5e261b00396..d4833ecaac1a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -125,6 +125,7 @@ services-azdls = [ "reqsign?/reqwest_request", ] services-azfile = [] +services-b2 = [] services-cacache = ["dep:cacache"] services-cloudflare-kv = [] services-cos = [ diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs new file mode 100644 index 000000000000..0c09d55fc9a9 --- /dev/null +++ b/core/src/services/b2/backend.rs @@ -0,0 +1,554 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use async_trait::async_trait; +use http::Request; +use http::StatusCode; +use log::debug; +use serde::Deserialize; +use tokio::sync::RwLock; + +use crate::raw::*; +use crate::services::b2::core::B2Signer; +use crate::services::b2::core::ListFileNamesResponse; +use crate::*; + +use super::core::constants; +use super::core::parse_file_info; +use super::core::B2Core; +use super::error::parse_error; +use super::lister::B2Lister; +use super::writer::B2Writer; +use super::writer::B2Writers; + +/// Config for backblaze b2 services support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct B2Config { + /// root of this backend. + /// + /// All operations will happen under this root. + pub root: Option, + /// keyID of this backend. + /// + /// - If application_key_id is set, we will take user's input first. + /// - If not, we will try to load it from environment. + pub application_key_id: Option, + /// applicationKey of this backend. + /// + /// - If application_key is set, we will take user's input first. + /// - If not, we will try to load it from environment. + pub application_key: Option, + /// bucket of this backend. + /// + /// required. + pub bucket: String, + /// bucket id of this backend. + /// + /// required. + pub bucket_id: String, +} + +impl Debug for B2Config { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("B2Config"); + + d.field("root", &self.root) + .field("application_key_id", &self.application_key_id) + .field("bucket_id", &self.bucket_id) + .field("bucket", &self.bucket); + + d.finish_non_exhaustive() + } +} + +/// [b2](https://www.backblaze.com/cloud-storage) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct B2Builder { + config: B2Config, + + http_client: Option, +} + +impl Debug for B2Builder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("B2Builder"); + + d.field("config", &self.config); + d.finish_non_exhaustive() + } +} + +impl B2Builder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// application_key_id of this backend. + pub fn application_key_id(&mut self, application_key_id: &str) -> &mut Self { + self.config.application_key_id = if application_key_id.is_empty() { + None + } else { + Some(application_key_id.to_string()) + }; + + self + } + + /// application_key of this backend. + pub fn application_key(&mut self, application_key: &str) -> &mut Self { + self.config.application_key = if application_key.is_empty() { + None + } else { + Some(application_key.to_string()) + }; + + self + } + + /// Set bucket name of this backend. + /// You can find it in https://secure.backblaze.com/b2_buckets.html + pub fn bucket(&mut self, bucket: &str) -> &mut Self { + self.config.bucket = bucket.to_string(); + + self + } + + /// Set bucket id of this backend. + /// You can find it in https://secure.backblaze.com/b2_buckets.html + pub fn bucket_id(&mut self, bucket_id: &str) -> &mut Self { + self.config.bucket_id = bucket_id.to_string(); + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for B2Builder { + const SCHEME: Scheme = Scheme::B2; + type Accessor = B2Backend; + + /// Converts a HashMap into an B2Builder instance. + /// + /// # Arguments + /// + /// * `map` - A HashMap containing the configuration values. + /// + /// # Returns + /// + /// Returns an instance of B2Builder. + fn from_map(map: HashMap) -> Self { + // Deserialize the configuration from the HashMap. + let config = B2Config::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an B2Builder instance with the deserialized config. + B2Builder { + config, + http_client: None, + } + } + + /// Builds the backend and returns the result of B2Backend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + // Handle bucket. + if self.config.bucket.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::B2)); + } + + debug!("backend use bucket {}", &self.config.bucket); + + // Handle bucket_id. + if self.config.bucket_id.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "bucket_id is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::B2)); + } + + debug!("backend bucket_id {}", &self.config.bucket_id); + + let application_key_id = match &self.config.application_key_id { + Some(application_key_id) => Ok(application_key_id.clone()), + None => Err( + Error::new(ErrorKind::ConfigInvalid, "application_key_id is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::B2), + ), + }?; + + let application_key = match &self.config.application_key { + Some(key_id) => Ok(key_id.clone()), + None => Err( + Error::new(ErrorKind::ConfigInvalid, "application_key is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::B2), + ), + }?; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::B2) + })? + }; + + let signer = B2Signer { + application_key_id, + application_key, + ..Default::default() + }; + + Ok(B2Backend { + core: Arc::new(B2Core { + signer: Arc::new(RwLock::new(signer)), + root, + + bucket: self.config.bucket.clone(), + bucket_id: self.config.bucket_id.clone(), + client, + }), + }) + } +} + +/// Backend for b2 services. +#[derive(Debug, Clone)] +pub struct B2Backend { + core: Arc, +} + +#[async_trait] +impl Accessor for B2Backend { + type Reader = IncomingAsyncBody; + + type BlockingReader = (); + + type Writer = B2Writers; + + type BlockingWriter = (); + + type Lister = oio::PageLister; + + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::B2) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_next: true, + read_with_range: true, + + write: true, + write_can_empty: true, + write_can_multi: true, + write_with_content_type: true, + // The min multipart size of b2 is 5 MiB. + // + // ref: + write_multi_min_size: Some(5 * 1024 * 1024), + // The max multipart size of b2 is 5 Gb. + // + // ref: + write_multi_max_size: Some(5 * 1024 * 1024 * 1024), + + create_dir: true, + delete: true, + copy: true, + + list: true, + list_with_limit: true, + list_with_start_after: true, + list_with_recursive: true, + list_without_recursive: true, + + presign: true, + presign_read: true, + presign_write: true, + presign_stat: true, + + ..Default::default() + }); + + am + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.download_file_by_name(path, &args).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let size = parse_content_length(resp.headers())?; + Ok((RpRead::new().with_size(size), resp.into_body())) + } + StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())), + _ => Err(parse_error(resp).await?), + } + } + + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + let resp: http::Response = self + .core + .upload_file(path, Some(0), &OpWrite::default(), AsyncBody::Empty) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(RpCreateDir::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let writer = B2Writer::new(self.core.clone(), path, args); + + let w = oio::MultipartUploadWriter::new(writer); + + Ok((RpWrite::default(), w)) + } + + /// B2 have a get_file_info api required a file_id field, but field_id need call list api, list api also return file info + /// So we call list api to get file info + async fn stat(&self, path: &str, _args: OpStat) -> Result { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let delimiter = if path.ends_with('/') { Some("/") } else { None }; + let resp = self + .core + .list_file_names(Some(path), delimiter, None, None) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let resp: ListFileNamesResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + if resp.files.is_empty() { + return Err(Error::new(ErrorKind::NotFound, "no such file or directory")); + } + let meta = parse_file_info(&resp.files[0]); + Ok(RpStat::new(meta)) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { + let resp = self + .core + .list_file_names(Some(from), None, None, None) + .await?; + + let status = resp.status(); + + let source_file_id = match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let resp: ListFileNamesResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + if resp.files.is_empty() { + return Err(Error::new(ErrorKind::NotFound, "no such file or directory")); + } + + let file_id = resp.files[0].clone().file_id; + Ok(file_id) + } + _ => Err(parse_error(resp).await?), + }?; + + let Some(source_file_id) = source_file_id else { + return Err(Error::new(ErrorKind::IsADirectory, "is a directory")); + }; + + let resp = self.core.copy_file(source_file_id, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpCopy::default()), + _ => Err(parse_error(resp).await?), + } + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = self.core.hide_file(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpDelete::default()), + _ => { + let err = parse_error(resp).await?; + match err.kind() { + ErrorKind::NotFound => Ok(RpDelete::default()), + // Representative deleted + ErrorKind::AlreadyExists => Ok(RpDelete::default()), + _ => Err(err), + } + } + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + Ok(( + RpList::default(), + oio::PageLister::new(B2Lister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + args.start_after(), + )), + )) + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + match args.operation() { + PresignOperation::Stat(_) => { + let resp = self + .core + .get_download_authorization(path, &OpRead::default(), args.expire()) + .await?; + let path = build_abs_path(&self.core.root, path); + + let auth_info = self.core.get_auth_info().await?; + + let url = format!( + "{}/file/{}/{}?Authorization={}", + auth_info.download_url, self.core.bucket, path, resp.authorization_token + ); + + let req = Request::get(url); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + // We don't need this request anymore, consume + let (parts, _) = req.into_parts(); + + Ok(RpPresign::new(PresignedRequest::new( + parts.method, + parts.uri, + parts.headers, + ))) + } + PresignOperation::Read(op) => { + let resp = self + .core + .get_download_authorization(path, op, args.expire()) + .await?; + let path = build_abs_path(&self.core.root, path); + + let auth_info = self.core.get_auth_info().await?; + + let url = format!( + "{}/file/{}/{}?Authorization={}", + auth_info.download_url, self.core.bucket, path, resp.authorization_token + ); + + let req = Request::get(url); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + // We don't need this request anymore, consume + let (parts, _) = req.into_parts(); + + Ok(RpPresign::new(PresignedRequest::new( + parts.method, + parts.uri, + parts.headers, + ))) + } + PresignOperation::Write(_) => { + let resp = self.core.get_upload_url().await?; + + let mut req = Request::post(&resp.upload_url); + + req = req.header(http::header::AUTHORIZATION, resp.authorization_token); + req = req.header("X-Bz-File-Name", build_abs_path(&self.core.root, path)); + req = req.header(http::header::CONTENT_TYPE, "b2/x-auto"); + req = req.header(constants::X_BZ_CONTENT_SHA1, "do_not_verify"); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + // We don't need this request anymore, consume it directly. + let (parts, _) = req.into_parts(); + + Ok(RpPresign::new(PresignedRequest::new( + parts.method, + parts.uri, + parts.headers, + ))) + } + } + } +} diff --git a/core/src/services/b2/core.rs b/core/src/services/b2/core.rs new file mode 100644 index 000000000000..db706629a9de --- /dev/null +++ b/core/src/services/b2/core.rs @@ -0,0 +1,706 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use std::time::Duration; + +use chrono::{DateTime, Utc}; +use http::{header, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +use crate::raw::*; +use crate::services::b2::core::constants::X_BZ_PART_NUMBER; +use crate::services::b2::error::parse_error; +use crate::*; + +use self::constants::{X_BZ_CONTENT_SHA1, X_BZ_FILE_NAME}; + +pub(super) mod constants { + pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name"; + pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1"; + pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number"; +} + +/// Core of [b2](https://www.backblaze.com/cloud-storage) services support. +#[derive(Clone)] +pub struct B2Core { + pub signer: Arc>, + + /// The root of this core. + pub root: String, + /// The bucket name of this backend. + pub bucket: String, + /// The bucket id of this backend. + pub bucket_id: String, + + pub client: HttpClient, +} + +impl Debug for B2Core { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("bucket_id", &self.bucket_id) + .finish_non_exhaustive() + } +} + +impl B2Core { + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + /// [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account) + pub async fn get_auth_info(&self) -> Result { + { + let signer = self.signer.read().await; + + if !signer.auth_info.authorization_token.is_empty() + && signer.auth_info.expires_in > Utc::now() + { + let auth_info = signer.auth_info.clone(); + return Ok(auth_info); + } + } + + { + let mut signer = self.signer.write().await; + let req = Request::get("https://api.backblazeb2.com/b2api/v3/b2_authorize_account") + .header( + header::AUTHORIZATION, + format_authorization_by_basic( + &signer.application_key_id, + &signer.application_key, + )?, + ) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let token = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + signer.auth_info = AuthInfo { + authorization_token: token.authorization_token.clone(), + api_url: token.api_info.storage_api.api_url.clone(), + download_url: token.api_info.storage_api.download_url.clone(), + // This authorization token is valid for at most 24 hours. + expires_in: Utc::now() + chrono::Duration::hours(20), + }; + } + _ => { + return Err(parse_error(resp).await?); + } + } + Ok(signer.auth_info.clone()) + } + } +} + +impl B2Core { + pub async fn download_file_by_name( + &self, + path: &str, + args: &OpRead, + ) -> Result> { + let path = build_abs_path(&self.root, path); + + let auth_info = self.get_auth_info().await?; + + // Construct headers to add to the request + let url = format!( + "{}/file/{}/{}", + auth_info.download_url, + self.bucket, + percent_encode_path(&path) + ); + + let mut req = Request::get(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + let range = args.range(); + if !range.is_full() { + req = req.header(http::header::RANGE, range.to_header()); + } + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub(super) async fn get_upload_url(&self) -> Result { + let auth_info = self.get_auth_info().await?; + + let url = format!( + "{}/b2api/v2/b2_get_upload_url?bucketId={}", + auth_info.api_url, self.bucket_id + ); + + let mut req = Request::get(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let resp = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + Ok(resp) + } + _ => Err(parse_error(resp).await?), + } + } + + pub async fn get_download_authorization( + &self, + path: &str, + args: &OpRead, + expire: Duration, + ) -> Result { + let path = build_abs_path(&self.root, path); + + let auth_info = self.get_auth_info().await?; + + // Construct headers to add to the request + let url = format!( + "{}/b2api/v2/b2_get_download_authorization", + auth_info.api_url + ); + let mut req = Request::post(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + let range = args.range(); + if !range.is_full() { + req = req.header(http::header::RANGE, range.to_header()); + } + let body = GetDownloadAuthorizationRequest { + bucket_id: self.bucket_id.clone(), + file_name_prefix: path, + valid_duration_in_seconds: expire.as_secs(), + }; + let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + let body = bytes::Bytes::from(body); + + let req = req + .body(AsyncBody::Bytes(body)) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let resp = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + Ok(resp) + } + _ => Err(parse_error(resp).await?), + } + } + + pub async fn upload_file( + &self, + path: &str, + size: Option, + args: &OpWrite, + body: AsyncBody, + ) -> Result> { + let resp = self.get_upload_url().await?; + + let p = build_abs_path(&self.root, path); + + let mut req = Request::post(resp.upload_url); + + req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p)); + + req = req.header(header::AUTHORIZATION, resp.authorization_token); + + req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify"); + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size.to_string()) + } + + if let Some(mime) = args.content_type() { + req = req.header(header::CONTENT_TYPE, mime) + } else { + req = req.header(header::CONTENT_TYPE, "b2/x-auto") + } + + if let Some(pos) = args.content_disposition() { + req = req.header(header::CONTENT_DISPOSITION, pos) + } + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn start_large_file( + &self, + path: &str, + args: &OpWrite, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let auth_info = self.get_auth_info().await?; + + let url = format!("{}/b2api/v2/b2_start_large_file", auth_info.api_url); + + let mut req = Request::post(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + let mut start_large_file_request = StartLargeFileRequest { + bucket_id: self.bucket_id.clone(), + file_name: percent_encode_path(&p), + content_type: "b2/x-auto".to_owned(), + }; + + if let Some(mime) = args.content_type() { + start_large_file_request.content_type = mime.to_owned(); + } + + let body = + serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?; + let body = bytes::Bytes::from(body); + + let req = req + .body(AsyncBody::Bytes(body)) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn get_upload_part_url(&self, file_id: &str) -> Result { + let auth_info = self.get_auth_info().await?; + + let url = format!( + "{}/b2api/v2/b2_get_upload_part_url?fileId={}", + auth_info.api_url, file_id + ); + + let mut req = Request::get(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let resp = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + Ok(resp) + } + _ => Err(parse_error(resp).await?), + } + } + + pub async fn upload_part( + &self, + file_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result> { + let resp = self.get_upload_part_url(file_id).await?; + + let mut req = Request::post(resp.upload_url); + + req = req.header(X_BZ_PART_NUMBER, part_number.to_string()); + + req = req.header(header::CONTENT_LENGTH, size.to_string()); + + req = req.header(header::AUTHORIZATION, resp.authorization_token); + + req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify"); + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn finish_large_file( + &self, + file_id: &str, + part_sha1_array: Vec, + ) -> Result> { + let auth_info = self.get_auth_info().await?; + + let url = format!("{}/b2api/v2/b2_finish_large_file", auth_info.api_url); + + let mut req = Request::post(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + let body = serde_json::to_vec(&FinishLargeFileRequest { + file_id: file_id.to_owned(), + part_sha1_array, + }) + .map_err(new_json_serialize_error)?; + let body = bytes::Bytes::from(body); + + // Set body + let req = req + .body(AsyncBody::Bytes(body)) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn cancel_large_file(&self, file_id: &str) -> Result> { + let auth_info = self.get_auth_info().await?; + + let url = format!("{}/b2api/v2/b2_cancel_large_file", auth_info.api_url); + + let mut req = Request::post(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + let body = serde_json::to_vec(&CancelLargeFileRequest { + file_id: file_id.to_owned(), + }) + .map_err(new_json_serialize_error)?; + let body = bytes::Bytes::from(body); + + // Set body + let req = req + .body(AsyncBody::Bytes(body)) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn list_file_names( + &self, + prefix: Option<&str>, + delimiter: Option<&str>, + limit: Option, + start_after: Option, + ) -> Result> { + let auth_info = self.get_auth_info().await?; + + let mut url = format!( + "{}/b2api/v2/b2_list_file_names?bucketId={}", + auth_info.api_url, self.bucket_id + ); + + if let Some(prefix) = prefix { + let prefix = build_abs_path(&self.root, prefix); + url.push_str(&format!("&prefix={}", percent_encode_path(&prefix))); + } + + if let Some(limit) = limit { + url.push_str(&format!("&maxFileCount={}", limit)); + } + + if let Some(start_after) = start_after { + let start_after = build_abs_path(&self.root, &start_after); + url.push_str(&format!( + "&startFileName={}", + percent_encode_path(&start_after) + )); + } + + if let Some(delimiter) = delimiter { + url.push_str(&format!("&delimiter={}", delimiter)); + } + + let mut req = Request::get(&url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn copy_file( + &self, + source_file_id: String, + to: &str, + ) -> Result> { + let to = build_abs_path(&self.root, to); + + let auth_info = self.get_auth_info().await?; + + let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url); + + let mut req = Request::post(url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + let body = CopyFileRequest { + source_file_id, + file_name: to, + }; + + let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + let body = bytes::Bytes::from(body); + + // Set body + let req = req + .body(AsyncBody::Bytes(body)) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn hide_file(&self, path: &str) -> Result> { + let path = build_abs_path(&self.root, path); + + let auth_info = self.get_auth_info().await?; + + let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url); + + let mut req = Request::post(url); + + req = req.header(header::AUTHORIZATION, auth_info.authorization_token); + + let body = HideFileRequest { + bucket_id: self.bucket_id.clone(), + file_name: path.to_string(), + }; + + let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + let body = bytes::Bytes::from(body); + + // Set body + let req = req + .body(AsyncBody::Bytes(body)) + .map_err(new_request_build_error)?; + + self.send(req).await + } +} + +#[derive(Clone)] +pub struct B2Signer { + /// The application_key_id of this core. + pub application_key_id: String, + /// The application_key of this core. + pub application_key: String, + + pub auth_info: AuthInfo, +} + +#[derive(Clone)] +pub struct AuthInfo { + pub authorization_token: String, + /// The base URL to use for all API calls except for uploading and downloading files. + pub api_url: String, + /// The base URL to use for downloading files. + pub download_url: String, + + pub expires_in: DateTime, +} + +impl Default for B2Signer { + fn default() -> Self { + B2Signer { + application_key: String::new(), + application_key_id: String::new(), + + auth_info: AuthInfo { + authorization_token: String::new(), + api_url: String::new(), + download_url: String::new(), + expires_in: DateTime::::MIN_UTC, + }, + } + } +} + +/// Request of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file). +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StartLargeFileRequest { + pub bucket_id: String, + pub file_name: String, + pub content_type: String, +} + +/// Response of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file). +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StartLargeFileResponse { + pub file_id: String, +} + +/// Response of [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account). +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AuthorizeAccountResponse { + /// An authorization token to use with all calls, other than b2_authorize_account, that need an Authorization header. This authorization token is valid for at most 24 hours. + /// So we should call b2_authorize_account every 24 hours. + pub authorization_token: String, + pub api_info: ApiInfo, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ApiInfo { + pub storage_api: StorageApi, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StorageApi { + pub api_url: String, + pub download_url: String, +} + +/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-url). +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetUploadUrlResponse { + /// The authorizationToken that must be used when uploading files to this bucket. + /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file + pub authorization_token: String, + pub upload_url: String, +} + +/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-part-url). +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetUploadPartUrlResponse { + /// The authorizationToken that must be used when uploading files to this bucket. + /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file + pub authorization_token: String, + pub upload_url: String, +} + +/// Response of [b2_upload_part](https://www.backblaze.com/apidocs/b2-upload-part). +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UploadPartResponse { + pub content_sha1: String, +} + +/// Response of [b2_finish_large_file](https://www.backblaze.com/apidocs/b2-finish-large-file). +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FinishLargeFileRequest { + pub file_id: String, + pub part_sha1_array: Vec, +} + +/// Response of [b2_cancel_large_file](https://www.backblaze.com/apidocs/b2-cancel-large-file). +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelLargeFileRequest { + pub file_id: String, +} + +/// Response of [list_file_names](https://www.backblaze.com/apidocs/b2-list-file-names). +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListFileNamesResponse { + pub files: Vec, + pub next_file_name: Option, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct File { + pub file_id: Option, + pub content_length: u64, + pub content_md5: Option, + pub content_type: Option, + pub file_name: String, + pub action: String, +} + +pub(super) fn parse_file_info(file: &File) -> Metadata { + if file.file_name.ends_with('/') { + return Metadata::new(EntryMode::DIR); + } + + let mut metadata = Metadata::new(EntryMode::FILE); + + metadata.set_content_length(file.content_length); + + if let Some(content_md5) = &file.content_md5 { + metadata.set_content_md5(content_md5); + } + + if let Some(content_type) = &file.content_type { + metadata.set_content_type(content_type); + } + + metadata +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CopyFileRequest { + pub source_file_id: String, + pub file_name: String, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct HideFileRequest { + pub bucket_id: String, + pub file_name: String, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GetDownloadAuthorizationRequest { + pub bucket_id: String, + pub file_name_prefix: String, + pub valid_duration_in_seconds: u64, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetDownloadAuthorizationResponse { + pub authorization_token: String, +} diff --git a/core/src/services/b2/docs.md b/core/src/services/b2/docs.md new file mode 100644 index 000000000000..577bc82ffaca --- /dev/null +++ b/core/src/services/b2/docs.md @@ -0,0 +1,56 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] copy +- [ ] rename +- [x] list +- [x] scan +- [x] presign +- [ ] blocking + +## Configuration + +- `root`: Set the work directory for backend +- `key_id`: B2 application key keyID +- `application_key` B2 application key applicationKey +- `bucket` B2 bucket name +- `bucket_id` B2 bucket_id + +You can refer to [`B2Builder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::B2; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // create backend builder + let mut builder = B2::default(); + + // set the storage bucket for OpenDAL + builder.root("/"); + // set the key_id for OpenDAL + builder.key_id("xxxxxxxxxx"); + // set the key_id for OpenDAL + builder.application_key("xxxxxxxxxx"); + // set the bucket name for OpenDAL + builder.bucket("opendal"); + // set the bucket_id for OpenDAL + builder.bucket_id("xxxxxxxxxxxxx"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/b2/error.rs b/core/src/services/b2/error.rs new file mode 100644 index 000000000000..2a3ead79154a --- /dev/null +++ b/core/src/services/b2/error.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// the error response of b2 +#[derive(Default, Debug, Deserialize)] +#[allow(dead_code)] +struct B2Error { + status: u32, + code: String, + message: String, +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (mut kind, mut retryable) = match parts.status.as_u16() { + 403 => (ErrorKind::PermissionDenied, false), + 404 => (ErrorKind::NotFound, false), + 304 | 412 => (ErrorKind::ConditionNotMatch, false), + // Service b2 could return 403, show the authorization error + 401 => (ErrorKind::PermissionDenied, true), + 500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, b2_err) = serde_json::from_reader::<_, B2Error>(bs.clone().reader()) + .map(|b2_err| (format!("{b2_err:?}"), Some(b2_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + if let Some(b2_err) = b2_err { + (kind, retryable) = parse_b2_error_code(b2_err.code.as_str()).unwrap_or((kind, retryable)); + }; + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +/// Returns the `Error kind` of this code and whether the error is retryable. +pub fn parse_b2_error_code(code: &str) -> Option<(ErrorKind, bool)> { + match code { + "already_hidden" => Some((ErrorKind::AlreadyExists, false)), + "no_such_file" => Some((ErrorKind::NotFound, false)), + _ => None, + } +} + +#[cfg(test)] +mod test { + use futures::stream; + use http::StatusCode; + + use super::*; + + #[test] + fn test_parse_b2_error_code() { + let code = "already_hidden"; + assert_eq!( + parse_b2_error_code(code), + Some((crate::ErrorKind::AlreadyExists, false)) + ); + + let code = "no_such_file"; + assert_eq!( + parse_b2_error_code(code), + Some((crate::ErrorKind::NotFound, false)) + ); + + let code = "not_found"; + assert_eq!(parse_b2_error_code(code), None); + } + + #[tokio::test] + async fn test_parse_error() { + let err_res = vec![ + ( + r#"{"status": 403, "code": "access_denied", "message":"The provided customer-managed encryption key is wrong."}"#, + ErrorKind::PermissionDenied, + StatusCode::FORBIDDEN, + ), + ( + r#"{"status": 404, "code": "not_found", "message":"File is not in B2 Cloud Storage."}"#, + ErrorKind::NotFound, + StatusCode::NOT_FOUND, + ), + ( + r#"{"status": 401, "code": "bad_auth_token", "message":"The auth token used is not valid. Call b2_authorize_account again to either get a new one, or an error message describing the problem."}"#, + ErrorKind::PermissionDenied, + StatusCode::UNAUTHORIZED, + ), + ]; + + for res in err_res { + let bs = bytes::Bytes::from(res.0); + let body = IncomingAsyncBody::new( + Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))), + None, + ); + let resp = Response::builder().status(res.2).body(body).unwrap(); + + let err = parse_error(resp).await; + + assert!(err.is_ok()); + assert_eq!(err.unwrap().kind(), res.1); + } + } +} diff --git a/core/src/services/b2/lister.rs b/core/src/services/b2/lister.rs new file mode 100644 index 000000000000..4f084537aa82 --- /dev/null +++ b/core/src/services/b2/lister.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Buf; + +use super::core::{parse_file_info, B2Core, ListFileNamesResponse}; + +use crate::raw::*; +use crate::services::b2::error::parse_error; +use crate::*; + +pub struct B2Lister { + core: Arc, + + path: String, + delimiter: Option<&'static str>, + limit: Option, + + /// B2 starts listing **after** this specified key + start_after: Option, +} + +impl B2Lister { + pub fn new( + core: Arc, + path: &str, + recursive: bool, + limit: Option, + start_after: Option<&str>, + ) -> Self { + let delimiter = if recursive { None } else { Some("/") }; + Self { + core, + + path: path.to_string(), + delimiter, + limit, + start_after: start_after.map(String::from), + } + } +} + +#[async_trait] +impl oio::PageList for B2Lister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self + .core + .list_file_names( + Some(&self.path), + self.delimiter, + self.limit, + self.start_after.clone(), + ) + .await?; + + if resp.status() != http::StatusCode::OK { + return Err(parse_error(resp).await?); + } + + let bs = resp.into_body().bytes().await?; + + let output: ListFileNamesResponse = + serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; + + ctx.done = output.next_file_name.is_none(); + + for file in output.files { + if let Some(start_after) = self.start_after.clone() { + if build_abs_path(&self.core.root, &start_after) == file.file_name { + continue; + } + } + if file.file_name == build_abs_path(&self.core.root, &self.path) { + continue; + } + let file_name = file.file_name.clone(); + let metadata = parse_file_info(&file); + + ctx.entries.push_back(oio::Entry::new( + &build_rel_path(&self.core.root, &file_name), + metadata, + )) + } + + Ok(()) + } +} diff --git a/core/src/services/b2/mod.rs b/core/src/services/b2/mod.rs new file mode 100644 index 000000000000..80745f01e82a --- /dev/null +++ b/core/src/services/b2/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::B2Builder as B2; +pub use backend::B2Config; + +mod core; +mod error; +mod lister; +mod writer; diff --git a/core/src/services/b2/writer.rs b/core/src/services/b2/writer.rs new file mode 100644 index 000000000000..d4ed419ffda3 --- /dev/null +++ b/core/src/services/b2/writer.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; + +use crate::raw::*; +use crate::*; + +use super::core::{B2Core, StartLargeFileResponse, UploadPartResponse}; +use super::error::parse_error; + +pub type B2Writers = oio::MultipartUploadWriter; + +pub struct B2Writer { + core: Arc, + + op: OpWrite, + path: String, +} + +impl B2Writer { + pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + B2Writer { + core, + path: path.to_string(), + op, + } + } +} + +#[async_trait] +impl oio::MultipartUploadWrite for B2Writer { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let resp = self + .core + .upload_file(&self.path, Some(size), &self.op, body) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn initiate_part(&self) -> Result { + let resp = self.core.start_large_file(&self.path, &self.op).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let result: StartLargeFileResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + Ok(result.file_id) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write_part( + &self, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result { + // B2 requires part number must between [1..=10000] + let part_number = part_number + 1; + + let resp = self + .core + .upload_part(upload_id, part_number, size, body) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let result: UploadPartResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + Ok(oio::MultipartUploadPart { + etag: result.content_sha1, + part_number, + }) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartUploadPart], + ) -> Result<()> { + let part_sha1_array = parts + .iter() + .map(|p| { + let binding = p.etag.clone(); + let sha1 = binding.strip_prefix("unverified:"); + let Some(sha1) = sha1 else { + return "".to_string(); + }; + sha1.to_string() + }) + .collect(); + + let resp = self + .core + .finish_large_file(upload_id, part_sha1_array) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn abort_part(&self, upload_id: &str) -> Result<()> { + let resp = self.core.cancel_large_file(upload_id).await?; + match resp.status() { + // b2 returns code 200 if abort succeeds. + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 9ba0246cabf9..947274a63b47 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -289,3 +289,10 @@ mod alluxio; pub use alluxio::Alluxio; #[cfg(feature = "services-alluxio")] pub use alluxio::AlluxioConfig; + +#[cfg(feature = "services-b2")] +mod b2; +#[cfg(feature = "services-b2")] +pub use b2::B2Config; +#[cfg(feature = "services-b2")] +pub use b2::B2; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index f926bd57063e..278a08e11687 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -163,6 +163,8 @@ impl Operator { Scheme::Azdls => Self::from_map::(map)?.finish(), #[cfg(feature = "services-azfile")] Scheme::Azfile => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-b2")] + Scheme::B2 => Self::from_map::(map)?.finish(), #[cfg(feature = "services-cacache")] Scheme::Cacache => Self::from_map::(map)?.finish(), #[cfg(feature = "services-cos")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 54ea475f8e39..57093f510db4 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -38,6 +38,8 @@ pub enum Scheme { Azblob, /// [Azdls][crate::services::Azdls]: Azure Data Lake Storage Gen2. Azdls, + /// [B2][crate::services::B2]: Backblaze B2 Services. + B2, /// [cacache][crate::services::Cacache]: cacache backend support. Cacache, /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services. @@ -173,6 +175,8 @@ impl Scheme { Scheme::Azdls, #[cfg(feature = "services-azfile")] Scheme::Azfile, + #[cfg(feature = "services-b2")] + Scheme::B2, #[cfg(feature = "services-cacache")] Scheme::Cacache, #[cfg(feature = "services-cos")] @@ -283,6 +287,7 @@ impl FromStr for Scheme { // OpenDAL used to call `azdls` as `azdfs`, we keep it for backward compatibility. // And abfs is widely used in hadoop ecosystem, keep it for easy to use. "azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls), + "b2" => Ok(Scheme::B2), "cacache" => Ok(Scheme::Cacache), "cloudflare_kv" => Ok(Scheme::CloudflareKv), "cos" => Ok(Scheme::Cos), @@ -338,6 +343,7 @@ impl From for &'static str { Scheme::Atomicserver => "atomicserver", Scheme::Azblob => "azblob", Scheme::Azdls => "azdls", + Scheme::B2 => "b2", Scheme::Cacache => "cacache", Scheme::CloudflareKv => "cloudflare_kv", Scheme::Cos => "cos",