diff --git a/.env.example b/.env.example index f95a10298dbd..f8fa06e8834f 100644 --- a/.env.example +++ b/.env.example @@ -185,4 +185,9 @@ OPENDAL_SEAFILE_ROOT=/path/to/dir OPENDAL_SEAFILE_ENDPOINT= OPENDAL_SEAFILE_USERNAME= OPENDAL_SEAFILE_PASSWORD= -OPENDAL_SEAFILE_REPO_NAME= \ No newline at end of file +OPENDAL_SEAFILE_REPO_NAME= +# upyun +OPENDAL_UPYUN_ROOT=/path/to/dir +OPENDAL_UPYUN_BUCKET= +OPENDAL_UPYUN_OPERATOR= +OPENDAL_UPYUN_PASSWORD= \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 864b6d3467eb..0060321d219c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4640,6 +4640,7 @@ dependencies = [ "getrandom 0.2.11", "governor", "hdrs", + "hmac", "hrana-client-proto", "http", "libtest-mimic", @@ -4675,6 +4676,7 @@ dependencies = [ "rusqlite", "serde", "serde_json", + "sha1", "sha2", "size", "sled", diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index de42caf066c0..0f1a3121eec2 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -91,6 +91,7 @@ services-all = [ "services-alluxio", "services-b2", "services-seafile", + "services-upyun", ] # Default services provided by opendal. @@ -113,6 +114,7 @@ services-webhdfs = ["opendal/services-webhdfs"] services-alluxio = ["opendal/services-alluxio"] services-azfile = ["opendal/services-azfile"] services-b2 = ["opendal/services-b2"] +services-upyun = ["opendal/services-upyun"] services-cacache = ["opendal/services-cacache"] services-dashmap = ["opendal/services-dashmap"] services-dropbox = ["opendal/services-dropbox"] diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index a812d69ec2f9..c0f9cd752a77 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -86,6 +86,7 @@ services-all = [ "services-alluxio", "services-b2", "services-seafile", + "services-upyun", ] # Default services provided by opendal. @@ -107,6 +108,7 @@ services-webhdfs = ["opendal/services-webhdfs"] # Optional services provided by opendal. services-alluxio = ["opendal/services-alluxio"] services-azfile = ["opendal/services-azfile"] +services-upyun = ["opendal/services-upyun"] services-b2 = ["opendal/services-b2"] services-cacache = ["opendal/services-cacache"] services-dashmap = ["opendal/services-dashmap"] diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 5de87a0814c0..38fe2597c640 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -85,6 +85,7 @@ services-all = [ "services-alluxio", "services-b2", "services-seafile", + "services-upyun", ] # Default services provided by opendal. @@ -107,6 +108,7 @@ services-webhdfs = ["opendal/services-webhdfs"] services-alluxio = ["opendal/services-alluxio"] services-azfile = ["opendal/services-azfile"] services-b2 = ["opendal/services-b2"] +services-upyun = ["opendal/services-upyun"] services-cacache = ["opendal/services-cacache"] services-dashmap = ["opendal/services-dashmap"] services-dropbox = ["opendal/services-dropbox"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 76084a8c34ce..3dd0438ea6dc 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -127,6 +127,7 @@ services-azdls = [ services-azfile = [] services-b2 = [] services-seafile = [] +services-upyun = ["dep:hmac", "dep:sha1"] services-cacache = ["dep:cacache"] services-cloudflare-kv = [] services-cos = [ @@ -293,6 +294,8 @@ rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = { version = "0.10", optional = true } +hmac = { version = "0.12.1", optional = true } +sha1 = { version = "0.10.6", optional = true } sled = { version = "0.34.7", optional = true } suppaftp = { version = "5.2", default-features = false, features = [ "async-secure", @@ -326,4 +329,4 @@ tracing-subscriber = { version = "0.3", features = [ "env-filter", "tracing-log", ] } -wiremock = "0.5" +wiremock = "0.5" \ No newline at end of file diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 086db17cd30e..a25f8d7bf3dc 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -312,3 +312,10 @@ mod seafile; pub use seafile::Seafile; #[cfg(feature = "services-seafile")] pub use seafile::SeafileConfig; + +#[cfg(feature = "services-upyun")] +mod upyun; +#[cfg(feature = "services-upyun")] +pub use upyun::Upyun; +#[cfg(feature = "services-upyun")] +pub use upyun::UpyunConfig; diff --git a/core/src/services/upyun/backend.rs b/core/src/services/upyun/backend.rs new file mode 100644 index 000000000000..1a69119d4153 --- /dev/null +++ b/core/src/services/upyun/backend.rs @@ -0,0 +1,378 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use super::core::parse_info; +use super::core::UpyunCore; +use super::error::parse_error; +use super::lister::UpyunLister; +use super::writer::UpyunWriter; +use super::writer::UpyunWriters; +use crate::raw::*; +use crate::services::upyun::core::UpyunSigner; +use crate::*; + +/// Config for backblaze upyun services support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct UpyunConfig { + /// root of this backend. + /// + /// All operations will happen under this root. + pub root: Option, + /// bucket address of this backend. + pub bucket: String, + /// username of this backend. + pub operator: Option, + /// password of this backend. + pub password: Option, +} + +impl Debug for UpyunConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Config"); + + ds.field("root", &self.root); + ds.field("bucket", &self.bucket); + ds.field("operator", &self.operator); + + ds.finish() + } +} + +/// [upyun](https://www.upyun.com/products/file-storage) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct UpyunBuilder { + config: UpyunConfig, + + http_client: Option, +} + +impl Debug for UpyunBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("UpyunBuilder"); + + d.field("config", &self.config); + d.finish_non_exhaustive() + } +} + +impl UpyunBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// bucket of this backend. + /// + /// It is required. e.g. `test` + pub fn bucket(&mut self, bucket: &str) -> &mut Self { + self.config.bucket = bucket.to_string(); + + self + } + + /// operator of this backend. + /// + /// It is required. e.g. `test` + pub fn operator(&mut self, operator: &str) -> &mut Self { + self.config.operator = if operator.is_empty() { + None + } else { + Some(operator.to_string()) + }; + + self + } + + /// password of this backend. + /// + /// It is required. e.g. `asecret` + pub fn password(&mut self, password: &str) -> &mut Self { + self.config.password = if password.is_empty() { + None + } else { + Some(password.to_string()) + }; + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for UpyunBuilder { + const SCHEME: Scheme = Scheme::Upyun; + type Accessor = UpyunBackend; + + /// Converts a HashMap into an UpyunBuilder instance. + /// + /// # Arguments + /// + /// * `map` - A HashMap containing the configuration values. + /// + /// # Returns + /// + /// Returns an instance of UpyunBuilder. + fn from_map(map: HashMap) -> Self { + // Deserialize the configuration from the HashMap. + let config = UpyunConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an UpyunBuilder instance with the deserialized config. + UpyunBuilder { + config, + http_client: None, + } + } + + /// Builds the backend and returns the result of UpyunBackend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + // Handle bucket. + if self.config.bucket.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Upyun)); + } + + debug!("backend use bucket {}", &self.config.bucket); + + let operator = match &self.config.operator { + Some(operator) => Ok(operator.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "operator is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Upyun)), + }?; + + let password = match &self.config.password { + Some(password) => Ok(password.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Upyun)), + }?; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Upyun) + })? + }; + + let signer = UpyunSigner { + operator: operator.clone(), + password: password.clone(), + }; + + Ok(UpyunBackend { + core: Arc::new(UpyunCore { + root, + operator, + password, + bucket: self.config.bucket.clone(), + signer, + client, + }), + }) + } +} + +/// Backend for upyun services. +#[derive(Debug, Clone)] +pub struct UpyunBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for UpyunBackend { + type Reader = IncomingAsyncBody; + + type BlockingReader = (); + + type Writer = UpyunWriters; + + type BlockingWriter = (); + + type Lister = oio::PageLister; + + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Upyun) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + create_dir: true, + + read: true, + read_can_next: true, + + write: true, + write_can_empty: true, + write_can_multi: true, + write_with_cache_control: true, + write_with_content_type: true, + + // https://help.upyun.com/knowledge-base/rest_api/#e5b9b6e8a18ce5bc8fe696ade782b9e7bbade4bca0 + write_multi_min_size: Some(1024 * 1024), + write_multi_max_size: Some(50 * 1024 * 1024), + + delete: true, + rename: true, + copy: true, + + list: true, + list_with_limit: true, + list_with_recursive: true, + + ..Default::default() + }); + + am + } + + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + let resp = self.core.create_dir(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpCreateDir::default()), + _ => Err(parse_error(resp).await?), + } + } + + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + let resp = self.core.move_object(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(RpRename::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { + let resp = self.core.copy(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(RpCopy::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.download_file(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let size = parse_content_length(resp.headers())?; + let range = parse_content_range(resp.headers())?; + Ok(( + RpRead::new().with_size(size).with_range(range), + resp.into_body(), + )) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let resp = self.core.info(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => parse_info(resp.headers()).map(RpStat::new), + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let writer = UpyunWriter::new(self.core.clone(), args, path.to_string()); + + let w = oio::MultipartUploadWriter::new(writer); + + Ok((RpWrite::default(), w)) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = self.core.delete(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpDelete::default()), + // Allow 404 when deleting a non-existing object + StatusCode::NOT_FOUND => Ok(RpDelete::default()), + _ => Err(parse_error(resp).await?), + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let l = UpyunLister::new(self.core.clone(), path, args.limit()); + Ok((RpList::default(), oio::PageLister::new(l))) + } +} diff --git a/core/src/services/upyun/core.rs b/core/src/services/upyun/core.rs new file mode 100644 index 000000000000..45d890a2c2f9 --- /dev/null +++ b/core/src/services/upyun/core.rs @@ -0,0 +1,586 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; + +use base64::Engine; +use hmac::{Hmac, Mac}; +use http::{header, HeaderMap, Request, Response}; +use md5::Digest; +use serde::Deserialize; +use sha1::Sha1; + +use crate::raw::*; +use crate::*; + +use self::constants::*; + +mod constants { + pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type"; + pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size"; + pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control"; + pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition"; + pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage"; + pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type"; + pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder"; + pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid"; + pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id"; + pub const X_UPYUN_FOLDER: &str = "x-upyun-folder"; + pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source"; + pub const X_UPYUN_COPY_SOURCE: &str = "X-Upyun-Copy-Source"; + pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive"; + pub const X_UPYUN_LIST_ITER: &str = "x-list-iter"; + pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit"; + pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096; + pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256; +} + +#[derive(Clone)] +pub struct UpyunCore { + /// The root of this core. + pub root: String, + /// The endpoint of this backend. + pub operator: String, + /// The password id of this backend. + pub password: String, + /// The bucket of this backend. + pub bucket: String, + + /// signer of this backend. + pub signer: UpyunSigner, + + pub client: HttpClient, +} + +impl Debug for UpyunCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("operator", &self.operator) + .finish_non_exhaustive() + } +} + +impl UpyunCore { + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + pub async fn sign(&self, req: &mut Request) -> Result<()> { + // get rfc1123 date + let date = chrono::Utc::now() + .format("%a, %d %b %Y %H:%M:%S GMT") + .to_string(); + let authorization = + self.signer + .authorization(&date, req.method().as_str(), req.uri().path()); + + req.headers_mut() + .insert("Authorization", authorization.parse().unwrap()); + req.headers_mut().insert("Date", date.parse().unwrap()); + + Ok(()) + } +} + +impl UpyunCore { + pub async fn download_file(&self, path: &str) -> Result> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn info(&self, path: &str) -> Result> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::head(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn upload( + &self, + path: &str, + size: Option, + args: &OpWrite, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size.to_string()) + } + + if let Some(mime) = args.content_type() { + req = req.header(header::CONTENT_TYPE, mime) + } + + if let Some(pos) = args.content_disposition() { + req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(X_UPYUN_CACHE_CONTROL, cache_control) + } + + // Set body + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + Ok(req) + } + + pub async fn delete(&self, path: &str) -> Result> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::delete(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn copy(&self, from: &str, to: &str) -> Result> { + let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from)); + let to = build_abs_path(&self.root, to); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&to) + ); + + let mut req = Request::put(url); + + req = req.header(header::CONTENT_LENGTH, "0"); + + req = req.header(X_UPYUN_COPY_SOURCE, from); + + req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy"); + + // Set body + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn move_object(&self, from: &str, to: &str) -> Result> { + let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from)); + let to = build_abs_path(&self.root, to); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&to) + ); + + let mut req = Request::put(url); + + req = req.header(header::CONTENT_LENGTH, "0"); + + req = req.header(X_UPYUN_MOVE_SOURCE, from); + + req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy"); + + // Set body + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn create_dir(&self, path: &str) -> Result> { + let path = build_abs_path(&self.root, path); + let path = path[..path.len() - 1].to_string(); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let mut req = Request::post(url); + + req = req.header("folder", "true"); + + req = req.header(X_UPYUN_FOLDER, "true"); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn initiate_multipart_upload( + &self, + path: &str, + args: &OpWrite, + ) -> Result> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MULTI_STAGE, "initiate"); + + req = req.header(X_UPYUN_MULTI_DISORDER, "true"); + + if let Some(content_type) = args.content_type() { + req = req.header(X_UPYUN_MULTI_TYPE, content_type); + } + + if let Some(content_disposition) = args.content_disposition() { + req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(X_UPYUN_CACHE_CONTROL, cache_control) + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn upload_part( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p), + ); + + let mut req = Request::put(&url); + + req = req.header(header::CONTENT_LENGTH, size); + + req = req.header(X_UPYUN_MULTI_STAGE, "upload"); + + req = req.header(X_UPYUN_MULTI_UUID, upload_id); + + req = req.header(X_UPYUN_PART_ID, part_number); + + // Set body + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + Ok(req) + } + + pub async fn complete_multipart_upload( + &self, + path: &str, + upload_id: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p), + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MULTI_STAGE, "complete"); + + req = req.header(X_UPYUN_MULTI_UUID, upload_id); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn list_objects( + &self, + path: &str, + iter: &str, + limit: Option, + ) -> Result> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path), + ); + + let mut req = Request::get(url.clone()); + + req = req.header(header::ACCEPT, "application/json"); + + if !iter.is_empty() { + req = req.header(X_UPYUN_LIST_ITER, iter); + } + + if let Some(mut limit) = limit { + if limit > X_UPYUN_LIST_MAX_LIMIT { + limit = X_UPYUN_LIST_DEFAULT_LIMIT; + } + req = req.header(X_UPYUN_LIST_LIMIT, limit); + } + + // Set body + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } +} + +#[derive(Clone, Default)] +pub struct UpyunSigner { + pub operator: String, + pub password: String, +} + +type HmacSha1 = Hmac; + +impl UpyunSigner { + pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String { + let sign = vec![method, uri, date]; + + let sign = sign + .into_iter() + .filter(|s| !s.is_empty()) + .collect::>() + .join("&"); + + let mut mac = HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes()) + .expect("HMAC can take key of any size"); + mac.update(sign.as_bytes()); + let sign_str = mac.finalize().into_bytes(); + + let sign = base64::engine::general_purpose::STANDARD.encode(sign_str.as_slice()); + format!("UPYUN {}:{}", self.operator, sign) + } +} + +pub(super) fn parse_initiate_part(headers: &HeaderMap) -> Result<&str> { + match headers.get(X_UPYUN_MULTI_UUID) { + None => Err(Error::new(ErrorKind::Unexpected, "missing uuid")), + Some(v) => Ok(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("parse_initiate_part") + .set_source(e) + })?), + } +} + +pub(super) fn parse_info(headers: &HeaderMap) -> Result { + let mode = if parse_file_type(headers)? == "file" { + EntryMode::FILE + } else { + EntryMode::DIR + }; + + let mut m = Metadata::new(mode); + + if let Some(v) = parse_file_size(headers)? { + m.set_content_length(v); + } + + if let Some(v) = parse_content_type(headers)? { + m.set_content_type(v); + } + + if let Some(v) = parse_content_md5(headers)? { + m.set_content_md5(v); + } + + if let Some(v) = parse_cache_control(headers)? { + m.set_cache_control(v); + } + + if let Some(v) = parse_content_disposition(headers)? { + m.set_content_disposition(v); + } + + Ok(m) +} + +fn parse_file_type(headers: &HeaderMap) -> Result<&str> { + match headers.get(X_UPYUN_FILE_TYPE) { + None => Err(Error::new(ErrorKind::Unexpected, "missing file type")), + Some(v) => Ok(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("parse_file_type") + .set_source(e) + })?), + } +} + +fn parse_file_size(headers: &HeaderMap) -> Result> { + match headers.get(X_UPYUN_FILE_SIZE) { + None => Ok(None), + Some(v) => Ok(Some( + v.to_str() + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value is not valid utf-8 string", + ) + .with_operation("http_util::parse_content_length") + .set_source(e) + })? + .parse::() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "header value is not valid integer") + .with_operation("http_util::parse_content_length") + .set_source(e) + })?, + )), + } +} + +fn parse_cache_control(headers: &HeaderMap) -> Result> { + match headers.get(X_UPYUN_CACHE_CONTROL) { + None => Ok(None), + Some(v) => Ok(Some(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("parse_cache_control") + .set_source(e) + })?)), + } +} + +fn parse_content_disposition(headers: &HeaderMap) -> Result> { + match headers.get(X_UPYUN_CONTENT_DISPOSITION) { + None => Ok(None), + Some(v) => Ok(Some(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("parse_content_disposition") + .set_source(e) + })?)), + } +} + +pub fn format_md5(bs: &[u8]) -> String { + let mut hasher = md5::Md5::new(); + hasher.update(bs); + + format!("{:x}", hasher.finalize()) +} + +#[derive(Debug, Deserialize)] +pub(super) struct File { + #[serde(rename = "type")] + pub type_field: String, + pub name: String, + pub length: u64, + pub last_modified: i64, +} + +#[derive(Debug, Deserialize)] +pub(super) struct ListObjectsResponse { + pub iter: String, + pub files: Vec, +} diff --git a/core/src/services/upyun/docs.md b/core/src/services/upyun/docs.md new file mode 100644 index 000000000000..72dd8478ea98 --- /dev/null +++ b/core/src/services/upyun/docs.md @@ -0,0 +1,53 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] copy +- [x] rename +- [x] list +- [x] scan +- [ ] presign +- [ ] blocking + +## Configuration + +- `root`: Set the work directory for backend +- `bucket`: Upyun bucket name +- `operator` Upyun operator +- `password` Upyun password + +You can refer to [`UpyunBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Upyun; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // create backend builder + let mut builder = Upyun::default(); + + // set the storage bucket for OpenDAL + builder.root("/"); + // set the bucket for OpenDAL + builder.bucket("test"); + // set the operator for OpenDAL + builder.operator("xxxxxxxxxx"); + // set the password name for OpenDAL + builder.password("opendal"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/upyun/error.rs b/core/src/services/upyun/error.rs new file mode 100644 index 000000000000..51f4934b11b2 --- /dev/null +++ b/core/src/services/upyun/error.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use quick_xml::de; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// UpyunError is the error returned by upyun service. +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +struct UpyunError { + code: i64, + msg: String, + id: String, +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status.as_u16() { + 403 => (ErrorKind::PermissionDenied, false), + 404 => (ErrorKind::NotFound, false), + 304 | 412 => (ErrorKind::ConditionNotMatch, false), + // Service like Upyun could return 499 error with a message like: + // Client Disconnect, we should retry it. + 499 => (ErrorKind::Unexpected, true), + 500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, _upyun_err) = de::from_reader::<_, UpyunError>(bs.clone().reader()) + .map(|upyun_err| (format!("{upyun_err:?}"), Some(upyun_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +#[cfg(test)] +mod test { + use futures::stream; + use http::StatusCode; + + use super::*; + + #[tokio::test] + async fn test_parse_error() { + let err_res = vec![ + ( + r#"{"code": 40100016, "msg": "invalid date value in header", "id": "f5b30c720ddcecc70abd2f5c1c64bde8"}"#, + ErrorKind::Unexpected, + StatusCode::UNAUTHORIZED, + ), + ( + r#"{"code": 40300010, "msg": "file type error", "id": "f5b30c720ddcecc70abd2f5c1c64bde7"}"#, + ErrorKind::PermissionDenied, + StatusCode::FORBIDDEN, + ), + ]; + + for res in err_res { + let bs = bytes::Bytes::from(res.0); + let body = IncomingAsyncBody::new( + Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))), + None, + ); + let resp = Response::builder().status(res.2).body(body).unwrap(); + + let err = parse_error(resp).await; + + assert!(err.is_ok()); + assert_eq!(err.unwrap().kind(), res.1); + } + } +} diff --git a/core/src/services/upyun/lister.rs b/core/src/services/upyun/lister.rs new file mode 100644 index 000000000000..e08933bbd3a7 --- /dev/null +++ b/core/src/services/upyun/lister.rs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use super::core::{ListObjectsResponse, UpyunCore}; +use super::error::parse_error; +use crate::raw::oio::Entry; +use crate::raw::*; +use crate::EntryMode; +use crate::Metadata; +use crate::Result; + +pub struct UpyunLister { + core: Arc, + + path: String, + limit: Option, +} + +impl UpyunLister { + pub(super) fn new(core: Arc, path: &str, limit: Option) -> Self { + UpyunLister { + core, + path: path.to_string(), + limit, + } + } +} + +#[async_trait] +impl oio::PageList for UpyunLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self + .core + .list_objects(&self.path, &ctx.token, self.limit) + .await?; + + if resp.status() == http::StatusCode::NOT_FOUND { + ctx.done = true; + return Ok(()); + } + + match resp.status() { + http::StatusCode::OK => {} + http::StatusCode::NOT_FOUND => { + ctx.done = true; + return Ok(()); + } + _ => { + return Err(parse_error(resp).await?); + } + } + + let bs = resp.into_body().bytes().await?; + + let response = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + + // ref https://help.upyun.com/knowledge-base/rest_api/#e88eb7e58f96e79baee5bd95e69687e4bbb6e58897e8a1a8 + // when iter is "g2gCZAAEbmV4dGQAA2VvZg", it means the list is done. + ctx.done = response.iter == "g2gCZAAEbmV4dGQAA2VvZg"; + + ctx.token = response.iter; + + for file in response.files { + let path = build_abs_path(&format!("/{}", &self.path), &file.name); + + let entry = if file.type_field == "folder" { + let path = format!("{}/", path); + Entry::new(&path, Metadata::new(EntryMode::DIR)) + } else { + let m = Metadata::new(EntryMode::FILE) + .with_content_length(file.length) + .with_content_type(file.type_field) + .with_last_modified(parse_datetime_from_from_timestamp(file.last_modified)?); + Entry::new(&path, m) + }; + + ctx.entries.push_back(entry); + } + + Ok(()) + } +} diff --git a/core/src/services/upyun/mod.rs b/core/src/services/upyun/mod.rs new file mode 100644 index 000000000000..039f2aa22944 --- /dev/null +++ b/core/src/services/upyun/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::UpyunBuilder as Upyun; +pub use backend::UpyunConfig; + +mod core; +mod error; +mod lister; +mod writer; diff --git a/core/src/services/upyun/writer.rs b/core/src/services/upyun/writer.rs new file mode 100644 index 000000000000..c0b42e44f978 --- /dev/null +++ b/core/src/services/upyun/writer.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; + +use crate::raw::*; +use crate::*; + +use super::core::{parse_initiate_part, UpyunCore}; +use super::error::parse_error; + +pub type UpyunWriters = oio::MultipartUploadWriter; + +pub struct UpyunWriter { + core: Arc, + op: OpWrite, + path: String, +} + +impl UpyunWriter { + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + UpyunWriter { core, op, path } + } +} + +#[async_trait] +impl oio::MultipartUploadWrite for UpyunWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let req = self + .core + .upload(&self.path, Some(size), &self.op, body) + .await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn initiate_part(&self) -> Result { + let resp = self + .core + .initiate_multipart_upload(&self.path, &self.op) + .await?; + + let status = resp.status(); + + match status { + StatusCode::NO_CONTENT => { + let id = parse_initiate_part(resp.headers())?; + + Ok(id.to_string()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write_part( + &self, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result { + let req = self + .core + .upload_part(&self.path, upload_id, part_number, size, body) + .await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::NO_CONTENT => { + let etag = parse_etag(resp.headers())? + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "ETag not present in returning response", + ) + })? + .to_string(); + + resp.into_body().consume().await?; + + Ok(oio::MultipartUploadPart { part_number, etag }) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn complete_part( + &self, + upload_id: &str, + _parts: &[oio::MultipartUploadPart], + ) -> Result<()> { + let resp = self + .core + .complete_multipart_upload(&self.path, upload_id) + .await?; + + let status = resp.status(); + + match status { + StatusCode::NO_CONTENT => { + resp.into_body().consume().await?; + + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn abort_part(&self, _upload_id: &str) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Upyun does not support abort multipart upload", + )) + } +} diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index c5f1f27b2c19..2e4df296876b 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -157,6 +157,8 @@ impl Operator { Scheme::Atomicserver => Self::from_map::(map)?.finish(), #[cfg(feature = "services-alluxio")] Scheme::Alluxio => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-upyun")] + Scheme::Upyun => Self::from_map::(map)?.finish(), #[cfg(feature = "services-azblob")] Scheme::Azblob => Self::from_map::(map)?.finish(), #[cfg(feature = "services-azdls")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 295de6a96300..6c5151321fe9 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -42,6 +42,8 @@ pub enum Scheme { B2, /// [Seafile][crate::services::Seafile]: Seafile Services. Seafile, + /// [Upyun][crate::services::Upyun]: Upyun Services. + Upyun, /// [cacache][crate::services::Cacache]: cacache backend support. Cacache, /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services. @@ -243,6 +245,8 @@ impl Scheme { Scheme::S3, #[cfg(feature = "services-seafile")] Scheme::Seafile, + #[cfg(feature = "services-upyun")] + Scheme::Upyun, #[cfg(feature = "services-sftp")] Scheme::Sftp, #[cfg(feature = "services-sled")] @@ -331,6 +335,7 @@ impl FromStr for Scheme { "rocksdb" => Ok(Scheme::Rocksdb), "s3" => Ok(Scheme::S3), "seafile" => Ok(Scheme::Seafile), + "upyun" => Ok(Scheme::Upyun), "sftp" => Ok(Scheme::Sftp), "sled" => Ok(Scheme::Sled), "supabase" => Ok(Scheme::Supabase), @@ -402,6 +407,7 @@ impl From for &'static str { Scheme::Sqlite => "sqlite", Scheme::Mongodb => "mongodb", Scheme::Alluxio => "alluxio", + Scheme::Upyun => "upyun", Scheme::Custom(v) => v, } }