Skip to content

Commit

Permalink
Tightened up message reception fault handling, decreased incoming cir…
Browse files Browse the repository at this point in the history
…cular queue size. Cleaned up ChannelHandlerContext docs.
  • Loading branch information
neocoretechs committed Dec 9, 2017
1 parent d6ddeda commit 9ced957
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 114 deletions.
2 changes: 2 additions & 0 deletions src/main/java/org/ros/concurrent/CircularBlockingDeque.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public T peekLast() {
public boolean isEmpty() {
return length == 0;
}

public int length() { return length; }

/**
* Returns an iterator over the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.ros.concurrent.ListenerGroup;
import org.ros.concurrent.SignalRunnable;
import org.ros.exception.RosRuntimeException;
import org.ros.internal.node.server.NodeIdentifier;
import org.ros.internal.transport.ProtocolNames;
import org.ros.internal.transport.queue.IncomingMessageQueue;
Expand Down Expand Up @@ -128,12 +128,7 @@ public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddr
if (knownPublishers.contains(publisherIdentifier)) {
return;
}
try {
tcpClientManager.connect(toString(), address);
} catch (IOException e) {
log.error("Failure attempting to add publisher "+toString()+" "+address);
throw new RosRuntimeException(e);
}
tcpClientManager.connect(toString(), address);
// TODO(damonkohler): knownPublishers is duplicate information that is
// already available to the TopicParticipantManager.
knownPublishers.add(publisherIdentifier);
Expand Down
97 changes: 36 additions & 61 deletions src/main/java/org/ros/internal/transport/ChannelHandlerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import org.ros.internal.transport.tcp.ChannelGroup;

/**
* This is the ChannelHandlerContext that links the underlying TCP Socket 'channel' to the {@link ChannelPipeline} and the {@link ChannelGroup}
* and provides access to the {@link Executor} to spin up event dispatcher.
* @author jg (C) NeoCoreTechs 2017
*
*/
public interface ChannelHandlerContext {
/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
/**
* Return the {@link Socket} which is bound to the {@link ChannelHandlerContext}.
*/
Socket channel();

Expand All @@ -33,99 +38,59 @@ public interface ChannelHandlerContext {
String name();

/**
* Request to bind to the given {@link SocketAddress} and notify the {@link Future} once the operation
* completes, either because the operation was successful or because of an error.
* Request to bind to the given {@link SocketAddress}
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, CompletionHandler)} method
* called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* This will result in having the socket bound to the local address.
*/
void bind(SocketAddress localAddress) throws IOException;

/**
* Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
* <p>
* If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with
* a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}
* will be used.
* Request to connect to the given {@link SocketAddress}.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* If the connection fails because of a connection timeout, the exception will be thrown
* This will result in having the socket connected and the input and output streams initialized.
* @throws IOException
*/
void connect(SocketAddress remoteAddress) throws IOException;

/**
* Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
* {@link Future} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
* This will result in having the
* {@link ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* Request to connect to the given remote {@link SocketAddress} while bind to the localAddress.
* This will result in having the socket bound and streams ready.
* @throws IOException
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress) throws IOException;



/**
* Request to disconnect from the remote peer and notify the {@link Future} once the operation completes,
* either because the operation was successful or because of an error.
* <p>
* This will result in having the
* {@link ChannelHandler#disconnect(ChannelHandlerContext, CompletionHandler)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* Request to disconnect from the remote peer.
* This will result in having the Socket closed.
* @throws IOException
*/
void disconnect() throws IOException;

/**
* Request to close the {@link Channel} and notify the {@link Future} once the operation completes,
* either because the operation was successful or because of
* an error.
*
* Request to close the {@link Channel}.
* After it is closed it is not possible to reuse it again.
* <p>
* This will result in having the
* {@link ChannelHandler#close(ChannelHandlerContext, CompletionHandler)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* This will result in having the {@link Socket} closed.
* @throws IOException
*/
void close() throws IOException;


/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was
* read, and triggers a
* {@link ChannelHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
* <p>
* This will result in having the
* {@link ChannelHandler#read(ChannelHandlerContext)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* @throws IOException
* Request to Read data from the {@link InputStream} into the first inbound buffer.
* It is up to the client, i.e. {@link AsynchTCPWorker}, to trigger a read event if data was
* read, and it does this through the pipeline {@link ChannelPipeline#fireChannelRead(Object)} and
* triggers an event through the pipeline via the {@link ChannelPipeline#fireChannelReadComplete()}
* if successful. If there's a pending read operation already, this method does nothing.
* @throws IOException Generates {@link ChannelPipeline#fireExceptionCaught(Throwable)}
*/
Object read() throws IOException;

/**
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
* @throws IOException
*/
void write(Object msg) throws IOException;


/**
* Return the assigned {@link ChannelPipeline}
*/
Expand Down Expand Up @@ -164,10 +129,20 @@ public interface ChannelHandlerContext {
*/
Set<String> getMessageTypes();

/**
* Write with the named {@link CompletionHandler}
* @param msg
* @param handler
*/
void write(Object msg, CompletionHandler<Integer, Void> handler);

/**
* Request to Read data from the {@link InputStream} into the first inbound buffer.
* <p>
* This will result in having the Socket read and {@link CompletionHandler#completed(Object, Object)}
* On IOException {@link CompletionHandler#failed(Throwable, Object)}
*/
Object read(CompletionHandler<Integer, Void> handler);



}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
boolean ready = false;
Object mutex = new Object();
Set<String> outboundMessageTypes;


InputStream is = null;
OutputStream os = null;

public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/Socket channel2, Executor exc) {
channelGroup = channelGroup2;
channel = channel2;
Expand Down Expand Up @@ -73,6 +74,9 @@ public String name() {
@Override
public void connect(SocketAddress remoteAddress) throws IOException {
channel.connect(remoteAddress);
is = channel.getInputStream();
os = channel.getOutputStream();

}

@Override
Expand All @@ -95,7 +99,6 @@ public void close() throws IOException {

@Override
public Object read() throws IOException {
InputStream is = channel.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
try {
return ois.readObject();
Expand All @@ -106,7 +109,6 @@ public Object read() throws IOException {

@Override
public void write(Object msg) throws IOException {
OutputStream os = channel.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(msg);
oos.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
/**
* Created in default subscriber to handle the incoming messages.
* Creates a MessageReceiver and a MessageDispatcher.
* The MessageDispatcher is spun up by the ExecutorService.
* @author jg
*/
public class IncomingMessageQueue<T> {
Expand All @@ -18,17 +19,17 @@ public class IncomingMessageQueue<T> {
* The maximum number of incoming messages that will be queued.
* <p>
* This limit applies to dispatching {@link LazyMessage}s as they arrive over
* the network. It is independent of {@link MessageDispatcher} queue
* capacities specified by
* the network. It is independent of {@link MessageDispatcher} queue capacities specified by
* {@link IncomingMessageQueue#addListener(MessageListener, int)} which are
* consumed by user provided {@link MessageListener}s.
* @author Groff (C) NeoCoreTechs 2017
*/
private static final int DEQUE_CAPACITY = 8192;
private static final int DEQUE_CAPACITY = 256;

private final MessageReceiver<T> messageReceiver;
private final MessageDispatcher<T> messageDispatcher;

public IncomingMessageQueue( ExecutorService executorService) {
public IncomingMessageQueue(ExecutorService executorService) {
CircularBlockingDeque<T> lazyMessages =
new CircularBlockingDeque<T>(DEQUE_CAPACITY);
messageReceiver = new MessageReceiver<T>(lazyMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import java.util.concurrent.ExecutorService;

/**
* @author jg
*
* @param <T>
* the message type
* The IncomingMessageQueue creates this and spins it up via the ExecutorService.
* It shares the CircularBlockingDeque with the MessageReceiver.
* It services the MessageListeners with received messages from the queue.
* @param <T> the message type
* @author jg (C) NeoCoreTechs 2017
*/
public class MessageDispatcher<T> extends CancellableLoop {

Expand All @@ -34,8 +35,7 @@ public class MessageDispatcher<T> extends CancellableLoop {
private boolean latchMode;
private T latchedMessage;

public MessageDispatcher(CircularBlockingDeque<T> lazyMessages,
ExecutorService executorService) {
public MessageDispatcher(CircularBlockingDeque<T> lazyMessages, ExecutorService executorService) {
this.lazyMessages = lazyMessages;
messageListeners = new ListenerGroup<MessageListener<T>>(executorService);
mutex = new Object();
Expand All @@ -55,8 +55,7 @@ public void addListener(MessageListener<T> messageListener, int limit) {
log.info("Adding listener.");
}
synchronized (mutex) {
EventDispatcher<MessageListener<T>> eventDispatcher =
messageListeners.add(messageListener, limit);
EventDispatcher<MessageListener<T>> eventDispatcher = messageListeners.add(messageListener, limit);
if (latchMode && latchedMessage != null) {
eventDispatcher.signal(newSignalRunnable(latchedMessage));
}
Expand All @@ -67,8 +66,7 @@ public void addListener(MessageListener<T> messageListener, int limit) {
* Returns a newly allocated {@link SignalRunnable} for the specified
* {@link LazyMessage}.
*
* @param lazyMessage
* the {@link LazyMessage} to signal {@link MessageListener}s with
* @param lazyMessage the {@link LazyMessage} to signal {@link MessageListener}s with
* @return the newly allocated {@link SignalRunnable}
*/
private SignalRunnable<MessageListener<T>> newSignalRunnable(final T lazyMessage) {
Expand All @@ -81,9 +79,7 @@ public void run(MessageListener<T> messageListener) {
}

/**
* @param enabled
* {@code true} if latch mode should be enabled, {@code false}
* otherwise
* @param enabled {@code true} if latch mode should be enabled, {@code false} otherwise
*/
public void setLatchMode(boolean enabled) {
latchMode = enabled;
Expand Down
Loading

0 comments on commit 9ced957

Please sign in to comment.