diff --git a/plc4j/drivers/opcua/pom.xml b/plc4j/drivers/opcua/pom.xml
index 7a5c6b44a44..dab62b94f57 100644
--- a/plc4j/drivers/opcua/pom.xml
+++ b/plc4j/drivers/opcua/pom.xml
@@ -98,6 +98,26 @@
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ license-check
+ verify
+
+ check
+
+
+
+
+
+
+ src/test/resources/chunk-calculation*.csv
+
+
+
@@ -127,6 +147,10 @@
org.apache.commons
commons-lang3
+
+ commons-codec
+ commons-codec
+
io.vavr
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/config/Limits.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/config/Limits.java
new file mode 100644
index 00000000000..4fdaee87967
--- /dev/null
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/config/Limits.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.opcua.config;
+
+import org.apache.plc4x.java.spi.configuration.Configuration;
+import org.apache.plc4x.java.spi.configuration.annotations.ConfigurationParameter;
+
+public class Limits implements Configuration {
+
+ private static final int DEFAULT_RECEIVE_BUFFER_SIZE = 65535;
+ private static final int DEFAULT_SEND_BUFFER_SIZE = 65535;
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 2097152;
+ private static final int DEFAULT_MAX_CHUNK_COUNT = 64;
+
+ @ConfigurationParameter("receiveBufferSize")
+ private int receiveBufferSize;
+ @ConfigurationParameter("sendBufferSize")
+ private int sendBufferSize;
+ @ConfigurationParameter("maxMessageSize")
+ private int maxMessageSize;
+ @ConfigurationParameter("maxChunkCount")
+ private int maxChunkCount;
+
+ public Limits() {
+ this(DEFAULT_RECEIVE_BUFFER_SIZE, DEFAULT_SEND_BUFFER_SIZE, DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_CHUNK_COUNT);
+ }
+
+ public Limits(int receiveBufferSize, int sendBufferSize, int maxMessageSize, int maxChunkCount) {
+ this.receiveBufferSize = receiveBufferSize;
+ this.sendBufferSize = sendBufferSize;
+ this.maxMessageSize = maxMessageSize;
+ this.maxChunkCount = maxChunkCount;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public int getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+ public int getMaxChunkCount() {
+ return maxChunkCount;
+ }
+
+ @Override
+ public String toString() {
+ return "Limits{" +
+ " receiveBufferSize=" + receiveBufferSize +
+ ", sendBufferSize=" + sendBufferSize +
+ ", maxMessageSize=" + maxMessageSize +
+ ", maxChunkCount=" + maxChunkCount +
+ '}';
+ }
+}
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/config/OpcuaConfiguration.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/config/OpcuaConfiguration.java
index f9c8caf0cee..d3357289820 100644
--- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/config/OpcuaConfiguration.java
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/config/OpcuaConfiguration.java
@@ -18,15 +18,29 @@
*/
package org.apache.plc4x.java.opcua.config;
-import org.apache.plc4x.java.opcua.readwrite.PascalByteString;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import org.apache.plc4x.java.opcua.context.SecureChannel;
+import org.apache.plc4x.java.opcua.security.MessageSecurity;
import org.apache.plc4x.java.opcua.security.SecurityPolicy;
import org.apache.plc4x.java.spi.configuration.Configuration;
+import org.apache.plc4x.java.spi.configuration.annotations.ComplexConfigurationParameter;
import org.apache.plc4x.java.spi.configuration.annotations.ConfigurationParameter;
import org.apache.plc4x.java.spi.configuration.annotations.defaults.BooleanDefaultValue;
-import org.apache.plc4x.java.spi.configuration.annotations.defaults.StringDefaultValue;
public class OpcuaConfiguration implements Configuration {
+ public static final long DEFAULT_CHANNEL_LIFETIME = 3600000;
+
+ public static final long DEFAULT_SESSION_TIMEOUT = 120000;
+ public static final long DEFAULT_NEGOTIATION_TIMEOUT = 60000;
+
+ public static final long DEFAULT_REQUEST_TIMEOUT = 30000;
+
@ConfigurationParameter("protocolCode")
private String protocolCode;
@@ -49,16 +63,47 @@ public class OpcuaConfiguration implements Configuration {
@ConfigurationParameter("securityPolicy")
private SecurityPolicy securityPolicy = SecurityPolicy.NONE;
+ @ConfigurationParameter("messageSecurity")
+ private MessageSecurity messageSecurity = MessageSecurity.SIGN_ENCRYPT;
+
@ConfigurationParameter("keyStoreFile")
private String keyStoreFile;
- @ConfigurationParameter("certDirectory")
- private String certDirectory;
+ @ConfigurationParameter("keyStoreType")
+ private String keyStoreType = KeyStore.getDefaultType();
@ConfigurationParameter("keyStorePassword")
private String keyStorePassword;
- private byte[] senderCertificate;
- private PascalByteString thumbprint;
+
+ @ConfigurationParameter("serverCertificateFile")
+ private String serverCertificateFile;
+
+ @ConfigurationParameter("trustStoreFile")
+ private String trustStoreFile;
+
+ @ConfigurationParameter("trustStoreType")
+ private String trustStoreType = KeyStore.getDefaultType();
+
+ @ConfigurationParameter("trustStorePassword")
+ private String trustStorePassword;
+
+ // the discovered certificate when discovery is enabled
+ private X509Certificate serverCertificate;
+
+ @ConfigurationParameter("channelLifetime")
+ private long channelLifetime = DEFAULT_CHANNEL_LIFETIME;
+
+ @ConfigurationParameter("sessionTimeout")
+ private long sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+
+ @ConfigurationParameter("negotiationTimeout")
+ private long negotiationTimeout = DEFAULT_NEGOTIATION_TIMEOUT;
+
+ @ConfigurationParameter("requestTimeout")
+ private long requestTimeout = DEFAULT_REQUEST_TIMEOUT;
+
+ @ComplexConfigurationParameter(prefix = "encoding", defaultOverrides = {}, requiredOverrides = {})
+ private Limits limits = new Limits();
public String getProtocolCode() {
return protocolCode;
@@ -84,20 +129,73 @@ public String getPassword() {
return password;
}
- public String getCertDirectory() {
- return certDirectory;
- }
-
public SecurityPolicy getSecurityPolicy() {
return securityPolicy;
}
+ public MessageSecurity getMessageSecurity() {
+ return messageSecurity;
+ }
+
public String getKeyStoreFile() {
return keyStoreFile;
}
- public String getKeyStorePassword() {
- return keyStorePassword;
+ public String getKeyStoreType() {
+ return keyStoreType;
+ }
+
+ public char[] getKeyStorePassword() {
+ return keyStorePassword == null ? null : keyStorePassword.toCharArray();
+ }
+
+ public String getTrustStoreFile() {
+ return trustStoreFile;
+ }
+
+ public String getTrustStoreType() {
+ return trustStoreType;
+ }
+
+ public char[] getTrustStorePassword() {
+ return trustStorePassword == null ? null : trustStorePassword.toCharArray();
+ }
+
+ public Limits getEncodingLimits() {
+ return limits;
+ }
+
+ public X509Certificate getServerCertificate() {
+ if (serverCertificate == null && serverCertificateFile != null) {
+ // initialize server certificate from configured file
+ try {
+ byte[] certificateBytes = Files.readAllBytes(Path.of(serverCertificateFile));
+ serverCertificate = SecureChannel.getX509Certificate(certificateBytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return serverCertificate;
+ }
+
+ public void setServerCertificate(X509Certificate serverCertificate) {
+ this.serverCertificate = serverCertificate;
+ }
+
+ public long getChannelLifetime() {
+ return channelLifetime;
+ }
+
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public long getNegotiationTimeout() {
+ return negotiationTimeout;
}
@Override
@@ -108,26 +206,9 @@ public String toString() {
", password='" + (password != null ? "******" : null) + '\'' +
", securityPolicy='" + securityPolicy + '\'' +
", keyStoreFile='" + keyStoreFile + '\'' +
- ", certDirectory='" + certDirectory + '\'' +
", keyStorePassword='" + (keyStorePassword != null ? "******" : null) + '\'' +
+ ", limits=" + limits +
'}';
}
-
- public byte[] getSenderCertificate() {
- return senderCertificate;
- }
-
- public void setSenderCertificate(byte[] senderCertificate) {
- this.senderCertificate = senderCertificate;
- }
-
- public PascalByteString getThumbprint() {
- return this.thumbprint;
- }
-
- public void setThumbprint(PascalByteString thumbprint) {
- this.thumbprint = thumbprint;
- }
-
}
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/AsymmetricEncryptionHandler.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/AsymmetricEncryptionHandler.java
index e537bc1b551..7d8ba1009a4 100644
--- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/AsymmetricEncryptionHandler.java
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/AsymmetricEncryptionHandler.java
@@ -1,208 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.plc4x.java.opcua.context;
-import org.apache.plc4x.java.opcua.readwrite.BinaryPayload;
-import org.apache.plc4x.java.opcua.readwrite.MessagePDU;
-import org.apache.plc4x.java.opcua.readwrite.OpcuaAPU;
-import org.apache.plc4x.java.opcua.readwrite.OpcuaOpenResponse;
-import org.apache.plc4x.java.opcua.security.SecurityPolicy;
-import org.apache.plc4x.java.spi.generation.*;
-
-import javax.crypto.Cipher;
-import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
-import java.security.PublicKey;
import java.security.Signature;
-import java.security.cert.Certificate;
-import java.security.cert.X509Certificate;
-import java.security.interfaces.RSAPublicKey;
-
-public class AsymmetricEncryptionHandler {
- private static final int SECURE_MESSAGE_HEADER_SIZE = 12;
- private static final int SEQUENCE_HEADER_SIZE = 8;
-
- private final SecurityPolicy policy;
+import java.security.SignatureException;
+import javax.crypto.Cipher;
+import org.apache.plc4x.java.opcua.protocol.chunk.Chunk;
+import org.apache.plc4x.java.opcua.security.SecurityPolicy;
+import org.apache.plc4x.java.spi.generation.WriteBufferByteBased;
- private final X509Certificate serverCertificate;
- private final X509Certificate clientCertificate;
+public class AsymmetricEncryptionHandler extends BaseEncryptionHandler {
- private final PrivateKey clientPrivateKey;
+ private final PrivateKey senderPrivateKey;
- public AsymmetricEncryptionHandler(X509Certificate serverCertificate, X509Certificate clientCertificate, PrivateKey clientPrivateKey, PublicKey clientPublicKey, SecurityPolicy policy) {
- this.serverCertificate = serverCertificate;
- this.clientCertificate = clientCertificate;
- this.clientPrivateKey = clientPrivateKey;
- this.policy = policy;
+ public AsymmetricEncryptionHandler(Conversation conversation, SecurityPolicy securityPolicy, PrivateKey senderPrivateKey) {
+ super(conversation, securityPolicy);
+ this.senderPrivateKey = senderPrivateKey;
}
- /**
- * Docs: https://reference.opcfoundation.org/Core/Part6/v104/docs/6.7
- *
- * @param pdu
- * @param message
- * @return
- */
- public ReadBuffer encodeMessage(MessagePDU pdu, byte[] message) {
- int unencryptedLength = pdu.getLengthInBytes();
- int messageLength = message.length;
-
- int beforeBodyLength = unencryptedLength - messageLength; // message header, security header, sequence header
-
- int cipherTextBlockSize = (getAsymmetricKeyLength(serverCertificate) + 7) / 8;
- int plainTextBlockSize = (getAsymmetricKeyLength(serverCertificate) + 7) / 8 - policy.getAsymmetricPlainBlock();
- int signatureSize = (getAsymmetricKeyLength(clientCertificate) + 7) / 8;
-
-
- int maxChunkSize = 8196;
- int paddingOverhead = cipherTextBlockSize > 256 ? 2 : 1;
-
-
- int securityHeaderSize = beforeBodyLength - SEQUENCE_HEADER_SIZE - SECURE_MESSAGE_HEADER_SIZE;
- int maxCipherTextSize = maxChunkSize - securityHeaderSize;
- int maxCipherTextBlocks = maxCipherTextSize / cipherTextBlockSize;
- int maxPlainTextSize = maxCipherTextBlocks * plainTextBlockSize;
- int maxBodySize = maxPlainTextSize - SEQUENCE_HEADER_SIZE - paddingOverhead - signatureSize;
-
- int bodySize = Math.min(message.length, maxBodySize);
-
- int plainTextSize = SEQUENCE_HEADER_SIZE + bodySize + paddingOverhead + signatureSize;
- int remaining = plainTextSize % plainTextBlockSize;
- int paddingSize = remaining > 0 ? plainTextBlockSize - remaining : 0;
-
- int plainTextContentSize = SEQUENCE_HEADER_SIZE + bodySize +
- signatureSize + paddingSize + paddingOverhead;
-
- int frameSize = SECURE_MESSAGE_HEADER_SIZE + securityHeaderSize +
- (plainTextContentSize / plainTextBlockSize) * cipherTextBlockSize;
-
- try {
- WriteBufferByteBased buf = new WriteBufferByteBased(frameSize, ByteOrder.LITTLE_ENDIAN);
- OpcuaAPU opcuaAPU = new OpcuaAPU(pdu);
- opcuaAPU.serialize(buf);
-
- writePadding(paddingSize, buf);
- updateFrameSize(frameSize, buf);
-
- byte[] sign = sign(buf.getBytes());
- buf.writeByteArray(sign);
-
- buf.setPos(SECURE_MESSAGE_HEADER_SIZE + securityHeaderSize);
-
- int blockCount = (frameSize - buf.getPos()) / plainTextBlockSize;// -> plainTextContentSize / plainTextBlockSize
-
- byte[] encrypted = encrypt(plainTextBlockSize, securityHeaderSize, frameSize, buf, blockCount);
- buf.writeByteArray(encrypted);
+ protected void verify(WriteBufferByteBased buffer, Chunk chunk, int messageLength) throws Exception {
+ int signatureStart = messageLength - chunk.getSignatureSize();
+ byte[] message = buffer.getBytes(0, signatureStart);
+ byte[] signatureData = buffer.getBytes(signatureStart, signatureStart + chunk.getSignatureSize());
- return new ReadBufferByteBased(buf.getBytes(), ByteOrder.LITTLE_ENDIAN);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ Signature signature = securityPolicy.getAsymmetricSignatureAlgorithm().getSignature();
+ signature.initVerify(conversation.getRemoteCertificate().getPublicKey());
+ signature.update(message);
+ if (signature.verify(signatureData)) {
+ throw new IllegalArgumentException("Invalid signature");
}
}
- public OpcuaAPU decodeMessage(OpcuaAPU pdu) {
- MessagePDU message = pdu.getMessage();
+ protected int decrypt(WriteBufferByteBased chunkBuffer, Chunk chunk, int messageLength) throws Exception {
+ int bodyStart = 12 + chunk.getSecurityHeaderSize();
- OpcuaOpenResponse a = (OpcuaOpenResponse) message;
+ int bodySize = messageLength - bodyStart;
+ int blockCount = bodySize / chunk.getCipherTextBlockSize();
+ assert(bodySize % chunk.getCipherTextBlockSize() == 0);
+ byte[] encrypted = chunkBuffer.getBytes(bodyStart, bodyStart + bodySize);
+ byte[] plainText = new byte[chunk.getCipherTextBlockSize() * blockCount];
- int cipherTextBlockSize = (getAsymmetricKeyLength(serverCertificate) + 7) / 8;
- int signatureSize = (getAsymmetricKeyLength(clientCertificate) + 7) / 8;
+ Cipher cipher = securityPolicy.getAsymmetricEncryptionAlgorithm().getCipher();
+ cipher.init(Cipher.DECRYPT_MODE, senderPrivateKey);
- if (!(a.getMessage() instanceof BinaryPayload)) {
- throw new IllegalArgumentException("Unexpected payload");
- }
- byte[] textMessage = ((BinaryPayload) a.getMessage()).getPayload();
-
- int blockCount = (SEQUENCE_HEADER_SIZE + textMessage.length) / cipherTextBlockSize;
- int plainTextBufferSize = cipherTextBlockSize * blockCount;
-
- try {
- WriteBufferByteBased buf = new WriteBufferByteBased(pdu.getLengthInBytes(), ByteOrder.LITTLE_ENDIAN);
- pdu.serialize(buf);
-
- Cipher cipher = policy.getAsymmetricEncryptionAlgorithm().getCipher();
- cipher.init(Cipher.DECRYPT_MODE, clientPrivateKey);
-
- ByteBuffer buffer = ByteBuffer.allocate(plainTextBufferSize);
- byte[] bytes = buf.getBytes(pdu.getLengthInBytes() - plainTextBufferSize, pdu.getLengthInBytes());
- //byte[] bytes = textMessage;
- ByteBuffer originalMessage = ByteBuffer.wrap(bytes);
-
- for (int blockNumber = 0; blockNumber < blockCount; blockNumber++) {
- originalMessage.limit(originalMessage.position() + cipherTextBlockSize);
- cipher.doFinal(originalMessage, buffer);
- }
-
- buffer.flip();
- buf.setPos(pdu.getLengthInBytes() - plainTextBufferSize);
- buf.writeByteArray(buffer.array());
- int frameSize = pdu.getLengthInBytes() - plainTextBufferSize + buffer.limit();
- updateFrameSize(frameSize, buf);
-
- byte[] decryptedMessage = buf.getBytes(0, frameSize);
+ int bodyLength = 0;
+ for (int block = 0; block < blockCount; block++) {
+ int pos = block * chunk.getCipherTextBlockSize();
- ReadBuffer readBuffer = new ReadBufferByteBased(decryptedMessage, ByteOrder.LITTLE_ENDIAN);
- OpcuaAPU opcuaAPU = OpcuaAPU.staticParse(readBuffer, true);
- return opcuaAPU;
- } catch (Exception e) {
- throw new RuntimeException(e);
+ bodyLength += cipher.doFinal(encrypted, pos, chunk.getCipherTextBlockSize(), plainText, pos);
}
-
+ chunkBuffer.setPos(bodyStart);
+ byte[] decrypted = new byte[bodyLength];
+ System.arraycopy(plainText, 0, decrypted, 0, bodyLength);
+ chunkBuffer.writeByteArray("payload", decrypted);
+ return bodyLength;
}
- private byte[] encrypt(int plainTextBlockSize, int securityHeaderSize, int frameSize, WriteBufferByteBased buf, int blockCount) throws Exception {
- ByteBuffer buffer = ByteBuffer.allocate(frameSize - buf.getPos());
- ByteBuffer originalMessage = ByteBuffer.wrap(buf.getBytes(SECURE_MESSAGE_HEADER_SIZE + securityHeaderSize, frameSize));
-
+ protected void encrypt(WriteBufferByteBased buffer, int securityHeaderSize, int plainTextBlockSize, int cipherTextBlockSize, int blockCount) throws Exception {
+ int bodyStart = 12 + securityHeaderSize;
+ byte[] copy = buffer.getBytes(bodyStart, bodyStart + (plainTextBlockSize * blockCount));
+ byte[] encrypted = new byte[cipherTextBlockSize * blockCount];
- Cipher cipher = policy.getAsymmetricEncryptionAlgorithm().getCipher();
- cipher.init(Cipher.ENCRYPT_MODE, serverCertificate.getPublicKey());
+ // copy of bytes from sequence header over payload, padding bytes and signature
+ Cipher cipher = securityPolicy.getAsymmetricEncryptionAlgorithm().getCipher();
+ cipher.init(Cipher.ENCRYPT_MODE, conversation.getRemoteCertificate().getPublicKey());
for (int block = 0; block < blockCount; block++) {
- int position = block * plainTextBlockSize;
- int limit = (block + 1) * plainTextBlockSize;
- originalMessage.position(position);
- originalMessage.limit(limit);
-
- cipher.doFinal(originalMessage, buffer);
+ int pos = block * plainTextBlockSize;
+ int target = block * cipherTextBlockSize;
+ cipher.doFinal(copy, pos, plainTextBlockSize, encrypted, target);
}
- return buffer.array();
- }
-
- private static void updateFrameSize(int frameSize, WriteBufferByteBased buf) throws SerializationException {
- int initPosition = buf.getPos();
- buf.setPos(4);
- buf.writeInt(32, frameSize);
- buf.setPos(initPosition);
- }
-
- public byte[] sign(byte[] data) {
- try {
- Signature signature = policy.getAsymmetricSignatureAlgorithm().getSignature();
- signature.initSign(clientPrivateKey);
- signature.update(data);
- return signature.sign();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ buffer.setPos(bodyStart);
+ buffer.writeByteArray("encrypted", encrypted);
}
- private void writePadding(int paddingSize, WriteBufferByteBased buffer) throws Exception {
- buffer.writeByte((byte) paddingSize);
- for (int i = 0; i < paddingSize; i++) {
- buffer.writeByte((byte) paddingSize);
- }
- }
-
-
- static int getAsymmetricKeyLength(Certificate certificate) {
- PublicKey publicKey = certificate != null ?
- certificate.getPublicKey() : null;
-
- return (publicKey instanceof RSAPublicKey) ?
- ((RSAPublicKey) publicKey).getModulus().bitLength() : 0;
+ public byte[] sign(byte[] contentsToSign) throws GeneralSecurityException {
+ Signature signature = securityPolicy.getAsymmetricSignatureAlgorithm().getSignature();
+ signature.initSign(senderPrivateKey);
+ signature.update(contentsToSign);
+ return signature.sign();
}
}
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/BaseEncryptionHandler.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/BaseEncryptionHandler.java
new file mode 100644
index 00000000000..91b7a686ff3
--- /dev/null
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/BaseEncryptionHandler.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.opcua.context;
+
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.plc4x.java.opcua.protocol.chunk.Chunk;
+import org.apache.plc4x.java.opcua.protocol.chunk.PayloadConverter;
+import org.apache.plc4x.java.opcua.readwrite.ChunkType;
+import org.apache.plc4x.java.opcua.readwrite.MessagePDU;
+import org.apache.plc4x.java.opcua.security.SecurityPolicy;
+import org.apache.plc4x.java.spi.generation.ByteOrder;
+import org.apache.plc4x.java.spi.generation.SerializationException;
+import org.apache.plc4x.java.spi.generation.WriteBufferByteBased;
+
+abstract class BaseEncryptionHandler {
+
+ protected static final int SECURE_MESSAGE_HEADER_SIZE = 12;
+ protected static final int SEQUENCE_HEADER_SIZE = 8;
+
+ protected final Conversation conversation;
+ protected final SecurityPolicy securityPolicy;
+
+ public BaseEncryptionHandler(Conversation conversation, SecurityPolicy securityPolicy) {
+ this.conversation = conversation;
+ this.securityPolicy = securityPolicy;
+ }
+ public final List encodeMessage(Chunk chunk, MessagePDU message, Supplier sequenceSupplier) {
+
+ try {
+ ByteBuffer messageBuffer = ByteBuffer.wrap(PayloadConverter.toStream(message));
+ int sequenceStart = SECURE_MESSAGE_HEADER_SIZE + chunk.getSecurityHeaderSize();
+
+ // processed parts of frame
+ byte[] messageHeader = new byte[SECURE_MESSAGE_HEADER_SIZE];
+ messageBuffer.get(messageHeader);
+ byte[] securityHeader = new byte[chunk.getSecurityHeaderSize()];
+ messageBuffer.get(securityHeader);
+ byte[] sequenceHeader = new byte[SEQUENCE_HEADER_SIZE];
+ messageBuffer.get(sequenceHeader);
+
+ ByteBuffer bodyBuffer = messageBuffer.slice();
+ List messages = new ArrayList<>();
+ boolean first = true;
+ while (bodyBuffer.hasRemaining()) {
+ int bodySize = Math.min(bodyBuffer.remaining(), chunk.getMaxBodySize());
+ int paddingSize = 0;
+ if (chunk.isEncrypted()) {
+ int plainTextSize = SEQUENCE_HEADER_SIZE + bodySize + chunk.getPaddingOverhead() + chunk.getSignatureSize();
+ int gap = plainTextSize % chunk.getPlainTextBlockSize();
+ paddingSize = gap > 0 ? chunk.getPlainTextBlockSize() - gap : 0;
+ }
+
+ int plainTextContentSize = SEQUENCE_HEADER_SIZE + bodySize + chunk.getSignatureSize() + paddingSize + chunk.getPaddingOverhead();
+ if (chunk.isEncrypted()) {
+ assert ((plainTextContentSize % chunk.getPlainTextBlockSize()) == 0);
+ }
+
+ int chunkSize = SECURE_MESSAGE_HEADER_SIZE + chunk.getSecurityHeaderSize() + (plainTextContentSize / chunk.getPlainTextBlockSize()) * chunk.getCipherTextBlockSize();
+
+ WriteBufferByteBased chunkBuffer = new WriteBufferByteBased(chunkSize, ByteOrder.LITTLE_ENDIAN);
+ chunkBuffer.writeByteArray("messageHeader", messageHeader);
+ chunkBuffer.writeByteArray("securityHeader", securityHeader);
+ chunkBuffer.writeByteArray("sequenceHeader", sequenceHeader);
+ updateFrameSize(chunkBuffer, chunkSize);
+ ChunkType chunkType = bodyBuffer.remaining() - bodySize > 0 ? ChunkType.CONTINUE : ChunkType.FINAL;
+ updateFrame(first, chunkBuffer, chunk, chunkType, sequenceSupplier); // populate headers
+ first = false;
+
+ byte[] chunkContents = new byte[bodySize];
+ bodyBuffer.get(chunkContents);
+ // copy part of message not larger than body size into chunk buffer
+ chunkBuffer.writeByteArray("payload", chunkContents);
+
+ if (chunk.isEncrypted()) {
+ for (int index = 0, limit = paddingSize + chunk.getPaddingOverhead(); index < limit; index++) {
+ chunkBuffer.writeByte("padding", (byte) paddingSize);
+ }
+ if (chunk.getPaddingOverhead() > 1) {
+ // override extra padding byte with MSB of padding size
+ chunkBuffer.setPos(bodySize + paddingSize + chunk.getPaddingOverhead());
+ chunkBuffer.writeByte("paddingMSB", (byte) ((paddingSize >> 8) & 0xFF));
+ }
+ }
+
+ if (chunk.isSigned()) {
+ byte[] signatureData = sign(chunkBuffer.getBytes(0, chunkBuffer.getPos()));
+ chunkBuffer.writeByteArray("signature", signatureData);
+ }
+ if (chunk.isEncrypted()) {
+ encrypt(chunkBuffer, chunk.getSecurityHeaderSize(), chunk.getPlainTextBlockSize(),
+ chunk.getCipherTextBlockSize(), plainTextContentSize / chunk.getPlainTextBlockSize()
+ );
+ }
+
+ MessagePDU chunkedMessage = PayloadConverter.pduFromStream(chunkBuffer.getBytes(), message.getResponse());
+ messages.add(chunkedMessage);
+ }
+ return messages;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final MessagePDU decodeMessage(Chunk chunk, MessagePDU message) {
+ try {
+ if (!chunk.isEncrypted() && !chunk.isSigned()) {
+ return message;
+ }
+
+ int messageLength = message.getLengthInBytes();
+ WriteBufferByteBased chunkBuffer = new WriteBufferByteBased(messageLength, ByteOrder.LITTLE_ENDIAN);
+ message.serialize(chunkBuffer);
+
+ int bodySize = messageLength - chunk.getSecurityHeaderSize() - SECURE_MESSAGE_HEADER_SIZE;
+ if (chunk.isEncrypted()) {
+ bodySize = decrypt(chunkBuffer, chunk, messageLength);
+ }
+
+ if (chunk.isSigned()) {
+ verify(chunkBuffer, chunk, messageLength);
+ }
+
+ int encryptionOverhead = getEncryptionOverhead(chunk, messageLength);
+ int paddingSize = getPaddingSize(chunkBuffer, chunk, messageLength);
+
+ int payloadStart = SECURE_MESSAGE_HEADER_SIZE + chunk.getSecurityHeaderSize();
+ int payloadEnd = payloadStart + bodySize - paddingSize - chunk.getSignatureSize() - chunk.getPaddingOverhead();
+ int expectedPaddingSize = messageLength - payloadEnd - chunk.getSignatureSize() - encryptionOverhead - chunk.getPaddingOverhead();
+
+ if (paddingSize != expectedPaddingSize) {
+ throw new IllegalArgumentException("Malformed data detected - expected padding size do not match");
+ }
+
+ if (chunk.isEncrypted()) {
+ byte[] paddingBytes = chunkBuffer.getBytes(payloadEnd, payloadEnd + expectedPaddingSize);
+ byte paddingByte = (byte) (paddingSize & 0xff);
+ for (int index = 0; index < paddingBytes.length; index++) {
+ if (paddingBytes[index] != paddingByte) {
+ throw new IllegalArgumentException("Malformed padding byte at index " + index);
+ }
+ }
+ }
+
+ int overhead = paddingSize + chunk.getSignatureSize() + chunk.getPaddingOverhead() + encryptionOverhead;
+ updateFrameSize(chunkBuffer, messageLength - overhead);
+
+ return PayloadConverter.pduFromStream(chunkBuffer.getBytes(), message.getResponse());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void updateFrame(boolean first, WriteBufferByteBased messageBuffer, Chunk chunk, ChunkType chunkType, Supplier sequenceSupplier) throws SerializationException {
+ int payloadStart = SECURE_MESSAGE_HEADER_SIZE + chunk.getSecurityHeaderSize();
+ if (chunkType != ChunkType.FINAL) {
+ messageBuffer.setPos(3);
+ messageBuffer.writeString("chunkType", 8, chunkType.getValue());
+ }
+
+ if (!first) {
+ messageBuffer.setPos(payloadStart);
+ messageBuffer.writeUnsignedLong("sequenceId", 32, sequenceSupplier.get());
+ }
+
+ // leave buffer at beginning of message body
+ messageBuffer.setPos(payloadStart + 8);
+ }
+
+ private void updateFrameSize(WriteBufferByteBased messageBuffer, long frameSize) throws SerializationException {
+ int position = messageBuffer.getPos();
+ try {
+ messageBuffer.setPos(4);
+ messageBuffer.writeUnsignedLong("totalLength", 32, frameSize);
+ } finally {
+ messageBuffer.setPos(position);
+ }
+ }
+
+ private int getEncryptionOverhead(Chunk chunk, int messageLength) {
+ if (!chunk.isEncrypted()) {
+ return 0;
+ }
+
+ int bodyStart = SECURE_MESSAGE_HEADER_SIZE + chunk.getSecurityHeaderSize();
+ int bodySize = messageLength - bodyStart;
+ int blockCount = bodySize / chunk.getCipherTextBlockSize();
+ // bytes we "lost" after payload got decrypted
+ return (chunk.getCipherTextBlockSize() * blockCount) - (chunk.getPlainTextBlockSize() * blockCount);
+ }
+
+ private short getPaddingSize(WriteBufferByteBased chunkBuffer, Chunk chunk, int messageLength) {
+ if (!chunk.isEncrypted()) {
+ return 0;
+ }
+
+ int bodyStart = SECURE_MESSAGE_HEADER_SIZE + chunk.getSecurityHeaderSize();
+ int bodySize = messageLength - bodyStart;
+ int blockCount = bodySize / chunk.getCipherTextBlockSize();
+ // bytes we "lost" after payload got decrypted
+ int encryptionOverhead = (chunk.getCipherTextBlockSize() * blockCount) - (chunk.getPlainTextBlockSize() * blockCount);
+
+ int paddingEnd = messageLength - chunk.getSignatureSize() - encryptionOverhead - chunk.getPaddingOverhead();
+ byte[] padding = chunkBuffer.getBytes(paddingEnd, paddingEnd + chunk.getPaddingOverhead());
+ if (padding.length > 2) { // cipher block size exceeds 256 bytes
+ return (short)(((padding[1] & 0xFF) << 8) | (padding[0] & 0xFF));
+ }
+ return padding[0];
+ }
+
+ protected abstract void verify(WriteBufferByteBased buffer, Chunk chunk, int messageLength) throws Exception;
+
+ protected abstract int decrypt(WriteBufferByteBased chunkBuffer, Chunk chunk, int messageLength) throws Exception;
+
+ protected abstract void encrypt(WriteBufferByteBased buffer, int securityHeaderSize, int plainTextBlockSize, int cipherTextBlockSize, int blockCount) throws Exception;
+
+ protected abstract byte[] sign(byte[] contentsToSign) throws GeneralSecurityException;
+
+}
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/CallContext.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/CallContext.java
new file mode 100644
index 00000000000..266d9531fa6
--- /dev/null
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/CallContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.opcua.context;
+
+import java.util.function.Supplier;
+import org.apache.plc4x.java.opcua.protocol.chunk.ChunkStorage;
+import org.apache.plc4x.java.opcua.readwrite.SecurityHeader;
+
+public class CallContext {
+
+ private final SecurityHeader sequenceHeader;
+ private final Supplier sequenceSupplier;
+ private final int requestId;
+
+ public CallContext(SecurityHeader sequenceHeader, Supplier sequenceSupplier, int requestId) {
+ this.sequenceHeader = sequenceHeader;
+ this.sequenceSupplier = sequenceSupplier;
+ this.requestId = requestId;
+ }
+
+ public SecurityHeader getSecurityHeader() {
+ return sequenceHeader;
+ }
+
+ public int getNextSequenceNumber() {
+ return sequenceSupplier.get();
+ }
+
+ public int getRequestId() {
+ return requestId;
+ }
+}
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/CertificateKeyPair.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/CertificateKeyPair.java
index 9133a68bf44..29caf9fd557 100644
--- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/CertificateKeyPair.java
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/CertificateKeyPair.java
@@ -19,6 +19,8 @@
package org.apache.plc4x.java.opcua.context;
import io.vavr.control.Try;
+import java.security.GeneralSecurityException;
+import java.security.PrivateKey;
import org.bouncycastle.asn1.x509.GeneralName;
import java.security.KeyPair;
@@ -34,7 +36,7 @@ public class CertificateKeyPair {
private final X509Certificate certificate;
private final byte[] thumbprint;
- public CertificateKeyPair(KeyPair keyPair, X509Certificate certificate) throws Exception {
+ public CertificateKeyPair(KeyPair keyPair, X509Certificate certificate) throws GeneralSecurityException {
this.keyPair = keyPair;
this.certificate = certificate;
MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
@@ -49,6 +51,10 @@ public X509Certificate getCertificate() {
return certificate;
}
+ public PrivateKey getPrivateKey() {
+ return keyPair.getPrivate();
+ }
+
public byte[] getThumbPrint() {
return thumbprint;
}
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/Conversation.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/Conversation.java
new file mode 100644
index 00000000000..d398d76951e
--- /dev/null
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/Conversation.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.opcua.context;
+
+import static org.apache.plc4x.java.opcua.readwrite.ChunkType.ABORT;
+import static org.apache.plc4x.java.opcua.readwrite.ChunkType.FINAL;
+
+import java.security.GeneralSecurityException;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+import org.apache.plc4x.java.opcua.config.Limits;
+import org.apache.plc4x.java.opcua.config.OpcuaConfiguration;
+import org.apache.plc4x.java.opcua.protocol.chunk.ChunkStorage;
+import org.apache.plc4x.java.opcua.protocol.chunk.MemoryChunkStorage;
+import org.apache.plc4x.java.opcua.readwrite.BinaryPayload;
+import org.apache.plc4x.java.opcua.readwrite.ChunkType;
+import org.apache.plc4x.java.opcua.readwrite.ExpandedNodeId;
+import org.apache.plc4x.java.opcua.readwrite.ExtensiblePayload;
+import org.apache.plc4x.java.opcua.readwrite.ExtensionObject;
+import org.apache.plc4x.java.opcua.readwrite.ExtensionObjectDefinition;
+import org.apache.plc4x.java.opcua.readwrite.ExtensionObjectEncodingMask;
+import org.apache.plc4x.java.opcua.readwrite.MessagePDU;
+import org.apache.plc4x.java.opcua.readwrite.NodeId;
+import org.apache.plc4x.java.opcua.readwrite.NodeIdFourByte;
+import org.apache.plc4x.java.opcua.readwrite.NodeIdTwoByte;
+import org.apache.plc4x.java.opcua.readwrite.NodeIdTypeDefinition;
+import org.apache.plc4x.java.opcua.readwrite.NullExtension;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaAPU;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaAcknowledgeResponse;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaCloseRequest;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaConstants;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaHelloRequest;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaMessageRequest;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaMessageResponse;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaOpenRequest;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaOpenResponse;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaProtocolLimits;
+import org.apache.plc4x.java.opcua.readwrite.OpcuaStatusCode;
+import org.apache.plc4x.java.opcua.readwrite.PascalString;
+import org.apache.plc4x.java.opcua.readwrite.Payload;
+import org.apache.plc4x.java.opcua.readwrite.RequestHeader;
+import org.apache.plc4x.java.opcua.readwrite.ResponseHeader;
+import org.apache.plc4x.java.opcua.readwrite.SecurityHeader;
+import org.apache.plc4x.java.opcua.readwrite.SequenceHeader;
+import org.apache.plc4x.java.opcua.readwrite.ServiceFault;
+import org.apache.plc4x.java.opcua.readwrite.SignatureData;
+import org.apache.plc4x.java.opcua.security.MessageSecurity;
+import org.apache.plc4x.java.opcua.security.SecurityPolicy;
+import org.apache.plc4x.java.spi.ConversationContext;
+import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
+import org.apache.plc4x.java.spi.generation.ParseException;
+import org.apache.plc4x.java.spi.generation.ReadBufferByteBased;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Conversation {
+ private static final long EPOCH_OFFSET = 116444736000000000L; //Offset between OPC UA epoch time and linux epoch time.
+
+ private static final ExpandedNodeId NULL_EXPANDED_NODE_ID = new ExpandedNodeId(false,
+ false,
+ new NodeIdTwoByte((short) 0),
+ null,
+ null
+ );
+
+ protected static final ExtensionObject NULL_EXTENSION_OBJECT = new ExtensionObject(
+ NULL_EXPANDED_NODE_ID,
+ new ExtensionObjectEncodingMask(false, false, false),
+ new NullExtension()); // Body
+
+
+ private final Logger logger = LoggerFactory.getLogger(Conversation.class);
+ private final AtomicReference securityHeader = new AtomicReference<>(new SecurityHeader(1, 1));
+ private final AtomicLong senderSequenceNumber = new AtomicLong(-1);
+
+ private final AtomicReference authenticationToken = new AtomicReference<>(new NodeIdTwoByte((short) 0));
+
+ private final ConversationContext context;
+ private final SecureChannelTransactionManager tm;
+
+ private final SecurityPolicy securityPolicy;
+ private final MessageSecurity messageSecurity;
+ private final EncryptionHandler encryptionHandler;
+ private final OpcuaDriverContext driverContext;
+ private final OpcuaConfiguration configuration;
+
+ private OpcuaProtocolLimits limits;
+
+ private X509Certificate localCertificate = null;
+ private X509Certificate remoteCertificate = null;
+ private byte[] remoteNonce;
+ private byte[] localNonce;
+
+ private final BiPredicate> sequenceValidator = (sequenceHeader, callback) -> {
+ if (senderSequenceNumber.get() == -1L) {
+ senderSequenceNumber.set(sequenceHeader.getSequenceNumber());
+ return true;
+ }
+ int expectedSequence = sequenceHeader.getSequenceNumber() - 1;
+ if (!senderSequenceNumber.compareAndSet(expectedSequence, sequenceHeader.getSequenceNumber())) {
+ callback.completeExceptionally(
+ new PlcProtocolException("Lost sequence, expected " + expectedSequence + " but received " + sequenceHeader.getSequenceNumber())
+ );
+ return false;
+ }
+ return true;
+ };
+
+ public Conversation(ConversationContext context, OpcuaDriverContext driverContext, OpcuaConfiguration configuration) {
+ this.context = context;
+ this.tm = new SecureChannelTransactionManager();
+ this.driverContext = driverContext;
+ this.configuration = configuration;
+
+ this.securityPolicy = determineSecurityPolicy(configuration);
+ CertificateKeyPair senderKeyPair = driverContext.getCertificateKeyPair();
+
+ if (this.securityPolicy != SecurityPolicy.NONE) {
+ //Sender Certificate gets populated during the 'discover' phase when encryption is enabled.
+ this.messageSecurity = configuration.getMessageSecurity();
+ this.remoteCertificate = configuration.getServerCertificate();
+ this.encryptionHandler = new EncryptionHandler(this, senderKeyPair.getPrivateKey());
+ this.localCertificate = senderKeyPair.getCertificate();
+ this.localNonce = createNonce();
+ } else {
+ this.messageSecurity = MessageSecurity.NONE;
+ this.encryptionHandler = new EncryptionHandler(this, null);
+ }
+
+ Limits encodingLimits = configuration.getEncodingLimits();
+ limits = new OpcuaProtocolLimits(
+ encodingLimits.getReceiveBufferSize(),
+ encodingLimits.getSendBufferSize(),
+ encodingLimits.getMaxMessageSize(),
+ encodingLimits.getMaxChunkCount()
+ );
+ }
+
+ public CompletableFuture requestHello() {
+ logger.debug("Sending hello message to {}", this.driverContext.getEndpoint());
+ OpcuaHelloRequest request = new OpcuaHelloRequest(FINAL,
+ OpcuaConstants.PROTOCOLVERSION,
+ new OpcuaProtocolLimits(
+ limits.getReceiveBufferSize(),
+ limits.getSendBufferSize(),
+ limits.getMaxMessageSize(),
+ limits.getMaxChunkCount()
+ ),
+ new PascalString(driverContext.getEndpoint())
+ );
+
+ // open messages are guaranteed to fit into 8192 bytes limit
+ //CompletableFuture future = new CompletableFuture<>();
+
+ CompletableFuture future = new CompletableFuture<>();
+ sendRequest(request, future, configuration.getNegotiationTimeout())
+ .unwrap(OpcuaAPU::getMessage)
+ .check(OpcuaAcknowledgeResponse.class::isInstance)
+ .unwrap(OpcuaAcknowledgeResponse.class::cast)
+ .handle(opcuaAcknowledgeResponse -> {
+ OpcuaProtocolLimits limits = opcuaAcknowledgeResponse.getLimits();
+ // merge encoding limits to match common minimum:
+ // our receipt buffer should not exceed server send buffer size,
+ // our send buffer size should not exceed server receive buffer size
+ // chunks and message sizes should match too
+ this.limits = new OpcuaProtocolLimits(
+ Math.min(this.limits.getReceiveBufferSize(), limits.getSendBufferSize()),
+ Math.min(this.limits.getSendBufferSize(), limits.getReceiveBufferSize()),
+ Math.min(this.limits.getMaxMessageSize(), limits.getMaxMessageSize()),
+ Math.min(this.limits.getMaxChunkCount(), limits.getMaxChunkCount())
+ );
+ future.complete(opcuaAcknowledgeResponse);
+ });
+ return future;
+ }
+
+ public CompletableFuture requestChannelOpen(Function request) {
+ return request(
+ OpcuaOpenResponse.class, request,
+ (rsp, chunk) -> new OpcuaOpenResponse(rsp.getChunk(), rsp.getOpenResponse(), chunk),
+ (rsp) -> rsp.getMessage().getSequenceHeader(),
+ OpcuaOpenResponse::getMessage
+ );
+ }
+
+ public CompletableFuture requestChannelClose(Function request) {
+ logger.trace("Got close secure channel request");
+ return request(
+ OpcuaMessageResponse.class, request,
+ (rsp, chunk) -> new OpcuaMessageResponse(rsp.getChunk(), rsp.getSecurityHeader(), chunk),
+ (rsp) -> rsp.getMessage().getSequenceHeader(),
+ OpcuaMessageResponse::getMessage
+ ).whenComplete((r, e) -> {
+ context.fireDisconnected();
+ }).thenApply(r -> null);
+ }
+
+ private CompletableFuture request(
+ Class replyType, Function request,
+ BiFunction chunkAssembler,
+ Function sequenceHeaderExtractor,
+ Function chunkExtractor
+ ) {
+ int requestId = tm.getTransactionIdentifier();
+ logger.debug("Firing request {}", requestId);
+ T messagePDU = request.apply(
+ new CallContext(securityHeader.get(), tm.getSequenceSupplier(), requestId)
+ );
+
+ MemoryChunkStorage chunkStorage = new MemoryChunkStorage();
+ List chunks = encryptionHandler.encodeMessage(messagePDU, tm.getSequenceSupplier());
+ CompletableFuture future = new CompletableFuture<>();
+ for (int count = chunks.size(), index = 0; index < count; index++) {
+ boolean last = index + 1 == count;
+ if (last) {
+ sendRequest(chunks.get(index), future, configuration.getNegotiationTimeout())
+ .unwrap(OpcuaAPU::getMessage)
+ .check(replyType::isInstance)
+ .unwrap(replyType::cast)
+ .unwrap(msg -> encryptionHandler.decodeMessage(msg))
+ .check(replyType::isInstance)
+ .unwrap(replyType::cast)
+ .check(reply -> requestId == sequenceHeaderExtractor.apply(reply).getRequestId())
+ .check(reply -> sequenceValidator.test(sequenceHeaderExtractor.apply(reply), future))
+ .check(msg -> accumulateChunkUntilFinal(chunkStorage, msg.getChunk(), chunkExtractor.apply(msg)))
+ .unwrap(msg -> mergeChunks(chunkStorage, msg, sequenceHeaderExtractor.apply(msg), chunkAssembler))
+ .handle(response -> {
+ future.complete(response);
+ });
+ } else {
+ context.sendToWire(new OpcuaAPU(chunks.get(index)));
+ }
+ }
+ return future;
+ }
+
+ public CompletableFuture submit(T object, Class replyType) {
+ return submit(object).thenApply(response -> {
+ if (replyType.isInstance(response)) {
+ return replyType.cast(response);
+ }
+ throw new IllegalStateException("Received reply of unexpected type " + response.getClass().getName() + " while " + replyType.getName() + " has been expected");
+ });
+ }
+
+ private CompletableFuture