Skip to content

Commit

Permalink
Fix subscription tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Dec 15, 2023
1 parent 5b5caf1 commit 624162b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.plc4x.java.opcua.context.SecureChannel.getX509Certificate;

import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
new NullExtension()); // Body

private static final long EPOCH_OFFSET = 116444736000000000L; //Offset between OPC UA epoch time and linux epoch time.
private final Map<Long, OpcuaSubscriptionHandle> subscriptions = new HashMap<>();
private final Map<Long, OpcuaSubscriptionHandle> subscriptions = new ConcurrentHashMap<>();
private final RequestTransactionManager tm = new RequestTransactionManager();

private OpcuaConfiguration configuration;
Expand Down Expand Up @@ -760,7 +761,8 @@ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionReque
// bridge(transaction, future, response, error)
onSubscribeCreateSubscription(cycleTime).thenApply(response -> {
long subscriptionId = response.getSubscriptionId();
OpcuaSubscriptionHandle handle = new OpcuaSubscriptionHandle(this, conversation, subscriptionRequest, subscriptionId, cycleTime);
OpcuaSubscriptionHandle handle = new OpcuaSubscriptionHandle(this, tm,
conversation, subscriptionRequest, subscriptionId, cycleTime);
subscriptions.put(handle.getSubscriptionId(), handle);
return handle;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
Expand All @@ -35,6 +36,8 @@
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,16 +59,19 @@ public class OpcuaSubscriptionHandle extends DefaultPlcSubscriptionHandle {
private final List<String> tagNames;
private final Conversation conversation;
private final PlcSubscriptionRequest subscriptionRequest;
private final AtomicBoolean destroy = new AtomicBoolean(false);
private final OpcuaProtocolLogic plcSubscriber;
private final Long subscriptionId;
private final long cycleTime;
private final long revisedCycleTime;

private final AtomicLong clientHandles = new AtomicLong(1L);
private final RequestTransactionManager tm;
private ScheduledFuture<?> publishTask;

public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, Conversation conversation, PlcSubscriptionRequest subscriptionRequest, Long subscriptionId, long cycleTime) {
public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, RequestTransactionManager tm,
Conversation conversation, PlcSubscriptionRequest subscriptionRequest, Long subscriptionId, long cycleTime) {
super(plcSubscriber);
this.tm = tm;
this.consumers = new HashSet<>();
this.subscriptionRequest = subscriptionRequest;
this.tagNames = new ArrayList<>(subscriptionRequest.getTagNames());
Expand Down Expand Up @@ -146,32 +152,20 @@ public CompletableFuture<OpcuaSubscriptionHandle> onSubscribeCreateMonitoredItem
LOGGER.debug("Tag {} was added to the subscription", tagNames.get(index));
}
}
schedulePublishRequest();

LOGGER.trace("Scheduling publish event for subscription {}", subscriptionId);
publishTask = EXECUTOR.scheduleAtFixedRate(this::sendPublishRequest, revisedCycleTime / 2, revisedCycleTime, TimeUnit.MILLISECONDS);
return this;
});
}

private void sleep(long length) {
try {
Thread.sleep(length);
} catch (InterruptedException e) {
LOGGER.trace("Interrupted Exception");
}
}

/**
* Main subscriber loop. For subscription, we still need to send a request the server on every cycle.
* Which includes a request for an update of the previously agreed upon list of tags.
* The server will respond at most once every cycle.
*
* @return
*/
public void schedulePublishRequest() {
LOGGER.trace("Scheduling publish event for subscription {}", subscriptionId);

EXECUTOR.schedule(this::sendPublishRequest, revisedCycleTime, TimeUnit.MILLISECONDS);
}

