Skip to content

Commit

Permalink
fix(plc4j/genericcan) Fix of generic CAN driver and CAN transports af…
Browse files Browse the repository at this point in the history
…ter recent releases.

Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Mar 11, 2024
1 parent e22fde2 commit eaa0fe1
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 344 deletions.
6 changes: 5 additions & 1 deletion plc4j/drivers/can/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-transport-socketcan</artifactId>
<artifactId>plc4j-transport-test</artifactId>
<version>0.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
Expand All @@ -122,6 +122,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>

<!-- Logging -->
<dependency>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum GenericCANDataType {
INTEGER56((long) 16L, (String) "RAW_BYTE_ARRAY", (short) 56),
INTEGER64((long) 17L, (String) "LINT", (short) 64),
REAL32((long) 18L, (String) "REAL", (short) 32),
REAL64((long) 19L, (String) "LREAL", (short) 64);
REAL64((long) 19L, (String) "LREAL", (short) 64),
RAW((long) 20L, (String) "RAW_BYTE_ARRAY", (short) 8);
private static final Map<Long, GenericCANDataType> map;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ public void decode(ConversationContext<GenericFrame> context, GenericFrame msg)
ReadBuffer buffer = new ReadBufferByteBased(msg.getData(), ByteOrder.LITTLE_ENDIAN);
buffer.pullContext("readTags");
if (subscription.matches(msg.getNodeId())) {
byte[] data = msg.getData();
ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, ByteOrder.LITTLE_ENDIAN);
for (Entry<String, GenericCANTag> tag : subscription.getTags().entrySet()) {
try {
PlcValue value = read(buffer, tag.getValue());
PlcValue value = read(readBuffer, tag.getValue(), data.length);
if (value == null) {
tags.put(tag.getKey(), new ResponseItem<>(PlcResponseCode.INTERNAL_ERROR, null));
} else {
Expand All @@ -112,20 +114,20 @@ public void decode(ConversationContext<GenericFrame> context, GenericFrame msg)
}
}

private PlcValue read(ReadBuffer buffer, GenericCANTag tag) throws ParseException {
private PlcValue read(ReadBuffer buffer, GenericCANTag tag, int length) throws ParseException {
try {
buffer.pullContext("read-" + tag);
return DataItem.staticParse(buffer, tag.getDataType());
return DataItem.staticParse(buffer, tag.getDataType(), length);
} finally {
buffer.closeContext("read-" + tag);
}
}

private void write(WriteBuffer buffer, GenericCANTag tag, PlcValue value) throws SerializationException {
WriteBufferByteBased writeBuffer = new WriteBufferByteBased(DataItem.getLengthInBytes(value, tag.getDataType()));
DataItem.staticSerialize(writeBuffer, value, tag.getDataType());
try {
buffer.pushContext("write-" + tag);
WriteBufferByteBased writeBuffer = new WriteBufferByteBased(DataItem.getLengthInBytes(value, tag.getDataType(), value.getLength()), ByteOrder.LITTLE_ENDIAN);
DataItem.staticSerialize(writeBuffer, value, tag.getDataType(), value.getLength(), ByteOrder.LITTLE_ENDIAN);
buffer.writeByteArray(writeBuffer.getBytes());
} finally {
buffer.popContext("write-" + tag);
Expand Down Expand Up @@ -193,7 +195,7 @@ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionReque
Map<String, ResponseItem<PlcSubscriptionHandle>> answers = new LinkedHashMap<>();
DefaultPlcSubscriptionResponse response = new DefaultPlcSubscriptionResponse(rq, answers);

Map<Integer, GenericCANSubscriptionHandle> handles = new HashMap<>();
Map<Integer, GenericCANSubscriptionHandle> handles = new LinkedHashMap<>();
for (String key : rq.getTagNames()) {
DefaultPlcSubscriptionTag subscription = (DefaultPlcSubscriptionTag) rq.getTag(key);
if (subscription.getPlcSubscriptionType() != PlcSubscriptionType.EVENT) {
Expand All @@ -215,7 +217,7 @@ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionReque

@Override
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
final DefaultPlcConsumerRegistration consumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new DefaultPlcSubscriptionHandle[0]));
final DefaultPlcConsumerRegistration consumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new PlcSubscriptionHandle[0]));
consumers.put(consumerRegistration, consumer);
return consumerRegistration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
import org.apache.plc4x.java.transport.can.CANTransport;
import org.apache.plc4x.java.transport.can.FrameData;

/**
* Generic CAN frame handler turn a wire level message @{@link FrameData} into a wrapper which
* does not have any specific other than node id and data.
*
* Because it is used by generic purpose driver it can not assume any semantics on message role.
*/
public class GenericCANFrameDataHandler implements CANTransport.FrameHandler<Message, GenericFrame> {

private final Supplier<CANFrameBuilder<Message>> frameBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.plc4x.java.can.generic.transport;

import org.apache.plc4x.java.spi.generation.Message;
import org.apache.plc4x.java.spi.generation.SerializationException;
import org.apache.plc4x.java.spi.generation.WriteBuffer;

public class GenericFrame implements Message {
/**
* Wrapper for wire level data.
*/
public class GenericFrame {

private final int nodeId;
private final byte[] data;
Expand All @@ -40,27 +39,4 @@ public byte[] getData() {
return data;
}

@Override
public void serialize(WriteBuffer writeBuffer) throws SerializationException {
// TODO: Is this correct?
writeBuffer.writeUnsignedShort("length", 8, (short) data.length);
writeBuffer.writeUnsignedInt("nodeId", 32, nodeId);
writeBuffer.writeByteArray("data", data);
}

@Override
public int getLengthInBytes() {
return 0;
}

@Override
public int getLengthInBits() {
return 0;
}

/*@Override
public MessageIO<? extends Message, ? extends Message> getMessageIO() {
return null;
}*/

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,50 @@
*/
package org.apache.plc4x.java.can.generic;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.embedded.Plc4xEmbeddedChannel;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.plc4x.java.DefaultPlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest.Builder;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.model.PlcTag;
import org.apache.plc4x.java.spi.connection.ChannelExposingConnection;
import org.apache.plc4x.java.spi.values.PlcBYTE;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.internal.util.Primitives;

import static java.util.Map.entry;
import static org.junit.jupiter.api.Assertions.*;

/**
* Test of generic can driver with virtual can transport.
*
* This test have additional role of confirming end to end behavior of driver and transport layer.
* The virtual can transport rely on netty channel/pipeline infrastructure, hence it does not have
* any backend such as memory queue. It simply converts message to a stream and expect it to be read.
*/
public class GenericCANDriverTest {

@Test
Expand All @@ -44,51 +76,112 @@ void testConnection() throws PlcConnectionException {
}

@Test
@Disabled("This test requires working virtual CAN transport to be truly platform independent")
void testSubscribeAndWrite() throws Exception {
// PlcConnection connection1 = new PlcDriverManager().getConnection("genericcan:socketcan://vcan0");
// PlcConnection connection2 = new PlcDriverManager().getConnection("genericcan:socketcan://vcan0");
PlcConnection connection1 = new DefaultPlcDriverManager().getConnection("genericcan:virtualcan://");
PlcConnection connection2 = connection1;
Map<String, Entry<String, Object>> tagMap = Map.ofEntries(
entry("tag1", entry("200:BYTE", (short) 0x0A)),
entry("tag2", entry("200:INTEGER8", (byte) 20)),
entry("tag3", entry("200:INTEGER8", (byte) 30))
);

subscribeAndWrite(tagMap);
}

@Test
void testSubscribeAndWriteRawArray() throws Exception {
Map<String, Entry<String, Object>> tagMap = Map.ofEntries(
entry("arr1", entry("201:RAW", new byte[] {
(short) 0, (short) 1, (short) 2, (short) 3, (short) 4, (short) 5, (short) 6, (short) 7}
))
);

subscribeAndWrite(tagMap);
}

@Test
@Disabled("Writing arrays requires use of RAW type")
void testSubscribeAndWriteByteArray() throws Exception {
Map<String, Entry<String, Object>> tagMap = Map.ofEntries(
entry("arr1", entry("201:BYTE[8]", new byte[] {
(short) 0, (short) 1, (short) 2, (short) 3, (short) 4, (short) 5, (short) 6, (short) 7}
))
);

subscribeAndWrite(tagMap);
}


void subscribeAndWrite(Map<String, Entry<String, Object>> entries) throws Exception {
PlcConnection connection = new DefaultPlcDriverManager().getConnection("genericcan:virtualcan://");

Plc4xEmbeddedChannel subscribeChannel = null;
Plc4xEmbeddedChannel writeChannel = null;
if (connection instanceof ChannelExposingConnection) {
Channel channel = ((ChannelExposingConnection) connection).getChannel();
if (channel instanceof Plc4xEmbeddedChannel) {
subscribeChannel = (Plc4xEmbeddedChannel) channel;
writeChannel = (Plc4xEmbeddedChannel) channel;
}
}
if (subscribeChannel == null) {
throw new IllegalArgumentException("Invalid configuration");
}

CountDownLatch latch = new CountDownLatch(1);
Byte tag1 = 0x55;
short tag2 = 10;
short tag3 = 50;

final AtomicReference<PlcSubscriptionEvent> plcEvent = new AtomicReference<>();
connection1.subscriptionRequestBuilder()
.addEventTagAddress("tag1", "200:BYTE")
.addEventTagAddress("tag2", "200:UNSIGNED8")
.addEventTagAddress("tag3", "200:UNSIGNED8")
.build().execute().whenComplete((reply, error) -> {
if (error != null) {
fail(error);
return;
}

reply.getSubscriptionHandle("tag1").register(event -> {
plcEvent.set(event);
latch.countDown();
});
Builder subscriptionRequestBuilder = connection.subscriptionRequestBuilder();
entries.forEach((k, v) -> {
subscriptionRequestBuilder.addEventTagAddress(k, v.getKey());
});

subscriptionRequestBuilder.build().execute().whenComplete((reply, error) -> {
if (error != null) {
fail(error);
return;
}

reply.getSubscriptionHandle(entries.keySet().iterator().next()).register(event -> {
plcEvent.set(event);
latch.countDown();
});
});

connection2.writeRequestBuilder()
.addTagAddress("f1", "200:BYTE", tag1)
.addTagAddress("f2", "200:UNSIGNED8", tag2)
.addTagAddress("f3", "200:UNSIGNED8", tag3)
.build().execute().whenComplete((reply, error) -> {
if (error != null) {
fail(error);
}
}).get();
PlcWriteRequest.Builder writeRequestBuilder = connection.writeRequestBuilder();
entries.forEach((k, v) -> {
writeRequestBuilder.addTagAddress(k, v.getKey(), v.getValue());
});
writeRequestBuilder.build().execute().whenComplete((reply, error) -> {
if (error != null) {
fail(error);
}
}).get();

// copy outbound message to inbound queue to confirm that transport API works and subscription
// is properly matched against incoming message
ByteBuf outgoing = writeChannel.flushOutbound().readOutbound();
subscribeChannel.writeInbound(outgoing);
subscribeChannel.flushInbound();

latch.await();

PlcSubscriptionEvent event = plcEvent.get();
assertEquals(tag1, event.getByte("tag1"));
assertEquals(tag2, event.getShort("tag2"));
assertEquals(tag3, event.getShort("tag3"));
entries.forEach((k, v) -> {
Object object = event.getObject(k);
if (!v.getValue().getClass().isArray()) {
assertEquals(v.getValue(), object);
return;
}

// comparing arrays is a bit of nightmare due to primitives
int length = Array.getLength(v.getValue());
int readLength = Array.getLength(object);
if (readLength != length) {
throw new IllegalArgumentException("Return value length do not match reference value");
}
for (int index = 0; index < length; index++) {
assertEquals(Array.get(v.getValue(), index), Array.get(object, index));
}
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public enum CANOpenDataType {
INTEGER64((long) 16L, (String) "LINT", (short) 64),
REAL32((long) 17L, (String) "REAL", (short) 32),
REAL64((long) 18L, (String) "LREAL", (short) 64),
RECORD((long) 19L, (String) "BYTE", (short) 8),
OCTET_STRING((long) 20L, (String) "BYTE", (short) 8),
RECORD((long) 19L, (String) "RAW_BYTE_ARRAY", (short) 8),
OCTET_STRING((long) 20L, (String) "RAW_BYTE_ARRAY", (short) 8),
VISIBLE_STRING((long) 21L, (String) "CHAR", (short) 8),
UNICODE_STRING((long) 22L, (String) "WCHAR", (short) 16),
TIME_OF_DAY((long) 23L, (String) "TIME_OF_DAY", (short) 48),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,6 @@ public void decode(ConversationContext<CANOpenFrame> context, CANOpenFrame msg)
logger.debug("Decoded CANOpen {} from {}, message {}", service, nodeId, payload);
}
}

// int identifier = msg.getIdentifier();
// CANOpenService service = CANOpenService.valueOf((byte) (identifier >> 7));
// if (service != null) {
// ReadBuffer buffer = new ReadBuffer(msg.getData());
// CANOpenPayload payload = CANOpenPayloadIO.staticParse(buffer, service);
//
//
// }
}

private void publishEvent(CANOpenService service, int nodeId, CANOpenPayload payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ public PlcValueType getPlcValueType() {
}

@Override
public String toString() {
return Hex.encodeHexString(value);
public byte[] getRaw() {
return value;
}

public byte[] getBytes() {
return value;
@Override
public String toString() {
return Hex.encodeHexString(value);
}

@Override
Expand Down
Loading

0 comments on commit eaa0fe1

Please sign in to comment.