Skip to content

Commit

Permalink
feat(service/dropbox): impl batch delete (apache#2606)
Browse files Browse the repository at this point in the history
* refresh token flow

Signed-off-by: suyanhanx <[email protected]>

* impl batch_delete

Signed-off-by: suyanhanx <[email protected]>

* enable CI test

Signed-off-by: suyanhanx <[email protected]>

* add too_many_write_operations as retriable

Signed-off-by: suyanhanx <[email protected]>

* use backon to retry

Signed-off-by: suyanhanx <[email protected]>

* make fmt happy

Signed-off-by: suyanhanx <[email protected]>

---------

Signed-off-by: suyanhanx <[email protected]>
  • Loading branch information
suyanhanx authored Jul 8, 2023
1 parent 1f4032b commit 45f0425
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/
OPENDAL_DROPBOX_TEST=false
OPENDAL_DROPBOX_ROOT=/tmp/opendal/
OPENDAL_DROPBOX_ACCESS_TOKEN=<access_token>
OPENDAL_DROPBOX_REFRESH_TOKEN=<refresh_token>
OPENDAL_DROPBOX_CLIENT_ID=<client_id>
OPENDAL_DROPBOX_CLIENT_SECRET=<client_secret>
66 changes: 66 additions & 0 deletions .github/workflows/service_test_dropbox.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Service Test Dropbox

on:
push:
branches:
- main
pull_request:
branches:
- main
paths:
- "core/src/**"
- "core/tests/**"
- "!core/src/docs/**"
- "!core/src/services/**"
- "core/src/services/dropbox/**"
- ".github/workflows/service_test_dropbox.yml"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
cancel-in-progress: true

jobs:
dropbox:
runs-on: ubuntu-latest
if: github.event_name == 'push' || !github.event.pull_request.head.repo.fork
steps:
- uses: actions/checkout@v3
- name: Setup Rust toolchain
uses: ./.github/actions/setup

- name: Load secret
id: op-load-secret
uses: 1password/load-secrets-action@v1
with:
export-env: true
env:
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
OPENDAL_DROPBOX_TEST: op://services/dropbox/test
OPENDAL_DROPBOX_ROOT: op://services/dropbox/root
OPENDAL_DROPBOX_REFRESH_TOKEN: op://services/dropbox/refresh_token
OPENDAL_DROPBOX_CLIENT_ID: op://services/dropbox/client_id
OPENDAL_DROPBOX_CLIENT_SECRET: op://services/dropbox/client_secret

- name: Test
shell: bash
working-directory: core
# It's easily for dropbox to trigger too_many_write_operations error.
# So we run tests one by one.
run: cargo test dropbox --features=services-dropbox -- --test-threads=1
86 changes: 86 additions & 0 deletions core/src/services/dropbox/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@

use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use backon::ExponentialBuilder;
use backon::Retryable;
use http::StatusCode;
use once_cell::sync::Lazy;
use serde::Deserialize;

use super::core::DropboxCore;
use super::error::parse_error;
use super::writer::DropboxWriter;
use crate::raw::*;
use crate::services::dropbox::error::DropboxErrorResponse;
use crate::*;

static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| {
ExponentialBuilder::default()
.with_max_delay(Duration::from_secs(10))
.with_max_times(10)
.with_jitter()
});

#[derive(Clone, Debug)]
pub struct DropboxBackend {
pub core: Arc<DropboxCore>,
Expand Down Expand Up @@ -58,6 +70,9 @@ impl Accessor for DropboxBackend {

delete: true,

batch: true,
batch_delete: true,

..Default::default()
});
ma
Expand Down Expand Up @@ -162,6 +177,59 @@ impl Accessor for DropboxBackend {
_ => Err(parse_error(resp).await?),
}
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let ops = args.into_operation();
if ops.len() > 1000 {
return Err(Error::new(
ErrorKind::Unsupported,
"dropbox services only allow delete up to 1000 keys at once",
)
.with_context("length", ops.len().to_string()));
}

let paths = ops.into_iter().map(|(p, _)| p).collect::<Vec<_>>();

let resp = self.core.dropbox_delete_batch(paths).await?;

let status = resp.status();

match status {
StatusCode::OK => {
let (_parts, body) = resp.into_parts();
let bs = body.bytes().await?;
let decoded_response = serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
.map_err(new_json_deserialize_error)?;

match decoded_response.tag.as_str() {
"complete" => {
let entries = decoded_response.entries.unwrap_or_default();
let results = self.core.handle_batch_delete_complete_result(entries);
Ok(RpBatch::new(results))
}
"async_job_id" => {
let job_id = decoded_response
.async_job_id
.expect("async_job_id should be present");
let res = { || self.core.dropbox_delete_batch_check(job_id.clone()) }
.retry(&*BACKOFF)
.when(|e| e.is_temporary())
.await?;

Ok(res)
}
_ => Err(Error::new(
ErrorKind::Unexpected,
&format!(
"delete batch failed with unexpected tag {}",
decoded_response.tag
),
)),
}
}
_ => Err(parse_error(resp).await?),
}
}
}

