From 52c5db33a39b0a95b57a6e32475415b7df46a61f Mon Sep 17 00:00:00 2001 From: NeoCoreTechs - New Core Technologies Date: Sat, 28 May 2016 09:38:01 -0700 Subject: [PATCH] Operational Beta release. Bug with image data over 65k, tends not to receive entire buffer sometimes. channel bug? --- .../node/service/DefaultServiceServer.java | 8 ++- .../node/service/ServiceRequestHandler.java | 28 +++++++---- .../java/org/ros/internal/system/Utility.java | 8 ++- .../transport/BaseClientHandshakeHandler.java | 3 +- .../transport/ChannelHandlerContext.java | 19 ++++--- .../transport/ChannelHandlerContextImpl.java | 50 ++++++++++++------- .../transport/queue/OutgoingMessageQueue.java | 10 ++-- .../transport/tcp/AsynchBaseServer.java | 13 ++--- .../transport/tcp/AsynchTCPServer.java | 28 +++++++---- .../transport/tcp/AsynchTCPWorker.java | 18 +++++-- .../internal/transport/tcp/ChannelGroup.java | 5 ++ .../transport/tcp/ChannelGroupImpl.java | 17 +++++++ ...nectionTrackingChannelPipelineFactory.java | 4 +- .../transport/tcp/NamedChannelHandler.java | 6 +-- .../ros/internal/transport/tcp/TcpClient.java | 24 +++++---- .../transport/tcp/TcpClientManager.java | 6 +-- .../tcp/TcpClientPipelineFactory.java | 6 +-- .../internal/transport/tcp/TcpRosServer.java | 10 ++-- .../tcp/TcpServerHandshakeHandler.java | 6 +-- .../tcp/TcpServerPipelineFactory.java | 6 +-- .../MessageQueueIntegrationTest.java | 32 ++++++------ 21 files changed, 195 insertions(+), 112 deletions(-) create mode 100644 src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java create mode 100644 src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java diff --git a/src/main/java/org/ros/internal/node/service/DefaultServiceServer.java b/src/main/java/org/ros/internal/node/service/DefaultServiceServer.java index cbe256e..6b7438a 100644 --- a/src/main/java/org/ros/internal/node/service/DefaultServiceServer.java +++ b/src/main/java/org/ros/internal/node/service/DefaultServiceServer.java @@ -8,8 +8,10 @@ import org.ros.address.AdvertiseAddress; import org.ros.concurrent.ListenerGroup; import org.ros.concurrent.SignalRunnable; +import org.ros.internal.message.MessageBuffers; import org.ros.internal.message.service.ServiceDescription; import org.ros.internal.node.topic.DefaultPublisher; +import org.ros.internal.system.Utility; import org.ros.internal.transport.ChannelHandler; import org.ros.internal.transport.ConnectionHeader; import org.ros.internal.transport.ConnectionHeaderFields; @@ -75,7 +77,7 @@ public void onMasterUnregistrationFailure(ServiceServer registrant) { }); } - public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeader) { + public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) { if (DEBUG) { log.info("Client handshake header: " + incomingConnectionHeader); } @@ -88,7 +90,9 @@ public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeade if (DEBUG) { log.info("Server handshake header: " + connectionHeader); } - return connectionHeader; + ByteBuffer headbuf = MessageBuffers.dynamicBuffer(); + Utility.serialize(connectionHeader, headbuf); + return headbuf; } @Override diff --git a/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java b/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java index c89ca81..97fce2d 100644 --- a/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java +++ b/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java @@ -5,6 +5,7 @@ //import org.jboss.netty.channel.ChannelHandlerContext; //import org.jboss.netty.channel.MessageEvent; //import org.jboss.netty.channel.SimpleChannelHandler; +import org.ros.exception.RosRuntimeException; import org.ros.exception.ServiceException; import org.ros.internal.message.MessageBufferPool; import org.ros.internal.system.Utility; @@ -13,6 +14,7 @@ import org.ros.message.MessageFactory; import org.ros.node.service.ServiceResponseBuilder; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; @@ -39,9 +41,7 @@ public ServiceRequestHandler(ServiceDeclaration serviceDeclaration, messageBufferPool = new MessageBufferPool(); } - private void handleRequest(ByteBuffer requestBuffer, ByteBuffer responseBuffer) - throws ServiceException { - T request = (T) Utility.deserialize(requestBuffer); + private void handleRequest(T request, ByteBuffer responseBuffer) throws ServiceException { S response = messageFactory.newFromType(serviceDeclaration.getType()); responseBuilder.build(request, response); Utility.serialize(response, responseBuffer); @@ -51,16 +51,26 @@ private void handleSuccess(final ChannelHandlerContext ctx, ServiceServerRespons response.setErrorCode(1); response.setMessageLength(responseBuffer.limit()); response.setMessage(responseBuffer); - ctx.write(response); + ByteBuffer resbuf = messageBufferPool.acquire(); + Utility.serialize(response, resbuf); + try { + ctx.write(resbuf); + } catch (IOException e) { + throw new RosRuntimeException(e); + } } - private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response, - String message) { + private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response, String message, ByteBuffer responseBuffer) { response.setErrorCode(0); ByteBuffer encodedMessage = Charset.forName("US-ASCII").encode(message); response.setMessageLength(encodedMessage.limit()); response.setMessage(encodedMessage); - ctx.write(response); + Utility.serialize(response, responseBuffer); + try { + ctx.write(responseBuffer); + } catch (IOException e) { + throw new RosRuntimeException(e); + } } @Override @@ -68,7 +78,7 @@ public Object channelRead(final ChannelHandlerContext ctx, Object e) throws Exce // Although the ChannelHandlerContext is explicitly documented as being safe // to keep for later use, the MessageEvent is not. So, we make a defensive // copy of the buffer. - final ByteBuffer requestBuffer = ((ByteBuffer) e); + final T requestBuffer = ((T) e); this.executorService.execute(new Runnable() { @Override public void run() { @@ -79,7 +89,7 @@ public void run() { handleRequest(requestBuffer, responseBuffer); success = true; } catch (ServiceException ex) { - handleError(ctx, response, ex.getMessage()); + handleError(ctx, response, ex.getMessage(), responseBuffer); success = false; } if (success) { diff --git a/src/main/java/org/ros/internal/system/Utility.java b/src/main/java/org/ros/internal/system/Utility.java index 5e1d746..652b3f2 100644 --- a/src/main/java/org/ros/internal/system/Utility.java +++ b/src/main/java/org/ros/internal/system/Utility.java @@ -20,10 +20,10 @@ * */ public class Utility { - private static boolean DEBUG = false; + private static boolean DEBUG = true; private static final Log log = LogFactory.getLog(Utility.class); + public static void serialize(T value, ByteBuffer buffer) { - //serializer.serialize((Message) value, buffer); DirectByteArrayOutputStream dbaos = new DirectByteArrayOutputStream(); ObjectOutputStream oos; try { @@ -41,7 +41,6 @@ public static void serialize(T value, ByteBuffer buffer) { public static Object deserialize(ByteBuffer buffer) { - //return deserializer.deserialize(buffer); byte[] obuf = buffer.array(); Object Od = null; try { @@ -53,8 +52,7 @@ public static Object deserialize(ByteBuffer buffer) { s.close(); bais.close(); //rbc.close(); - } catch (IOException ioe) { - } catch (ClassNotFoundException cnf) { + } catch (IOException | ClassNotFoundException cnf) { log.error("Class cannot be deserialized, may have been modified beyond version compatibility"); } if( DEBUG ) diff --git a/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java b/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java index 4b36936..6cf0dbf 100644 --- a/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java +++ b/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java @@ -14,8 +14,9 @@ import java.util.concurrent.ExecutorService; /** - * Abstraction of top level ChannelHandler interface + * Abstraction of top level ChannelHandler interface. * Common functionality for {@link ClientHandshake} handlers. + * @author jg * */ public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler { diff --git a/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java b/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java index 0f567af..effddf2 100644 --- a/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java +++ b/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java @@ -7,10 +7,13 @@ import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channel; import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import org.ros.internal.transport.tcp.ChannelGroup; + public interface ChannelHandlerContext { /** * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}. @@ -39,7 +42,7 @@ public interface ChannelHandlerContext { * called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - AsynchronousSocketChannel bind(SocketAddress localAddress) throws IOException; + /*Asynchronous*/SocketChannel bind(SocketAddress localAddress) throws IOException; /** * Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation @@ -53,8 +56,9 @@ public interface ChannelHandlerContext { * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)} * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. + * @throws IOException */ - void connect(SocketAddress remoteAddress); + void connect(SocketAddress remoteAddress) throws IOException; /** * Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the @@ -109,8 +113,9 @@ public interface ChannelHandlerContext { * {@link ChannelHandler#read(ChannelHandlerContext)} * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. + * @throws IOException */ - Future read(ByteBuffer buf); + /*Future*/int read(ByteBuffer buf) throws IOException; /** * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an * {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was @@ -122,6 +127,7 @@ public interface ChannelHandlerContext { * {@link ChannelHandler#read(ChannelHandlerContext)} * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. + * @throws IOException */ void read(ByteBuffer buf,CompletionHandler completionHandler); @@ -129,15 +135,16 @@ public interface ChannelHandlerContext { * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. + * @throws IOException */ - Future write(Object msg); + /*Future*/int write(ByteBuffer msg) throws IOException; /** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. */ - void write(Object msg, CompletionHandler handler); + void write(ByteBuffer msg, CompletionHandler handler); /** @@ -149,7 +156,7 @@ public interface ChannelHandlerContext { * Return the channel group * */ - AsynchronousChannelGroup getChannelGroup(); + /*Asynchronous*/ChannelGroup getChannelGroup(); /** * Determine if this channel is ready for processing, it is configured, has a socket diff --git a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java index 898e5a9..9463d16 100644 --- a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java +++ b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java @@ -1,17 +1,23 @@ package org.ros.internal.transport; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousChannelGroup; +//import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channel; import java.nio.channels.CompletionHandler; +import java.nio.channels.SocketChannel; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.ros.internal.transport.tcp.ChannelGroup; /** * A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers. @@ -29,24 +35,24 @@ * */ public class ChannelHandlerContextImpl implements ChannelHandlerContext { - AsynchronousChannelGroup channelGroup; + /*Asynchronous*/ChannelGroup channelGroup; Executor executor; - AsynchronousSocketChannel channel; + /*Asynchronous*/SocketChannel channel; ChannelPipeline pipeline; boolean ready = false; Object mutex = new Object(); Set outboundMessageTypes; - public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) { - channelGroup = grp; - channel = ch; + public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/SocketChannel channel2, Executor exc) { + channelGroup = channelGroup2; + channel = channel2; executor = exc; pipeline = new ChannelPipelineImpl(this); outboundMessageTypes = (Set) new HashSet(); } - public void setChannel(AsynchronousSocketChannel sock) { + public void setChannel(/*Asynchronous*/SocketChannel sock) { this.channel = sock; } @@ -63,12 +69,12 @@ public String name() { @Override - public AsynchronousSocketChannel bind(SocketAddress localAddress) throws IOException { + public /*Asynchronous*/SocketChannel bind(SocketAddress localAddress) throws IOException { return channel.bind(localAddress); } @Override - public void connect(SocketAddress remoteAddress) { + public void connect(SocketAddress remoteAddress) throws IOException { channel.connect(remoteAddress); } @@ -91,23 +97,33 @@ public void close() throws IOException { @Override - public Future read(ByteBuffer buf) { - return channel.read(buf); + public /*Future*/ int read(ByteBuffer buf) throws IOException { + return channel.read(buf); } @Override - public Future write(Object msg) { - return channel.write((ByteBuffer) msg); + public /*Future*/int write(ByteBuffer msg) throws IOException { + return channel.write(msg); } @Override - public void write(Object msg, CompletionHandler handler) { - channel.write((ByteBuffer)msg, null, handler); + public void write(ByteBuffer msg, CompletionHandler handler) { + try { + int res = channel.write(msg/*, null, handler*/); + handler.completed(res, null); + } catch (IOException e) { + handler.failed(e, null); + } } @Override public void read(ByteBuffer buf, CompletionHandler handler) { - channel.read(buf, null, handler); + try { + int res = channel.read(buf/*, null, handler*/); + handler.completed(res, null); + } catch (IOException e) { + handler.failed(e, null); + } } @@ -117,7 +133,7 @@ public ChannelPipeline pipeline() { } @Override - public AsynchronousChannelGroup getChannelGroup() { + public /*Asynchronous*/ChannelGroup getChannelGroup() { return channelGroup; } diff --git a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java index 8e5087a..82d3d6d 100644 --- a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java +++ b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java @@ -14,6 +14,7 @@ import org.apache.commons.logging.LogFactory; import org.ros.concurrent.CancellableLoop; import org.ros.concurrent.CircularBlockingDeque; +import org.ros.exception.RosRuntimeException; import org.ros.internal.message.Message; import org.ros.internal.message.MessageBufferPool; import org.ros.internal.message.MessageBuffers; @@ -37,7 +38,6 @@ public class OutgoingMessageQueue { private final CircularBlockingDeque deque; private final Writer writer; - private final MessageBufferPool messageBufferPool; private final ByteBuffer latchedBuffer; private final Object mutex; @@ -105,7 +105,7 @@ public void failed(Throwable arg0, Void arg1) { public OutgoingMessageQueue(ExecutorService executorService, List ctxs) throws IOException { deque = new CircularBlockingDeque(DEQUE_CAPACITY); writer = new Writer(); - messageBufferPool = new MessageBufferPool(); + //messageBufferPool = new MessageBufferPool(); latchedBuffer = MessageBuffers.dynamicBuffer(); mutex = new Object(); latchMode = false; @@ -151,7 +151,11 @@ private void writeLatchedMessage() { Iterator it = channels.iterator(); while(it.hasNext()) { ChannelHandlerContext ctx = it.next(); - ctx.write(latchedBuffer); + try { + ctx.write(latchedBuffer); + } catch (IOException e) { + throw new RosRuntimeException(e); + } } } } diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java b/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java index c2f8551..98afceb 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java @@ -7,6 +7,7 @@ import java.net.SocketOption; import java.net.StandardSocketOptions; import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.SocketChannel; import java.util.concurrent.Future; import org.apache.commons.logging.Log; @@ -45,15 +46,15 @@ public void startServer() throws IOException { public void run() { while(!shouldStop) { try { - Future channel = server.accept(); + /*Future*/ channel = server.accept(); if( DEBUG ) { - log.info("Accept "+channel.get()); + log.info("Accept "+channel/*.get()*/); } - ((AsynchronousSocketChannel)channel.get()).setOption(StandardSocketOptions.SO_RCVBUF, 4096000); - ((AsynchronousSocketChannel)channel.get()).setOption(StandardSocketOptions.SO_SNDBUF, 4096000); - ((AsynchronousSocketChannel)channel.get()).setOption(StandardSocketOptions.TCP_NODELAY, true); - ChannelHandlerContext ctx = new ChannelHandlerContextImpl(channelGroup, channel.get(), exc); + (/*(AsynchronousSocketChannel)*/channel/*.get()*/).setOption(StandardSocketOptions.SO_RCVBUF, 4096000); + (/*(AsynchronousSocketChannel)*/channel/*.get()*/).setOption(StandardSocketOptions.SO_SNDBUF, 4096000); + (/*(AsynchronousSocketChannel)*/channel/*.get()*/).setOption(StandardSocketOptions.TCP_NODELAY, true); + ChannelHandlerContext ctx = new ChannelHandlerContextImpl(channelGroup, channel/*.get()*/, exc); tcpserver.getSubscribers().add(ctx); // inject the handlers, start handshake tcpserver.getFactoryStack().inject(ctx); diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java index 969dc43..61a5b20 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java @@ -1,9 +1,11 @@ package org.ros.internal.transport.tcp; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousServerSocketChannel; -import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +//import java.nio.channels.AsynchronousChannelGroup; +//import java.nio.channels.AsynchronousServerSocketChannel; +//import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Executor; import org.apache.commons.logging.Log; @@ -16,28 +18,34 @@ public abstract class AsynchTCPServer implements Cloneable, Runnable { private static boolean DEBUG = true; private static final Log log = LogFactory.getLog(AsynchTCPServer.class); - AsynchronousServerSocketChannel server = null; - AsynchronousChannelGroup channelGroup; - AsynchronousSocketChannel data = null; + //AsynchronousServerSocketChannel server = null; + //AsynchronousChannelGroup channelGroup; + //AsynchronousSocketChannel data = null; + ServerSocketChannel server = null; + ChannelGroup channelGroup; + SocketChannel data = null; Executor exc; boolean shouldStop = false; - public synchronized void startServer(AsynchronousChannelGroup group, Executor exc, int port) throws IOException { + //public synchronized void startServer(AsynchronousChannelGroup group, Executor exc, int port) throws IOException { + public synchronized void startServer(ChannelGroup group, Executor exc, int port) throws IOException { this.exc = exc; if( server == null ) { if( DEBUG ) log.info("AsynchTCPServer attempt local bind port "+port); channelGroup = group; - server = AsynchronousServerSocketChannel.open(channelGroup); + //server = AsynchronousServerSocketChannel.open(channelGroup); + server = ServerSocketChannel.open();//channelGroup); server.bind(new InetSocketAddress(port)); exc.execute(this); } } - public synchronized void startServer(AsynchronousChannelGroup group, Executor exc, InetSocketAddress binder) throws IOException { + //public synchronized void startServer(AsynchronousChannelGroup group, Executor exc, InetSocketAddress binder) throws IOException { + public synchronized void startServer(ChannelGroup group, Executor exc, InetSocketAddress binder) throws IOException { if( server == null ) { if( DEBUG ) log.info("AsynchTCPServer attempt bind "+binder); channelGroup = group; - server = AsynchronousServerSocketChannel.open(channelGroup); + server = ServerSocketChannel.open();//channelGroup); server.bind(binder); exc.execute(this); } diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java index 9fb8a1e..652b740 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java @@ -52,13 +52,24 @@ public void run() { final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire(); buf.clear(); final CountDownLatch cdl = new CountDownLatch(1); + int res = ctx.read(buf); + buf.flip(); + Object reso = Utility.deserialize(buf); + if( DEBUG ) + log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" result:"+res+" Object:"+reso); + /* ctx.read(buf, new CompletionHandler() { @Override public void completed(Integer arg0, Void arg1) { buf.flip(); - if( DEBUG ) - log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" command received:"+buf+" Result:"+arg0+","+arg1); Object res = Utility.deserialize(buf); + //if( res == null ) { + // cdl.countDown(); + // return; + //} + + if( DEBUG ) + log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" Object:"+res+" Result:"+arg0+","+arg1); try { ctx.pipeline().fireChannelRead(res); } catch (Exception e) { @@ -87,7 +98,8 @@ public void failed(Throwable arg0, Void arg1) { cdl.countDown(); } }); - cdl.await(); // readpendingexception if we overlap operations + */ + //cdl.await(); // readpendingexception if we overlap operations } // shouldRun } catch(Exception se) { diff --git a/src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java b/src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java new file mode 100644 index 0000000..1102568 --- /dev/null +++ b/src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java @@ -0,0 +1,5 @@ +package org.ros.internal.transport.tcp; + +public interface ChannelGroup { + public void shutdown(); +} diff --git a/src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java b/src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java new file mode 100644 index 0000000..133cfcc --- /dev/null +++ b/src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java @@ -0,0 +1,17 @@ +package org.ros.internal.transport.tcp; + +import java.util.concurrent.ExecutorService; +/** + * Manipulate groups of channels al la AsynchronousChannelGroup + * @author jg + * + */ +public class ChannelGroupImpl implements ChannelGroup { + private ExecutorService executorService; + public ChannelGroupImpl(ExecutorService executorService) { + this.executorService = executorService; + } + public void shutdown() { + executorService.shutdown(); + } +} diff --git a/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java index d3e7bac..e6c25ef 100644 --- a/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java @@ -1,6 +1,6 @@ package org.ros.internal.transport.tcp; -import java.nio.channels.AsynchronousChannelGroup; +//import java.nio.channels.AsynchronousChannelGroup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -16,7 +16,7 @@ public class ConnectionTrackingChannelPipelineFactory extends ChannelInitializer private final ConnectionTrackingHandler connectionTrackingHandler; - public ConnectionTrackingChannelPipelineFactory(AsynchronousChannelGroup channelGroup){ + public ConnectionTrackingChannelPipelineFactory(/*Asynchronous*/ChannelGroup channelGroup){ this.connectionTrackingHandler = new ConnectionTrackingHandler(); if(DEBUG) log.info("ConnectionTrackingChannelPipelineFactory ctor"+channelGroup); diff --git a/src/main/java/org/ros/internal/transport/tcp/NamedChannelHandler.java b/src/main/java/org/ros/internal/transport/tcp/NamedChannelHandler.java index 9b61ef8..22640ce 100644 --- a/src/main/java/org/ros/internal/transport/tcp/NamedChannelHandler.java +++ b/src/main/java/org/ros/internal/transport/tcp/NamedChannelHandler.java @@ -2,12 +2,8 @@ import org.ros.internal.transport.ChannelHandler; - -//import org.jboss.netty.channel.ChannelDownstreamHandler; -//import org.jboss.netty.channel.ChannelHandler; -//import org.jboss.netty.channel.ChannelUpstreamHandler; - /** + * Adds a name property to a channelhandler contract * @author jg */ public interface NamedChannelHandler extends ChannelHandler { diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java index 3ec5773..4c2b08b 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java @@ -2,6 +2,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.ros.exception.RosRuntimeException; import org.ros.internal.transport.ChannelHandlerContext; import org.ros.internal.transport.ChannelHandlerContextImpl; @@ -12,6 +13,7 @@ import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channel; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; @@ -36,12 +38,12 @@ public class TcpClient { private final List namedChannelHandlers; private Executor executor; - private AsynchronousSocketChannel channel; - private AsynchronousChannelGroup channelGroup; + private /*Asynchronous*/SocketChannel channel; + private /*Asynchronous*/ChannelGroup channelGroup; private ChannelInitializerFactoryStack factoryStack; // Stack of ChannelInitializer factories to load ChannelHandlers - public TcpClient( Executor executor, AsynchronousChannelGroup channelGroup, List namedChannelHandlers) { + public TcpClient( Executor executor, /*Asynchronous*/ChannelGroup channelGroup, List namedChannelHandlers) { this.executor = executor; this.channelGroup = channelGroup; this.namedChannelHandlers = namedChannelHandlers; @@ -68,10 +70,10 @@ public void addAllNamedChannelHandlers(List namedChannelHan public ChannelHandlerContext getContext() { return ctx; } public Channel connect(String connectionName, SocketAddress socketAddress) throws Exception { - channel = AsynchronousSocketChannel.open(channelGroup); - ((AsynchronousSocketChannel)channel).setOption(StandardSocketOptions.SO_RCVBUF, 4096000); - ((AsynchronousSocketChannel)channel).setOption(StandardSocketOptions.SO_SNDBUF, 4096000); - ((AsynchronousSocketChannel)channel).setOption(StandardSocketOptions.TCP_NODELAY, true); + channel = /*Asynchronous*/SocketChannel.open(/*channelGroup*/); + ((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_RCVBUF, 4096000); + ((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_SNDBUF, 4096000); + ((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.TCP_NODELAY, true); ctx = new ChannelHandlerContextImpl(channelGroup, channel, executor); TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(ctx.getChannelGroup(), namedChannelHandlers); // add handler pipeline factory to stack @@ -99,9 +101,13 @@ public Channel connect(String connectionName, SocketAddress socketAddress) throw return channel; } - public Future write(ByteBuffer buffer) { + public /*Future*/ int write(ByteBuffer buffer) { assert(channel != null); assert(buffer != null); - return channel.write(buffer); + try { + return channel.write(buffer); + } catch (IOException e) { + throw new RosRuntimeException(e); + } } } \ No newline at end of file diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java index a9ac80e..89cca25 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java @@ -6,7 +6,7 @@ import java.io.IOException; import java.net.SocketAddress; -import java.nio.channels.AsynchronousChannelGroup; +//import java.nio.channels.AsynchronousChannelGroup; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -30,14 +30,14 @@ public class TcpClientManager { public static boolean DEBUG = true; private static final Log log = LogFactory.getLog(TcpClientManager.class); - private final AsynchronousChannelGroup channelGroup; + private final /*Asynchronous*/ChannelGroup channelGroup; private final Collection tcpClients; private final List namedChannelHandlers; private final Executor executor; public TcpClientManager(ExecutorService executor) throws IOException { this.executor = executor; - this.channelGroup = AsynchronousChannelGroup.withThreadPool(executor); + this.channelGroup = new ChannelGroupImpl(executor);/*AsynchronousChannelGroup.withThreadPool(executor);*/ this.tcpClients = new ArrayList(); this.namedChannelHandlers = new ArrayList(); diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java index 1d13792..7687045 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java @@ -18,11 +18,11 @@ public class TcpClientPipelineFactory extends ConnectionTrackingChannelPipelineF public static final String LENGTH_FIELD_PREPENDER = "LengthFieldPrepender"; private List namedChannelHandlers; - public TcpClientPipelineFactory(AsynchronousChannelGroup channelGroup, List namedChannelHandlers) { - super(channelGroup); + public TcpClientPipelineFactory(/*Asynchronous*/ChannelGroup asynchronousChannelGroup, List namedChannelHandlers) { + super(asynchronousChannelGroup); this.namedChannelHandlers = namedChannelHandlers; if( DEBUG ) - log.info("TcpClientPipelineFactory:"+channelGroup); + log.info("TcpClientPipelineFactory:"+asynchronousChannelGroup); } @Override diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java b/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java index aff0ada..94ac3ac 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java @@ -57,8 +57,10 @@ public class TcpRosServer implements Serializable { private transient ServiceManager serviceManager; private transient ScheduledExecutorService executorService; - private transient AsynchronousChannelGroup outgoingChannelGroup; // publisher with connected subscribers - private transient AsynchronousChannelGroup incomingChannelGroup; // subscriber connected to publishers + //private transient AsynchronousChannelGroup outgoingChannelGroup; // publisher with connected subscribers + //private transient AsynchronousChannelGroup incomingChannelGroup; // subscriber connected to publishers + private transient ChannelGroup outgoingChannelGroup; // publisher with connected subscribers + private transient ChannelGroup incomingChannelGroup; // subscriber connected to publishers private transient TcpServerPipelineFactory serverPipelineFactory; private transient ChannelInitializerFactoryStack factoryStack; // Stack of ChannelInitializer factories to load ChannelHandlers private transient List contexts; @@ -79,7 +81,7 @@ public TcpRosServer(BindAddress bindAddress, AdvertiseAddress advertiseAddress, public void start() { //assert(outgoingChannel == null); try { - incomingChannelGroup = AsynchronousChannelGroup.withThreadPool(executorService); + incomingChannelGroup = new ChannelGroupImpl(executorService);//AsynchronousChannelGroup.withThreadPool(executorService); advertiseAddress.setPort(bindAddress.toInetSocketAddress().getPort()); factoryStack = new ChannelInitializerFactoryStack(); serverPipelineFactory = @@ -154,7 +156,7 @@ public List getSubscribers() { public ExecutorService getExecutor() { return executorService; } - public AsynchronousChannelGroup getChannelGroup() { return incomingChannelGroup; } + public /*Asynchronous*/ChannelGroup getChannelGroup() { return incomingChannelGroup; } } diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java b/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java index 09c2093..ba21617 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java @@ -3,22 +3,19 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.ros.exception.RosRuntimeException; + import org.ros.internal.node.server.NodeIdentifier; import org.ros.internal.node.service.DefaultServiceServer; import org.ros.internal.node.service.ServiceManager; -import org.ros.internal.node.service.ServiceResponseEncoder; import org.ros.internal.node.topic.DefaultPublisher; import org.ros.internal.node.topic.SubscriberIdentifier; import org.ros.internal.node.topic.TopicIdentifier; import org.ros.internal.node.topic.TopicParticipantManager; import org.ros.internal.transport.ChannelHandler; import org.ros.internal.transport.ChannelHandlerContext; -import org.ros.internal.transport.ChannelPipeline; import org.ros.internal.transport.ConnectionHeader; import org.ros.internal.transport.ConnectionHeaderFields; import org.ros.namespace.GraphName; @@ -27,7 +24,6 @@ * A {@link ChannelHandler} which will process the TCP server handshake. * * @author jg - */ public class TcpServerHandshakeHandler implements ChannelHandler { private static final boolean DEBUG = true ; diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java index 4a533fd..bd41b95 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java @@ -27,11 +27,11 @@ public class TcpServerPipelineFactory extends ConnectionTrackingChannelPipelineF private final TopicParticipantManager topicParticipantManager; private final ServiceManager serviceManager; - public TcpServerPipelineFactory(AsynchronousChannelGroup channelGroup, + public TcpServerPipelineFactory(ChannelGroup incomingChannelGroup, TopicParticipantManager topicParticipantManager, ServiceManager serviceManager) { - super(channelGroup); + super(incomingChannelGroup); if( DEBUG ) - log.info("TcpServerPipeLineFactory ctor:"+channelGroup+" "+topicParticipantManager+" "+serviceManager); + log.info("TcpServerPipeLineFactory ctor:"+incomingChannelGroup+" "+topicParticipantManager+" "+serviceManager); this.topicParticipantManager = topicParticipantManager; this.serviceManager = serviceManager; } diff --git a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java index a67c0e9..8ceb309 100644 --- a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java +++ b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java @@ -28,6 +28,8 @@ import org.ros.internal.node.topic.TopicParticipantManager; import org.ros.internal.transport.queue.IncomingMessageQueue; import org.ros.internal.transport.queue.OutgoingMessageQueue; +import org.ros.internal.transport.tcp.ChannelGroup; +import org.ros.internal.transport.tcp.ChannelGroupImpl; import org.ros.internal.transport.tcp.ChannelInitializerFactoryStack; import org.ros.internal.transport.tcp.NamedChannelHandler; import org.ros.internal.transport.tcp.TcpClient; @@ -43,6 +45,8 @@ import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -175,13 +179,8 @@ private ChannelHandlerContext buildServerChannel() { isock = new InetSocketAddress(0); TopicParticipantManager topicParticipantManager = new TopicParticipantManager(); ServiceManager serviceManager = new ServiceManager(); - AsynchronousChannelGroup incomingChannelGroup = null; - try { - incomingChannelGroup = AsynchronousChannelGroup.withThreadPool(executorService); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + /*Asynchronous*/ChannelGroup incomingChannelGroup = null; + incomingChannelGroup = new ChannelGroupImpl(executorService);/* AsynchronousChannelGroup.withThreadPool(executorService);*/ ChannelInitializerFactoryStack factoryStack = new ChannelInitializerFactoryStack(); TcpServerPipelineFactory serverPipelineFactory = @@ -193,9 +192,9 @@ protected void initChannel(ChannelHandlerContext ch) { } }; factoryStack.addLast(serverPipelineFactory); - AsynchronousServerSocketChannel listener = null; + /*AsynchronousServer*/ServerSocketChannel listener = null; try { - listener = AsynchronousServerSocketChannel.open(incomingChannelGroup); + listener = /*Asynchronous*/ServerSocketChannel.open(/*incomingChannelGroup*/); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); @@ -207,17 +206,18 @@ protected void initChannel(ChannelHandlerContext ch) { e.printStackTrace(); } - Future channel = listener.accept(); + /*Future