Skip to content

Commit

Permalink
[#139] Fixed Exception catching across callback threadpools
Browse files Browse the repository at this point in the history
  • Loading branch information
More-Wrong committed Nov 27, 2023
1 parent e31f2b3 commit ded286d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ZscriptBasicNode implements ZscriptNode {
try {
if (r.hasAddress()) {
if (!addressingSystem.response(r)) {
callbackPool.sendCallback(unknownResponseHandler, r);
callbackPool.sendCallback(unknownResponseHandler, r, callbackExceptionHandler);
}
} else {
response(r);
Expand Down Expand Up @@ -110,12 +110,11 @@ public void send(AddressedCommand addr) {
public void checkTimeouts() {
Collection<CommandSequence> timedOut = connectionBuffer.checkTimeouts();
if (!timedOut.isEmpty()) {
long nanoTime = System.nanoTime();
for (CommandSequence seq : timedOut) {
if (fullSequenceCallbacks.get(seq) != null) {
callbackPool.sendCallback(fullSequenceCallbacks.get(seq), ResponseSequence.blank());
callbackPool.sendCallback(fullSequenceCallbacks.get(seq), ResponseSequence.blank(), callbackExceptionHandler);
} else if (pathCallbacks.get(seq.getExecutionPath()) != null) {
callbackPool.sendCallback(pathCallbacks.get(seq.getExecutionPath()), ResponseExecutionPath.blank());
callbackPool.sendCallback(pathCallbacks.get(seq.getExecutionPath()), ResponseExecutionPath.blank(), callbackExceptionHandler);
}
}
}
Expand All @@ -125,9 +124,9 @@ private void response(AddressedResponse resp) {
if (resp.getContent().getResponseValue() != 0) {
Consumer<ResponseSequence> handler = notificationHandlers.get(resp.getContent().getResponseValue());
if (handler != null) {
callbackPool.sendCallback(handler, resp.getContent());
callbackPool.sendCallback(handler, resp.getContent(), callbackExceptionHandler);
} else {
callbackPool.sendCallback(unknownResponseHandler, resp);
callbackPool.sendCallback(unknownResponseHandler, resp, callbackExceptionHandler);
}
return;
}
Expand All @@ -137,20 +136,20 @@ private void response(AddressedResponse resp) {
if (resp.getContent().hasEchoValue() && echoSystem.unmatchedReceive(resp.getContent().getEchoValue())) {
return;
}
callbackPool.sendCallback(unknownResponseHandler, resp);
callbackPool.sendCallback(unknownResponseHandler, resp, callbackExceptionHandler);
return;
}
strategy.mayHaveSpace();
parentConnection.responseReceived(found);
Consumer<ResponseSequence> seqCallback = fullSequenceCallbacks.remove(found.getContent());
if (seqCallback != null) {
callbackPool.sendCallback(seqCallback, resp.getContent());
callbackPool.sendCallback(seqCallback, resp.getContent(), callbackExceptionHandler);
} else {
Consumer<ResponseExecutionPath> pathCallback = pathCallbacks.remove(found.getContent().getExecutionPath());
if (pathCallback != null) {
callbackPool.sendCallback(pathCallback, resp.getContent().getExecutionPath());
callbackPool.sendCallback(pathCallback, resp.getContent().getExecutionPath(), callbackExceptionHandler);
} else {
callbackPool.sendCallback(unknownResponseHandler, resp);
callbackPool.sendCallback(unknownResponseHandler, resp, callbackExceptionHandler);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,27 @@ public ZscriptCallbackThreadpool(ExecutorService callbackPool) {
this.callbackPool = callbackPool;
}

public <T> void sendCallback(Consumer<T> callback, T content) {
public <T> void sendCallback(Consumer<T> callback, T content, Consumer<Exception> handler) {
callbackPool.submit(() -> {
try {
callback.accept(content);
} catch (Exception e) {
handler.accept(e);
}
});
}

public <T extends Exception> void sendCallback(Consumer<T> callback, T content) {
callbackPool.submit(() -> callback.accept(content));
}

public void sendCallback(Runnable callback) {
callbackPool.submit(callback);
public void sendCallback(Runnable callback, Consumer<Exception> handler) {
callbackPool.submit(() -> {
try {
callback.run();
} catch (Exception e) {
handler.accept(e);
}
});
}
}

0 comments on commit ded286d

Please sign in to comment.