Skip to content

Commit

Permalink
feat(services/webhdfs): Implement multi write via CONCAT (apache#3939)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo authored Jan 10, 2024
1 parent 56accd2 commit 8714ad7
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 54 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ OPENDAL_WEBDAV_ENDPOINT=http://127.0.0.1:8080
OPENDAL_WEBHDFS_ROOT=/tmp/opendal/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_DELEGATION=<delegation>
OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=false
# supbase
OPENDAL_SUPABASE_BUCKET=<bucket>
Expand Down
1 change: 1 addition & 0 deletions .github/services/webhdfs/webhdfs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ runs:
cat << EOF >> $GITHUB_ENV
OPENDAL_WEBHDFS_ROOT=/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
EOF
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ runs:
cat << EOF >> $GITHUB_ENV
OPENDAL_WEBHDFS_ROOT=/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=true
EOF
11 changes: 6 additions & 5 deletions core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ where
})));
let size = self.fill_cache(bs);
return Poll::Ready(Ok(size));
} else {
ready!(self.futures.poll_next_unpin(cx));
} else if let Some(res) = ready!(self.futures.poll_next_unpin(cx)) {
res?;
}
}
State::Close(_) => {
Expand Down Expand Up @@ -209,8 +209,7 @@ where
}));
}
}
}
if self.futures.is_empty() && self.cache.is_none() {
} else if self.futures.is_empty() && self.cache.is_none() {
self.state =
State::Close(Box::pin(
async move { w.complete_block(block_ids).await },
Expand All @@ -232,7 +231,9 @@ where
})));
}
}
while ready!(self.futures.poll_next_unpin(cx)).is_some() {}
while let Some(res) = ready!(self.futures.poll_next_unpin(cx)) {
res?;
}
}
}
State::Close(fut) => {
Expand Down
161 changes: 131 additions & 30 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ pub struct WebhdfsBuilder {
endpoint: Option<String>,
delegation: Option<String>,
disable_list_batch: bool,
/// atomic_write_dir of this backend
pub atomic_write_dir: Option<String>,
}

impl Debug for WebhdfsBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Builder")
.field("root", &self.root)
.field("endpoint", &self.endpoint)
.field("atomic_write_dir", &self.atomic_write_dir)
.finish_non_exhaustive()
}
}
Expand Down Expand Up @@ -117,6 +120,20 @@ impl WebhdfsBuilder {
self.disable_list_batch = true;
self
}

/// Set temp dir for atomic write.
///
/// # Notes
///
/// If not set, write multi not support, eg: `.opendal_tmp/`.
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.atomic_write_dir = if dir.is_empty() {
None
} else {
Some(String::from(dir))
};
self
}
}

impl Builder for WebhdfsBuilder {
Expand All @@ -132,6 +149,8 @@ impl Builder for WebhdfsBuilder {
map.get("disable_list_batch")
.filter(|v| v == &"true")
.map(|_| builder.disable_list_batch());
map.get("atomic_write_dir")
.map(|v| builder.atomic_write_dir(v));

builder
}
Expand Down Expand Up @@ -162,6 +181,8 @@ impl Builder for WebhdfsBuilder {
};
debug!("backend use endpoint {}", endpoint);

let atomic_write_dir = self.atomic_write_dir.take();

let auth = self
.delegation
.take()
Expand All @@ -175,6 +196,7 @@ impl Builder for WebhdfsBuilder {
auth,
client,
root_checker: OnceCell::new(),
atomic_write_dir,
disable_list_batch: self.disable_list_batch,
};

Expand All @@ -190,52 +212,78 @@ pub struct WebhdfsBackend {
auth: Option<String>,
root_checker: OnceCell<()>,

pub atomic_write_dir: Option<String>,
pub disable_list_batch: bool,
pub client: HttpClient,
}

impl WebhdfsBackend {
/// create object or make a directory
///
/// TODO: we should split it into mkdir and create
pub fn webhdfs_create_object_request(
pub fn webhdfs_create_dir_request(&self, path: &str) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let mut url = format!(
"{}/webhdfs/v1/{}?op=MKDIRS&overwrite=true&noredirect=true",
self.endpoint,
percent_encode_path(&p),
);
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}

let req = Request::put(&url);

req.body(AsyncBody::Empty).map_err(new_request_build_error)
}
/// create object
pub async fn webhdfs_create_object_request(
&self,
path: &str,
size: Option<usize>,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let op = if path.ends_with('/') {
"MKDIRS"
} else {
"CREATE"
};

let mut url = format!(
"{}/webhdfs/v1/{}?op={}&overwrite=true",
"{}/webhdfs/v1/{}?op=CREATE&overwrite=true&noredirect=true",
self.endpoint,
percent_encode_path(&p),
op,
);
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}

let mut req = Request::put(&url);
let req = Request::put(&url);

let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

// mkdir does not redirect
if path.ends_with('/') {
return req.body(AsyncBody::Empty).map_err(new_request_build_error);
let resp = self.client.send(req).await?;

let status = resp.status();

if status != StatusCode::CREATED && status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

let bs = resp.into_body().bytes().await?;

let resp =
serde_json::from_slice::<LocationResponse>(&bs).map_err(new_json_deserialize_error)?;

let mut req = Request::put(&resp.location);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size.to_string());
}
req = req.header(CONTENT_LENGTH, size);
};

