Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Vert.x 4.x #13

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
</developers>

<properties>
<vertx.version>3.8.3</vertx.version>
<retrofit.version>2.6.0</retrofit.version>
<vertx.version>4.2.5</vertx.version>
<retrofit.version>2.9.0</retrofit.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -91,7 +91,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
265 changes: 140 additions & 125 deletions src/main/java/com/julienviet/retrofit/vertx/VertxCallFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@

import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.*;
import okio.Buffer;
import okio.Timeout;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,146 +22,166 @@
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class VertxCallFactory implements Call.Factory {
private static final long DEFAULT_TIMEOUT = 10;
private final HttpClient client;
private final long timeout;

private final HttpClient client;

public VertxCallFactory(HttpClient client) {
this.client = client;
}

private class VertxCall implements okhttp3.Call {

private final Request retroRequest;
private final AtomicBoolean executed = new AtomicBoolean();
public VertxCallFactory(HttpClient client) {
this(client, DEFAULT_TIMEOUT);
}

VertxCall(Request retroRequest) {
this.retroRequest = retroRequest;
public VertxCallFactory(HttpClient client, long timeout) {
this.client = client;
this.timeout = timeout;
}

@Override
public Request request() {
return retroRequest;
private static IOException getIOException(Throwable cause) {
IOException ioe;
if (cause instanceof IOException) {
ioe = (IOException) cause;
} else {
ioe = new IOException(cause);
}
return ioe;
}

@Override
public Response execute() throws IOException {
CompletableFuture<Response> future = new CompletableFuture<>();
enqueue(new Callback() {
private class VertxCall implements okhttp3.Call {
private final Request retroRequest;
private final AtomicBoolean executed = new AtomicBoolean();

@Override
public void onResponse(okhttp3.Call call, Response response) throws IOException {
future.complete(response);
VertxCall(Request retroRequest) {
this.retroRequest = retroRequest;
}

@Override
public void onFailure(okhttp3.Call call, IOException e) {
future.completeExceptionally(e);
public Request request() {
return retroRequest;
}
});
try {
return future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IOException(e);
}
}

@Override
public void enqueue(Callback callback) {
if (executed.compareAndSet(false, true)) {
Future<Response> fut = Future.future();
fut.setHandler(ar -> {
if (ar.succeeded()) {
@Override
public Response execute() throws IOException {
CompletableFuture<Response> future = new CompletableFuture<>();
enqueue(new Callback() {
@Override
public void onResponse(okhttp3.Call call, Response response) {
future.complete(response);
}

@Override
public void onFailure(okhttp3.Call call, IOException e) {
future.completeExceptionally(e);
}
});
try {
callback.onResponse(this, ar.result());
} catch (IOException e) {
// WTF ?
e.printStackTrace();
return future.get(timeout, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IOException(e);
}
} else {
IOException ioe;
Throwable cause = ar.cause();
if (cause instanceof IOException) {
ioe = (IOException) cause;
}

@Override
public void enqueue(Callback callback) {
if (executed.compareAndSet(false, true)) {
Promise<Response> promise = Promise.promise();
Future<Response> fut = promise.future();
fut.onSuccess(ar -> {
try {
callback.onResponse(this, ar);
} catch (IOException e) {
e.printStackTrace();
}
});
fut.onFailure(cause -> callback.onFailure(this, getIOException(cause)));

HttpMethod method = HttpMethod.valueOf(retroRequest.method());
HttpUrl url = retroRequest.url();
client.request(method, url.port(), url.host(), url.encodedPath(), handler -> {
if (handler.failed()) {
promise.tryFail(handler.cause());
return;
}
HttpClientRequest request = handler.result();
MultiMap headers = request.headers();
Map<String, List<String>> origHeaders = retroRequest.headers().toMultimap();
for (Map.Entry<String, List<String>> hdr : origHeaders.entrySet()) {
headers.add(hdr.getKey(), hdr.getValue());
}
try {
RequestBody body = retroRequest.body();
if (body != null && body.contentLength() > 0) {
MediaType mediaType = body.contentType();
String type;
if (mediaType != null) {
type = mediaType.toString();
} else {
type = "application/octet-stream";
}
request.putHeader("content-type", type);
request.putHeader("content-length", Long.toString(body.contentLength()));
Buffer buffer = new Buffer();
body.writeTo(buffer);
request.write(io.vertx.core.buffer.Buffer.buffer(buffer.readByteArray()));
}
} catch (IOException e) {
e.printStackTrace();
}
request.exceptionHandler(promise::tryFail);
request.response(responseHandler -> {
if (responseHandler.failed()) {
promise.tryFail(responseHandler.cause());
return;
}
HttpClientResponse resp = responseHandler.result();
resp.bodyHandler(body -> {
Response.Builder builder = new Response.Builder();
builder.protocol(Protocol.HTTP_1_1);
builder.request(this.retroRequest);
builder.code(resp.statusCode());
builder.message(resp.statusMessage());
for (Map.Entry<String, String> header : resp.headers()) {
builder.addHeader(header.getKey(), header.getValue());
}
String mediaTypeHeader = resp.getHeader("Content-Type");
MediaType mediaType = mediaTypeHeader != null ? MediaType.parse(mediaTypeHeader) : null;
builder.body(ResponseBody.create(mediaType, body.getBytes()));
promise.tryComplete(builder.build());
});
});
request.end();
});
} else {
ioe = new IOException(cause);
}
callback.onFailure(this, ioe);
}
});
HttpMethod method = HttpMethod.valueOf(retroRequest.method());
HttpClientRequest request = client.requestAbs(method, this.retroRequest.url().toString(), resp -> {
resp.exceptionHandler(fut::tryFail);
resp.bodyHandler(body -> {
Response.Builder builder = new Response.Builder();
builder.protocol(Protocol.HTTP_1_1);
builder.request(this.retroRequest);
builder.code(resp.statusCode());
builder.message(resp.statusMessage());
for (Map.Entry<String, String> header : resp.headers()) {
builder.addHeader(header.getKey(), header.getValue());
callback.onFailure(this, new IOException("Already executed"));
}
String mediaTypeHeader = resp.getHeader("Content-Type");
MediaType mediaType = mediaTypeHeader != null ? MediaType.parse(mediaTypeHeader) : null;
builder.body(ResponseBody.create(mediaType, body.getBytes()));
fut.tryComplete(builder.build());
});
});
request.exceptionHandler(fut::tryFail);
int size = retroRequest.headers().size();
Headers retroHeaders = retroRequest.headers();
MultiMap headers = request.headers();
for (int i = 0;i < size;i++) {
String header = retroHeaders.name(i);
String value = retroHeaders.value(i);
headers.add(header, value);
}
try {
RequestBody body = this.retroRequest.body();
if (body != null && body.contentLength() > 0) {
MediaType mediaType = body.contentType();
request.putHeader("content-type", mediaType.toString());

request.putHeader("content-length", "" + body.contentLength());
Buffer buffer = new Buffer();
body.writeTo(buffer);
request.write(io.vertx.core.buffer.Buffer.buffer(buffer.readByteArray()));
}
} catch (IOException e) {
e.printStackTrace(); // ?

@Override
public void cancel() {
}
request.end();
} else {
callback.onFailure(this, new IOException("Already executed"));
}
}

@Override
public void cancel() {
}
@Override
public boolean isExecuted() {
return executed.get();
}

@Override
public boolean isExecuted() {
return executed.get();
}
@Override
public boolean isCanceled() {
return false;
}

@Override
public boolean isCanceled() {
return false;
}
@Override
public Timeout timeout() {
return Timeout.NONE;
}

@Override
public Timeout timeout() {
return Timeout.NONE;
@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public Call clone() {
return new VertxCall(retroRequest);
}
}

@Override
public Call clone() {
return new VertxCall(retroRequest);
public okhttp3.Call newCall(Request request) {
return new VertxCall(request);
}
}

@Override
public okhttp3.Call newCall(Request request) {
return new VertxCall(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
@RunWith(VertxUnitRunner.class)
public class ClientTest {

public static final String API_URL = "http://localhost:8080";
private static final String API_URL = "http://localhost:8080";

public static class Contributor {
public final String login;
Expand Down Expand Up @@ -145,7 +145,7 @@ public void onFailure(Call<List<Contributor>> call, Throwable throwable) {
// @Test
public void testResponseError(TestContext ctx) throws Exception {
startHttpServer(req -> {
NetSocket so = req.netSocket();
NetSocket so = req.toNetSocket().result();
so.write("HTTP/1.1 200 OK\r\n");
so.write("Transfer-Encoding: chunked\r\n");
so.write("\r\n");
Expand Down