Skip to content

Commit

Permalink
feat(bin/oli): support command mv (apache#5370)
Browse files Browse the repository at this point in the history
  • Loading branch information
meteorgan authored Dec 4, 2024
1 parent eaf6c18 commit 21839be
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 3 deletions.
2 changes: 1 addition & 1 deletion bin/oli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ cargo install oli --all-features
- `~/Library/Application Support/oli/config.toml` on macOS
- `C:\Users\<UserName>\AppData\Roaming\oli\config.toml` on Windows

The content of `config.toml` should be follow these pattern:
The content of `config.toml` should follow these pattern:

```toml
[profiles.<profile_name>]
Expand Down
4 changes: 4 additions & 0 deletions bin/oli/src/bin/oli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ async fn main() -> Result<()> {
let cmd: oli::commands::stat::StatCmd = clap::Parser::parse();
cmd.run().await?;
}
Some("omv") => {
let cmd: oli::commands::mv::MoveCmd = clap::Parser::parse();
cmd.run().await?;
}
Some(v) => {
println!("{v} is not supported")
}
Expand Down
3 changes: 3 additions & 0 deletions bin/oli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
pub mod cat;
pub mod cp;
pub mod ls;
pub mod mv;
pub mod rm;
pub mod stat;

Expand All @@ -30,6 +31,7 @@ pub enum OliSubcommand {
Ls(ls::LsCmd),
Rm(rm::RmCmd),
Stat(stat::StatCmd),
Mv(mv::MoveCmd),
}

impl OliSubcommand {
Expand All @@ -40,6 +42,7 @@ impl OliSubcommand {
Self::Ls(cmd) => cmd.run().await,
Self::Rm(cmd) => cmd.run().await,
Self::Stat(cmd) => cmd.run().await,
Self::Mv(cmd) => cmd.run().await,
}
}
}
135 changes: 135 additions & 0 deletions bin/oli/src/commands/mv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::config::Config;
use crate::params::config::ConfigParams;
use anyhow::{Error, Result};
use futures::{AsyncWriteExt, TryStreamExt};
use opendal::Operator;
use std::path::Path;

#[derive(Debug, clap::Parser)]
#[command(name = "mv", about = "Move object", disable_version_flag = true)]
pub struct MoveCmd {
#[command(flatten)]
pub config_params: ConfigParams,
#[arg()]
pub source: String,
#[arg()]
pub destination: String,
/// Move objects recursively.
#[arg(short = 'r', long)]
pub recursive: bool,
}

impl MoveCmd {
pub async fn run(&self) -> Result<()> {
let cfg = Config::load(&self.config_params.config)?;

let (src_op, src_path) = cfg.parse_location(&self.source)?;
let (dst_op, dst_path) = cfg.parse_location(&self.destination)?;

let src_meta = src_op.stat(&src_path).await?;
if !self.recursive || src_meta.is_file() {
if src_meta.is_dir() {
return Err(Error::msg("can not move a directory in non-recursive mode"));
}

let mut actual_dst_path = dst_path.clone();
if let Ok(meta) = dst_op.stat(&dst_path).await {
if meta.is_dir() && !dst_path.ends_with("/") {
actual_dst_path.push('/');
}
}
if actual_dst_path.is_empty() || actual_dst_path.ends_with("/") {
let file_name = src_path.rsplit_once("/").unwrap_or(("", &src_path)).1;
actual_dst_path.push_str(file_name);
}

println!("Moving: {}", src_path);
self.cp_file(
&src_op,
&src_path,
&dst_op,
&actual_dst_path,
src_meta.content_length(),
)
.await?;
src_op.delete(&src_path).await?;

return Ok(());
}

let dst_root = Path::new(&dst_path);
let prefix = src_path.strip_prefix('/').unwrap_or(src_path.as_str());
let mut lst = src_op.lister_with(&src_path).recursive(true).await?;
while let Some(entry) = lst.try_next().await? {
let path = entry.path();
if path == src_path {
continue;
}

let suffix = path.strip_prefix(prefix).expect("invalid path");
let depath = dst_root.join(suffix);

println!("Moving: {}", path);
let meta = entry.metadata();
if meta.is_dir() {
dst_op.create_dir(&depath.to_string_lossy()).await?;
src_op.delete(path).await?;
continue;
}

let path_metadata = src_op.stat(path).await?;
self.cp_file(
&src_op,
path,
&dst_op,
&depath.to_string_lossy(),
path_metadata.content_length(),
)
.await?;

src_op.delete(path).await?;
}

Ok(())
}

async fn cp_file(
&self,
src_op: &Operator,
src_path: &str,
dst_op: &Operator,
dst_path: &str,
length: u64,
) -> Result<()> {
let src_reader = src_op
.reader_with(src_path)
.chunk(8 * 1024 * 1024)
.await?
.into_futures_async_read(0..length)
.await?;

let mut dst_writer = dst_op.writer(dst_path).await?.into_futures_async_write();

futures::io::copy_buf(src_reader, &mut dst_writer).await?;
dst_writer.close().await?;

Ok(())
}
}
2 changes: 1 addition & 1 deletion bin/oli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Config {
profiles: HashMap<String, HashMap<String, String>>,
}

/// resolve_relative_path turns a relative path to a absolute path.
/// resolve_relative_path turns a relative path to an absolute path.
///
/// The reason why we don't use `fs::canonicalize` here is `fs::canonicalize`
/// will return an error if the path does not exist, which is unwanted.
Expand Down
110 changes: 110 additions & 0 deletions bin/oli/tests/mv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use anyhow::Result;
use assert_cmd::Command;
use std::fs;

#[tokio::test]
async fn test_basic_mv() -> Result<()> {
let dir = tempfile::tempdir()?;
let src_path = dir.path().join("src.txt");
let dst_path = dir.path().join("dst.txt");
let expect = "hello";
fs::write(&src_path, expect)?;

let mut cmd = Command::cargo_bin("oli")?;
cmd.arg("mv")
.arg(src_path.as_os_str())
.arg(dst_path.as_os_str());
cmd.assert().success();

let actual = fs::read_to_string(&dst_path)?;
assert_eq!(actual, expect);

assert!(!fs::exists(&src_path)?);

Ok(())
}

#[tokio::test]
async fn test_move_a_file_to_a_dir() -> Result<()> {
let src_dir = tempfile::tempdir()?;
let src_path = src_dir.path().join("src.txt");
let expect = "hello";
fs::write(&src_path, expect)?;

let dst_dir = tempfile::tempdir()?;
let dst_path = dst_dir.path().join("dir/");

let mut cmd = Command::cargo_bin("oli")?;
cmd.arg("mv")
.arg(src_path.as_os_str())
.arg(dst_path.as_os_str());
cmd.assert().success();

let dst_path = dst_path.join("src.txt");
let actual = fs::read_to_string(&dst_path)?;
assert_eq!(actual, expect);

assert!(!fs::exists(&src_path)?);

Ok(())
}

#[tokio::test]
async fn test_mv_with_recursive() -> Result<()> {
let src_root = tempfile::tempdir()?;
let src_path = src_root.path().join("src/");
fs::create_dir(&src_path)?;

let src_file1 = src_path.as_path().join("file1.txt");
let file1_content = "file1";
fs::write(&src_file1, file1_content).expect("write file1 error");

let src_dir = src_path.join("dir/");
fs::create_dir(&src_dir)?;
let src_file2 = src_dir.as_path().join("file2.txt");
let file2_content = "file2";
fs::write(&src_file2, file2_content).expect("write file2 error");

let src_empty_dir = src_path.join("empty_dir/");
fs::create_dir(&src_empty_dir)?;

let dst_path = tempfile::tempdir()?;

let mut cmd = Command::cargo_bin("oli")?;
cmd.arg("mv")
.arg(src_path.as_os_str())
.arg(dst_path.path().as_os_str())
.arg("-r");
cmd.assert().success();

let dst_file1_content =
fs::read_to_string(dst_path.path().join("file1.txt")).expect("read file1 error");
assert_eq!(dst_file1_content, file1_content);
let dst_file2_content =
fs::read_to_string(dst_path.path().join("dir/file2.txt")).expect("read dir/file2 error");
assert_eq!(dst_file2_content, file2_content);
assert!(fs::exists(dst_path.path().join("empty_dir/"))?);

// src_path is empty now
let mut src_data = fs::read_dir(&src_path)?;
assert!(src_data.next().is_none());

Ok(())
}
2 changes: 1 addition & 1 deletion core/src/types/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Entry {
&self.metadata
}

/// Consume this entry to get it's path and metadata.
/// Consume this entry to get its path and metadata.
pub fn into_parts(self) -> (String, Metadata) {
(self.path, self.metadata)
}
Expand Down

0 comments on commit 21839be

Please sign in to comment.