From 6073d4bf4755c31600e858745bac45a12e937d6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rajmund=20Tak=C3=A1cs?= Date: Wed, 25 Oct 2023 09:09:15 +0200 Subject: [PATCH] feat(plc4j/spi): Add option to synchronously await response from PLC (#1163) There might be cases when the driver needs to wait until the device responds our last request, before sending the next request. This feature attempts to make this more convenient. --- .../plc4x/java/spi/ConversationContext.java | 3 + .../plc4x/java/spi/Plc4xNettyWrapper.java | 19 +++-- .../spi/internal/DefaultContextHandler.java | 17 +++-- .../internal/DefaultExpectRequestContext.java | 2 +- .../internal/DefaultSendRequestContext.java | 2 +- .../spi/internal/HandlerRegistration.java | 75 +++++++++++++++++-- .../plc4x/java/spi/Plc4xNettyWrapperTest.java | 3 +- 7 files changed, 99 insertions(+), 22 deletions(-) diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java index 94b09a282bc..72508b54847 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java @@ -19,6 +19,7 @@ package org.apache.plc4x.java.spi; import io.netty.channel.Channel; +import java.util.concurrent.ExecutionException; import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.spi.configuration.Configuration; @@ -92,6 +93,8 @@ interface ContextHandler { void cancel(); + void awaitResponse() throws InterruptedException, ExecutionException; + } } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java index 27abd9324df..e1f708072d9 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java @@ -152,6 +152,11 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List onTimeout(AtomicReference reference, Consumer onTimeoutConsumer) { - return new Consumer() { - @Override - public void accept(TimeoutException e) { - registeredHandlers.remove(reference.get()); - onTimeoutConsumer.accept(e); - } + return timeoutException -> { + final HandlerRegistration registration = reference.get(); + registeredHandlers.remove(registration); + onTimeoutConsumer.accept(timeoutException); + registration.confirmError(); }; } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java index e7e9e163e72..e99ed509429 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java @@ -18,27 +18,32 @@ */ package org.apache.plc4x.java.spi.internal; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.plc4x.java.spi.ConversationContext; -import java.util.function.BooleanSupplier; - class DefaultContextHandler implements ConversationContext.ContextHandler { - private final BooleanSupplier getDone; + private final Future awaitable; private final Runnable cancel; - public DefaultContextHandler(BooleanSupplier getDone, Runnable cancel) { - this.getDone = getDone; + public DefaultContextHandler(Future awaitable, Runnable cancel) { + this.awaitable = awaitable; this.cancel = cancel; } @Override public boolean isDone() { - return this.getDone.getAsBoolean(); + return this.awaitable.isDone(); } @Override public void cancel() { this.cancel.run(); } + + @Override + public void awaitResponse() throws InterruptedException, ExecutionException { + this.awaitable.get(); + } } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java index 74bf96df9bf..ec9be896c35 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java @@ -82,7 +82,7 @@ public ConversationContext.ContextHandler handle(Consumer packetConsumer) { this.packetConsumer = packetConsumer; registration = new HandlerRegistration(commands, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer, timeout); finisher.accept(registration); - return new DefaultContextHandler(registration::hasHandled, registration::cancel); + return new DefaultContextHandler(registration, registration::cancel); } @Override diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java index cfa448806fd..c975a65149e 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java @@ -95,7 +95,7 @@ public DefaultContextHandler handle(Consumer packetConsumer) { onTimeoutConsumer, errorConsumer, timeout); finisher.accept(registration); context.sendToWire(request); - return new DefaultContextHandler(registration::hasHandled, registration::cancel); + return new DefaultContextHandler(registration, registration::cancel); } @Override diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java index c71b203a3d8..8ada15fee9a 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java @@ -22,13 +22,17 @@ import java.time.Duration; import java.util.Deque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -public class HandlerRegistration { +public class HandlerRegistration implements Future { private static int counter = 0; @@ -43,17 +47,36 @@ public class HandlerRegistration { private final Consumer onTimeoutConsumer; private final BiConsumer errorConsumer; + private final Runnable onHandled; + private final Runnable onError; + private final Runnable onCancelled; private final Duration timeout; - private volatile boolean cancelled = false; - private volatile boolean handled = false; + private final CompletableFuture handled = new CompletableFuture<>(); public HandlerRegistration(Deque, Predicate>> commands, Class expectClazz, Consumer packetConsumer, Consumer onTimeoutConsumer, BiConsumer errorConsumer, Duration timeout) { + this( + commands, + expectClazz, + packetConsumer, + onTimeoutConsumer, + errorConsumer, + () -> {}, + () -> {}, + () -> {}, + timeout + ); + } + + public HandlerRegistration(Deque, Predicate>> commands, Class expectClazz, Consumer packetConsumer, Consumer onTimeoutConsumer, BiConsumer errorConsumer, Runnable onHandled, Runnable onError, Runnable onCancelled, Duration timeout) { this.commands = commands; this.expectClazz = expectClazz; this.packetConsumer = packetConsumer; this.onTimeoutConsumer = onTimeoutConsumer; this.errorConsumer = errorConsumer; + this.onHandled = onHandled; + this.onError = onError; + this.onCancelled = onCancelled; this.timeout = timeout; } @@ -82,21 +105,59 @@ public Duration getTimeout() { } public void cancel() { - this.cancelled = true; + handled.cancel(true); + onCancelled.run(); + } + + @Override + public boolean cancel(boolean ignored) { + if (isCancelled()) { + return false; + } else { + cancel(); + return true; + } } public boolean isCancelled() { - return this.cancelled; + return handled.isCancelled(); + } + + @Override + public boolean isDone() { + return hasHandled(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + return handled.get(); + } + + @Override + public Void get(long amount, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + return handled.get(amount, timeUnit); } public void confirmHandled() { - this.handled = true; + confirmCompleted(); + this.onHandled.run(); + } + + public void confirmError() { + confirmCompleted(); + this.onError.run(); + } + + public void confirmCompleted() { + this.handled.complete(null); } public boolean hasHandled() { - return this.handled; + return this.handled.isDone(); } + + @Override public String toString() { return "HandlerRegistration#" + id; diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java index 07aa5a0a1ce..90695013d63 100644 --- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java @@ -86,7 +86,7 @@ void conversationTimeoutTest() throws Exception { .onError((value, throwable) -> error.set(true)) .handle((answer) -> handled.set(true)); - Thread.sleep(750); + handler.awaitResponse(); verify(true, false, false); wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); @@ -104,6 +104,7 @@ void conversationWithNoTimeoutTest() throws Exception { verify(false, false, false); wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); + handler.awaitResponse(); verify(false, false, true); }