-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Java client: update error handling #865
Changes from all commits
27f7bd2
794833e
4a69077
f4cb71a
0dbec7b
34a84a1
769ab53
49a7eed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,20 +1,28 @@ | ||||||
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ | ||||||
package glide.connectors.handlers; | ||||||
|
||||||
import glide.api.models.exceptions.ClosingException; | ||||||
import glide.api.models.exceptions.ConnectionException; | ||||||
import glide.api.models.exceptions.ExecAbortException; | ||||||
import glide.api.models.exceptions.RequestException; | ||||||
import glide.api.models.exceptions.TimeoutException; | ||||||
import glide.managers.CommandManager; | ||||||
import glide.managers.ConnectionManager; | ||||||
import java.util.concurrent.CompletableFuture; | ||||||
import java.util.concurrent.ConcurrentHashMap; | ||||||
import java.util.concurrent.ConcurrentLinkedQueue; | ||||||
import java.util.concurrent.atomic.AtomicInteger; | ||||||
import lombok.RequiredArgsConstructor; | ||||||
import org.apache.commons.lang3.tuple.Pair; | ||||||
import response.ResponseOuterClass.RequestError; | ||||||
import response.ResponseOuterClass.Response; | ||||||
|
||||||
/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */ | ||||||
@RequiredArgsConstructor | ||||||
public class CallbackDispatcher { | ||||||
|
||||||
/** Unique request ID (callback ID). Thread-safe and overflow-safe. */ | ||||||
private final AtomicInteger nextAvailableRequestId = new AtomicInteger(0); | ||||||
protected final AtomicInteger nextAvailableRequestId = new AtomicInteger(0); | ||||||
|
||||||
/** | ||||||
* Storage of Futures to handle responses. Map key is callback id, which starts from 0. The value | ||||||
|
@@ -24,7 +32,7 @@ public class CallbackDispatcher { | |||||
* Negative Java values would be shown as positive on Rust side. There is no data loss, because | ||||||
* callback ID remains unique. | ||||||
*/ | ||||||
private final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses = | ||||||
protected final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses = | ||||||
new ConcurrentHashMap<>(); | ||||||
|
||||||
/** | ||||||
|
@@ -33,7 +41,7 @@ public class CallbackDispatcher { | |||||
*/ | ||||||
// TODO: Optimize to avoid growing up to 2e32 (16 Gb) | ||||||
// https://github.com/aws/glide-for-redis/issues/704 | ||||||
private final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>(); | ||||||
protected final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>(); | ||||||
|
||||||
/** | ||||||
* Register a new request to be sent. Once response received, the given future completes with it. | ||||||
|
@@ -54,29 +62,72 @@ public Pair<Integer, CompletableFuture<Response>> registerRequest() { | |||||
} | ||||||
|
||||||
public CompletableFuture<Response> registerConnection() { | ||||||
var res = registerRequest(); | ||||||
return res.getValue(); | ||||||
return registerRequest().getValue(); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Complete the corresponding client promise and free resources. | ||||||
* Complete the corresponding client promise, handle error and free resources. | ||||||
* | ||||||
* @param response A response received | ||||||
*/ | ||||||
public void completeRequest(Response response) { | ||||||
if (response.hasClosingError()) { | ||||||
// According to https://github.com/aws/glide-for-redis/issues/851 | ||||||
// a response with a closing error may arrive with any/random callback ID (usually -1) | ||||||
// CommandManager and ConnectionManager would close the UDS channel on ClosingException | ||||||
distributeClosingException(response.getClosingError()); | ||||||
return; | ||||||
} | ||||||
// Complete and return the response at callbackId | ||||||
// free up the callback ID in the freeRequestIds list | ||||||
int callbackId = response.getCallbackIdx(); | ||||||
CompletableFuture<Response> future = responses.remove(callbackId); | ||||||
freeRequestIds.add(callbackId); | ||||||
if (future != null) { | ||||||
freeRequestIds.add(callbackId); | ||||||
if (response.hasRequestError()) { | ||||||
RequestError error = response.getRequestError(); | ||||||
String msg = error.getMessage(); | ||||||
switch (error.getType()) { | ||||||
case Unspecified: | ||||||
// Unspecified error on Redis service-side | ||||||
future.completeExceptionally(new RequestException(msg)); | ||||||
case ExecAbort: | ||||||
// Transactional error on Redis service-side | ||||||
future.completeExceptionally(new ExecAbortException(msg)); | ||||||
case Timeout: | ||||||
// Timeout from Glide to Redis service | ||||||
future.completeExceptionally(new TimeoutException(msg)); | ||||||
case Disconnect: | ||||||
// Connection problem between Glide and Redis | ||||||
future.completeExceptionally(new ConnectionException(msg)); | ||||||
default: | ||||||
// Request or command error from Redis | ||||||
future.completeExceptionally(new RequestException(msg)); | ||||||
} | ||||||
} | ||||||
future.completeAsync(() -> response); | ||||||
} else { | ||||||
// TODO: log an error. | ||||||
// TODO: log an error thru logger. | ||||||
// probably a response was received after shutdown or `registerRequest` call was missing | ||||||
System.err.printf( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this happens, the client is in an erroneous state and should close. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in 794833e |
||||||
"Received a response for not registered callback id %d, request error = %s%n", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this supposed to be \n instead of %n?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a mistake - |
||||||
callbackId, response.getRequestError()); | ||||||
distributeClosingException("Client is in an erroneous state and should close"); | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Distribute {@link ClosingException} to all pending requests. {@link CommandManager} and {@link | ||||||
* ConnectionManager} should catch it, handle and close the UDS connection.<br> | ||||||
* Should be used to termination the client/connection only. | ||||||
* | ||||||
* @param message Exception message | ||||||
*/ | ||||||
public void distributeClosingException(String message) { | ||||||
responses.values().forEach(f -> f.completeExceptionally(new ClosingException(message))); | ||||||
responses.clear(); | ||||||
} | ||||||
|
||||||
public void shutdownGracefully() { | ||||||
responses.values().forEach(future -> future.cancel(false)); | ||||||
responses.clear(); | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,8 +25,8 @@ public class ChannelHandler { | |
|
||
private static final String THREAD_POOL_NAME = "glide-channel"; | ||
|
||
private final Channel channel; | ||
private final CallbackDispatcher callbackDispatcher; | ||
protected final Channel channel; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why was this changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For tests, ref #865 (comment) |
||
protected final CallbackDispatcher callbackDispatcher; | ||
|
||
/** Open a new channel for a new client. */ | ||
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create
TestCallbackDispatcher
(which inheritsCallbackDispatcher
) for tests. Access to protected fields is required to verify correctness of the data processing.This is true for other classes too where access modifier was changed.