Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java client: update error handling #865

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ tasks.register('buildAll') {
finalizedBy 'build'
}

compileJava.dependsOn('protobuf', 'buildRustRelease')
compileJava.dependsOn('protobuf')
clean.dependsOn('cleanProtobuf', 'cleanRust')
test.dependsOn('buildRust')
testFfi.dependsOn('buildRust')
Expand All @@ -127,4 +127,3 @@ tasks.withType(Test) {
}
jvmArgs "-Djava.library.path=${projectDir}/../target/debug"
}

Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package glide.connectors.handlers;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */
@RequiredArgsConstructor
public class CallbackDispatcher {

/** Unique request ID (callback ID). Thread-safe and overflow-safe. */
private final AtomicInteger nextAvailableRequestId = new AtomicInteger(0);
protected final AtomicInteger nextAvailableRequestId = new AtomicInteger(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create TestCallbackDispatcher (which inherits CallbackDispatcher) for tests. Access to protected fields is required to verify correctness of the data processing.
This is true for other classes too where access modifier was changed.


/**
* Storage of Futures to handle responses. Map key is callback id, which starts from 0. The value
Expand All @@ -23,7 +29,7 @@ public class CallbackDispatcher {
* Negative Java values would be shown as positive on Rust side. There is no data loss, because
* callback ID remains unique.
*/
private final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses =
protected final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses =
new ConcurrentHashMap<>();

/**
Expand All @@ -32,7 +38,7 @@ public class CallbackDispatcher {
*/
// TODO: Optimize to avoid growing up to 2e32 (16 Gb)
// https://github.com/aws/glide-for-redis/issues/704
private final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>();
protected final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>();

/**
* Register a new request to be sent. Once response received, the given future completes with it.
Expand All @@ -58,21 +64,62 @@ public CompletableFuture<Response> registerConnection() {
}

/**
* Complete the corresponding client promise and free resources.
* Complete the corresponding client promise, handle error and free resources.
*
* @param response A response received
*/
public void completeRequest(Response response) {
if (response.hasClosingError()) {
// According to https://github.com/aws/glide-for-redis/issues/851
// a response with a closing error may arrive with any/random callback ID (usually -1)
// CommandManager and ConnectionManager would close the UDS channel on ClosingException
responses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: consider wrapping this repeated code in a function, and in that function also close the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe I'm missing where the channel is closed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, saw that closing the channel happens in the exception handler.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make all error handling in one place, including channel closing, but CallbackDispatcher does not (and can not) have refenrence to the channel.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 0dbec7b. Will use new function to handle read error too in the next PR.

.values()
.forEach(f -> f.completeExceptionally(new ClosingException(response.getClosingError())));
responses.clear();
return;
}
// Complete and return the response at callbackId
// free up the callback ID in the freeRequestIds list
int callbackId = response.getCallbackIdx();
CompletableFuture<Response> future = responses.remove(callbackId);
freeRequestIds.add(callbackId);
if (future != null) {
freeRequestIds.add(callbackId);
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
future.completeExceptionally(new RequestException(msg));
case ExecAbort:
// Transactional error on Redis service-side
future.completeExceptionally(new ExecAbortException(msg));
case Timeout:
// Timeout from Glide to Redis service
future.completeExceptionally(new TimeoutException(msg));
case Disconnect:
// Connection problem between Glide and Redis
future.completeExceptionally(new ConnectionException(msg));
default:
// Request or command error from Redis
future.completeExceptionally(new RequestException(msg));
}
}
future.completeAsync(() -> response);
} else {
// TODO: log an error.
// TODO: log an error thru logger.
// probably a response was received after shutdown or `registerRequest` call was missing
System.err.printf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this happens, the client is in an erroneous state and should close.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 794833e

"Received a response for not registered callback id %d, request error = %s%n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be \n instead of %n?

Suggested change
"Received a response for not registered callback id %d, request error = %s%n",
"Received a response for not registered callback id %d, request error = %s\n",

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a mistake - %n is rendered as \n. Thank you for proofreading though.

callbackId, response.getRequestError());
responses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cancel all of the responses, but you should also close the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, saw that it's happening in the exception handler.

.values()
.forEach(
f ->
f.completeExceptionally(
new ClosingException("Client is in an erroneous state and should close")));
responses.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public class ChannelHandler {

private static final String THREAD_POOL_NAME = "glide-channel";

private final Channel channel;
private final CallbackDispatcher callbackDispatcher;
protected final Channel channel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this changed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tests, ref #865 (comment)

protected final CallbackDispatcher callbackDispatcher;

/** Open a new channel for a new client. */
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
package glide.managers;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
Expand All @@ -20,40 +14,15 @@ public class BaseCommandResponseResolver
private RedisExceptionCheckedFunction<Long, Object> respPointerResolver;

/**
* Extracts value from the RESP pointer. <br>
* Throws errors when the response is unsuccessful.
* Extracts value from the RESP pointer.
*
* @return A generic Object with the Response | null if the response is empty
* @return A generic Object with the Response or null if the response is empty
*/
public Object apply(Response response) throws RedisException {
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RequestException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
// TODO: close the channel on a closing error
// channel.close();
throw new ClosingException(response.getClosingError());
}
// Note: errors are already handled before in CallbackDispatcher
assert !response.hasClosingError() : "Unhandled response closing error";
assert !response.hasRequestError() : "Unhandled response request error";

if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
Expand Down
19 changes: 19 additions & 0 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package glide.managers;

import glide.api.models.exceptions.ClosingException;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.models.Command;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -31,9 +32,27 @@ public <T> CompletableFuture<T> submitNewCommand(
// when complete, convert the response to our expected type T using the given responseHandler
return channel
.write(prepareRedisRequest(command.getRequestType(), command.getArguments()), true)
.exceptionally(this::exceptionHandler)
.thenApplyAsync(response -> responseHandler.apply(response));
}

/**
* Exception handler for future pipeline.
*
* @param e An exception thrown in the pipeline before
* @return Nothing, it rethrows the exception
*/
private Response exceptionHandler(Throwable e) {
if (e instanceof ClosingException) {
channel.close();
}
if (e instanceof RuntimeException) {
// RedisException also goes here
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}

/**
* Build a protobuf command/transaction request object.<br>
* Used by {@link CommandManager}.
Expand Down
42 changes: 27 additions & 15 deletions java/client/src/main/java/glide/managers/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import glide.api.models.exceptions.ClosingException;
import glide.connectors.handlers.ChannelHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.RequiredArgsConstructor;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
Expand All @@ -38,7 +36,25 @@ public class ConnectionManager {
*/
public CompletableFuture<Void> connectToRedis(BaseClientConfiguration configuration) {
ConnectionRequest request = createConnectionRequest(configuration);
return channel.connect(request).thenApplyAsync(this::checkGlideRsResponse);
return channel
.connect(request)
.exceptionally(this::exceptionHandler)
.thenApplyAsync(this::checkGlideRsResponse);
}

/**
* Exception handler for future pipeline.
*
* @param e An exception thrown in the pipeline before
* @return Nothing, it always rethrows the exception
*/
private Response exceptionHandler(Throwable e) {
channel.close();
if (e instanceof RuntimeException) {
// RedisException also goes here
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}

/**
Expand Down Expand Up @@ -154,15 +170,15 @@ private ConnectionRequestOuterClass.ReadFrom mapReadFromEnum(ReadFrom readFrom)

/** Check a response received from Glide. */
private Void checkGlideRsResponse(Response response) {
// Note: errors are already handled before in CallbackDispatcher, but we double-check
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
throwClosingError("Unexpected request error in response: " + error.getMessage());
throwClosingError(
"Unhandled request error in response: " + response.getRequestError().getMessage());
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
throwClosingError(response.getClosingError());
throwClosingError("Unhandled closing error in response: " + response.getClosingError());
}

if (response.hasRespPointer()) {
throwClosingError("Unexpected data in response");
}
Expand All @@ -173,12 +189,8 @@ private Void checkGlideRsResponse(Response response) {
return null;
}

private void throwClosingError(String msg) throws ClosingException {
try {
closeConnection().get();
} catch (InterruptedException | ExecutionException exception) {
throw new RuntimeException(exception);
}
private void throwClosingError(String msg) {
closeConnection();
throw new ClosingException(msg);
}

Expand All @@ -188,6 +200,6 @@ private void throwClosingError(String msg) throws ClosingException {
* @return a CompletableFuture to indicate the channel is closed
*/
public Future<Void> closeConnection() {
return channel.close().syncUninterruptibly();
return channel.close();
}
}
Loading
Loading