Skip to content

Commit

Permalink
fix(plc4j/opcua): Make sure UA subscription acknowledges are retained…
Browse files Browse the repository at this point in the history
… over publish cycles.

Closes #1364.

Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Apr 12, 2024
1 parent 930cf67 commit bacd928
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -65,6 +66,8 @@ public class OpcuaSubscriptionHandle extends DefaultPlcSubscriptionHandle {

private final AtomicLong clientHandles = new AtomicLong(1L);
private final RequestTransactionManager tm;

private final List<SubscriptionAcknowledgement> outstandingAcknowledgements = new CopyOnWriteArrayList();
private ScheduledFuture<?> publishTask;

public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, RequestTransactionManager tm,
Expand Down Expand Up @@ -166,7 +169,6 @@ public CompletableFuture<OpcuaSubscriptionHandle> onSubscribeCreateMonitoredItem
* @return
*/
private void sendPublishRequest() {
List<ExtensionObjectDefinition> outstandingAcknowledgements = new LinkedList<>();
List<Long> outstandingRequests = new LinkedList<>();

//If we are waiting on a response and haven't received one, just wait until we do. A keep alive will be sent out eventually
Expand All @@ -184,6 +186,7 @@ private void sendPublishRequest() {
// we work in external thread - we need to coordinate access to conversation pipeline
RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> {
LOGGER.trace("Sent publish request with {} acks", ackLength);
// Create Consumer for the response message, error and timeout to be sent to the Secure Channel
conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> {
outstandingRequests.remove(((ResponseHeader) responseMessage.getResponseHeader()).getRequestHandle());
Expand Down

0 comments on commit bacd928

Please sign in to comment.