Skip to content

Commit

Permalink
refactor(services/gdrive): prepare for CI (apache#2892)
Browse files Browse the repository at this point in the history
  • Loading branch information
suyanhanx authored Aug 21, 2023
1 parent 8649189 commit 4e1c14c
Show file tree
Hide file tree
Showing 4 changed files with 510 additions and 111 deletions.
9 changes: 8 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,11 @@ OPENDAL_ETCD_USERNAME=<username>
OPENDAL_ETCD_PASSWORD=<password>
OPENDAL_ETCD_CA_PATH=<ca_path>
OPENDAL_ETCD_CERT_PATH=<cert_path>
OPENDAL_ETCD_KEY_PATH=<key_path>
OPENDAL_ETCD_KEY_PATH=<key_path>
# google drive
OPENDAL_GDRIVE_TEST=false
OPENDAL_GDRIVE_ROOT=/tmp/opendal/
OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token>
OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token>
OPENDAL_GDRIVE_CLIENT_ID=<client_id>
OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret>
163 changes: 150 additions & 13 deletions core/src/services/gdrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use chrono::Utc;
use http::StatusCode;

use super::core::GdriveCore;
use super::error::parse_error;
use super::writer::GdriveWriter;
use crate::raw::*;
use crate::services::gdrive::core::GdriveFile;
use crate::services::gdrive::core::GdriveFileList;
use crate::types::Result;
use crate::Capability;
use crate::Error;
use crate::ErrorKind;
use crate::*;

#[derive(Clone, Debug)]
pub struct GdriveBackend {
Expand Down Expand Up @@ -59,27 +60,96 @@ impl Accessor for GdriveBackend {

fn info(&self) -> AccessorInfo {
let mut ma = AccessorInfo::default();
ma.set_scheme(crate::Scheme::Gdrive)
ma.set_scheme(Scheme::Gdrive)
.set_root(&self.core.root)
.set_full_capability(Capability {
stat: true,

read: true,

write: true,

create_dir: true,

delete: true,

..Default::default()
});

ma
}

async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
// Stat root always returns a DIR.
if path == "/" {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}

let resp = self.core.gdrive_stat(path).await?;

let status = resp.status();

match status {
StatusCode::OK => {
let meta = self.parse_metadata(resp.into_body().bytes().await?)?;
Ok(RpStat::new(meta))
}
_ => Err(parse_error(resp).await?),
}
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let parent = self.core.ensure_parent_path(path).await?;

let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap();

// As Google Drive allows files have the same name, we need to check if the folder exists.
let resp = self.core.gdrive_search_folder(path, &parent).await?;
let status = resp.status();

match status {
StatusCode::OK => {
let body = resp.into_body().bytes().await?;
let meta = serde_json::from_slice::<GdriveFileList>(&body)
.map_err(new_json_deserialize_error)?;

if !meta.files.is_empty() {
return Ok(RpCreateDir::default());
}
}
_ => return Err(parse_error(resp).await?),
}

let resp = self.core.gdrive_create_folder(path, Some(parent)).await?;

let status = resp.status();

match status {
StatusCode::OK => Ok(RpCreateDir::default()),
_ => Err(parse_error(resp).await?),
}
}

async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.gdrive_get(path).await?;
// We need to request for metadata and body separately here.
// Request for metadata first to check if the file exists.
let resp = self.core.gdrive_stat(path).await?;

let status = resp.status();

match status {
StatusCode::OK => {
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
let body = resp.into_body().bytes().await?;
let meta = self.parse_metadata(body)?;

let resp = self.core.gdrive_get(path).await?;

let status = resp.status();

match status {
StatusCode::OK => Ok((RpRead::with_metadata(meta), resp.into_body())),
_ => Err(parse_error(resp).await?),
}
}
_ => Err(parse_error(resp).await?),
}
Expand All @@ -93,20 +163,87 @@ impl Accessor for GdriveBackend {
));
}

// As Google Drive allows files have the same name, we need to check if the file exists.
// If the file exists, we will keep its ID and update it.
let mut file_id: Option<String> = None;

let resp = self.core.gdrive_stat(path).await;
// We don't care about the error here.
// As long as the file doesn't exist, we will create a new one.
if let Ok(resp) = resp {
let status = resp.status();

if status == StatusCode::OK {
let body = resp.into_body().bytes().await?;
let meta = serde_json::from_slice::<GdriveFile>(&body)
.map_err(new_json_deserialize_error)?;

file_id = if meta.id.is_empty() {
None
} else {
Some(meta.id)
};
}
}

Ok((
RpWrite::default(),
GdriveWriter::new(self.core.clone(), args, String::from(path)),
GdriveWriter::new(self.core.clone(), String::from(path), file_id),
))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let resp = self.core.gdrive_delete(path).await?;
let resp = self.core.gdrive_delete(path).await;
if let Ok(resp) = resp {
let status = resp.status();

let status = resp.status();
match status {
StatusCode::NO_CONTENT => return Ok(RpDelete::default()),
_ => return Err(parse_error(resp).await?),
}
};

match status {
StatusCode::NO_CONTENT => Ok(RpDelete::default()),
_ => Err(parse_error(resp).await?),
let e = resp.err().unwrap();
if e.kind() == ErrorKind::NotFound {
Ok(RpDelete::default())
} else {
Err(e)
}
}
}

impl GdriveBackend {
pub(crate) fn parse_metadata(&self, body: bytes::Bytes) -> Result<Metadata> {
let metadata =
serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?;

let mut meta = Metadata::new(match metadata.mime_type.as_str() {
"application/vnd.google-apps.folder" => EntryMode::DIR,
_ => EntryMode::FILE,
});

let size = if meta.mode() == EntryMode::DIR {
// Google Drive does not return the size for folders.
0
} else {
metadata
.size
.expect("file size must exist")
.parse::<u64>()
.map_err(|e| {
Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
})?
};
meta = meta.with_content_length(size);
meta = meta.with_last_modified(
metadata
.modified_time
.expect("modified time must exist. please check your query param - fields")
.parse::<chrono::DateTime<Utc>>()
.map_err(|e| {
Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
})?,
);
Ok(meta)
}
}
Loading

0 comments on commit 4e1c14c

Please sign in to comment.