diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java index f6b5434bca0..88d6e06c674 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java @@ -21,6 +21,7 @@ import static org.apache.plc4x.java.opcua.context.SecureChannel.getX509Certificate; import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; @@ -77,7 +78,7 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase implements H new NullExtension()); // Body private static final long EPOCH_OFFSET = 116444736000000000L; //Offset between OPC UA epoch time and linux epoch time. - private final Map subscriptions = new HashMap<>(); + private final Map subscriptions = new ConcurrentHashMap<>(); private final RequestTransactionManager tm = new RequestTransactionManager(); private OpcuaConfiguration configuration; @@ -760,7 +761,8 @@ public CompletableFuture subscribe(PlcSubscriptionReque // bridge(transaction, future, response, error) onSubscribeCreateSubscription(cycleTime).thenApply(response -> { long subscriptionId = response.getSubscriptionId(); - OpcuaSubscriptionHandle handle = new OpcuaSubscriptionHandle(this, conversation, subscriptionRequest, subscriptionId, cycleTime); + OpcuaSubscriptionHandle handle = new OpcuaSubscriptionHandle(this, tm, + conversation, subscriptionRequest, subscriptionId, cycleTime); subscriptions.put(handle.getSubscriptionId(), handle); return handle; }) diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java index 0472ca10bfd..d7029b42d2e 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java @@ -22,6 +22,7 @@ import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; @@ -35,6 +36,8 @@ import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration; import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag; import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle; +import org.apache.plc4x.java.spi.transaction.RequestTransactionManager; +import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,16 +59,19 @@ public class OpcuaSubscriptionHandle extends DefaultPlcSubscriptionHandle { private final List tagNames; private final Conversation conversation; private final PlcSubscriptionRequest subscriptionRequest; - private final AtomicBoolean destroy = new AtomicBoolean(false); private final OpcuaProtocolLogic plcSubscriber; private final Long subscriptionId; private final long cycleTime; private final long revisedCycleTime; private final AtomicLong clientHandles = new AtomicLong(1L); + private final RequestTransactionManager tm; + private ScheduledFuture publishTask; - public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, Conversation conversation, PlcSubscriptionRequest subscriptionRequest, Long subscriptionId, long cycleTime) { + public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, RequestTransactionManager tm, + Conversation conversation, PlcSubscriptionRequest subscriptionRequest, Long subscriptionId, long cycleTime) { super(plcSubscriber); + this.tm = tm; this.consumers = new HashSet<>(); this.subscriptionRequest = subscriptionRequest; this.tagNames = new ArrayList<>(subscriptionRequest.getTagNames()); @@ -146,19 +152,13 @@ public CompletableFuture onSubscribeCreateMonitoredItem LOGGER.debug("Tag {} was added to the subscription", tagNames.get(index)); } } - schedulePublishRequest(); + + LOGGER.trace("Scheduling publish event for subscription {}", subscriptionId); + publishTask = EXECUTOR.scheduleAtFixedRate(this::sendPublishRequest, revisedCycleTime / 2, revisedCycleTime, TimeUnit.MILLISECONDS); return this; }); } - private void sleep(long length) { - try { - Thread.sleep(length); - } catch (InterruptedException e) { - LOGGER.trace("Interrupted Exception"); - } - } - /** * Main subscriber loop. For subscription, we still need to send a request the server on every cycle. * Which includes a request for an update of the previously agreed upon list of tags. @@ -166,12 +166,6 @@ private void sleep(long length) { * * @return */ - public void schedulePublishRequest() { - LOGGER.trace("Scheduling publish event for subscription {}", subscriptionId); - - EXECUTOR.schedule(this::sendPublishRequest, revisedCycleTime, TimeUnit.MILLISECONDS); - } - private void sendPublishRequest() { List outstandingAcknowledgements = new LinkedList<>(); List outstandingRequests = new LinkedList<>(); @@ -181,36 +175,43 @@ private void sendPublishRequest() { RequestHeader requestHeader = conversation.createRequestHeader(this.revisedCycleTime * 10); //Make a copy of the outstanding requests, so it isn't modified while we are putting the ack list together. - List acks = ((List) ((LinkedList) outstandingAcknowledgements).clone()); - int ackLength = acks.size() == 0 ? -1 : acks.size(); + List acks = new ArrayList<>(outstandingAcknowledgements); + int ackLength = acks.isEmpty() ? -1 : acks.size(); outstandingAcknowledgements.removeAll(acks); - PublishRequest publishRequest = new PublishRequest( - requestHeader, - ackLength, - acks - ); + PublishRequest publishRequest = new PublishRequest(requestHeader, ackLength, acks); + // we work in external thread - we need to coordinate access to conversation pipeline + RequestTransaction transaction = tm.startRequest(); + transaction.submit(() -> { + // Create Consumer for the response message, error and timeout to be sent to the Secure Channel + conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> { + outstandingRequests.remove(((ResponseHeader) responseMessage.getResponseHeader()).getRequestHandle()); - // Create Consumer for the response message, error and timeout to be sent to the Secure Channel - conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> { - outstandingRequests.remove(((ResponseHeader) responseMessage.getResponseHeader()).getRequestHandle()); - - for (long availableSequenceNumber : responseMessage.getAvailableSequenceNumbers()) { - outstandingAcknowledgements.add(new SubscriptionAcknowledgement(this.subscriptionId, availableSequenceNumber)); - } + for (long availableSequenceNumber : responseMessage.getAvailableSequenceNumbers()) { + outstandingAcknowledgements.add(new SubscriptionAcknowledgement(this.subscriptionId, availableSequenceNumber)); + } - for (ExtensionObject notificationMessage : ((NotificationMessage) responseMessage.getNotificationMessage()).getNotificationData()) { - ExtensionObjectDefinition notification = notificationMessage.getBody(); - if (notification instanceof DataChangeNotification) { - LOGGER.trace("Found a Data Change notification"); - List items = ((DataChangeNotification) notification).getMonitoredItems(); - onSubscriptionValue(items.toArray(new MonitoredItemNotification[0])); + for (ExtensionObject notificationMessage : ((NotificationMessage) responseMessage.getNotificationMessage()).getNotificationData()) { + ExtensionObjectDefinition notification = notificationMessage.getBody(); + if (notification instanceof DataChangeNotification) { + LOGGER.trace("Found a Data Change notification"); + List items = ((DataChangeNotification) notification).getMonitoredItems(); + onSubscriptionValue(items.toArray(new MonitoredItemNotification[0])); + } else { + LOGGER.warn("Unsupported Notification type"); + } + } + }).whenComplete((result, error) -> { + if (error != null) { + LOGGER.warn("Publish request of subscription {} resulted in error reported by server", subscriptionId, error); + transaction.failRequest(error); } else { - LOGGER.warn("Unsupported Notification type"); + LOGGER.trace("Completed publish request for subscription {}", subscriptionId); + transaction.endRequest(); } - } + }); + outstandingRequests.add(requestHeader.getRequestHandle()); }); - outstandingRequests.add(requestHeader.getRequestHandle()); } } @@ -221,36 +222,29 @@ private void sendPublishRequest() { * @return */ public void stopSubscriber() { - this.destroy.set(true); - - /* - RequestHeader requestHeader = channel.createRequestHeader(this.revisedCycleTime * 10); - List subscriptions = new ArrayList<>(1); - subscriptions.add(subscriptionId); + RequestHeader requestHeader = conversation.createRequestHeader(this.revisedCycleTime * 10); + List subscriptions = Collections.singletonList(subscriptionId); DeleteSubscriptionsRequest deleteSubscriptionRequest = new DeleteSubscriptionsRequest(requestHeader, 1, subscriptions ); - ExpandedNodeId extExpandedNodeId = new ExpandedNodeId(false, //Namespace Uri Specified - false, //Server Index Specified - new NodeIdFourByte((short) 0, Integer.parseInt(deleteSubscriptionRequest.getIdentifier())), - null, - null); - - ExtensionObject extObject = new ExtensionObject( - extExpandedNodeId, - null, - deleteSubscriptionRequest); - - // Create Consumer for the response message, error and timeout to be sent to the Secure Channel - channel.submit(DeleteSubscriptionsResponse.class, extObject).thenAccept(responseMessage -> { - // TODO shall we clean resources here? + // subscription suspend can be invoked from multiple places, hence we manage transaction side of it + RequestTransaction transaction = tm.startRequest(); + transaction.submit(() -> { + // Create Consumer for the response message, error and timeout to be sent to the Secure Channel + conversation.submit(deleteSubscriptionRequest, DeleteSubscriptionsResponse.class) + .thenAccept(responseMessage -> publishTask.cancel(true)) + .whenComplete((result, error) -> { + if (error != null) { + LOGGER.error("Deletion of subscription resulted in error", error); + transaction.failRequest(error); + } else { + transaction.endRequest(); + } + plcSubscriber.removeSubscription(subscriptionId); + }); }); - - sleep(500); - plcSubscriber.removeSubscription(subscriptionId); - */ } /** @@ -287,4 +281,5 @@ public PlcConsumerRegistration register(Consumer consumer) public Long getSubscriptionId() { return subscriptionId; } + }