private void sendPublishRequest() {
List<ExtensionObjectDefinition> outstandingAcknowledgements = new LinkedList<>();
List<Long> outstandingRequests = new LinkedList<>();
Expand All @@ -181,36 +175,43 @@ private void sendPublishRequest() {
RequestHeader requestHeader = conversation.createRequestHeader(this.revisedCycleTime * 10);

//Make a copy of the outstanding requests, so it isn't modified while we are putting the ack list together.
List<ExtensionObjectDefinition> acks = ((List<ExtensionObjectDefinition>) ((LinkedList) outstandingAcknowledgements).clone());
int ackLength = acks.size() == 0 ? -1 : acks.size();
List<ExtensionObjectDefinition> acks = new ArrayList<>(outstandingAcknowledgements);
int ackLength = acks.isEmpty() ? -1 : acks.size();
outstandingAcknowledgements.removeAll(acks);

PublishRequest publishRequest = new PublishRequest(
requestHeader,
ackLength,
acks
);
PublishRequest publishRequest = new PublishRequest(requestHeader, ackLength, acks);
// we work in external thread - we need to coordinate access to conversation pipeline
RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> {
// Create Consumer for the response message, error and timeout to be sent to the Secure Channel
conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> {
outstandingRequests.remove(((ResponseHeader) responseMessage.getResponseHeader()).getRequestHandle());

// Create Consumer for the response message, error and timeout to be sent to the Secure Channel
conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> {
outstandingRequests.remove(((ResponseHeader) responseMessage.getResponseHeader()).getRequestHandle());

for (long availableSequenceNumber : responseMessage.getAvailableSequenceNumbers()) {
outstandingAcknowledgements.add(new SubscriptionAcknowledgement(this.subscriptionId, availableSequenceNumber));
}
for (long availableSequenceNumber : responseMessage.getAvailableSequenceNumbers()) {
outstandingAcknowledgements.add(new SubscriptionAcknowledgement(this.subscriptionId, availableSequenceNumber));
}

for (ExtensionObject notificationMessage : ((NotificationMessage) responseMessage.getNotificationMessage()).getNotificationData()) {
ExtensionObjectDefinition notification = notificationMessage.getBody();
if (notification instanceof DataChangeNotification) {
LOGGER.trace("Found a Data Change notification");
List<ExtensionObjectDefinition> items = ((DataChangeNotification) notification).getMonitoredItems();
onSubscriptionValue(items.toArray(new MonitoredItemNotification[0]));
for (ExtensionObject notificationMessage : ((NotificationMessage) responseMessage.getNotificationMessage()).getNotificationData()) {
ExtensionObjectDefinition notification = notificationMessage.getBody();
if (notification instanceof DataChangeNotification) {
LOGGER.trace("Found a Data Change notification");
List<ExtensionObjectDefinition> items = ((DataChangeNotification) notification).getMonitoredItems();
onSubscriptionValue(items.toArray(new MonitoredItemNotification[0]));
} else {
LOGGER.warn("Unsupported Notification type");
}
}
}).whenComplete((result, error) -> {
if (error != null) {
LOGGER.warn("Publish request of subscription {} resulted in error reported by server", subscriptionId, error);
transaction.failRequest(error);
} else {
LOGGER.warn("Unsupported Notification type");
LOGGER.trace("Completed publish request for subscription {}", subscriptionId);
transaction.endRequest();
}
}
});
outstandingRequests.add(requestHeader.getRequestHandle());
});
outstandingRequests.add(requestHeader.getRequestHandle());
}
}

Expand All @@ -221,36 +222,29 @@ private void sendPublishRequest() {
* @return
*/
public void stopSubscriber() {
this.destroy.set(true);

/*
RequestHeader requestHeader = channel.createRequestHeader(this.revisedCycleTime * 10);
List<Long> subscriptions = new ArrayList<>(1);
subscriptions.add(subscriptionId);
RequestHeader requestHeader = conversation.createRequestHeader(this.revisedCycleTime * 10);
List<Long> subscriptions = Collections.singletonList(subscriptionId);
DeleteSubscriptionsRequest deleteSubscriptionRequest = new DeleteSubscriptionsRequest(requestHeader,
1,
subscriptions
);

ExpandedNodeId extExpandedNodeId = new ExpandedNodeId(false, //Namespace Uri Specified
false, //Server Index Specified
new NodeIdFourByte((short) 0, Integer.parseInt(deleteSubscriptionRequest.getIdentifier())),
null,
null);
ExtensionObject extObject = new ExtensionObject(
extExpandedNodeId,
null,
deleteSubscriptionRequest);
// Create Consumer for the response message, error and timeout to be sent to the Secure Channel
channel.submit(DeleteSubscriptionsResponse.class, extObject).thenAccept(responseMessage -> {
// TODO shall we clean resources here?
// subscription suspend can be invoked from multiple places, hence we manage transaction side of it
RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> {
// Create Consumer for the response message, error and timeout to be sent to the Secure Channel
conversation.submit(deleteSubscriptionRequest, DeleteSubscriptionsResponse.class)
.thenAccept(responseMessage -> publishTask.cancel(true))
.whenComplete((result, error) -> {
if (error != null) {
LOGGER.error("Deletion of subscription resulted in error", error);
transaction.failRequest(error);
} else {
transaction.endRequest();
}
plcSubscriber.removeSubscription(subscriptionId);
});
});
sleep(500);
plcSubscriber.removeSubscription(subscriptionId);
*/
}

/**
Expand Down Expand Up @@ -287,4 +281,5 @@ public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer)
public Long getSubscriptionId() {
return subscriptionId;
}

}

0 comments on commit 624162b

Please sign in to comment.