From 302458dd51da9f4d3eb62ae7f390660363da5a49 Mon Sep 17 00:00:00 2001 From: NeoCoreTechs - New Core Technologies Date: Tue, 4 Feb 2020 16:57:28 -0800 Subject: [PATCH] RE-architect TcpClientManager, handshake bug FIXED! --- .../node/service/DefaultServiceClient.java | 2 +- .../node/topic/DefaultSubscriber.java | 2 +- .../tcp/ChannelInitializerFactoryStack.java | 10 +++---- .../ros/internal/transport/tcp/TcpClient.java | 17 +++++++---- .../transport/tcp/TcpClientManager.java | 29 ++++++++----------- .../tcp/TcpClientPipelineFactory.java | 2 +- .../MessageQueueIntegrationTest.java | 4 +-- 7 files changed, 34 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java b/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java index 2643bc3..b3229fd 100644 --- a/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java +++ b/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java @@ -95,7 +95,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla // TODO(damonkohler): Support non-persistent connections. connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1"); connectionHeader.merge(serviceDeclaration.toConnectionHeader()); - tcpClientManager = TcpClientManager.getInstance(executorService); + tcpClientManager = new TcpClientManager/*.getInstance*/(executorService); ServiceClientHandshakeHandler serviceClientHandshakeHandler = new ServiceClientHandshakeHandler(connectionHeader, responseListeners, executorService); handshakeLatch = new HandshakeLatch(); diff --git a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index c83e474..fddd808 100644 --- a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -74,7 +74,7 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD this.executorService = executorService; incomingMessageQueue = new IncomingMessageQueue(executorService); //knownPublishers = new HashSet(); - tcpClientManager = TcpClientManager.getInstance(executorService); + tcpClientManager = new TcpClientManager/*.getInstance*/(executorService); this.topicParticipantManager = topicParticipantManager; mutex = new Object(); SubscriberHandshakeHandler subscriberHandshakeHandler = diff --git a/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java b/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java index 9ebc9ed..a054a85 100644 --- a/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java +++ b/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java @@ -17,19 +17,19 @@ * */ public class ChannelInitializerFactoryStack { - private static boolean DEBUG = false; + private static boolean DEBUG = true; private static final Log log = LogFactory.getLog(ChannelInitializerFactoryStack.class); private LinkedBlockingDeque queue = new LinkedBlockingDeque(); public void addFirst(ChannelInitializer ch) { if(DEBUG) - log.info("Adding First ChannelInitializer "+ch+" queue size="+queue.size()); + log.info("Adding First ChannelInitializer:"+ch+" queue size="+queue.size()); queue.addFirst(ch); } public void addLast(ChannelInitializer ch) { if(DEBUG) - log.info("Adding Last ChannelInitializer "+ch+" queue size="+queue.size()); + log.info("Adding Last ChannelInitializer:"+ch+" queue size="+queue.size()); queue.addLast(ch); } /** @@ -40,12 +40,12 @@ public void addLast(ChannelInitializer ch) { */ public void inject(ChannelHandlerContext ctx) throws Exception { if(DEBUG) - log.info("Injecting ChannelHandlerContext "+ctx+" queue size="+queue.size()); + log.info("Injecting ChannelHandlerContext:"+ctx+" queue size="+queue.size()); Iterator it = queue.iterator(); while(it.hasNext()) { ChannelInitializer ch = it.next(); if(DEBUG) - log.info("ChannelInitializer.initChannel "+ch); + log.info("ChannelInitializer initChannel for ChannelHandlerContext:"+ctx+" ChannelInitializer:"+ch); ch.initChannel(ctx); } } diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java index 9bbf53b..c8edb87 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java @@ -56,18 +56,25 @@ public void setKeepAlive(boolean value) throws IOException { } public void addNamedChannelHandler(NamedChannelHandler namedChannelHandler) { - namedChannelHandlers.add(namedChannelHandler); + if (DEBUG) { + log.info("TcpClient:"+this+" adding NamedChannelHandler:"+namedChannelHandler); + } + namedChannelHandlers.add(namedChannelHandler); } public void addAllNamedChannelHandlers(List namedChannelHandlers) { - this.namedChannelHandlers.addAll(namedChannelHandlers); + if (DEBUG) { + for(NamedChannelHandler n: namedChannelHandlers) + log.info("TcpClient:"+this+" will add NamedChannelHandler:"+n); + } + this.namedChannelHandlers.addAll(namedChannelHandlers); } public ChannelHandlerContext getContext() { return ctx; } public Socket connect(String connectionName, SocketAddress socketAddress) throws Exception { if (DEBUG) { - log.info("TcpClient attempting connection:"+connectionName+" to socket:" + socketAddress); + log.info("TcpClient:"+this+" attempting connection:"+connectionName+" to socket:" + socketAddress); } //channel = /*Asynchronous*/SocketChannel.open(/*channelGroup*/); channel = new Socket(); @@ -94,10 +101,10 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws AsynchTCPWorker uworker = new AsynchTCPWorker(ctx); channelGroup.getExecutorService().execute(uworker); // notify pipeline we connected (or failed via exceptionCaught and runtime exception) - //ctx.pipeline().fireChannelActive(); + ctx.pipeline().fireChannelActive(); // recall we keep the list of contexts in TcpClientManager if (DEBUG) { - log.info("TcpClient Connected with ChannelHandlerContext "+ctx); + log.info("TcpClient:"+this+" Connected with ChannelHandlerContext "+ctx); } //} else { // We expect the first connection to succeed. If not, fail fast. diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java index 6a90790..d9a8d15 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java @@ -6,16 +6,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - /** * TcpClientManager manages TCP clients which are the subscriber and service clients that communicate with * remote peers outside master domain. @@ -32,20 +27,20 @@ public class TcpClientManager { private final Collection tcpClients; private final List namedChannelHandlers; - private static ConcurrentHashMap executors = new ConcurrentHashMap(1024); + //private static ConcurrentHashMap executors = new ConcurrentHashMap(1024); - public static TcpClientManager getInstance(ExecutorService exc) { - synchronized(TcpClientManager.class) { - TcpClientManager tcm = executors.get(exc); - if( tcm == null ) { - tcm = new TcpClientManager(exc); - executors.put(exc, tcm); - } - return tcm; - } - } + //public static TcpClientManager getInstance(ExecutorService exc) { + // synchronized(TcpClientManager.class) { + // TcpClientManager tcm = executors.get(exc); + // if( tcm == null ) { + // tcm = new TcpClientManager(exc); + // executors.put(exc, tcm); + // } + // return tcm; + // } + //} - private TcpClientManager(ExecutorService executor) { + public TcpClientManager(ExecutorService executor) { this.channelGroup = new ChannelGroupImpl(executor);/*AsynchronousChannelGroup.withThreadPool(executor);*/ this.tcpClients = new ArrayList(); this.namedChannelHandlers = new ArrayList(); diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java index 667793b..459cc45 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java @@ -12,7 +12,7 @@ * @author jg */ public class TcpClientPipelineFactory extends ChannelInitializer { - public static boolean DEBUG = false; + public static boolean DEBUG = true; private static final Log log = LogFactory.getLog(TcpClientPipelineFactory.class); public static final String LENGTH_FIELD_BASED_FRAME_DECODER = "LengthFieldBasedFrameDecoder"; public static final String LENGTH_FIELD_PREPENDER = "LengthFieldPrepender"; diff --git a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java index d7d5334..9ecd856 100644 --- a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java +++ b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java @@ -154,9 +154,9 @@ public void setup() { new IncomingMessageQueue(executorService); secondIncomingMessageQueue = new IncomingMessageQueue(executorService); - firstTcpClientManager = TcpClientManager.getInstance(executorService); + firstTcpClientManager = new TcpClientManager/*.getInstance*/(executorService); firstTcpClientManager.addNamedChannelHandler(firstIncomingMessageQueue.getMessageReceiver()); - secondTcpClientManager = TcpClientManager.getInstance(executorService); + secondTcpClientManager = new TcpClientManager/*.getInstance*/(executorService); secondTcpClientManager.addNamedChannelHandler(secondIncomingMessageQueue.getMessageReceiver()); } catch(Exception e) { throw new RosRuntimeException(e); } }