From e31f2b32723a19e0feae876a6586f98268f6f175 Mon Sep 17 00:00:00 2001 From: Alicia Date: Wed, 22 Nov 2023 17:53:27 +0000 Subject: [PATCH] [#139] Added callback threadpool --- .../zscript/javaclient/nodes/Connection.java | 1 + .../javaclient/nodes/ZscriptBasicNode.java | 47 +++++++------------ .../zscript/javaclient/nodes/ZscriptNode.java | 2 +- .../threading/ZscriptCallbackThreadpool.java | 25 ++++++++++ .../threading/ZscriptWorkerThread.java | 15 ++++-- .../javaclient/connectors/RawConnection.java | 5 -- 6 files changed, 56 insertions(+), 39 deletions(-) create mode 100644 clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptCallbackThreadpool.java diff --git a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/Connection.java b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/Connection.java index 0d5e7bc2e..14b868d95 100644 --- a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/Connection.java +++ b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/Connection.java @@ -4,6 +4,7 @@ import net.zscript.javaclient.addressing.AddressedCommand; import net.zscript.javaclient.addressing.AddressedResponse; +import net.zscript.javaclient.threading.ZscriptCallbackThreadpool; import net.zscript.javaclient.threading.ZscriptWorkerThread; public interface Connection { diff --git a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptBasicNode.java b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptBasicNode.java index 40e43747d..ec3c76688 100644 --- a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptBasicNode.java +++ b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptBasicNode.java @@ -16,11 +16,14 @@ import net.zscript.javaclient.commandPaths.ResponseExecutionPath; import net.zscript.javaclient.sequence.CommandSequence; import net.zscript.javaclient.sequence.ResponseSequence; +import net.zscript.javaclient.threading.ZscriptCallbackThreadpool; class ZscriptBasicNode implements ZscriptNode { private static final Logger LOG = LoggerFactory.getLogger(ZscriptBasicNode.class); + private final ZscriptCallbackThreadpool callbackPool; + private final AddressingSystem addressingSystem; private final ConnectionBuffer connectionBuffer; @@ -43,28 +46,12 @@ class ZscriptBasicNode implements ZscriptNode { private final EchoAssigner echoSystem; - ZscriptBasicNode(Connection parentConnection, int bufferSize) { - this.addressingSystem = new AddressingSystem(this); - this.parentConnection = parentConnection; - this.echoSystem = new EchoAssigner(TimeUnit.MILLISECONDS.toNanos(100)); - this.connectionBuffer = new ConnectionBuffer(parentConnection, echoSystem, bufferSize); - this.strategy.setBuffer(connectionBuffer); - parentConnection.onReceive(r -> { - try { - if (r.hasAddress()) { - if (!addressingSystem.response(r)) { - unknownResponseHandler.accept(r); - } - } else { - response(r); - } - } catch (Exception e) { - callbackExceptionHandler.accept(e); // catches all callback exceptions - } - }); + ZscriptBasicNode(ZscriptCallbackThreadpool callbackPool, Connection parentConnection, int bufferSize) { + this(callbackPool, parentConnection, bufferSize, 100, TimeUnit.MILLISECONDS); } - ZscriptBasicNode(Connection parentConnection, int bufferSize, long minSegmentChangeTime, TimeUnit unit) { + ZscriptBasicNode(ZscriptCallbackThreadpool callbackPool, Connection parentConnection, int bufferSize, long minSegmentChangeTime, TimeUnit unit) { + this.callbackPool = callbackPool; this.addressingSystem = new AddressingSystem(this); this.parentConnection = parentConnection; this.echoSystem = new EchoAssigner(unit.toNanos(minSegmentChangeTime)); @@ -74,13 +61,13 @@ class ZscriptBasicNode implements ZscriptNode { try { if (r.hasAddress()) { if (!addressingSystem.response(r)) { - unknownResponseHandler.accept(r); + callbackPool.sendCallback(unknownResponseHandler, r); } } else { response(r); } } catch (Exception e) { - callbackExceptionHandler.accept(e); // catches all callback exceptions + callbackPool.sendCallback(callbackExceptionHandler, e); // catches all callback exceptions } }); } @@ -126,9 +113,9 @@ public void checkTimeouts() { long nanoTime = System.nanoTime(); for (CommandSequence seq : timedOut) { if (fullSequenceCallbacks.get(seq) != null) { - fullSequenceCallbacks.get(seq).accept(ResponseSequence.blank()); + callbackPool.sendCallback(fullSequenceCallbacks.get(seq), ResponseSequence.blank()); } else if (pathCallbacks.get(seq.getExecutionPath()) != null) { - pathCallbacks.get(seq.getExecutionPath()).accept(ResponseExecutionPath.blank()); + callbackPool.sendCallback(pathCallbacks.get(seq.getExecutionPath()), ResponseExecutionPath.blank()); } } } @@ -138,9 +125,9 @@ private void response(AddressedResponse resp) { if (resp.getContent().getResponseValue() != 0) { Consumer handler = notificationHandlers.get(resp.getContent().getResponseValue()); if (handler != null) { - handler.accept(resp.getContent()); + callbackPool.sendCallback(handler, resp.getContent()); } else { - unknownResponseHandler.accept(resp); + callbackPool.sendCallback(unknownResponseHandler, resp); } return; } @@ -150,20 +137,20 @@ private void response(AddressedResponse resp) { if (resp.getContent().hasEchoValue() && echoSystem.unmatchedReceive(resp.getContent().getEchoValue())) { return; } - unknownResponseHandler.accept(resp); + callbackPool.sendCallback(unknownResponseHandler, resp); return; } strategy.mayHaveSpace(); parentConnection.responseReceived(found); Consumer seqCallback = fullSequenceCallbacks.remove(found.getContent()); if (seqCallback != null) { - seqCallback.accept(resp.getContent()); + callbackPool.sendCallback(seqCallback, resp.getContent()); } else { Consumer pathCallback = pathCallbacks.remove(found.getContent().getExecutionPath()); if (pathCallback != null) { - pathCallback.accept(resp.getContent().getExecutionPath()); + callbackPool.sendCallback(pathCallback, resp.getContent().getExecutionPath()); } else { - unknownResponseHandler.accept(resp); + callbackPool.sendCallback(unknownResponseHandler, resp); } } } diff --git a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptNode.java b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptNode.java index 3a3a9f558..75199243a 100644 --- a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptNode.java +++ b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/nodes/ZscriptNode.java @@ -19,8 +19,8 @@ static ZscriptNode newNode(Connection parentConnection) { } static ZscriptNode newNode(Connection parentConnection, int bufferSize, long minSegmentChangeTime, TimeUnit unit) { - ZscriptBasicNode node = new ZscriptBasicNode(parentConnection, bufferSize, minSegmentChangeTime, unit); ZscriptWorkerThread thread = parentConnection.getAssociatedThread(); + ZscriptBasicNode node = new ZscriptBasicNode(thread.getCallbackPool(), parentConnection, bufferSize, minSegmentChangeTime, unit); thread.addTimeoutCheck(node::checkTimeouts); return (ZscriptNode) Proxy.newProxyInstance(ZscriptNode.class.getClassLoader(), new Class[] { ZscriptNode.class }, (obj, method, params) -> thread.moveOntoThread(() -> method.invoke(node, params))); diff --git a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptCallbackThreadpool.java b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptCallbackThreadpool.java new file mode 100644 index 000000000..64fa799d3 --- /dev/null +++ b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptCallbackThreadpool.java @@ -0,0 +1,25 @@ +package net.zscript.javaclient.threading; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +public class ZscriptCallbackThreadpool { + private final ExecutorService callbackPool; + + public ZscriptCallbackThreadpool() { + this.callbackPool = Executors.newCachedThreadPool(); + } + + public ZscriptCallbackThreadpool(ExecutorService callbackPool) { + this.callbackPool = callbackPool; + } + + public void sendCallback(Consumer callback, T content) { + callbackPool.submit(() -> callback.accept(content)); + } + + public void sendCallback(Runnable callback) { + callbackPool.submit(callback); + } +} diff --git a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptWorkerThread.java b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptWorkerThread.java index a1f7ecc00..bb400810d 100644 --- a/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptWorkerThread.java +++ b/clients/java-client-lib/client-core/src/main/java/net/zscript/javaclient/threading/ZscriptWorkerThread.java @@ -1,7 +1,5 @@ package net.zscript.javaclient.threading; -import java.lang.ref.PhantomReference; -import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Iterator; @@ -15,11 +13,18 @@ import java.util.concurrent.TimeUnit; public class ZscriptWorkerThread { - private final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + + private final ZscriptCallbackThreadpool threadpool; private final Thread execThread; private final List> timeoutChecks = new ArrayList<>(); public ZscriptWorkerThread() { + this(new ZscriptCallbackThreadpool()); + } + + public ZscriptWorkerThread(ZscriptCallbackThreadpool threadpool) { + this.threadpool = threadpool; try { execThread = exec.submit(Thread::currentThread).get(); } catch (ExecutionException | InterruptedException e) { @@ -118,4 +123,8 @@ public Future startOnThread(Callable task) { return exec.submit(task); } } + + public ZscriptCallbackThreadpool getCallbackPool() { + return threadpool; + } } diff --git a/clients/java-client-lib/client-main/src/main/java/net/zscript/javaclient/connectors/RawConnection.java b/clients/java-client-lib/client-main/src/main/java/net/zscript/javaclient/connectors/RawConnection.java index 9db454228..4438d1746 100644 --- a/clients/java-client-lib/client-main/src/main/java/net/zscript/javaclient/connectors/RawConnection.java +++ b/clients/java-client-lib/client-main/src/main/java/net/zscript/javaclient/connectors/RawConnection.java @@ -3,20 +3,15 @@ import static java.nio.charset.StandardCharsets.UTF_8; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.security.Provider; -import java.util.concurrent.Callable; import java.util.function.Consumer; import net.zscript.javaclient.addressing.AddressedCommand; import net.zscript.javaclient.addressing.AddressedResponse; import net.zscript.javaclient.addressing.CompleteAddressedResponse; -import net.zscript.javaclient.connectors.serial.SerialConnection; import net.zscript.javaclient.nodes.Connection; import net.zscript.javaclient.threading.ZscriptWorkerThread; import net.zscript.javareceiver.tokenizer.TokenExtendingBuffer;