From 9ced9571cdb28a0c4bbcdab893b10b5659bc1f0c Mon Sep 17 00:00:00 2001 From: NeoCoreTechs - New Core Technologies <groffj@neocoretechs.com> Date: Sat, 9 Dec 2017 12:01:54 -0800 Subject: [PATCH] Tightened up message reception fault handling, decreased incoming circular queue size. Cleaned up ChannelHandlerContext docs. --- .../ros/concurrent/CircularBlockingDeque.java | 2 + .../node/topic/DefaultSubscriber.java | 9 +- .../transport/ChannelHandlerContext.java | 97 +++++++------------ .../transport/ChannelHandlerContextImpl.java | 10 +- .../transport/queue/IncomingMessageQueue.java | 9 +- .../transport/queue/MessageDispatcher.java | 22 ++--- .../transport/queue/MessageReceiver.java | 29 ++---- .../transport/queue/OutgoingMessageQueue.java | 6 +- .../transport/tcp/AsynchTCPWorker.java | 5 +- 9 files changed, 75 insertions(+), 114 deletions(-) diff --git a/src/main/java/org/ros/concurrent/CircularBlockingDeque.java b/src/main/java/org/ros/concurrent/CircularBlockingDeque.java index 20a8f26..cfdf65f 100644 --- a/src/main/java/org/ros/concurrent/CircularBlockingDeque.java +++ b/src/main/java/org/ros/concurrent/CircularBlockingDeque.java @@ -183,6 +183,8 @@ public T peekLast() { public boolean isEmpty() { return length == 0; } + + public int length() { return length; } /** * Returns an iterator over the queue. diff --git a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index f508a21..c54a750 100644 --- a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -2,9 +2,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.ros.concurrent.ListenerGroup; import org.ros.concurrent.SignalRunnable; -import org.ros.exception.RosRuntimeException; import org.ros.internal.node.server.NodeIdentifier; import org.ros.internal.transport.ProtocolNames; import org.ros.internal.transport.queue.IncomingMessageQueue; @@ -128,12 +128,7 @@ public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddr if (knownPublishers.contains(publisherIdentifier)) { return; } - try { - tcpClientManager.connect(toString(), address); - } catch (IOException e) { - log.error("Failure attempting to add publisher "+toString()+" "+address); - throw new RosRuntimeException(e); - } + tcpClientManager.connect(toString(), address); // TODO(damonkohler): knownPublishers is duplicate information that is // already available to the TopicParticipantManager. knownPublishers.add(publisherIdentifier); diff --git a/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java b/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java index ff5e126..ceab23a 100644 --- a/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java +++ b/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java @@ -9,13 +9,18 @@ import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import org.ros.internal.transport.tcp.ChannelGroup; +/** + * This is the ChannelHandlerContext that links the underlying TCP Socket 'channel' to the {@link ChannelPipeline} and the {@link ChannelGroup} + * and provides access to the {@link Executor} to spin up event dispatcher. + * @author jg (C) NeoCoreTechs 2017 + * + */ public interface ChannelHandlerContext { - /** - * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}. + /** + * Return the {@link Socket} which is bound to the {@link ChannelHandlerContext}. */ Socket channel(); @@ -33,99 +38,59 @@ public interface ChannelHandlerContext { String name(); /** - * Request to bind to the given {@link SocketAddress} and notify the {@link Future} once the operation - * completes, either because the operation was successful or because of an error. + * Request to bind to the given {@link SocketAddress} * <p> - * This will result in having the - * {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, CompletionHandler)} method - * called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. + * This will result in having the socket bound to the local address. */ void bind(SocketAddress localAddress) throws IOException; /** - * Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation - * completes, either because the operation was successful or because of an error. - * <p> - * If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with - * a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException} - * will be used. + * Request to connect to the given {@link SocketAddress}. * <p> - * This will result in having the - * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)} - * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. + * If the connection fails because of a connection timeout, the exception will be thrown + * This will result in having the socket connected and the input and output streams initialized. * @throws IOException */ void connect(SocketAddress remoteAddress) throws IOException; /** - * Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the - * {@link Future} once the operation completes, either because the operation was successful or because of - * an error. - * <p> - * This will result in having the - * {@link ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)} - * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. + * Request to connect to the given remote {@link SocketAddress} while bind to the localAddress. + * This will result in having the socket bound and streams ready. * @throws IOException */ void connect(SocketAddress remoteAddress, SocketAddress localAddress) throws IOException; - - /** - * Request to disconnect from the remote peer and notify the {@link Future} once the operation completes, - * either because the operation was successful or because of an error. - * <p> - * This will result in having the - * {@link ChannelHandler#disconnect(ChannelHandlerContext, CompletionHandler)} - * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. + * Request to disconnect from the remote peer. + * This will result in having the Socket closed. * @throws IOException */ void disconnect() throws IOException; /** - * Request to close the {@link Channel} and notify the {@link Future} once the operation completes, - * either because the operation was successful or because of - * an error. - * + * Request to close the {@link Channel}. * After it is closed it is not possible to reuse it again. - * <p> - * This will result in having the - * {@link ChannelHandler#close(ChannelHandlerContext, CompletionHandler)} - * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. + * This will result in having the {@link Socket} closed. * @throws IOException */ void close() throws IOException; - /** - * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an - * {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was - * read, and triggers a - * {@link ChannelHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the - * handler can decide to continue reading. If there's a pending read operation already, this method does nothing. - * <p> - * This will result in having the - * {@link ChannelHandler#read(ChannelHandlerContext)} - * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. - * @throws IOException + * Request to Read data from the {@link InputStream} into the first inbound buffer. + * It is up to the client, i.e. {@link AsynchTCPWorker}, to trigger a read event if data was + * read, and it does this through the pipeline {@link ChannelPipeline#fireChannelRead(Object)} and + * triggers an event through the pipeline via the {@link ChannelPipeline#fireChannelReadComplete()} + * if successful. If there's a pending read operation already, this method does nothing. + * @throws IOException Generates {@link ChannelPipeline#fireExceptionCaught(Throwable)} */ Object read() throws IOException; /** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. - * This method will not request to actual flush, so be sure to call {@link #flush()} - * once you want to request to flush all pending data to the actual transport. * @throws IOException */ void write(Object msg) throws IOException; - /** * Return the assigned {@link ChannelPipeline} */ @@ -164,10 +129,20 @@ public interface ChannelHandlerContext { */ Set<String> getMessageTypes(); + /** + * Write with the named {@link CompletionHandler} + * @param msg + * @param handler + */ void write(Object msg, CompletionHandler<Integer, Void> handler); + /** + * Request to Read data from the {@link InputStream} into the first inbound buffer. + * <p> + * This will result in having the Socket read and {@link CompletionHandler#completed(Object, Object)} + * On IOException {@link CompletionHandler#failed(Throwable, Object)} + */ Object read(CompletionHandler<Integer, Void> handler); - } diff --git a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java index be865eb..8ce8693 100644 --- a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java +++ b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java @@ -39,8 +39,9 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext { boolean ready = false; Object mutex = new Object(); Set<String> outboundMessageTypes; - - + InputStream is = null; + OutputStream os = null; + public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/Socket channel2, Executor exc) { channelGroup = channelGroup2; channel = channel2; @@ -73,6 +74,9 @@ public String name() { @Override public void connect(SocketAddress remoteAddress) throws IOException { channel.connect(remoteAddress); + is = channel.getInputStream(); + os = channel.getOutputStream(); + } @Override @@ -95,7 +99,6 @@ public void close() throws IOException { @Override public Object read() throws IOException { - InputStream is = channel.getInputStream(); ObjectInputStream ois = new ObjectInputStream(is); try { return ois.readObject(); @@ -106,7 +109,6 @@ public Object read() throws IOException { @Override public void write(Object msg) throws IOException { - OutputStream os = channel.getOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(os); oos.writeObject(msg); oos.flush(); diff --git a/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java b/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java index 7e08cef..51d0eb7 100644 --- a/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java +++ b/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java @@ -10,6 +10,7 @@ /** * Created in default subscriber to handle the incoming messages. * Creates a MessageReceiver and a MessageDispatcher. + * The MessageDispatcher is spun up by the ExecutorService. * @author jg */ public class IncomingMessageQueue<T> { @@ -18,17 +19,17 @@ public class IncomingMessageQueue<T> { * The maximum number of incoming messages that will be queued. * <p> * This limit applies to dispatching {@link LazyMessage}s as they arrive over - * the network. It is independent of {@link MessageDispatcher} queue - * capacities specified by + * the network. It is independent of {@link MessageDispatcher} queue capacities specified by * {@link IncomingMessageQueue#addListener(MessageListener, int)} which are * consumed by user provided {@link MessageListener}s. + * @author Groff (C) NeoCoreTechs 2017 */ - private static final int DEQUE_CAPACITY = 8192; + private static final int DEQUE_CAPACITY = 256; private final MessageReceiver<T> messageReceiver; private final MessageDispatcher<T> messageDispatcher; - public IncomingMessageQueue( ExecutorService executorService) { + public IncomingMessageQueue(ExecutorService executorService) { CircularBlockingDeque<T> lazyMessages = new CircularBlockingDeque<T>(DEQUE_CAPACITY); messageReceiver = new MessageReceiver<T>(lazyMessages); diff --git a/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java b/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java index e408a30..88a61ce 100644 --- a/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java +++ b/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java @@ -12,10 +12,11 @@ import java.util.concurrent.ExecutorService; /** - * @author jg - * - * @param <T> - * the message type + * The IncomingMessageQueue creates this and spins it up via the ExecutorService. + * It shares the CircularBlockingDeque with the MessageReceiver. + * It services the MessageListeners with received messages from the queue. + * @param <T> the message type + * @author jg (C) NeoCoreTechs 2017 */ public class MessageDispatcher<T> extends CancellableLoop { @@ -34,8 +35,7 @@ public class MessageDispatcher<T> extends CancellableLoop { private boolean latchMode; private T latchedMessage; - public MessageDispatcher(CircularBlockingDeque<T> lazyMessages, - ExecutorService executorService) { + public MessageDispatcher(CircularBlockingDeque<T> lazyMessages, ExecutorService executorService) { this.lazyMessages = lazyMessages; messageListeners = new ListenerGroup<MessageListener<T>>(executorService); mutex = new Object(); @@ -55,8 +55,7 @@ public void addListener(MessageListener<T> messageListener, int limit) { log.info("Adding listener."); } synchronized (mutex) { - EventDispatcher<MessageListener<T>> eventDispatcher = - messageListeners.add(messageListener, limit); + EventDispatcher<MessageListener<T>> eventDispatcher = messageListeners.add(messageListener, limit); if (latchMode && latchedMessage != null) { eventDispatcher.signal(newSignalRunnable(latchedMessage)); } @@ -67,8 +66,7 @@ public void addListener(MessageListener<T> messageListener, int limit) { * Returns a newly allocated {@link SignalRunnable} for the specified * {@link LazyMessage}. * - * @param lazyMessage - * the {@link LazyMessage} to signal {@link MessageListener}s with + * @param lazyMessage the {@link LazyMessage} to signal {@link MessageListener}s with * @return the newly allocated {@link SignalRunnable} */ private SignalRunnable<MessageListener<T>> newSignalRunnable(final T lazyMessage) { @@ -81,9 +79,7 @@ public void run(MessageListener<T> messageListener) { } /** - * @param enabled - * {@code true} if latch mode should be enabled, {@code false} - * otherwise + * @param enabled {@code true} if latch mode should be enabled, {@code false} otherwise */ public void setLatchMode(boolean enabled) { latchMode = enabled; diff --git a/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java b/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java index 534fc4a..fa259cf 100644 --- a/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java +++ b/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java @@ -10,12 +10,11 @@ /** - * A circular blocking deque type of ChannelHandler that takes channelRead events and queues them. + * A type of ChannelHandler that takes channelRead events and queues them. + * Contains the circular blocking deque shared by MessageDispatcher and managed by IncomingMessageQueue. * It is placed in the stack after handshake to be activated on read events. - * @author jg - * - * @param <T> - * the message type + * @author jg (C) NeoCoretechs 2017 + * @param <T> the message type */ public class MessageReceiver<T> extends AbstractNamedChannelHandler { @@ -24,10 +23,8 @@ public class MessageReceiver<T> extends AbstractNamedChannelHandler { private final CircularBlockingDeque<T> lazyMessages; - public MessageReceiver(CircularBlockingDeque<T> lazyMessages) { this.lazyMessages = lazyMessages; - } @Override @@ -46,10 +43,8 @@ public Object channelRead(ChannelHandlerContext ctx, Object msg) throws Exceptio @Override -public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1) - throws Exception { - log.error(arg1); - +public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1) throws Exception { + log.error(arg1); } @@ -57,8 +52,7 @@ public void exceptionCaught(ChannelHandlerContext arg0, Throwable arg1) public void handlerAdded(ChannelHandlerContext arg0) throws Exception { if( DEBUG ) { log.info(this+" handler added:"+arg0); - } - + } } @Override @@ -66,7 +60,6 @@ public void handlerRemoved(ChannelHandlerContext arg0) throws Exception { if( DEBUG ) { log.info("MessageReceiver handler removed:"+arg0); } - } @Override @@ -74,7 +67,6 @@ public void channelActive(ChannelHandlerContext arg0) throws Exception { if( DEBUG ) { log.info("MessageReceiver channel active:"+arg0); } - } @Override @@ -82,7 +74,6 @@ public void channelInactive(ChannelHandlerContext arg0) throws Exception { if( DEBUG ) { log.info("MessageReceiver channel inactive:"+arg0); } - } @Override @@ -90,18 +81,14 @@ public void channelReadComplete(ChannelHandlerContext arg0) throws Exception { if( DEBUG ) { log.info("MessageReceiver read complete:"+arg0); } - } @Override -public void userEventTriggered(ChannelHandlerContext arg0, Object arg1) - throws Exception { +public void userEventTriggered(ChannelHandlerContext arg0, Object arg1) throws Exception { if( DEBUG ) { log.debug("MessageReceiver user event triggered:"+arg0); } - } - } \ No newline at end of file diff --git a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java index 45d001c..440bf72 100644 --- a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java +++ b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java @@ -137,8 +137,7 @@ public boolean getLatchMode() { } /** - * @param message - * the message to add to the queue + * @param message the message to add to the queue */ public void add(T message) { deque.addLast(message); @@ -159,7 +158,8 @@ public void shutdown() { } - private void writeLatchedMessage() { +@SuppressWarnings("unused") +private void writeLatchedMessage() { synchronized (mutex) { latchedBuffer.clear(); Utility.serialize(latchedMessage, latchedBuffer); diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java index 230d656..f451691 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java @@ -4,7 +4,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.ros.internal.transport.ChannelHandlerContext; /** @@ -46,9 +45,13 @@ public void run() { } ctx.pipeline().fireExceptionCaught(e); } + ctx.pipeline().fireChannelReadComplete(); } // shouldRun } catch(Exception se) { log.error("AsynchTCPWorker terminating due to ",se); + try { + ctx.pipeline().fireExceptionCaught(se); + } catch (Exception e) {} } finally { try { if( DEBUG )