Skip to content

Commit

Permalink
VertxHttpClientHTTPConduit based client hangs when run asynchronously…
Browse files Browse the repository at this point in the history
… on Vert.x event loop with a body exceeding single chunk, fix #1685
  • Loading branch information
ppalaga committed Jan 23, 2025
1 parent a3d8420 commit 94dcf78
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private Object produceCxfClient(CXFClientInfo cxfClientInfo) {
throw new RuntimeException("Could not load " + RUNTIME_INITIALIZED_PROXY_MARKER_INTERFACE_NAME, e);
}
final QuarkusClientFactoryBean quarkusClientFactoryBean = new QuarkusClientFactoryBean(seiClass);
final QuarkusJaxWsProxyFactoryBean factory = new QuarkusJaxWsProxyFactoryBean(quarkusClientFactoryBean, interfaces);
final QuarkusJaxWsProxyFactoryBean factory = new QuarkusJaxWsProxyFactoryBean(quarkusClientFactoryBean, vertx,
interfaces);
final Map<String, Object> props = new LinkedHashMap<>();
factory.setProperties(props);
props.put(CXFClientInfo.class.getName(), cxfClientInfo);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
package io.quarkiverse.cxf;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.xml.ws.AsyncHandler;
import jakarta.xml.ws.Binding;
import jakarta.xml.ws.BindingProvider;
import jakarta.xml.ws.EndpointReference;
import jakarta.xml.ws.Response;

import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientFactoryBean;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.jaxws.JaxWsClientProxy;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.service.model.BindingOperationInfo;

import io.quarkus.runtime.BlockingOperationControl;
import io.vertx.core.Vertx;

