diff --git a/.github/workflows/qcos.yml b/.github/workflows/qcos.yml index db7ecf0..be443a0 100644 --- a/.github/workflows/qcos.yml +++ b/.github/workflows/qcos.yml @@ -16,8 +16,8 @@ jobs: steps: - uses: actions/checkout@v3 - name: Lint - run: cargo clippy + run: cargo clippy --features progress-bar - name: Build - run: cargo build --verbose + run: cargo build --verbose --features progress-bar - name: Run tests - run: cargo test --verbose -- --nocapture + run: cargo test --features progress-bar --verbose -- --nocapture diff --git a/Cargo.toml b/Cargo.toml index 480eb9b..6fbe669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qcos" -version = "0.1.7" +version = "0.1.8" edition = "2021" authors = ["bujnlc8 <75124771@qq.com>"] description = "provide basic interface encapsulation of Tencent Cloud Object Storage (cos)" @@ -21,6 +21,12 @@ bytes ="1.7.1" quick-xml = {version = "0.36.1", features = ["serialize"]} async-trait = "0.1.81" tokio = { version = "1.39.2", features = ["full"]} +indicatif = { version = "0.17.8", optional = true } +futures-util = { version = "0.3.30", optional = true } +tokio-util = {version = "0.7.11", optional = true} [dev-dependencies] tokio = { version = "1.39.2", features = ["full"]} + +[features] +progress-bar = ["indicatif", "futures-util", "tokio-util"] diff --git a/README.md b/README.md index 9c4f3cd..0926996 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,15 @@ **异步版本** `async`/`await` -本包提供腾讯云对象存储(cos) 基本的操作,包括`bucket`创建及删除,对象的上传(支持分块传输,设置分块大小及上传线程数量)、下载、删除等。 +本包提供腾讯云对象存储(cos) 基本的操作,包括`bucket`创建及删除,对象的上传、下载、删除等。 + +上传文件支持以下特点: + +- 支持文件直传,推荐 1GB 以下的文件 + +- 支持分块传输,设置分块大小和最大上传线程数量 + +- 支持显示上传进度条(需开启`progress-bar` feature),上传方法名称加了`_progress_bar`后缀与不显示进度条的方法区分 # How to use @@ -28,6 +36,21 @@ async fn main() { } else { println!("{}", res.error_message); } + // 分块上传,带进度条 + #[cfg(feature = "progress-bar")] + let res = client + .clone() + .put_big_object_progress_bar( + "Cargo.toml", + "Cargo.toml", + Some(mime::TEXT_PLAIN_UTF_8), + Some(qcos::objects::StorageClassEnum::ARCHIVE), + None, + Some(1024 * 1024), + None, + None, + ) + .await; } ``` @@ -42,5 +65,12 @@ insert into your project's cargo.toml block next line ``` [dependencies] -qcos = "0.1.7" +qcos = "0.1.8" +``` + +如果需要开启显示进度条的方法: + +``` +[dependencies] +qcos = {version = "0.1.8", features=["progress-bar"]} ``` diff --git a/examples/objects.rs b/examples/objects.rs index beb35fc..cec56f9 100644 --- a/examples/objects.rs +++ b/examples/objects.rs @@ -43,7 +43,7 @@ async fn main() { } else { println!("{}", res.error_message); } - // 分块传输 + // 分块上传 let res = client .clone() .put_big_object( @@ -61,6 +61,27 @@ async fn main() { } else { println!("{}", res.error_message); } + // 分块上传,带进度条 + #[cfg(feature = "progress-bar")] + let res = client + .clone() + .put_big_object_progress_bar( + "Cargo.toml", + "Cargo.toml", + Some(mime::TEXT_PLAIN_UTF_8), + Some(qcos::objects::StorageClassEnum::ARCHIVE), + None, + Some(1024 * 1024), + None, + None, + ) + .await; + #[cfg(feature = "progress-bar")] + if res.error_no == ErrNo::SUCCESS { + println!("SUCCESS"); + } else { + println!("{}", res.error_message); + } // 直接上传二进制流 let res = client .put_object_binary( diff --git a/src/client.rs b/src/client.rs index 85b80de..5e3a830 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,6 +2,7 @@ use crate::acl::AclHeader; use crate::request::Response; use crate::signer::Signer; + /// 接口请求Client /// # Examples /// ``` diff --git a/src/objects.rs b/src/objects.rs index 3429c82..e8c2847 100644 --- a/src/objects.rs +++ b/src/objects.rs @@ -6,13 +6,20 @@ use crate::client; pub use crate::request::{ CompleteMultipartUpload, ErrNo, InitiateMultipartUploadResult, Part, Request, Response, }; +#[cfg(feature = "progress-bar")] +use futures_util::TryStreamExt; +#[cfg(feature = "progress-bar")] +pub use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; pub use mime; pub use quick_xml::de::from_str; pub use quick_xml::se::to_string; pub use reqwest::Body; use std::io::Cursor; use std::{collections::HashMap, path::PathBuf}; +use tokio::io::{AsyncReadExt, BufReader}; use tokio::{fs, io::copy}; +#[cfg(feature = "progress-bar")] +use tokio_util::io::ReaderStream; // 最小上传分片大小 1MB const PART_MIN_SIZE: u64 = 1024 * 1024; @@ -59,6 +66,31 @@ pub trait Objects { acl_header: Option, ) -> Response; + /// 上传本地小文件,带进度条 + #[cfg(feature = "progress-bar")] + async fn put_object_progress_bar( + &self, + file_path: &str, + key: &str, + content_type: Option, + acl_header: Option, + progress_style: Option, + ) -> Response; + + /// 上传本地大文件,带进度条 + #[cfg(feature = "progress-bar")] + async fn put_big_object_progress_bar( + self, + file_path: &str, + key: &str, + content_type: Option, + storage_class: Option, + acl_header: Option, + part_size: Option, + max_threads: Option, + progress_style: Option, + ) -> Response; + /// 上传本地大文件 async fn put_big_object( self, @@ -71,6 +103,20 @@ pub trait Objects { max_threads: Option, ) -> Response; + /// 上传二进制流,带进度条 + #[cfg(feature = "progress-bar")] + async fn put_object_binary_progress_bar< + T: Into + Send + Sync + tokio::io::AsyncRead + 'static, + >( + &self, + file: T, + key: &str, + file_size: u64, + content_type: Option, + acl_header: Option, + progress_style: Option, + ) -> Response; + /// 上传二进制流 async fn put_object_binary + Send>( &self, @@ -98,8 +144,9 @@ pub trait Objects { acl_header: Option, ) -> Response; - /// 分块上传 - async fn put_object_part( + /// 分块上传,带进度条 + #[cfg(feature = "progress-bar")] + async fn put_object_part_progress_bar( self, key: &str, upload_id: &str, @@ -107,13 +154,27 @@ pub trait Objects { body: Vec, content_type: Option, acl_header: Option, + pb: ProgressBar, + ) -> Response; + + /// 分块上传文件 + /// [官网文档](https://cloud.tencent.com/document/product/436/7750) + async fn put_object_part + Send>( + self, + key: &str, + upload_id: &str, + part_number: u64, + body: T, + file_size: u64, + content_type: Option, + acl_header: Option, ) -> Response; /// 完成分块上传 async fn put_object_complete_part( &self, key: &str, - etag_map: &HashMap, + etag_map: HashMap, upload_id: &str, ) -> Response; @@ -158,9 +219,111 @@ impl Objects for client::Client { ) } }; - // 设置为分块上传或者大于5G会启动分块上传 + self.put_object_binary(file, key, content_type, acl_header) + .await + } + + /// 上传本地小文件,带进度条 + /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749) + /// # Examples + /// ``` + /// use qcos::client::Client; + /// use qcos::objects::Objects; + /// use mime; + /// use qcos::acl::{AclHeader, ObjectAcl}; + /// async { + /// let mut acl_header = AclHeader::new(); + /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead); + /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou"); + /// let res = client.put_object_progress_bar("Cargo.toml", "Cargo.toml", Some(mime::TEXT_PLAIN_UTF_8), Some(acl_header), None).await; + /// assert!(res.error_message.contains("403")); + /// }; + /// ``` + #[cfg(feature = "progress-bar")] + async fn put_object_progress_bar( + &self, + file_path: &str, + key: &str, + content_type: Option, + acl_header: Option, + progress_style: Option, + ) -> Response { + let file = match tokio::fs::File::open(file_path).await { + Ok(file) => file, + Err(e) => { + return Response::new( + ErrNo::IO, + format!("打开文件失败: {}, {}", file_path, e), + Default::default(), + ) + } + }; + let file_size = file.metadata().await.unwrap().len(); + self.put_object_binary_progress_bar( + file, + key, + file_size, + content_type, + acl_header, + progress_style, + ) + .await + } + + /// 上传本地大文件,带进度条 + /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749) + /// # 参数 + /// file_path: 文件路径 + /// key: 上传文件的key,如test/Cargo.lock + /// content_type: 文件类型 + /// storage_class: 存储类型`StorageClassEnum` 默认STANDARD + /// acl_header: 请求控制 + /// part_size: 分片大小,单位bytes,要求1M-1G之间,默认50M + /// max_threads: 最大上传线程数,默认20 + /// progress_style: 进度条样式 + /// + /// # Examples + /// ``` + /// use qcos::client::Client; + /// use qcos::objects::{Objects, StorageClassEnum}; + /// use mime; + /// use qcos::acl::{AclHeader, ObjectAcl}; + /// async { + /// let mut acl_header = AclHeader::new(); + /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead); + /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou"); + /// // 分块传输 + /// let res = client.put_big_object_progress_bar("Cargo.toml","Cargo.toml", Some(mime::TEXT_PLAIN_UTF_8), Some(StorageClassEnum::STANDARD), Some(acl_header), Some(1024 * 1024 * 100), None, None).await; + /// assert!(res.error_message.contains("403")); + /// }; + /// ``` + #[cfg(feature = "progress-bar")] + async fn put_big_object_progress_bar( + self, + file_path: &str, + key: &str, + content_type: Option, + storage_class: Option, + acl_header: Option, + part_size: Option, + max_threads: Option, + progress_style: Option, + ) -> Response { + let part_size = part_size.unwrap_or(PART_MAX_SIZE / 10 / 2); + assert!((PART_MIN_SIZE..PART_MAX_SIZE).contains(&part_size)); + assert!(max_threads.unwrap_or(20) <= 1000); + let mut file = match tokio::fs::File::open(file_path).await { + Ok(file) => file, + Err(e) => { + return Response::new( + ErrNo::IO, + format!("打开文件失败: {}, {}", file_path, e), + Default::default(), + ) + } + }; let file_size = match file.metadata().await { - Ok(meta) => meta.len() as usize, + Ok(meta) => meta.len(), Err(e) => { return Response::new( ErrNo::IO, @@ -169,27 +332,105 @@ impl Objects for client::Client { ) } }; - let mut headers = self.gen_common_headers(); - headers.insert( - "Content-Type".to_string(), - content_type - .unwrap_or(mime::APPLICATION_OCTET_STREAM) - .to_string(), - ); - headers.insert("Content-Length".to_string(), file_size.to_string()); - let url_path = self.get_path_from_object_key(key); - headers = - self.get_headers_with_auth("put", url_path.as_str(), acl_header, Some(headers), None); - let resp = Request::put( - self.get_full_url_from_path(url_path.as_str()).as_str(), - None, - Some(&headers), - None, - None, - Some(file), - ) - .await; - self.make_response(resp) + let mut part_number = 1; + let mut etag_map = HashMap::new(); + let upload_id_response = self + .put_object_get_upload_id(key, content_type.clone(), storage_class, acl_header.clone()) + .await; + if upload_id_response.error_no != ErrNo::SUCCESS { + return upload_id_response; + } + let upload_id = String::from_utf8_lossy(&upload_id_response.result[..]).to_string(); + // 默认20个线程 + let max_threads = max_threads.unwrap_or(20); + let mut tasks = Vec::new(); + let mut upload_bytes = 0; + let mut part_number1 = 1; + let multi = MultiProgress::new(); + let sty = match progress_style{ + Some(sty)=>sty, + None=> ProgressStyle::default_bar().template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap().progress_chars("#>-")}; + loop { + if upload_bytes >= file_size { + break; + } + let mut part_size1 = part_size; + let last_bytes = file_size - upload_bytes; + // 倒数第二次上传后剩余小于1M,附加到倒数第二次上传 + if last_bytes < part_size + PART_MIN_SIZE && last_bytes < PART_MAX_SIZE { + part_size1 = last_bytes; + } + let mut body: Vec = vec![0; part_size1 as usize]; + if let Err(e) = file.read_exact(&mut body).await { + // 调用清理 + self.abort_object_part(key, &upload_id).await; + return Response::new( + ErrNo::IO, + format!("读取文件失败: {}, {}", file_path, e), + Default::default(), + ); + } + upload_bytes += part_size1; + if tasks.len() < max_threads as usize { + let key = key.to_string(); + let upload_id = upload_id.clone(); + let this = self.clone(); + let acl_header = acl_header.clone(); + let content_type = content_type.clone(); + let pb = multi.add(ProgressBar::new(body.len() as u64)); + pb.set_style(sty.clone()); + let handle = tokio::spawn(async move { + let resp = this + .clone() + .put_object_part_progress_bar( + &key, + &upload_id, + part_number, + body, + content_type, + acl_header, + pb, + ) + .await; + if resp.error_no != ErrNo::SUCCESS { + // 调用清理 + this.abort_object_part(&key, upload_id.as_str()).await; + } + resp + }); + tasks.push(handle); + part_number += 1; + } else { + for task in tasks { + let response = task.await.unwrap(); + if response.error_no != ErrNo::SUCCESS { + return response; + } + etag_map.insert(part_number1, response.headers["etag"].clone()); + part_number1 += 1; + } + tasks = Vec::new(); + } + } + if !tasks.is_empty() { + for task in tasks { + let response = task.await.unwrap(); + if response.error_no != ErrNo::SUCCESS { + return response; + } + etag_map.insert(part_number1, response.headers["etag"].clone()); + part_number1 += 1; + } + } + // 调用合并 + let resp = self + .put_object_complete_part(key, etag_map, upload_id.as_str()) + .await; + if resp.error_no != ErrNo::SUCCESS { + // 调用清理 + self.abort_object_part(key, upload_id.as_str()).await; + } + return resp; } /// 上传本地大文件 @@ -200,7 +441,7 @@ impl Objects for client::Client { /// content_type: 文件类型 /// storage_class: 存储类型`StorageClassEnum` 默认STANDARD /// acl_header: 请求控制 - /// part_size: 分片大小,单位bytes,要求1M-1G之间,默认100M + /// part_size: 分片大小,单位bytes,要求1M-1G之间,默认50M /// max_threads: 最大上传线程数,默认20 /// /// # Examples @@ -228,8 +469,7 @@ impl Objects for client::Client { part_size: Option, max_threads: Option, ) -> Response { - use tokio::io::AsyncReadExt; - let part_size = part_size.unwrap_or(PART_MAX_SIZE / 10); + let part_size = part_size.unwrap_or(PART_MAX_SIZE / 10 / 2); assert!((PART_MIN_SIZE..PART_MAX_SIZE).contains(&part_size)); assert!(max_threads.unwrap_or(20) <= 1000); let mut file = match tokio::fs::File::open(file_path).await { @@ -242,7 +482,6 @@ impl Objects for client::Client { ) } }; - // 设置为分块上传或者大于5G会启动分块上传 let file_size = match file.metadata().await { Ok(meta) => meta.len(), Err(e) => { @@ -255,13 +494,14 @@ impl Objects for client::Client { }; let mut part_number = 1; let mut etag_map = HashMap::new(); - let upload_id = self + let upload_id_response = self .put_object_get_upload_id(key, content_type.clone(), storage_class, acl_header.clone()) .await; - if upload_id.error_no != ErrNo::SUCCESS { - return upload_id; + if upload_id_response.error_no != ErrNo::SUCCESS { + return upload_id_response; } - let upload_id = String::from_utf8_lossy(&upload_id.result[..]).to_string(); + let upload_id = String::from_utf8_lossy(&upload_id_response.result[..]).to_string(); + // 默认20个线程 let max_threads = max_threads.unwrap_or(20); let mut tasks = Vec::new(); let mut upload_bytes = 0; @@ -301,6 +541,7 @@ impl Objects for client::Client { &upload_id, part_number, body, + part_size1, content_type, acl_header, ) @@ -337,7 +578,7 @@ impl Objects for client::Client { } // 调用合并 let resp = self - .put_object_complete_part(key, &etag_map, upload_id.as_str()) + .put_object_complete_part(key, etag_map, upload_id.as_str()) .await; if resp.error_no != ErrNo::SUCCESS { // 调用清理 @@ -346,6 +587,54 @@ impl Objects for client::Client { return resp; } + /// 上传二进制流,带进度条 + /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749) + /// # Examples + /// ``` + /// use qcos::client::Client; + /// use qcos::objects::Objects; + /// use mime; + /// use qcos::acl::{AclHeader, ObjectAcl}; + /// async { + /// let mut acl_header = AclHeader::new(); + /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead); + /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou"); + /// let buffer = tokio::fs::File::open("Cargo.toml").await.unwrap(); + /// let res = client.put_object_binary_progress_bar(buffer, "Cargo.toml", 100, Some(mime::TEXT_PLAIN_UTF_8), Some(acl_header), None).await; + /// assert!(res.error_message.contains("403")); + /// }; + /// ``` + #[cfg(feature = "progress-bar")] + async fn put_object_binary_progress_bar< + T: Into + Send + Sync + tokio::io::AsyncRead + 'static, + >( + &self, + file: T, + key: &str, + file_size: u64, + content_type: Option, + acl_header: Option, + progress_style: Option, + ) -> Response { + let reader = ReaderStream::new(file); + let pb = ProgressBar::new(file_size); + let sty = match progress_style { + Some(sty)=>sty, + None=> ProgressStyle::default_bar().template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap().progress_chars("#>-") + }; + pb.set_style(sty); + let pb1 = pb.clone(); + let stream = reader.inspect_ok(move |chunk| { + pb1.inc(chunk.len() as u64); + }); + let body = Body::wrap_stream(stream); + let resp = self + .put_object_binary(body, key, content_type, acl_header) + .await; + pb.finish(); + resp + } + /// 上传二进制流 /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749) /// # Examples @@ -545,9 +834,10 @@ impl Objects for client::Client { } } - /// 分块上传文件 + /// 分块上传文件,带进度条 /// [官网文档](https://cloud.tencent.com/document/product/436/7750) - async fn put_object_part( + #[cfg(feature = "progress-bar")] + async fn put_object_part_progress_bar( self, key: &str, upload_id: &str, @@ -555,6 +845,41 @@ impl Objects for client::Client { body: Vec, content_type: Option, acl_header: Option, + pb: ProgressBar, + ) -> Response { + let file_size = body.len() as u64; + let reader = ReaderStream::new(BufReader::new(Cursor::new(body))); + let pb1 = pb.clone(); + let stream = reader.inspect_ok(move |chunk| { + pb1.inc(chunk.len() as u64); + }); + let body = Body::wrap_stream(stream); + let resp = self + .put_object_part( + key, + upload_id, + part_number, + body, + file_size, + content_type, + acl_header, + ) + .await; + pb.finish(); + resp + } + + /// 分块上传文件,不带进度条 + /// [官网文档](https://cloud.tencent.com/document/product/436/7750) + async fn put_object_part + Send>( + self, + key: &str, + upload_id: &str, + part_number: u64, + body: T, + file_size: u64, + content_type: Option, + acl_header: Option, ) -> Response { let mut headers = self.gen_common_headers(); headers.insert( @@ -563,7 +888,8 @@ impl Objects for client::Client { .unwrap_or(mime::APPLICATION_OCTET_STREAM) .to_string(), ); - headers.insert("Content-Length".to_string(), body.len().to_string()); + let body: Body = body.into(); + headers.insert("Content-Length".to_string(), file_size.to_string()); let url_path = self.get_path_from_object_key(key); let mut query = HashMap::new(); query.insert("partNumber".to_string(), part_number.to_string()); @@ -592,7 +918,7 @@ impl Objects for client::Client { async fn put_object_complete_part( &self, key: &str, - etag_map: &HashMap, + etag_map: HashMap, upload_id: &str, ) -> Response { let url_path = self.get_path_from_object_key(key);