#[derive(Default, Debug, Deserialize)]
Expand Down Expand Up @@ -217,3 +285,21 @@ pub struct DropboxMetadataSharingInfo {
pub traverse_only: Option<bool>,
pub no_access: Option<bool>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxDeleteBatchResponse {
#[serde(rename(deserialize = ".tag"))]
pub tag: String,
pub async_job_id: Option<String>,
pub entries: Option<Vec<DropboxDeleteBatchResponseEntry>>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxDeleteBatchResponseEntry {
#[serde(rename(deserialize = ".tag"))]
pub tag: String,
pub metadata: Option<DropboxMetadataResponse>,
pub error: Option<DropboxErrorResponse>,
}
49 changes: 37 additions & 12 deletions core/src/services/dropbox/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,47 @@ use crate::*;
///
/// # Configuration
///
/// - `access_token`: set the access_token for google drive api
/// - `root`: Set the work directory for backend
/// - `root`: Set the work directory for this backend.
///
/// You can refer to [`DropboxBuilder`]'s docs for more information
/// ## Credentials related
///
/// ### Just provide Access Token (Temporary)
///
/// - `access_token`: set the access_token for this backend.
/// Please notice its expiration.
///
/// ### Or provide Client ID and Client Secret and refresh token (Long Term)
///
/// If you want to let OpenDAL to refresh the access token automatically,
/// please provide the following fields:
///
/// - `refresh_token`: set the refresh_token for dropbox api
/// - `client_id`: set the client_id for dropbox api
/// - `client_secret`: set the client_secret for dropbox api
///
/// OpenDAL is a library, it cannot do the first step of OAuth2 for you.
/// You need to get authorization code from user by calling Dropbox's authorize url
/// and exchange it for refresh token.
///
/// Please refer to [Dropbox OAuth2 Guide](https://www.dropbox.com/developers/reference/oauth-guide)
/// for more information.
///
/// You can refer to [`DropboxBuilder`]'s docs for more information.
///
/// # Example
///
/// ## Via Builder
///
/// ```
/// ```rust
/// use anyhow::Result;
/// use opendal::raw::OpWrite;
/// use opendal::services::Dropbox;
/// use opendal::Operator;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // create backend builder
/// let mut builder = Dropbox::default();
/// builder.root("/test");
/// builder.access_token("<token>");
///
/// let op: Operator = Operator::new(builder)?.finish();
Expand All @@ -78,7 +100,9 @@ use crate::*;
#[derive(Default)]
pub struct DropboxBuilder {
root: Option<String>,

access_token: Option<String>,

refresh_token: Option<String>,
client_id: Option<String>,
client_secret: Option<String>,
Expand All @@ -105,33 +129,34 @@ impl DropboxBuilder {
///
/// You can get the access token from [Dropbox App Console](https://www.dropbox.com/developers/apps)
///
/// NOTE: this token will be expired in 4 hours. If you are trying to use dropbox services in a long time, please set a refresh_token instead.
/// NOTE: this token will be expired in 4 hours.
/// If you are trying to use the Dropbox service in a long time, please set a refresh_token instead.
pub fn access_token(&mut self, access_token: &str) -> &mut Self {
self.access_token = Some(access_token.to_string());
self
}

/// Refersh token is used for long term access to the Dropbox API.
/// Refresh token is used for long term access to the Dropbox API.
///
/// You can get the refresh token via OAuth2.0 Flow of dropbox.
/// You can get the refresh token via OAuth 2.0 Flow of Dropbox.
///
/// OpenDAL will use this refresh token to get a new access token when the old one is expired.
pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self {
self.refresh_token = Some(refresh_token.to_string());
self
}

/// Set the client id for dropbox.
/// Set the client id for Dropbox.
///
/// This is required for OAuth2.0 Flow with refresh token.
/// This is required for OAuth 2.0 Flow to refresh the access token.
pub fn client_id(&mut self, client_id: &str) -> &mut Self {
self.client_id = Some(client_id.to_string());
self
}

/// Set the client secret for dropbox.
/// Set the client secret for Dropbox.
///
/// This is required for OAuth2.0 Flow with refresh token.
/// This is required for OAuth 2.0 Flow with refresh the access token.
pub fn client_secret(&mut self, client_secret: &str) -> &mut Self {
self.client_secret = Some(client_secret.to_string());
self
Expand Down
Loading

0 comments on commit 45f0425

Please sign in to comment.