public class QuarkusJaxWsProxyFactoryBean extends JaxWsProxyFactoryBean {

private final Class<?>[] additionalImplementingClasses;
private final Vertx vertx;

public QuarkusJaxWsProxyFactoryBean(ClientFactoryBean fact, Class<?>... additionalImplementingClasses) {
public QuarkusJaxWsProxyFactoryBean(ClientFactoryBean fact, Vertx vertx, Class<?>... additionalImplementingClasses) {
super(fact);
this.vertx = vertx;
this.additionalImplementingClasses = additionalImplementingClasses;
}

Expand All @@ -21,4 +46,196 @@ protected Class<?>[] getImplementingClasses() {
return result;
}

@Override
protected ClientProxy clientClientProxy(Client c) {
return new QuarkusJaxWsClientProxy(vertx, (JaxWsClientProxy) super.clientClientProxy(c));
}

public static class QuarkusJaxWsClientProxy extends ClientProxy implements BindingProvider {

private final JaxWsClientProxy delegate;
private final Vertx vertx;

public QuarkusJaxWsClientProxy(Vertx vertx, JaxWsClientProxy delegate) {
super(delegate.getClient());
this.vertx = vertx;
this.delegate = delegate;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final boolean isAsync = isAsync(method);
if (isAsync && !BlockingOperationControl.isBlockingAllowed()) {
/* We are returning a Future and we are on Vert.x event loop thread */

final CompletableFuture<Response<Object>> result = new CompletableFuture<>();

/*
* We complete the result Future using AsyncHandler because that one gets a completed Response
* whose get() method does not block - see org.apache.cxf.jaxws.JaxwsClientCallback for how
* the AsyncHandler is called
*/
final Object[] newArgs;
final int len = args.length;
if (len > 0 && args[len - 1] instanceof AsyncHandler) {
final AsyncHandler<Object> jaxWsHandler = (AsyncHandler<Object>) args[len - 1];
newArgs = new Object[len];
System.arraycopy(args, 0, newArgs, 0, len);
newArgs[len - 1] = new AsyncHandler<Object>() {
@Override
public void handleResponse(Response<Object> res) {
try {
jaxWsHandler.handleResponse(res);
} finally {
result.complete(res);
}
}
};
} else {
newArgs = new Object[len + 1];
System.arraycopy(args, 0, newArgs, 0, len);
newArgs[len] = new AsyncHandler<Object>() {
@Override
public void handleResponse(Response<Object> res) {
result.complete(res);
}
};
}

/*
* Because even the async mode of VertxHttpClientConduit may block,
* we better dispatch the invocation to a worker thread
*/
vertx.executeBlocking(new Callable<>() {
@Override
public Void call() throws Exception {
try {
delegate.invoke(proxy, method, newArgs);
return null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
throw e;
} catch (Throwable e) {
throw new Exception(e);
}
}
}).onFailure(result::completeExceptionally);
return new QuarkusJaxWsResponse<Object>(result);
}
return delegate.invoke(proxy, method, args);

}

boolean isAsync(Method m) {
return m.getName().endsWith("Async")
&& (Future.class.equals(m.getReturnType())
|| Response.class.equals(m.getReturnType()));
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public int hashCode() {
return delegate.hashCode();
}

@Override
public Object invokeSync(Method method, BindingOperationInfo oi, Object[] params) throws Exception {
return delegate.invokeSync(method, oi, params);
}

@Override
public Map<String, Object> getRequestContext() {
return delegate.getRequestContext();
}

@Override
public Map<String, Object> getResponseContext() {
return delegate.getResponseContext();
}

@Override
public Client getClient() {
return delegate.getClient();
}

@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}

@Override
public String toString() {
return delegate.toString();
}

@Override
public Binding getBinding() {
return delegate.getBinding();
}

@Override
public EndpointReference getEndpointReference() {
return delegate.getEndpointReference();
}

@Override
public <T extends EndpointReference> T getEndpointReference(Class<T> clazz) {
return delegate.getEndpointReference(clazz);
}

static class QuarkusJaxWsResponse<T> implements Response<T> {

final CompletableFuture<Response<T>> delegate;

public QuarkusJaxWsResponse(CompletableFuture<Response<T>> delegate) {
this.delegate = delegate;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get().get();
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit).get();
}

@Override
public Map<String, Object> getContext() {
try {
return delegate.get().getContext();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand Down Expand Up @@ -52,9 +51,9 @@ void init(@Observes StartupEvent start) {
}

@Path("/helloWithWsdlWithEagerInit")
@GET
@POST
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> helloWithWsdlWithEagerInit(@QueryParam("person") String person) {
public Uni<String> helloWithWsdlWithEagerInit(String person) {
while (helloWithWsdlWithEagerInit == null) {
/* Spin until the client is ready */
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.quarkiverse.cxf.it.vertx.async;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

import io.quarkiverse.cxf.annotation.CXFClient;
Expand All @@ -17,9 +16,9 @@ public class RestAsyncWithoutWsdl {
HelloService helloWithoutWsdl;

@Path("/helloWithoutWsdl")
@GET
@POST
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> helloWithoutWsdl(@QueryParam("person") String person) {
public Uni<String> helloWithoutWsdl(String person) {
/* Without WSDL and without @Blocking should work */
return Uni.createFrom()
.future(helloWithoutWsdl.helloAsync(person))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.quarkiverse.cxf.it.vertx.async;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

import io.quarkiverse.cxf.annotation.CXFClient;
Expand All @@ -18,10 +17,10 @@ public class RestAsyncWithoutWsdlWithBlocking {
HelloService helloWithoutWsdlWithBlocking;

@Path("/helloWithoutWsdlWithBlocking")
@GET
@POST
@Produces(MediaType.TEXT_PLAIN)
@Blocking
public Uni<String> helloWithoutWsdlWithBlocking(@QueryParam("person") String person) {
public Uni<String> helloWithoutWsdlWithBlocking(String person) {
/* Without WSDL and with @Blocking should work */
return Uni.createFrom()
.future(helloWithoutWsdlWithBlocking.helloAsync(person))
Expand Down
Loading

0 comments on commit 94dcf78

Please sign in to comment.