Skip to content

Commit

Permalink
RE-architect TcpClientManager, handshake bug FIXED!
Browse files Browse the repository at this point in the history
  • Loading branch information
neocoretechs committed Feb 5, 2020
1 parent 9b50e2a commit 302458d
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla
// TODO(damonkohler): Support non-persistent connections.
connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1");
connectionHeader.merge(serviceDeclaration.toConnectionHeader());
tcpClientManager = TcpClientManager.getInstance(executorService);
tcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
ServiceClientHandshakeHandler<T, S> serviceClientHandshakeHandler =
new ServiceClientHandshakeHandler<T, S>(connectionHeader, responseListeners, executorService);
handshakeLatch = new HandshakeLatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD
this.executorService = executorService;
incomingMessageQueue = new IncomingMessageQueue<T>(executorService);
//knownPublishers = new HashSet<PublisherIdentifier>();
tcpClientManager = TcpClientManager.getInstance(executorService);
tcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
this.topicParticipantManager = topicParticipantManager;
mutex = new Object();
SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
*
*/
public class ChannelInitializerFactoryStack {
private static boolean DEBUG = false;
private static boolean DEBUG = true;
private static final Log log = LogFactory.getLog(ChannelInitializerFactoryStack.class);

private LinkedBlockingDeque<ChannelInitializer> queue = new LinkedBlockingDeque<ChannelInitializer>();

public void addFirst(ChannelInitializer ch) {
if(DEBUG)
log.info("Adding First ChannelInitializer "+ch+" queue size="+queue.size());
log.info("Adding First ChannelInitializer:"+ch+" queue size="+queue.size());
queue.addFirst(ch);
}
public void addLast(ChannelInitializer ch) {
if(DEBUG)
log.info("Adding Last ChannelInitializer "+ch+" queue size="+queue.size());
log.info("Adding Last ChannelInitializer:"+ch+" queue size="+queue.size());
queue.addLast(ch);
}
/**
Expand All @@ -40,12 +40,12 @@ public void addLast(ChannelInitializer ch) {
*/
public void inject(ChannelHandlerContext ctx) throws Exception {
if(DEBUG)
log.info("Injecting ChannelHandlerContext "+ctx+" queue size="+queue.size());
log.info("Injecting ChannelHandlerContext:"+ctx+" queue size="+queue.size());
Iterator<ChannelInitializer> it = queue.iterator();
while(it.hasNext()) {
ChannelInitializer ch = it.next();
if(DEBUG)
log.info("ChannelInitializer.initChannel "+ch);
log.info("ChannelInitializer initChannel for ChannelHandlerContext:"+ctx+" ChannelInitializer:"+ch);
ch.initChannel(ctx);
}
}
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/org/ros/internal/transport/tcp/TcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,25 @@ public void setKeepAlive(boolean value) throws IOException {
}

public void addNamedChannelHandler(NamedChannelHandler namedChannelHandler) {
namedChannelHandlers.add(namedChannelHandler);
if (DEBUG) {
log.info("TcpClient:"+this+" adding NamedChannelHandler:"+namedChannelHandler);
}
namedChannelHandlers.add(namedChannelHandler);
}

public void addAllNamedChannelHandlers(List<NamedChannelHandler> namedChannelHandlers) {
this.namedChannelHandlers.addAll(namedChannelHandlers);
if (DEBUG) {
for(NamedChannelHandler n: namedChannelHandlers)
log.info("TcpClient:"+this+" will add NamedChannelHandler:"+n);
}
this.namedChannelHandlers.addAll(namedChannelHandlers);
}

public ChannelHandlerContext getContext() { return ctx; }

public Socket connect(String connectionName, SocketAddress socketAddress) throws Exception {
if (DEBUG) {
log.info("TcpClient attempting connection:"+connectionName+" to socket:" + socketAddress);
log.info("TcpClient:"+this+" attempting connection:"+connectionName+" to socket:" + socketAddress);
}
//channel = /*Asynchronous*/SocketChannel.open(/*channelGroup*/);
channel = new Socket();
Expand All @@ -94,10 +101,10 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws
AsynchTCPWorker uworker = new AsynchTCPWorker(ctx);
channelGroup.getExecutorService().execute(uworker);
// notify pipeline we connected (or failed via exceptionCaught and runtime exception)
//ctx.pipeline().fireChannelActive();
ctx.pipeline().fireChannelActive();
// recall we keep the list of contexts in TcpClientManager
if (DEBUG) {
log.info("TcpClient Connected with ChannelHandlerContext "+ctx);
log.info("TcpClient:"+this+" Connected with ChannelHandlerContext "+ctx);
}
//} else {
// We expect the first connection to succeed. If not, fail fast.
Expand Down
29 changes: 12 additions & 17 deletions src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


/**
* TcpClientManager manages TCP clients which are the subscriber and service clients that communicate with
* remote peers outside master domain.
Expand All @@ -32,20 +27,20 @@ public class TcpClientManager {
private final Collection<TcpClient> tcpClients;
private final List<NamedChannelHandler> namedChannelHandlers;

private static ConcurrentHashMap<ExecutorService, TcpClientManager> executors = new ConcurrentHashMap<ExecutorService, TcpClientManager>(1024);
//private static ConcurrentHashMap<ExecutorService, TcpClientManager> executors = new ConcurrentHashMap<ExecutorService, TcpClientManager>(1024);

public static TcpClientManager getInstance(ExecutorService exc) {
synchronized(TcpClientManager.class) {
TcpClientManager tcm = executors.get(exc);
if( tcm == null ) {
tcm = new TcpClientManager(exc);
executors.put(exc, tcm);
}
return tcm;
}
}
//public static TcpClientManager getInstance(ExecutorService exc) {
// synchronized(TcpClientManager.class) {
// TcpClientManager tcm = executors.get(exc);
// if( tcm == null ) {
// tcm = new TcpClientManager(exc);
// executors.put(exc, tcm);
// }
// return tcm;
// }
//}

private TcpClientManager(ExecutorService executor) {
public TcpClientManager(ExecutorService executor) {
this.channelGroup = new ChannelGroupImpl(executor);/*AsynchronousChannelGroup.withThreadPool(executor);*/
this.tcpClients = new ArrayList<TcpClient>();
this.namedChannelHandlers = new ArrayList<NamedChannelHandler>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* @author jg
*/
public class TcpClientPipelineFactory extends ChannelInitializer {
public static boolean DEBUG = false;
public static boolean DEBUG = true;
private static final Log log = LogFactory.getLog(TcpClientPipelineFactory.class);
public static final String LENGTH_FIELD_BASED_FRAME_DECODER = "LengthFieldBasedFrameDecoder";
public static final String LENGTH_FIELD_PREPENDER = "LengthFieldPrepender";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ public void setup() {
new IncomingMessageQueue<std_msgs.String>(executorService);
secondIncomingMessageQueue =
new IncomingMessageQueue<std_msgs.String>(executorService);
firstTcpClientManager = TcpClientManager.getInstance(executorService);
firstTcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
firstTcpClientManager.addNamedChannelHandler(firstIncomingMessageQueue.getMessageReceiver());
secondTcpClientManager = TcpClientManager.getInstance(executorService);
secondTcpClientManager = new TcpClientManager/*.getInstance*/(executorService);
secondTcpClientManager.addNamedChannelHandler(secondIncomingMessageQueue.getMessageReceiver());
} catch(Exception e) { throw new RosRuntimeException(e); }
}
Expand Down

0 comments on commit 302458d

Please sign in to comment.