diff --git a/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java b/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java deleted file mode 100644 index 548fab0f..00000000 --- a/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java +++ /dev/null @@ -1,179 +0,0 @@ -package org.apache.activemq.transport.tcp; - -import java.io.IOException; -import java.net.Socket; -import java.net.SocketException; -import java.net.URI; -import java.net.UnknownHostException; -import java.security.cert.X509Certificate; -import java.util.Collections; - -import javax.net.ssl.SNIHostName; -import javax.net.ssl.SSLParameters; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; - -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.wireformat.WireFormat; -public class SslTransport extends TcpTransport { - - /** - * Default to null as there are different defaults between server and client, initialiseSocket - * for more details - */ - private Boolean verifyHostName = null; - - /** - * Connect to a remote node such as a Broker. - * - * @param wireFormat The WireFormat to be used. - * @param socketFactory The socket factory to be used. Forcing SSLSockets - * for obvious reasons. - * @param remoteLocation The remote location. - * @param localLocation The local location. - * @param needClientAuth If set to true, the underlying socket will need - * client certificate authentication. - * @throws UnknownHostException If TcpTransport throws. - * @throws IOException If TcpTransport throws. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException { - super(wireFormat, socketFactory, remoteLocation, localLocation); - if (this.socket != null) { - ((SSLSocket)this.socket).setNeedClientAuth(needClientAuth); - } - } - - @Override - protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { - /** - * This needs to default to null because this transport class is used for both a server transport - * and a client connection and we have different defaults for both. - * If we default it to a value it might override the transport server setting - * that was configured inside TcpTransportServer (which sets a default to false for server side) - * - * The idea here is that if this is a server transport then verifyHostName will be set by the setter - * and not be null as TcpTransportServer will set a default value of false (or a user will set it - * using transport.verifyHostName) but if this is a client connection the value will be null by default - * and will stay null if the user uses socket.verifyHostName to set the value or doesn't use the setter - * If it is null then we can check socketOptions for the value and if not set there then we can - * just set a default of true as this will be a client - * - * Unfortunately we have to do this to stay consistent because every other SSL option on the client - * side can be configured using socket. but this particular option isn't actually part of the socket - * so it makes it tricky from a user standpoint. For consistency sake I think it makes sense to allow - * using the socket. prefix that has been established so users do not get confused (as well as - * allow using no prefix which just calls the setter directly) - * - * Because of this there are actually two ways a client can configure this value, the client can either use - * socket.verifyHostName= as mentioned or just simply use verifyHostName= without using the socket. - * prefix and that will also work as the value will be set using the setter on the transport - * - * example server transport config: - * ssl://localhost:61616?transport.verifyHostName=true - * - * example from client: - * ssl://localhost:61616?verifyHostName=true - * OR - * ssl://localhost:61616?socket.verifyHostName=true - * - */ - if (verifyHostName == null) { - //Check to see if the user included the value as part of socket options and if so then use that value - if (socketOptions != null && socketOptions.containsKey("verifyHostName")) { - verifyHostName = Boolean.parseBoolean(socketOptions.get("verifyHostName").toString()); - socketOptions.remove("verifyHostName"); - } else { - //If null and not set then this is a client so default to true - verifyHostName = true; - } - } - - // Lets try to configure the SSL SNI field. Handy in case your using - // a single proxy to route to different messaging apps. - final SSLParameters sslParams = new SSLParameters(); - if (remoteLocation != null) { - sslParams.setServerNames(Collections.singletonList(new SNIHostName(remoteLocation.getHost()))); - } - - if (verifyHostName) { - sslParams.setEndpointIdentificationAlgorithm("HTTPS"); - } - - if (remoteLocation != null || verifyHostName) { - // AMQ-8445 only set SSLParameters if it has been populated before - ((SSLSocket) this.socket).setSSLParameters(sslParams); - } - - super.initialiseSocket(sock); - } - - /** - * Initialize from a ServerSocket. No access to needClientAuth is given - * since it is already set within the provided socket. - * - * @param wireFormat The WireFormat to be used. - * @param socket The Socket to be used. Forcing SSL. - * @throws IOException If TcpTransport throws. - */ - public SslTransport(WireFormat wireFormat, SSLSocket socket) throws IOException { - super(wireFormat, socket); - } - - public SslTransport(WireFormat format, SSLSocket socket, - InitBuffer initBuffer) throws IOException { - super(format, socket, initBuffer); - } - - /** - * Overriding in order to add the client's certificates to ConnectionInfo - * Commmands. - * - * @param command The Command coming in. - */ - @Override - public void doConsume(Object command) { - // The instanceof can be avoided, but that would require modifying the - // Command clas tree and that would require too much effort right - // now. - if (command instanceof ConnectionInfo) { - ConnectionInfo connectionInfo = (ConnectionInfo)command; - connectionInfo.setTransportContext(getPeerCertificates()); - } - super.doConsume(command); - } - - public void setVerifyHostName(Boolean verifyHostName) { - this.verifyHostName = verifyHostName; - } - - /** - * @return peer certificate chain associated with the ssl socket - */ - @Override - public X509Certificate[] getPeerCertificates() { - - SSLSocket sslSocket = (SSLSocket)this.socket; - - SSLSession sslSession = sslSocket.getSession(); - - X509Certificate[] clientCertChain; - try { - clientCertChain = (X509Certificate[])sslSession.getPeerCertificates(); - } catch (SSLPeerUnverifiedException e) { - clientCertChain = null; - } - - return clientCertChain; - } - - /** - * @return pretty print of 'this' - */ - @Override - public String toString() { - return "ssl://" + socket.getInetAddress() + ":" + socket.getPort(); - } -} \ No newline at end of file diff --git a/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java deleted file mode 100644 index 069deeb9..00000000 --- a/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ /dev/null @@ -1,767 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.tcp; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.security.cert.X509Certificate; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.SocketFactory; - -import org.apache.activemq.Service; -import org.apache.activemq.TransportLoggerSupport; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportThreadSupport; -import org.apache.activemq.util.InetAddressUtil; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.wireformat.WireFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of the {@link Transport} interface using raw tcp/ip - */ -public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); - - protected final URI remoteLocation; - protected final URI localLocation; - protected final WireFormat wireFormat; - - protected int connectionTimeout = 30000; - protected int soTimeout; - protected int socketBufferSize = 64 * 1024; - protected int ioBufferSize = 8 * 1024; - protected boolean closeAsync=true; - protected Socket socket; - protected DataOutputStream dataOut; - protected DataInputStream dataIn; - protected TimeStampStream buffOut = null; - - protected final InitBuffer initBuffer; - - /** - * The Traffic Class to be set on the socket. - */ - protected int trafficClass = 0; - /** - * Keeps track of attempts to set the Traffic Class on the socket. - */ - private boolean trafficClassSet = false; - /** - * Prevents setting both the Differentiated Services and Type of Service - * transport options at the same time, since they share the same spot in - * the TCP/IP packet headers. - */ - protected boolean diffServChosen = false; - protected boolean typeOfServiceChosen = false; - /** - * trace=true -> the Transport stack where this TcpTransport - * object will be, will have a TransportLogger layer - * trace=false -> the Transport stack where this TcpTransport - * object will be, will NOT have a TransportLogger layer, and therefore - * will never be able to print logging messages. - * This parameter is most probably set in Connection or TransportConnector URIs. - */ - protected boolean trace = false; - /** - * Name of the LogWriter implementation to use. - * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. - * This parameter is most probably set in Connection or TransportConnector URIs. - */ - protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; - /** - * Specifies if the TransportLogger will be manageable by JMX or not. - * Also, as long as there is at least 1 TransportLogger which is manageable, - * a TransportLoggerControl MBean will me created. - */ - protected boolean dynamicManagement = false; - /** - * startLogging=true -> the TransportLogger object of the Transport stack - * will initially write messages to the log. - * startLogging=false -> the TransportLogger object of the Transport stack - * will initially NOT write messages to the log. - * This parameter only has an effect if trace == true. - * This parameter is most probably set in Connection or TransportConnector URIs. - */ - protected boolean startLogging = true; - /** - * Specifies the port that will be used by the JMX server to manage - * the TransportLoggers. - * This should only be set in an URI by a client (producer or consumer) since - * a broker will already create a JMX server. - * It is useful for people who test a broker and clients in the same machine - * and want to control both via JMX; a different port will be needed. - */ - protected int jmxPort = 1099; - protected boolean useLocalHost = false; - protected int minmumWireFormatVersion; - protected SocketFactory socketFactory; - protected final AtomicReference stoppedLatch = new AtomicReference(); - protected volatile int receiveCounter; - - protected Map socketOptions; - private int soLinger = Integer.MIN_VALUE; - private Boolean keepAlive; - private Boolean tcpNoDelay; - private Thread runnerThread; - - /** - * Connect to a remote Node - e.g. a Broker - * - * @param wireFormat - * @param socketFactory - * @param remoteLocation - * @param localLocation - e.g. local InetAddress and local port - * @throws IOException - * @throws UnknownHostException - */ - public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, - URI localLocation) throws UnknownHostException, IOException { - this.wireFormat = wireFormat; - this.socketFactory = socketFactory; - try { - this.socket = socketFactory.createSocket(); - } catch (SocketException e) { - this.socket = null; - } - this.remoteLocation = remoteLocation; - this.localLocation = localLocation; - this.initBuffer = null; - setDaemon(false); - } - - /** - * Initialize from a server Socket - * - * @param wireFormat - * @param socket - * @throws IOException - */ - public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { - this(wireFormat, socket, null); - } - - public TcpTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException { - this.wireFormat = wireFormat; - this.socket = socket; - this.remoteLocation = null; - this.localLocation = null; - this.initBuffer = initBuffer; - setDaemon(true); - } - - /** - * A one way asynchronous send - */ - @Override - public void oneway(Object command) throws IOException { - checkStarted(); - wireFormat.marshal(command, dataOut); - dataOut.flush(); - } - - /** - * @return pretty print of 'this' - */ - @Override - public String toString() { - return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort() - : (localLocation != null ? localLocation : remoteLocation)) ; - } - - /** - * reads packets from a Socket - */ - @Override - public void run() { - LOG.trace("TCP consumer thread for " + this + " starting"); - this.runnerThread=Thread.currentThread(); - try { - while (!isStopped() && !isStopping()) { - doRun(); - } - } catch (IOException e) { - stoppedLatch.get().countDown(); - onException(e); - } catch (Throwable e){ - stoppedLatch.get().countDown(); - IOException ioe=new IOException("Unexpected error occurred: " + e); - ioe.initCause(e); - onException(ioe); - }finally { - stoppedLatch.get().countDown(); - } - } - - protected void doRun() throws IOException { - try { - Object command = readCommand(); - doConsume(command); - } catch (SocketTimeoutException e) { - } catch (InterruptedIOException e) { - } - } - - protected Object readCommand() throws IOException { - return wireFormat.unmarshal(dataIn); - } - - // Properties - // ------------------------------------------------------------------------- - public String getDiffServ() { - // This is the value requested by the user by setting the Tcp Transport - // options. If the socket hasn't been created, then this value may not - // reflect the value returned by Socket.getTrafficClass(). - return Integer.toString(this.trafficClass); - } - - public void setDiffServ(String diffServ) throws IllegalArgumentException { - this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ); - this.diffServChosen = true; - } - - public int getTypeOfService() { - // This is the value requested by the user by setting the Tcp Transport - // options. If the socket hasn't been created, then this value may not - // reflect the value returned by Socket.getTrafficClass(). - return this.trafficClass; - } - - public void setTypeOfService(int typeOfService) { - this.trafficClass = QualityOfServiceUtils.getToS(typeOfService); - this.typeOfServiceChosen = true; - } - - public boolean isTrace() { - return trace; - } - - public void setTrace(boolean trace) { - this.trace = trace; - } - - public String getLogWriterName() { - return logWriterName; - } - - public void setLogWriterName(String logFormat) { - this.logWriterName = logFormat; - } - - public boolean isDynamicManagement() { - return dynamicManagement; - } - - public void setDynamicManagement(boolean useJmx) { - this.dynamicManagement = useJmx; - } - - public boolean isStartLogging() { - return startLogging; - } - - public void setStartLogging(boolean startLogging) { - this.startLogging = startLogging; - } - - public int getJmxPort() { - return jmxPort; - } - - public void setJmxPort(int jmxPort) { - this.jmxPort = jmxPort; - } - - public int getMinmumWireFormatVersion() { - return minmumWireFormatVersion; - } - - public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { - this.minmumWireFormatVersion = minmumWireFormatVersion; - } - - public boolean isUseLocalHost() { - return useLocalHost; - } - - /** - * Sets whether 'localhost' or the actual local host name should be used to - * make local connections. On some operating systems such as Macs its not - * possible to connect as the local host name so localhost is better. - */ - public void setUseLocalHost(boolean useLocalHost) { - this.useLocalHost = useLocalHost; - } - - public int getSocketBufferSize() { - return socketBufferSize; - } - - /** - * Sets the buffer size to use on the socket - */ - public void setSocketBufferSize(int socketBufferSize) { - this.socketBufferSize = socketBufferSize; - } - - public int getSoTimeout() { - return soTimeout; - } - - /** - * Sets the socket timeout - */ - public void setSoTimeout(int soTimeout) { - this.soTimeout = soTimeout; - } - - public int getConnectionTimeout() { - return connectionTimeout; - } - - /** - * Sets the timeout used to connect to the socket - */ - public void setConnectionTimeout(int connectionTimeout) { - this.connectionTimeout = connectionTimeout; - } - - public Boolean getKeepAlive() { - return keepAlive; - } - - /** - * Enable/disable TCP KEEP_ALIVE mode - */ - public void setKeepAlive(Boolean keepAlive) { - this.keepAlive = keepAlive; - } - - /** - * Enable/disable soLinger - * @param soLinger enabled if > -1, disabled if == -1, system default otherwise - */ - public void setSoLinger(int soLinger) { - this.soLinger = soLinger; - } - - public int getSoLinger() { - return soLinger; - } - - public Boolean getTcpNoDelay() { - return tcpNoDelay; - } - - /** - * Enable/disable the TCP_NODELAY option on the socket - */ - public void setTcpNoDelay(Boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - /** - * @return the ioBufferSize - */ - public int getIoBufferSize() { - return this.ioBufferSize; - } - - /** - * @param ioBufferSize the ioBufferSize to set - */ - public void setIoBufferSize(int ioBufferSize) { - this.ioBufferSize = ioBufferSize; - } - - /** - * @return the closeAsync - */ - public boolean isCloseAsync() { - return closeAsync; - } - - /** - * @param closeAsync the closeAsync to set - */ - public void setCloseAsync(boolean closeAsync) { - this.closeAsync = closeAsync; - } - - // Implementation methods - // ------------------------------------------------------------------------- - protected String resolveHostName(String host) throws UnknownHostException { - if (isUseLocalHost()) { - String localName = InetAddressUtil.getLocalHostName(); - if (localName != null && localName.equals(host)) { - return "localhost"; - } - } - return host; - } - - /** - * Configures the socket for use - * - * @param sock the socket - * @throws SocketException, IllegalArgumentException if setting the options - * on the socket failed. - */ - protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { - if (socketOptions != null) { - // copy the map as its used values is being removed when calling setProperties - // and we need to be able to set the options again in case socket is re-initailized - Map copy = new HashMap(socketOptions); - IntrospectionSupport.setProperties(socket, copy); - if (!copy.isEmpty()) { - throw new IllegalArgumentException("Invalid socket parameters: " + copy); - } - } - - try { - //only positive values are legal - if (socketBufferSize > 0) { - sock.setReceiveBufferSize(socketBufferSize); - sock.setSendBufferSize(socketBufferSize); - } else { - LOG.warn("Socket buffer size was set to {}; Skipping this setting as the size must be a positive number.", socketBufferSize); - } - } catch (SocketException se) { - LOG.warn("Cannot set socket buffer size = " + socketBufferSize); - LOG.debug("Cannot set socket buffer size. Reason: " + se.getMessage() + ". This exception is ignored.", se); - } - sock.setSoTimeout(soTimeout); - - if (keepAlive != null) { - sock.setKeepAlive(keepAlive.booleanValue()); - } - - if (soLinger > -1) { - sock.setSoLinger(true, soLinger); - } else if (soLinger == -1) { - sock.setSoLinger(false, 0); - } - if (tcpNoDelay != null) { - sock.setTcpNoDelay(tcpNoDelay.booleanValue()); - } - if (!this.trafficClassSet) { - this.trafficClassSet = setTrafficClass(sock); - } - } - - @Override - protected void doStart() throws Exception { - connect(); - stoppedLatch.set(new CountDownLatch(1)); - super.doStart(); - } - - protected void connect() throws Exception { - - if (socket == null && socketFactory == null) { - throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); - } - - InetSocketAddress localAddress = null; - InetSocketAddress remoteAddress = null; - - if (localLocation != null) { - localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), - localLocation.getPort()); - } - - if (remoteLocation != null) { - String host = resolveHostName(remoteLocation.getHost()); - remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); - } - // Set the traffic class before the socket is connected when possible so - // that the connection packets are given the correct traffic class. - this.trafficClassSet = setTrafficClass(socket); - - if (socket != null) { - - if (localAddress != null) { - socket.bind(localAddress); - } - - // If it's a server accepted socket.. we don't need to connect it - // to a remote address. - if (remoteAddress != null) { - if (connectionTimeout >= 0) { - socket.connect(remoteAddress, connectionTimeout); - } else { - socket.connect(remoteAddress); - } - } - - } else { - // For SSL sockets.. you can't create an unconnected socket :( - // This means the timout option are not supported either. - if (localAddress != null) { - socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), - localAddress.getAddress(), localAddress.getPort()); - } else { - socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); - } - } - - initialiseSocket(socket); - initializeStreams(); - } - - @Override - protected void doStop(ServiceStopper stopper) throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping transport " + this); - } - - // Closing the streams flush the sockets before closing.. if the socket - // is hung.. then this hangs the close. - // closeStreams(); - if (socket != null) { - if (closeAsync) { - //closing the socket can hang also - final CountDownLatch latch = new CountDownLatch(1); - - // need a async task for this - final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory(); - taskRunnerFactory.execute(new Runnable() { - @Override - public void run() { - LOG.trace("Closing socket {}", socket); - try { - socket.close(); - LOG.debug("Closed socket {}", socket); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); - } - } finally { - latch.countDown(); - } - } - }); - - try { - latch.await(1,TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - taskRunnerFactory.shutdownNow(); - } - - } else { - // close synchronously - LOG.trace("Closing socket {}", socket); - try { - socket.close(); - LOG.debug("Closed socket {}", socket); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); - } - } - } - } - } - - /** - * Override so that stop() blocks until the run thread is no longer running. - */ - @Override - public void stop() throws Exception { - super.stop(); - CountDownLatch countDownLatch = stoppedLatch.get(); - if (countDownLatch != null && Thread.currentThread() != this.runnerThread) { - countDownLatch.await(1,TimeUnit.SECONDS); - } - } - - protected void initializeStreams() throws Exception { - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { - @Override - public int read() throws IOException { - receiveCounter++; - return super.read(); - } - @Override - public int read(byte[] b, int off, int len) throws IOException { - receiveCounter++; - return super.read(b, off, len); - } - @Override - public long skip(long n) throws IOException { - receiveCounter++; - return super.skip(n); - } - @Override - protected void fill() throws IOException { - receiveCounter++; - super.fill(); - } - }; - //Unread the initBuffer that was used for protocol detection if it exists - //so the stream can start over - if (initBuffer != null) { - buffIn.unread(initBuffer.buffer.array()); - } - this.dataIn = new DataInputStream(buffIn); - TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); - this.dataOut = new DataOutputStream(outputStream); - this.buffOut = outputStream; - - } - - protected void closeStreams() throws IOException { - if (dataOut != null) { - dataOut.close(); - } - if (dataIn != null) { - dataIn.close(); - } - } - - public void setSocketOptions(Map socketOptions) { - this.socketOptions = new HashMap(socketOptions); - } - - @Override - public String getRemoteAddress() { - if (socket != null) { - SocketAddress address = socket.getRemoteSocketAddress(); - if (address instanceof InetSocketAddress) { - return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort(); - } else { - return "" + socket.getRemoteSocketAddress(); - } - } - return null; - } - - @Override - public T narrow(Class target) { - if (target == Socket.class) { - return target.cast(socket); - } else if ( target == TimeStampStream.class) { - return target.cast(buffOut); - } - return super.narrow(target); - } - - @Override - public int getReceiveCounter() { - return receiveCounter; - } - - public static class InitBuffer { - public final int readSize; - public final ByteBuffer buffer; - - public InitBuffer(int readSize, ByteBuffer buffer) { - if (buffer == null) { - throw new IllegalArgumentException("Null buffer not allowed."); - } - this.readSize = readSize; - this.buffer = buffer; - } - } - - /** - * @param sock The socket on which to set the Traffic Class. - * @return Whether or not the Traffic Class was set on the given socket. - * @throws SocketException if the system does not support setting the - * Traffic Class. - * @throws IllegalArgumentException if both the Differentiated Services and - * Type of Services transport options have been set on the same - * connection. - */ - private boolean setTrafficClass(Socket sock) throws SocketException, - IllegalArgumentException { - if (sock == null - || (!this.diffServChosen && !this.typeOfServiceChosen)) { - return false; - } - if (this.diffServChosen && this.typeOfServiceChosen) { - throw new IllegalArgumentException("Cannot set both the " - + " Differentiated Services and Type of Services transport " - + " options on the same connection."); - } - - sock.setTrafficClass(this.trafficClass); - - int resultTrafficClass = sock.getTrafficClass(); - if (this.trafficClass != resultTrafficClass) { - // In the case where the user has specified the ECN bits (e.g. in - // Type of Service) but the system won't allow the ECN bits to be - // set or in the case where setting the traffic class failed for - // other reasons, emit a warning. - if ((this.trafficClass >> 2) == (resultTrafficClass >> 2) - && (this.trafficClass & 3) != (resultTrafficClass & 3)) { - LOG.warn("Attempted to set the Traffic Class to " - + this.trafficClass + " but the result Traffic Class was " - + resultTrafficClass + ". Please check that your system " - + "allows you to set the ECN bits (the first two bits)."); - } else { - LOG.warn("Attempted to set the Traffic Class to " - + this.trafficClass + " but the result Traffic Class was " - + resultTrafficClass + ". Please check that your system " - + "supports java.net.setTrafficClass."); - } - return false; - } - // Reset the guards that prevent both the Differentiated Services - // option and the Type of Service option from being set on the same - // connection. - this.diffServChosen = false; - this.typeOfServiceChosen = false; - return true; - } - - @Override - public WireFormat getWireFormat() { - return wireFormat; - } - - @Override - public X509Certificate[] getPeerCertificates() { - return null; - } - - @Override - public void setPeerCertificates(X509Certificate[] certificates) { - } -} \ No newline at end of file diff --git a/src/main/java/org/mule/extensions/jms/internal/connection/provider/activemq/ActiveMQConnectionProvider.java b/src/main/java/org/mule/extensions/jms/internal/connection/provider/activemq/ActiveMQConnectionProvider.java index bbd94594..3f6aea28 100644 --- a/src/main/java/org/mule/extensions/jms/internal/connection/provider/activemq/ActiveMQConnectionProvider.java +++ b/src/main/java/org/mule/extensions/jms/internal/connection/provider/activemq/ActiveMQConnectionProvider.java @@ -22,7 +22,6 @@ import org.mule.extensions.jms.internal.ExcludeFromGeneratedCoverage; import org.mule.extensions.jms.internal.connection.exception.ActiveMQException; import org.mule.extensions.jms.internal.connection.provider.BaseConnectionProvider; -import org.mule.extensions.jms.internal.connection.provider.loader.FirewallLoader; import org.mule.jms.commons.internal.connection.JmsConnection; import org.mule.jms.commons.internal.connection.JmsTransactionalConnection; import org.mule.runtime.api.connection.ConnectionException; @@ -89,6 +88,8 @@ public class ActiveMQConnectionProvider extends BaseConnectionProvider implement static final String ACTIVEMQ_VERSION = "5.15.16"; static final String BROKER_GA = "org.apache.activemq:activemq-broker"; static final String KAHA_DB_GA = "org.apache.activemq:activemq-kahadb-store"; + public static final String FIPS_140_2 = "fips140-2"; + public static final String MULE_SECURITY_MODEL = "mule.security.model"; /** * a provider for an {@link ActiveMQConnectionFactory} @@ -268,14 +269,10 @@ private boolean shouldUseSsl() { } protected void configureSSLContext() { + try { if (tlsConfiguration != null) { SSLContext sslContext = tlsConfiguration.createSslContext(); - ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); - // force loading of class from connector instead of the one from the library, because it uses reflection - ClassLoader firewallLoader = new FirewallLoader(currentClassLoader); - ClassLoader loader = new URLClassLoader(new URL[]{this.getClass().getProtectionDomain().getCodeSource().getLocation()}, firewallLoader); - Thread.currentThread().setContextClassLoader(loader); SslContext activeMQSslContext = new SslContext(); activeMQSslContext.setSSLContext(sslContext); diff --git a/src/main/java/org/mule/extensions/jms/internal/connection/provider/loader/FirewallLoader.java b/src/main/java/org/mule/extensions/jms/internal/connection/provider/loader/FirewallLoader.java deleted file mode 100644 index 254ec108..00000000 --- a/src/main/java/org/mule/extensions/jms/internal/connection/provider/loader/FirewallLoader.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.mule.extensions.jms.internal.connection.provider.loader; - -public class FirewallLoader extends ClassLoader { - public FirewallLoader(ClassLoader parent) { - super(parent); - } - public Class loadClass(String name, boolean resolve) throws ClassNotFoundException { - if (name.equals("org.apache.activemq.transport.tcp.SslTransport") || name.equals("org.apache.activemq.transport.tcp.TcpTransport")) { - throw new ClassNotFoundException(); - } - return super.loadClass(name, resolve); - } -} \ No newline at end of file diff --git a/src/test/munit/activemq-over-ssl-test-case.xml b/src/test/munit/activemq-over-ssl-test-case.xml index 82f47181..ae76ab11 100644 --- a/src/test/munit/activemq-over-ssl-test-case.xml +++ b/src/test/munit/activemq-over-ssl-test-case.xml @@ -40,16 +40,11 @@ - - - - - - - + + @@ -92,20 +87,5 @@ - - - - - - - - - - - - -