Skip to content

Commit

Permalink
adjusted queue sizes, uniform TCPnodelay
Browse files Browse the repository at this point in the history
  • Loading branch information
neocoretechs committed Jun 11, 2016
1 parent af06002 commit d6ddeda
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 124 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/ros/internal/node/server/BaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void run() {
try {
Socket datasocket = server.accept();
// disable Nagles algoritm; do not combine small packets into larger ones
datasocket.setTcpNoDelay(true);
//datasocket.setTcpNoDelay(true);
// wait 1 second before close; close blocks for 1 sec. and data can be sent
datasocket.setSoLinger(true, 1);
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class IncomingMessageQueue<T> {
* {@link IncomingMessageQueue#addListener(MessageListener, int)} which are
* consumed by user provided {@link MessageListener}s.
*/
private static final int DEQUE_CAPACITY = 16384;
private static final int DEQUE_CAPACITY = 8192;

private final MessageReceiver<T> messageReceiver;
private final MessageDispatcher<T> messageDispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
public class MessageReceiver<T> extends AbstractNamedChannelHandler {

private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(MessageReceiver.class);

private final CircularBlockingDeque<T> lazyMessages;
Expand All @@ -45,7 +45,6 @@ public Object channelRead(ChannelHandlerContext ctx, Object msg) throws Exceptio
}



@Override
public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1)
throws Exception {
Expand Down Expand Up @@ -94,8 +93,6 @@ public void channelReadComplete(ChannelHandlerContext arg0) throws Exception {

}



@Override
public void userEventTriggered(ChannelHandlerContext arg0, Object arg1)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
*/
public class OutgoingMessageQueue<T> {

private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);

private static final int DEQUE_CAPACITY = 16384;
private static final int DEQUE_CAPACITY = 8192;

private final CircularBlockingDeque<T> deque;
private final Writer writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void run() {
//(/*(AsynchronousSocketChannel)*/channel/*.get()*/).setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setSendBufferSize(4096000);
channel.setReceiveBufferSize(4096000);
channel.setTcpNoDelay(true);
//channel.setTcpNoDelay(true);
ChannelHandlerContext ctx = new ChannelHandlerContextImpl(channelGroup, channel/*.get()*/, exc);
tcpserver.getSubscribers().add(ctx);
// inject the handlers, start handshake
Expand Down
84 changes: 8 additions & 76 deletions src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@

import java.io.IOException;

import java.nio.ByteBuffer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.ros.internal.message.MessageBuffers;
import org.ros.internal.system.Utility;
import org.ros.internal.transport.ChannelHandlerContext;

/**
Expand All @@ -19,12 +15,11 @@
*
*/
public class AsynchTCPWorker implements Runnable {
private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(AsynchTCPWorker.class);
public boolean shouldRun = true;
private ChannelHandlerContext ctx;
private Object waitHalt = new Object();
//private MessageBufferPool pool = new MessageBufferPool();

public AsynchTCPWorker(ChannelHandlerContext ctx) throws IOException {
this.ctx = ctx;
Expand All @@ -37,84 +32,21 @@ public AsynchTCPWorker(ChannelHandlerContext ctx) throws IOException {
*/
@Override
public void run() {
//final Object waitFinish = ctx.getChannelCompletionMutex();
try {
while(shouldRun) {
//final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire();
// initiate asynch read
// If we get a read pending exception, try again
//final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire();
//buf.clear();
//final CountDownLatch cdl = new CountDownLatch(1);
//int res = ctx.read(buf);
// seems like a -1 is generated when channel breaks, so stop
// this worker on that case
//if( res == -1) {
// shouldRun = false;
// if( DEBUG )
// log.info("ROS AsynchTCPWorker CHANNEL BREAK, TERMINATING for "+ctx);
//} else {
// buf.flip();
// Object reso = Utility.deserialize(buf);
Object reso = ctx.read();

if( DEBUG )
if( DEBUG )
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" Object:"+reso);
try {
try {
ctx.pipeline().fireChannelRead(reso);
} catch (Exception e) {
if( DEBUG) {
} catch (Exception e) {
if( DEBUG) {
log.info("Exception out of fireChannelRead",e);
e.printStackTrace();
}
ctx.pipeline().fireExceptionCaught(e);
}
//}
/*
ctx.read(buf, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer arg0, Void arg1) {
buf.flip();
Object res = Utility.deserialize(buf);
//if( res == null ) {
// cdl.countDown();
// return;
//}
if( DEBUG )
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" Object:"+res+" Result:"+arg0+","+arg1);
try {
ctx.pipeline().fireChannelRead(res);
} catch (Exception e) {
if( DEBUG) {
log.info("Exception out of fireChannelRead",e);
e.printStackTrace();
}
}
cdl.countDown();
}
@Override
public void failed(Throwable arg0, Void arg1) {
if( DEBUG ){
log.info("AsynchTcpWorker read op failed:",arg0);
arg0.printStackTrace();
}
try {
ctx.pipeline().fireExceptionCaught(arg0);
} catch (Exception e) {
e.printStackTrace();
}
if( arg0 instanceof ClosedChannelException ) {
shouldRun = false;
}
cdl.countDown();
}
});
*/
//cdl.await(); // readpendingexception if we overlap operations
} // shouldRun

ctx.pipeline().fireExceptionCaught(e);
}
} // shouldRun
} catch(Exception se) {
log.error("AsynchTCPWorker terminating due to ",se);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void addAllNamedChannelHandlers(List<NamedChannelHandler> namedChannelHan
public Socket connect(String connectionName, SocketAddress socketAddress) throws Exception {
//channel = /*Asynchronous*/SocketChannel.open(/*channelGroup*/);
channel = new Socket();
channel.setTcpNoDelay(false);
//channel.setTcpNoDelay(true);
channel.setSendBufferSize(4096000);
channel.setSendBufferSize(4096000);
//((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_RCVBUF, 4096000);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package org.ros.internal.transport.tcp;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.channels.SocketChannel;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -24,6 +19,8 @@

/**
* A {@link ChannelHandler} which will process the TCP server handshake.
* Once an incoming channel read takes place the handshake handler is removed and the traffic
* handler is placed in the pipeline
*
* @author jg
*/
Expand All @@ -43,10 +40,13 @@ public TcpServerHandshakeHandler(TopicParticipantManager topicParticipantManager

@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("Channel active");
if(DEBUG)
log.info("Channel active");
}
/**
* Channel read initiated by pipeline generated message
* We make the assumption that the inbound object is of type ConnectionHeader
* when talking to this handler
*/
@Override
public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception {
Expand All @@ -63,7 +63,7 @@ public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception
return e;
}
/**
* Handle the handshake for a service
* Handle the handshake for a service in response to channel read
* @param ctx
* @param incomingHeader
* @throws IOException
Expand All @@ -87,7 +87,7 @@ private void handleServiceHandshake(ChannelHandlerContext ctx, ConnectionHeader
}
}
/**
* Handle the handshake for a typical (not a service) subscriber.
* Handle the handshake for a typical (not a service) subscriber in response to a channel read.
* @param ctx
* @param incomingConnectionHeader
* @throws InterruptedException
Expand All @@ -112,61 +112,51 @@ private void handleSubscriberHandshake(final ChannelHandlerContext ctx, final Co

ctx.write(outgoingBuffer);

/*
ctx.write(outgoingBuffer, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer arg0, Void arg1) {
*/
String nodeName = incomingConnectionHeader.getField(ConnectionHeaderFields.CALLER_ID);
publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName), new TopicIdentifier(topicName)), ctx);
// Once the handshake is complete, there will be nothing incoming on the
// channel as we are only queueing outbound traffic to the subscriber, which is done by the OutgoingMessgequeue.
// So, we remove the handler
ctx.pipeline().remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER);
// Set this context ready to receive the message type specified
synchronized(ctx.getMessageTypes()) {
String nodeName = incomingConnectionHeader.getField(ConnectionHeaderFields.CALLER_ID);
publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName), new TopicIdentifier(topicName)), ctx);
// Once the handshake is complete, there will be nothing incoming on the
// channel as we are only queueing outbound traffic to the subscriber, which is done by the OutgoingMessgequeue.
// So, we remove the handler
ctx.pipeline().remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER);
// Set this context ready to receive the message type specified
synchronized(ctx.getMessageTypes()) {
ctx.getMessageTypes().add(incomingConnectionHeader.getField(ConnectionHeaderFields.TYPE));
}
// The handshake is complete and the only task is to set the context ready, which will allow
// the outbound queue to start sending messages.
ctx.setReady(true);
}
// The handshake is complete and the only task is to set the context ready, which will allow
// the outbound queue to start sending messages.
ctx.setReady(true);

if( DEBUG ) {
log.info("subscriber complete:"+outgoingBuffer);
}
/*
}
@Override
public void failed(Throwable arg0, Void arg1) {
log.info("Failed to perform handshake for:"+ctx);
}
*/
//});


if( DEBUG ) {
log.info("subscriber complete:"+outgoingBuffer);
}

}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if( DEBUG )
log.info(this+" Handler added "+ctx);

}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if(DEBUG)
log.info("Handler removed "+ctx);

}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if(DEBUG)
log.info("Channel inactive "+ctx);

}


@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if(DEBUG)
log.info("channel read complete "+ctx);

}
Expand All @@ -180,6 +170,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable msg)throws Exce
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event)
throws Exception {
if(DEBUG)
log.info("User event triggered "+ctx+" "+event);

}
Expand Down

0 comments on commit d6ddeda

Please sign in to comment.