if let Some(content_type) = args.content_type() {
req = req.header(CONTENT_TYPE, content_type);
}
};
let req = req.body(body).map_err(new_request_build_error)?;

req.body(body).map_err(new_request_build_error)
Ok(req)
}

pub async fn webhdfs_init_append_request(&self, path: &str) -> Result<String> {
Expand All @@ -260,7 +308,7 @@ impl WebhdfsBackend {
match status {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
let resp: InitAppendResponse =
let resp: LocationResponse =
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;

Ok(resp.location)
Expand All @@ -269,6 +317,32 @@ impl WebhdfsBackend {
}
}

pub async fn webhdfs_rename_object(
&self,
from: &str,
to: &str,
) -> Result<Response<IncomingAsyncBody>> {
let from = build_abs_path(&self.root, from);
let to = build_rooted_abs_path(&self.root, to);

let mut url = format!(
"{}/webhdfs/v1/{}?op=RENAME&destination={}",
self.endpoint,
percent_encode_path(&from),
percent_encode_path(&to)
);

if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}

let req = Request::put(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.client.send(req).await
}

pub async fn webhdfs_append_request(
&self,
location: &str,
Expand All @@ -288,6 +362,36 @@ impl WebhdfsBackend {
req.body(body).map_err(new_request_build_error)
}

/// CONCAT will concat sources to the path
pub fn webhdfs_concat_request(
&self,
path: &str,
sources: Vec<String>,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let sources = sources
.iter()
.map(|p| build_rooted_abs_path(&self.root, p))
.collect::<Vec<String>>()
.join(",");

let mut url = format!(
"{}/webhdfs/v1/{}?op=CONCAT&sources={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(&sources),
);

if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}

let req = Request::post(url);

req.body(AsyncBody::Empty).map_err(new_request_build_error)
}

async fn webhdfs_open_request(
&self,
path: &str,
Expand Down Expand Up @@ -403,7 +507,7 @@ impl WebhdfsBackend {
self.client.send(req).await
}

async fn webhdfs_delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
pub async fn webhdfs_delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=DELETE&recursive=false",
Expand Down Expand Up @@ -469,6 +573,8 @@ impl Accessor for WebhdfsBackend {

write: true,
write_can_append: true,
write_can_multi: self.atomic_write_dir.is_some(),

create_dir: true,
delete: true,

Expand All @@ -481,12 +587,7 @@ impl Accessor for WebhdfsBackend {

/// Create a file or directory
async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let req = self.webhdfs_create_object_request(
path,
Some(0),
&OpWrite::default(),
AsyncBody::Empty,
)?;
let req = self.webhdfs_create_dir_request(path)?;

let resp = self.client.send(req).await?;

Expand Down Expand Up @@ -585,7 +686,7 @@ impl Accessor for WebhdfsBackend {
let w = if args.append() {
WebhdfsWriters::Two(oio::AppendObjectWriter::new(w))
} else {
WebhdfsWriters::One(oio::OneShotWriter::new(w))
WebhdfsWriters::One(oio::BlockWriter::new(w, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down Expand Up @@ -619,6 +720,6 @@ impl Accessor for WebhdfsBackend {

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct InitAppendResponse {
pub(super) struct LocationResponse {
pub location: String,
}
3 changes: 3 additions & 0 deletions core/src/services/webhdfs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This service can be used to:
- `root`: The root path of the WebHDFS service.
- `endpoint`: The endpoint of the WebHDFS service.
- `delegation`: The delegation token for WebHDFS.
- `atomic_write_dir`: The tmp write dir of multi write for WebHDFS.

Refer to [`Builder`]'s public API docs for more information.

Expand Down Expand Up @@ -58,6 +59,8 @@ async fn main() -> Result<()> {
builder.endpoint("http://127.0.0.1:9870");
// set the delegation_token for builder
builder.delegation("delegation_token");
// set atomic_write_dir for builder
builder.atomic_write_dir(".opendal_tmp/");

let op: Operator = Operator::new(builder)?.finish();

Expand Down
Loading

0 comments on commit 8714ad7

Please sign in to comment.