Skip to content

Commit

Permalink
feat(bindings/java): implement async ops to pass AsyncStepsTest (#2291)
Browse files Browse the repository at this point in the history
* refactor(bindings/java): Drop usage of GlobalRef

Signed-off-by: tison <[email protected]>

* feat(bindings/java): support stat

Signed-off-by: tison <[email protected]>

* feat(bindings/java): support read

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored May 23, 2023
1 parent be571e7 commit 003ef81
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ public Metadata stat(String fileName) {
protected native void disposeInternal(long handle);

private static native long constructor(String schema, Map<String, String> map);

private static native void write(long nativeHandle, String fileName, String content);

private static native String read(long nativeHandle, String fileName);

private static native void delete(long nativeHandle, String fileName);

private static native long stat(long nativeHandle, String file);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.opendal;

public class Metadata extends NativeObject {
public Metadata(long nativeHandle) {
protected Metadata(long nativeHandle) {
super(nativeHandle);
}

Expand Down
55 changes: 51 additions & 4 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,71 @@
package org.apache.opendal;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class Operator extends NativeObject {
private static AsyncRegistry registry() {
return AsyncRegistry.INSTANCE;
}

private enum AsyncRegistry {
INSTANCE;

private final Map<Long, CompletableFuture<?>> registry = new ConcurrentHashMap<>();

@SuppressWarnings("unused") // called by jni-rs
private long requestId() {
final CompletableFuture<?> f = new CompletableFuture<>();
while (true) {
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
final CompletableFuture<?> prev = registry.putIfAbsent(requestId, f);
if (prev == null) {
return requestId;
}
}
}

private CompletableFuture<?> get(long requestId) {
return registry.get(requestId);
}

@SuppressWarnings("unchecked")
private <T> CompletableFuture<T> take(long requestId) {
final CompletableFuture<?> f = get(requestId);
if (f != null) {
f.whenComplete((r, e) -> registry.remove(requestId));
}
return (CompletableFuture<T>) f;
}
}

public Operator(String schema, Map<String, String> map) {
super(constructor(schema, map));
}


public CompletableFuture<Void> write(String fileName, String content) {
return write(nativeHandle, fileName, content);
final long requestId = write(nativeHandle, fileName, content);
return registry().take(requestId);
}

public CompletableFuture<Metadata> stat(String fileName) {
final long requestId = stat(nativeHandle, fileName);
final CompletableFuture<Long> f = registry().take(requestId);
return f.thenApply(Metadata::new);
}

public CompletableFuture<String> read(String fileName) {
final long requestId = read(nativeHandle, fileName);
return registry().take(requestId);
}

@Override
protected native void disposeInternal(long handle);

private static native long constructor(String schema, Map<String, String> map);

private static native CompletableFuture<Void> write(long nativeHandle, String fileName, String content);
private static native long read(long nativeHandle, String fileName);
private static native long write(long nativeHandle, String fileName, String content);
private static native long stat(long nativeHandle, String file);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class ODException extends RuntimeException {
private final Code code;

@SuppressWarnings("unused") // called by jni-rs
public ODException(String code, String message) {
this(Code.valueOf(code), message);
}
Expand Down
170 changes: 147 additions & 23 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

use std::str::FromStr;

use jni::objects::{JClass, JObject, JString, JValue};
use jni::sys::{jlong, jobject};
use jni::objects::{JClass, JObject, JString, JValue, JValueOwned};
use jni::sys::jlong;
use jni::JNIEnv;

use opendal::{Operator, Scheme};

use crate::error::Error;
use crate::{get_current_env, Result};
use crate::{jmap_to_hashmap, RUNTIME};

Expand Down Expand Up @@ -69,10 +68,10 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write(
op: *mut Operator,
file: JString,
content: JString,
) -> jobject {
) -> jlong {
intern_write(&mut env, op, file, content).unwrap_or_else(|e| {
e.throw(&mut env);
JObject::null().into_raw()
0
})
}

Expand All @@ -81,41 +80,147 @@ fn intern_write(
op: *mut Operator,
file: JString,
content: JString,
) -> Result<jobject> {
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let file = env.get_string(&file)?.to_str()?.to_string();
let content = env.get_string(&content)?.to_str()?.to_string();

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = do_write(op, file, content).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});

Ok(id)
}

async fn do_write(op: &mut Operator, file: String, content: String) -> Result<()> {
Ok(op.write(&file, content).await?)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_stat(
mut env: JNIEnv,
_: JClass,
op: *mut Operator,
file: JString,
) -> jlong {
intern_stat(&mut env, op, file).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_stat(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result<jlong> {
let op = unsafe { &mut *op };
let file: String = env.get_string(&file)?.into();
let content: String = env.get_string(&content)?.into();
let id = request_id(env)?;

let file = env.get_string(&file)?.to_str()?.to_string();

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = do_stat(op, file).await;
complete_future(id, result.map(JValueOwned::Long))
});

let class = "java/util/concurrent/CompletableFuture";
let f = env.new_object(class, "()V", &[])?;
Ok(id)
}

async fn do_stat(op: &mut Operator, file: String) -> Result<jlong> {
let metadata = op.stat(&file).await?;
Ok(Box::into_raw(Box::new(metadata)) as jlong)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_read(
mut env: JNIEnv,
_: JClass,
op: *mut Operator,
file: JString,
) -> jlong {
intern_read(&mut env, op, file).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_read(env: &mut JNIEnv, op: *mut Operator, file: JString) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

// keep the future alive, so that we can complete it later
// but this approach will be limited by global ref table size (65535)
let future = env.new_global_ref(&f)?;
let file = env.get_string(&file)?.to_str()?.to_string();

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = match op.write(&file, content).await {
Ok(()) => Ok(JObject::null()),
Err(err) => Err(Error::from(err)),
};
complete_future(future.as_ref(), result)
let result = do_read(op, file).await;
complete_future(id, result.map(JValueOwned::Object))
});

Ok(f.into_raw())
Ok(id)
}

async fn do_read<'local>(op: &mut Operator, file: String) -> Result<JObject<'local>> {
let content = op.read(&file).await?;
let content = String::from_utf8(content)?;

let env = unsafe { get_current_env() };
let result = env.new_string(content)?;
Ok(result.into())
}

fn request_id(env: &mut JNIEnv) -> Result<jlong> {
let registry = env
.call_static_method(
"org/apache/opendal/Operator",
"registry",
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
&[],
)?
.l()?;
Ok(env.call_method(registry, "requestId", "()J", &[])?.j()?)
}

fn complete_future(future: &JObject, result: Result<JObject>) {
fn make_object<'local>(
env: &mut JNIEnv<'local>,
value: JValueOwned<'local>,
) -> Result<JObject<'local>> {
let o = match value {
JValueOwned::Object(o) => o,
JValueOwned::Byte(_) => env.new_object("java/lang/Long", "(B)V", &[value.borrow()])?,
JValueOwned::Char(_) => env.new_object("java/lang/Char", "(C)V", &[value.borrow()])?,
JValueOwned::Short(_) => env.new_object("java/lang/Short", "(S)V", &[value.borrow()])?,
JValueOwned::Int(_) => env.new_object("java/lang/Integer", "(I)V", &[value.borrow()])?,
JValueOwned::Long(_) => env.new_object("java/lang/Long", "(J)V", &[value.borrow()])?,
JValueOwned::Bool(_) => env.new_object("java/lang/Boolean", "(Z)V", &[value.borrow()])?,
JValueOwned::Float(_) => env.new_object("java/lang/Float", "(F)V", &[value.borrow()])?,
JValueOwned::Double(_) => env.new_object("java/lang/Double", "(D)V", &[value.borrow()])?,
JValueOwned::Void => JObject::null(),
};
Ok(o)
}

fn complete_future(id: jlong, result: Result<JValueOwned>) {
let mut env = unsafe { get_current_env() };
let future = get_future(&mut env, id).unwrap();
match result {
Ok(result) => env
.call_method(
Ok(result) => {
let result = make_object(&mut env, result).unwrap();
env.call_method(
future,
"complete",
"(Ljava/lang/Object;)Z",
&[JValue::Object(&result)],
)
.unwrap(),
.unwrap()
}
Err(err) => {
let exception = err.to_exception(&mut env).unwrap();
env.call_method(
Expand All @@ -128,3 +233,22 @@ fn complete_future(future: &JObject, result: Result<JObject>) {
}
};
}

fn get_future<'local>(env: &mut JNIEnv<'local>, id: jlong) -> Result<JObject<'local>> {
let registry = env
.call_static_method(
"org/apache/opendal/Operator",
"registry",
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
&[],
)?
.l()?;
Ok(env
.call_method(
registry,
"get",
"(J)Ljava/util/concurrent/CompletableFuture;",
&[JValue::Long(id)],
)?
.l()?)
}
21 changes: 13 additions & 8 deletions bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,47 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AsyncStepsTest {
Operator operator;
Operator op;

@Given("A new OpenDAL Async Operator")
public void a_new_open_dal_async_operator() {
Map<String, String> params = new HashMap<>();
params.put("root", "/tmp");
operator = new Operator("Memory", params);
op = new Operator("Memory", params);
}

@When("Async write path {string} with content {string}")
public void async_write_path_test_with_content_hello_world(String fileName, String content) {
CompletableFuture<Void> f = operator.write(fileName, content);

f.join();

assertTrue(f.isDone());
assertFalse(f.isCompletedExceptionally());
op.write(fileName, content).join();
}

@Then("The async file {string} should exist")
public void the_async_file_test_should_exist(String fileName) {
Metadata metadata = op.stat(fileName).join();
assertNotNull(metadata);
}

@Then("The async file {string} entry mode must be file")
public void the_async_file_test_entry_mode_must_be_file(String fileName) {
Metadata metadata = op.stat(fileName).join();
assertTrue(metadata.isFile());
}

@Then("The async file {string} content length must be {int}")
public void the_async_file_test_content_length_must_be_13(String fileName, int length) {
Metadata metadata = op.stat(fileName).join();
assertEquals(metadata.getContentLength(), length);
}

@Then("The async file {string} must have content {string}")
public void the_async_file_test_must_have_content_hello_world(String fileName, String content) {
String readContent = op.read(fileName).join();
assertEquals(content, readContent);
}
}
Loading

0 comments on commit 003ef81

Please sign in to comment.