Skip to content

Commit

Permalink
[#139] Added callback threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
More-Wrong committed Nov 22, 2023
1 parent af9f5d2 commit e31f2b3
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import net.zscript.javaclient.addressing.AddressedCommand;
import net.zscript.javaclient.addressing.AddressedResponse;
import net.zscript.javaclient.threading.ZscriptCallbackThreadpool;
import net.zscript.javaclient.threading.ZscriptWorkerThread;

public interface Connection {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
import net.zscript.javaclient.commandPaths.ResponseExecutionPath;
import net.zscript.javaclient.sequence.CommandSequence;
import net.zscript.javaclient.sequence.ResponseSequence;
import net.zscript.javaclient.threading.ZscriptCallbackThreadpool;

class ZscriptBasicNode implements ZscriptNode {

private static final Logger LOG = LoggerFactory.getLogger(ZscriptBasicNode.class);

private final ZscriptCallbackThreadpool callbackPool;

private final AddressingSystem addressingSystem;

private final ConnectionBuffer connectionBuffer;
Expand All @@ -43,28 +46,12 @@ class ZscriptBasicNode implements ZscriptNode {

private final EchoAssigner echoSystem;

ZscriptBasicNode(Connection parentConnection, int bufferSize) {
this.addressingSystem = new AddressingSystem(this);
this.parentConnection = parentConnection;
this.echoSystem = new EchoAssigner(TimeUnit.MILLISECONDS.toNanos(100));
this.connectionBuffer = new ConnectionBuffer(parentConnection, echoSystem, bufferSize);
this.strategy.setBuffer(connectionBuffer);
parentConnection.onReceive(r -> {
try {
if (r.hasAddress()) {
if (!addressingSystem.response(r)) {
unknownResponseHandler.accept(r);
}
} else {
response(r);
}
} catch (Exception e) {
callbackExceptionHandler.accept(e); // catches all callback exceptions
}
});
ZscriptBasicNode(ZscriptCallbackThreadpool callbackPool, Connection parentConnection, int bufferSize) {
this(callbackPool, parentConnection, bufferSize, 100, TimeUnit.MILLISECONDS);
}

ZscriptBasicNode(Connection parentConnection, int bufferSize, long minSegmentChangeTime, TimeUnit unit) {
ZscriptBasicNode(ZscriptCallbackThreadpool callbackPool, Connection parentConnection, int bufferSize, long minSegmentChangeTime, TimeUnit unit) {
this.callbackPool = callbackPool;
this.addressingSystem = new AddressingSystem(this);
this.parentConnection = parentConnection;
this.echoSystem = new EchoAssigner(unit.toNanos(minSegmentChangeTime));
Expand All @@ -74,13 +61,13 @@ class ZscriptBasicNode implements ZscriptNode {
try {
if (r.hasAddress()) {
if (!addressingSystem.response(r)) {
unknownResponseHandler.accept(r);
callbackPool.sendCallback(unknownResponseHandler, r);
}
} else {
response(r);
}
} catch (Exception e) {
callbackExceptionHandler.accept(e); // catches all callback exceptions
callbackPool.sendCallback(callbackExceptionHandler, e); // catches all callback exceptions
}
});
}
Expand Down Expand Up @@ -126,9 +113,9 @@ public void checkTimeouts() {
long nanoTime = System.nanoTime();
for (CommandSequence seq : timedOut) {
if (fullSequenceCallbacks.get(seq) != null) {
fullSequenceCallbacks.get(seq).accept(ResponseSequence.blank());
callbackPool.sendCallback(fullSequenceCallbacks.get(seq), ResponseSequence.blank());
} else if (pathCallbacks.get(seq.getExecutionPath()) != null) {
pathCallbacks.get(seq.getExecutionPath()).accept(ResponseExecutionPath.blank());
callbackPool.sendCallback(pathCallbacks.get(seq.getExecutionPath()), ResponseExecutionPath.blank());
}
}
}
Expand All @@ -138,9 +125,9 @@ private void response(AddressedResponse resp) {
if (resp.getContent().getResponseValue() != 0) {
Consumer<ResponseSequence> handler = notificationHandlers.get(resp.getContent().getResponseValue());
if (handler != null) {
handler.accept(resp.getContent());
callbackPool.sendCallback(handler, resp.getContent());
} else {
unknownResponseHandler.accept(resp);
callbackPool.sendCallback(unknownResponseHandler, resp);
}
return;
}
Expand All @@ -150,20 +137,20 @@ private void response(AddressedResponse resp) {
if (resp.getContent().hasEchoValue() && echoSystem.unmatchedReceive(resp.getContent().getEchoValue())) {
return;
}
unknownResponseHandler.accept(resp);
callbackPool.sendCallback(unknownResponseHandler, resp);
return;
}
strategy.mayHaveSpace();
parentConnection.responseReceived(found);
Consumer<ResponseSequence> seqCallback = fullSequenceCallbacks.remove(found.getContent());
if (seqCallback != null) {
seqCallback.accept(resp.getContent());
callbackPool.sendCallback(seqCallback, resp.getContent());
} else {
Consumer<ResponseExecutionPath> pathCallback = pathCallbacks.remove(found.getContent().getExecutionPath());
if (pathCallback != null) {
pathCallback.accept(resp.getContent().getExecutionPath());
callbackPool.sendCallback(pathCallback, resp.getContent().getExecutionPath());
} else {
unknownResponseHandler.accept(resp);
callbackPool.sendCallback(unknownResponseHandler, resp);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ static ZscriptNode newNode(Connection parentConnection) {
}

static ZscriptNode newNode(Connection parentConnection, int bufferSize, long minSegmentChangeTime, TimeUnit unit) {
ZscriptBasicNode node = new ZscriptBasicNode(parentConnection, bufferSize, minSegmentChangeTime, unit);
ZscriptWorkerThread thread = parentConnection.getAssociatedThread();
ZscriptBasicNode node = new ZscriptBasicNode(thread.getCallbackPool(), parentConnection, bufferSize, minSegmentChangeTime, unit);
thread.addTimeoutCheck(node::checkTimeouts);
return (ZscriptNode) Proxy.newProxyInstance(ZscriptNode.class.getClassLoader(), new Class[] { ZscriptNode.class },
(obj, method, params) -> thread.moveOntoThread(() -> method.invoke(node, params)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package net.zscript.javaclient.threading;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

public class ZscriptCallbackThreadpool {
private final ExecutorService callbackPool;

public ZscriptCallbackThreadpool() {
this.callbackPool = Executors.newCachedThreadPool();
}

public ZscriptCallbackThreadpool(ExecutorService callbackPool) {
this.callbackPool = callbackPool;
}

public <T> void sendCallback(Consumer<T> callback, T content) {
callbackPool.submit(() -> callback.accept(content));
}

public void sendCallback(Runnable callback) {
callbackPool.submit(callback);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package net.zscript.javaclient.threading;

import java.lang.ref.PhantomReference;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -15,11 +13,18 @@
import java.util.concurrent.TimeUnit;

public class ZscriptWorkerThread {
private final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();

private final ZscriptCallbackThreadpool threadpool;
private final Thread execThread;
private final List<WeakReference<Runnable>> timeoutChecks = new ArrayList<>();

public ZscriptWorkerThread() {
this(new ZscriptCallbackThreadpool());
}

public ZscriptWorkerThread(ZscriptCallbackThreadpool threadpool) {
this.threadpool = threadpool;
try {
execThread = exec.submit(Thread::currentThread).get();
} catch (ExecutionException | InterruptedException e) {
Expand Down Expand Up @@ -118,4 +123,8 @@ public <T> Future<T> startOnThread(Callable<T> task) {
return exec.submit(task);
}
}

public ZscriptCallbackThreadpool getCallbackPool() {
return threadpool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.Provider;
import java.util.concurrent.Callable;
import java.util.function.Consumer;

import net.zscript.javaclient.addressing.AddressedCommand;
import net.zscript.javaclient.addressing.AddressedResponse;
import net.zscript.javaclient.addressing.CompleteAddressedResponse;
import net.zscript.javaclient.connectors.serial.SerialConnection;
import net.zscript.javaclient.nodes.Connection;
import net.zscript.javaclient.threading.ZscriptWorkerThread;
import net.zscript.javareceiver.tokenizer.TokenExtendingBuffer;
Expand Down

0 comments on commit e31f2b3

Please sign in to comment.