diff --git a/.env.example b/.env.example index 62a3f9473043..ef73d622bb20 100644 --- a/.env.example +++ b/.env.example @@ -148,4 +148,11 @@ OPENDAL_ETCD_USERNAME= OPENDAL_ETCD_PASSWORD= OPENDAL_ETCD_CA_PATH= OPENDAL_ETCD_CERT_PATH= -OPENDAL_ETCD_KEY_PATH= \ No newline at end of file +OPENDAL_ETCD_KEY_PATH= +# google drive +OPENDAL_GDRIVE_TEST=false +OPENDAL_GDRIVE_ROOT=/tmp/opendal/ +OPENDAL_GDRIVE_ACCESS_TOKEN= +OPENDAL_GDRIVE_REFRESH_TOKEN= +OPENDAL_GDRIVE_CLIENT_ID= +OPENDAL_GDRIVE_CLIENT_SECRET= diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 72d5740e880c..49ce2331491b 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -19,16 +19,17 @@ use std::fmt::Debug; use std::sync::Arc; use async_trait::async_trait; +use chrono::Utc; use http::StatusCode; use super::core::GdriveCore; use super::error::parse_error; use super::writer::GdriveWriter; use crate::raw::*; +use crate::services::gdrive::core::GdriveFile; +use crate::services::gdrive::core::GdriveFileList; use crate::types::Result; -use crate::Capability; -use crate::Error; -use crate::ErrorKind; +use crate::*; #[derive(Clone, Debug)] pub struct GdriveBackend { @@ -59,27 +60,96 @@ impl Accessor for GdriveBackend { fn info(&self) -> AccessorInfo { let mut ma = AccessorInfo::default(); - ma.set_scheme(crate::Scheme::Gdrive) + ma.set_scheme(Scheme::Gdrive) .set_root(&self.core.root) .set_full_capability(Capability { + stat: true, + read: true, + write: true, + + create_dir: true, + delete: true, + ..Default::default() }); ma } + async fn stat(&self, path: &str, _args: OpStat) -> Result { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let resp = self.core.gdrive_stat(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let meta = self.parse_metadata(resp.into_body().bytes().await?)?; + Ok(RpStat::new(meta)) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result { + let parent = self.core.ensure_parent_path(path).await?; + + let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); + + // As Google Drive allows files have the same name, we need to check if the folder exists. + let resp = self.core.gdrive_search_folder(path, &parent).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + let meta = serde_json::from_slice::(&body) + .map_err(new_json_deserialize_error)?; + + if !meta.files.is_empty() { + return Ok(RpCreateDir::default()); + } + } + _ => return Err(parse_error(resp).await?), + } + + let resp = self.core.gdrive_create_folder(path, Some(parent)).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpCreateDir::default()), + _ => Err(parse_error(resp).await?), + } + } + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.core.gdrive_get(path).await?; + // We need to request for metadata and body separately here. + // Request for metadata first to check if the file exists. + let resp = self.core.gdrive_stat(path).await?; let status = resp.status(); match status { StatusCode::OK => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) + let body = resp.into_body().bytes().await?; + let meta = self.parse_metadata(body)?; + + let resp = self.core.gdrive_get(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok((RpRead::with_metadata(meta), resp.into_body())), + _ => Err(parse_error(resp).await?), + } } _ => Err(parse_error(resp).await?), } @@ -93,20 +163,87 @@ impl Accessor for GdriveBackend { )); } + // As Google Drive allows files have the same name, we need to check if the file exists. + // If the file exists, we will keep its ID and update it. + let mut file_id: Option = None; + + let resp = self.core.gdrive_stat(path).await; + // We don't care about the error here. + // As long as the file doesn't exist, we will create a new one. + if let Ok(resp) = resp { + let status = resp.status(); + + if status == StatusCode::OK { + let body = resp.into_body().bytes().await?; + let meta = serde_json::from_slice::(&body) + .map_err(new_json_deserialize_error)?; + + file_id = if meta.id.is_empty() { + None + } else { + Some(meta.id) + }; + } + } + Ok(( RpWrite::default(), - GdriveWriter::new(self.core.clone(), args, String::from(path)), + GdriveWriter::new(self.core.clone(), String::from(path), file_id), )) } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.core.gdrive_delete(path).await?; + let resp = self.core.gdrive_delete(path).await; + if let Ok(resp) = resp { + let status = resp.status(); - let status = resp.status(); + match status { + StatusCode::NO_CONTENT => return Ok(RpDelete::default()), + _ => return Err(parse_error(resp).await?), + } + }; - match status { - StatusCode::NO_CONTENT => Ok(RpDelete::default()), - _ => Err(parse_error(resp).await?), + let e = resp.err().unwrap(); + if e.kind() == ErrorKind::NotFound { + Ok(RpDelete::default()) + } else { + Err(e) } } } + +impl GdriveBackend { + pub(crate) fn parse_metadata(&self, body: bytes::Bytes) -> Result { + let metadata = + serde_json::from_slice::(&body).map_err(new_json_deserialize_error)?; + + let mut meta = Metadata::new(match metadata.mime_type.as_str() { + "application/vnd.google-apps.folder" => EntryMode::DIR, + _ => EntryMode::FILE, + }); + + let size = if meta.mode() == EntryMode::DIR { + // Google Drive does not return the size for folders. + 0 + } else { + metadata + .size + .expect("file size must exist") + .parse::() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) + })? + }; + meta = meta.with_content_length(size); + meta = meta.with_last_modified( + metadata + .modified_time + .expect("modified time must exist. please check your query param - fields") + .parse::>() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) + })?, + ); + Ok(meta) + } +} diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index b54504bca172..acf8b07470b3 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -20,22 +20,17 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use bytes; use http::header; -use http::request::Builder; use http::Request; use http::Response; use http::StatusCode; use serde::Deserialize; +use serde_json::json; use tokio::sync::Mutex; use super::error::parse_error; -use crate::raw::build_rooted_abs_path; -use crate::raw::new_json_deserialize_error; -use crate::raw::new_request_build_error; -use crate::raw::percent_encode_path; -use crate::raw::AsyncBody; -use crate::raw::HttpClient; -use crate::raw::IncomingAsyncBody; +use crate::raw::*; use crate::types::Result; use crate::Error; use crate::ErrorKind; @@ -43,7 +38,13 @@ use crate::ErrorKind; pub struct GdriveCore { pub root: String, pub access_token: String, + pub client: HttpClient, + + /// Cache the mapping from path to file id + /// + /// Google Drive uses file id to identify a file. + /// As the path is immutable, we can cache the mapping from path to file id. pub path_cache: Arc>>, } @@ -56,91 +57,223 @@ impl Debug for GdriveCore { } impl GdriveCore { - async fn get_abs_root_id(&self) -> Result { - let root = "root"; + /// Get the file id by path. + /// Including file and folder. + /// + /// The path is rooted at the root of the Google Drive. + /// + /// # Notes + /// + /// - A path is a sequence of file names separated by slashes. + /// - A file only knows its parent id, but not its name. + /// - To find the file id of a file, we need to traverse the path from the root to the file. + pub(crate) async fn get_file_id_by_path(&self, file_path: &str) -> Result { + let path = build_rooted_abs_path(&self.root, file_path); - if let Some(root_id) = self.path_cache.lock().await.get(root) { - return Ok(root_id.to_string()); - } + let mut parent_id = "root".to_owned(); + let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); + + for (i, item) in file_path_items.iter().enumerate() { + let mut query = format!( + "name = \"{}\" and \"{}\" in parents and trashed = false", + item, parent_id + ); + if i != file_path_items.len() - 1 { + query += " and mimeType = 'application/vnd.google-apps.folder'"; + } - let req = self - .sign(Request::get( - "https://www.googleapis.com/drive/v3/files/root", + let mut req = Request::get(format!( + "https://www.googleapis.com/drive/v3/files?q={}", + percent_encode_path(&query) )) - .body(AsyncBody::Empty) + .body(AsyncBody::default()) .map_err(new_request_build_error)?; - let resp = self.client.send(req).await?; - let status = resp.status(); + let _ = self.sign(&mut req); + + let resp = self.client.send(req).await?; + let status = resp.status(); - match status { - StatusCode::OK => { - let resp_body = &resp.into_body().bytes().await?; + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; - let gdrive_file: GdriveFile = - serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; + let gdrive_file_list: GdriveFileList = + serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; - let root_id = gdrive_file.id; + if gdrive_file_list.files.is_empty() { + return Err(Error::new( + ErrorKind::NotFound, + &format!("path not found: {}", item), + )); + } - let mut cache_guard = self.path_cache.lock().await; - cache_guard.insert(root.to_owned(), root_id.clone()); + if gdrive_file_list.files.len() > 1 { + return Err(Error::new(ErrorKind::Unexpected, &format!("please ensure that the file corresponding to the path exists and is unique. the response body is {}", String::from_utf8_lossy(resp_body)))); + } - Ok(root_id) + parent_id = gdrive_file_list.files[0].id.clone(); + } + _ => { + return Err(parse_error(resp).await?); + } } - _ => Err(parse_error(resp).await?), } - } - async fn get_file_id_by_path(&self, file_path: &str) -> Result { - let path = build_rooted_abs_path(&self.root, file_path); + Ok(parent_id) + } - if let Some(file_id) = self.path_cache.lock().await.get(&path) { - return Ok(file_id.to_string()); - } + /// Ensure the parent path exists. + /// If the parent path does not exist, create it. + /// + /// # Notes + /// + /// - The path is rooted at the root of the Google Drive. + /// - Will create the parent path recursively. + pub(crate) async fn ensure_parent_path(&self, path: &str) -> Result { + let path = build_rooted_abs_path(&self.root, path); - let mut parent_id = self.get_abs_root_id().await?; - let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); + let mut parent: String = "root".to_owned(); + let mut file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); + file_path_items.pop(); for (i, item) in file_path_items.iter().enumerate() { - let mut query = format!( - "name = '{}' and parents = '{}' and trashed = false", - item, parent_id + let query = format!( + "name = \"{}\" and \"{}\" in parents and trashed = false and mimeType = 'application/vnd.google-apps.folder'", + item, parent ); - if i != file_path_items.len() - 1 { - query += "and mimeType = 'application/vnd.google-apps.folder'"; - } - let req = self - .sign(Request::get(format!( - "https://www.googleapis.com/drive/v3/files?q={}", - percent_encode_path(&query) - ))) - .body(AsyncBody::default()) - .map_err(new_request_build_error)?; + let mut req = Request::get(format!( + "https://www.googleapis.com/drive/v3/files?q={}", + percent_encode_path(&query) + )) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + let _ = self.sign(&mut req); let resp = self.client.send(req).await?; let status = resp.status(); - if status == StatusCode::OK { - let resp_body = &resp.into_body().bytes().await?; - - let gdrive_file_list: GdriveFileList = - serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; - - if gdrive_file_list.files.len() != 1 { - return Err(Error::new(ErrorKind::Unexpected, &format!("Please ensure that the file corresponding to the path exists and is unique. The response body is {}", String::from_utf8_lossy(resp_body)))); + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + + let gdrive_file_list: GdriveFileList = + serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; + + if gdrive_file_list.files.len() != 1 { + let parent_name = file_path_items[i]; + let resp_body = self + .gdrive_create_folder(parent_name, Some(parent.to_owned())) + .await? + .into_body() + .bytes() + .await?; + let parent_meta: GdriveFile = serde_json::from_slice(&resp_body) + .map_err(new_json_deserialize_error)?; + + parent = parent_meta.id; + } else { + parent = gdrive_file_list.files[0].id.clone(); + } + } + StatusCode::NOT_FOUND => { + let parent_name = file_path_items[i]; + let res = self + .gdrive_create_folder(parent_name, Some(parent.to_owned())) + .await?; + + let status = res.status(); + + match status { + StatusCode::OK => { + let parent_id = res.into_body().bytes().await?; + parent = String::from_utf8_lossy(&parent_id).to_string(); + } + _ => { + return Err(parse_error(res).await?); + } + } + } + _ => { + return Err(parse_error(resp).await?); } - - parent_id = gdrive_file_list.files[0].id.clone(); - } else { - return Err(parse_error(resp).await?); } } - let mut cache_guard = self.path_cache.lock().await; - cache_guard.insert(path, parent_id.clone()); + Ok(parent.to_owned()) + } - Ok(parent_id) + pub async fn gdrive_search_folder( + &self, + target: &str, + parent: &str, + ) -> Result> { + let query = format!( + "name = '{}' and '{}' in parents and trashed = false and mimeType = 'application/vnd.google-apps.folder'", + target, parent + ); + let url = format!( + "https://www.googleapis.com/drive/v3/files?q={}", + percent_encode_path(query.as_str()) + ); + + let mut req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req)?; + + self.client.send(req).await + } + + /// Create a folder. + /// Should provide the parent folder id. + /// Or will create the folder in the root folder. + pub async fn gdrive_create_folder( + &self, + name: &str, + parent: Option, + ) -> Result> { + let url = "https://www.googleapis.com/drive/v3/files"; + + let mut req = Request::post(url) + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(bytes::Bytes::from( + serde_json::to_vec(&json!({ + "name": name, + "mimeType": "application/vnd.google-apps.folder", + // If the parent is not provided, the folder will be created in the root folder. + "parents": [parent.unwrap_or("root".to_owned())], + })) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + &format!("failed to serialize json(create folder result): {}", e), + ) + })?, + ))) + .map_err(new_request_build_error)?; + + let _ = self.sign(&mut req); + + self.client.send(req).await + } + + pub async fn gdrive_stat(&self, path: &str) -> Result> { + let path_id = self.get_file_id_by_path(path).await?; + + // The file metadata in the Google Drive API is very complex. + // For now, we only need the file id, name, mime type and modified time. + let mut req = Request::get(&format!( + "https://www.googleapis.com/drive/v3/files/{}?fields=id,name,mimeType,size,modifiedTime", + path_id.as_str() + )) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + let _ = self.sign(&mut req); + + self.client.send(req).await } pub async fn gdrive_get(&self, path: &str) -> Result> { @@ -149,10 +282,10 @@ impl GdriveCore { self.get_file_id_by_path(path).await? ); - let req = self - .sign(Request::get(&url)) + let mut req = Request::get(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; + let _ = self.sign(&mut req); self.client.send(req).await } @@ -165,11 +298,11 @@ impl GdriveCore { body: AsyncBody, ) -> Result> { let url = format!( - "https://www.googleapis.com/upload/drive/v3/files/{}", + "https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media", self.get_file_id_by_path(path).await? ); - let mut req = Request::patch(&url); + let mut req = Request::put(&url); if let Some(size) = size { req = req.header(header::CONTENT_LENGTH, size) @@ -179,7 +312,8 @@ impl GdriveCore { req = req.header(header::CONTENT_TYPE, mime) } - let req = self.sign(req).body(body).map_err(new_request_build_error)?; + let mut req = req.body(body).map_err(new_request_build_error)?; + let _ = self.sign(&mut req); self.client.send(req).await } @@ -190,29 +324,112 @@ impl GdriveCore { self.get_file_id_by_path(path).await? ); - let req = self - .sign(Request::delete(&url)) + let mut req = Request::delete(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; + let _ = self.sign(&mut req); + + self.client.send(req).await + } + + pub async fn gdrive_upload_simple_request( + &self, + path: &str, + size: u64, + body: bytes::Bytes, + ) -> Result> { + let parent = self.ensure_parent_path(path).await?; + + let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart"; + + let file_name = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); + + let metadata = &json!({ + "name": file_name, + "parents": [parent], + }); + + let req = Request::post(url).header("X-Upload-Content-Length", size); + + let multipart = Multipart::new() + .part( + FormDataPart::new("metadata") + .header( + header::CONTENT_TYPE, + "application/json; charset=UTF-8".parse().unwrap(), + ) + .content(metadata.to_string()), + ) + .part( + FormDataPart::new("file") + .header( + header::CONTENT_TYPE, + "application/octet-stream".parse().unwrap(), + ) + .content(body), + ); + + let mut req = multipart.apply(req)?; + + let _ = self.sign(&mut req); + + self.client.send(req).await + } + + pub async fn gdrive_upload_overwrite_simple_request( + &self, + file_id: &str, + size: u64, + body: bytes::Bytes, + ) -> Result> { + let url = format!( + "https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media", + file_id + ); + + let mut req = Request::patch(url) + .header(header::CONTENT_TYPE, "application/octet-stream") + .header(header::CONTENT_LENGTH, size) + .header("X-Upload-Content-Length", size) + .body(AsyncBody::Bytes(body)) + .map_err(new_request_build_error)?; + + let _ = self.sign(&mut req); + self.client.send(req).await } - fn sign(&self, mut req: Builder) -> Builder { + fn sign(&self, req: &mut Request) -> Result<()> { let auth_header_content = format!("Bearer {}", self.access_token); - req = req.header(header::AUTHORIZATION, auth_header_content); - req + req.headers_mut() + .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap()); + + Ok(()) } } +// This is the file struct returned by the Google Drive API. +// This is a complex struct, but we only add the fields we need. // refer to https://developers.google.com/drive/api/reference/rest/v3/files#File -#[derive(Deserialize)] -struct GdriveFile { - id: String, +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct GdriveFile { + pub mime_type: String, + pub id: String, + pub name: String, + pub size: Option, + // The modified time is not returned unless the `fields` + // query parameter contains `modifiedTime`. + // As we only need the modified time when we do `stat` operation, + // if other operations(such as search) do not specify the `fields` query parameter, + // try to access this field, it will be `None`. + pub modified_time: Option, } // refer to https://developers.google.com/drive/api/reference/rest/v3/files/list #[derive(Deserialize)] -struct GdriveFileList { - files: Vec, +#[serde(rename_all = "camelCase")] +pub(crate) struct GdriveFileList { + pub(crate) files: Vec, } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index ba8f20b8f635..48d88f3ec046 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -24,31 +24,61 @@ use http::StatusCode; use super::core::GdriveCore; use super::error::parse_error; use crate::raw::*; +use crate::services::gdrive::core::GdriveFile; use crate::*; pub struct GdriveWriter { core: Arc, - op: OpWrite, + path: String, + + file_id: Option, } impl GdriveWriter { - pub fn new(core: Arc, op: OpWrite, path: String) -> Self { - GdriveWriter { core, op, path } + pub fn new(core: Arc, path: String, file_id: Option) -> Self { + GdriveWriter { + core, + path, + + file_id, + } } -} -#[async_trait] -impl oio::Write for GdriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + /// Write a single chunk of data to the object. + /// + /// This is used for small objects. + /// And should overwrite the object if it already exists. + pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()> { let resp = self .core - .gdrive_update( - &self.path, - Some(bs.len()), - self.op.content_type(), - AsyncBody::Bytes(bs), - ) + .gdrive_upload_simple_request(&self.path, size, body) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::CREATED => { + let bs = resp.into_body().bytes().await?; + + let file = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + + self.file_id = Some(file.id); + + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + pub async fn write_overwrite(&self, size: u64, body: Bytes) -> Result<()> { + let file_id = self.file_id.as_ref().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "file_id is required for overwrite") + })?; + let resp = self + .core + .gdrive_upload_overwrite_simple_request(file_id, size, body) .await?; let status = resp.status(); @@ -61,12 +91,20 @@ impl oio::Write for GdriveWriter { _ => Err(parse_error(resp).await?), } } +} + +#[async_trait] +impl oio::Write for GdriveWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + if self.file_id.is_none() { + self.write_create(bs.len() as u64, bs).await + } else { + self.write_overwrite(bs.len() as u64, bs).await + } + } async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } async fn abort(&mut self) -> Result<()> {