From 024f7956b64f4d66eaf48bdc08787e1e38176f1f Mon Sep 17 00:00:00 2001 From: NeoCoreTechs - New Core Technologies Date: Mon, 21 Feb 2022 10:14:43 -0800 Subject: [PATCH] Refactor and Debug Fixed multiple publisher/subscriber bug --- .gitignore | 3 + .../org/ros/internal/node/DefaultNode.java | 2 +- .../ros/internal/node/server/SlaveServer.java | 8 +- .../internal/node/topic/DefaultPublisher.java | 20 +++- .../node/topic/DefaultSubscriber.java | 5 +- .../internal/node/topic/PublisherFactory.java | 5 +- .../transport/ChannelHandlerContext.java | 13 --- .../transport/ChannelHandlerContextImpl.java | 98 ++++++++----------- .../transport/queue/OutgoingMessageQueue.java | 11 +-- .../transport/tcp/AsynchBaseServer.java | 13 ++- .../transport/tcp/AsynchTCPServer.java | 22 ++--- .../transport/tcp/AsynchTCPWorker.java | 4 +- .../internal/transport/tcp/ChannelGroup.java | 14 --- .../transport/tcp/ChannelGroupImpl.java | 32 ------ .../ros/internal/transport/tcp/TcpClient.java | 13 +-- .../transport/tcp/TcpClientManager.java | 9 +- .../tcp/TcpClientPipelineFactory.java | 5 +- .../internal/transport/tcp/TcpRosServer.java | 26 ++--- .../tcp/TcpServerPipelineFactory.java | 4 +- .../MessageQueueIntegrationTest.java | 8 +- 20 files changed, 111 insertions(+), 204 deletions(-) create mode 100644 .gitignore delete mode 100644 src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java delete mode 100644 src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0275e3c --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ + +*.class +*.jar diff --git a/src/main/java/org/ros/internal/node/DefaultNode.java b/src/main/java/org/ros/internal/node/DefaultNode.java index 9422e70..ae01ad3 100644 --- a/src/main/java/org/ros/internal/node/DefaultNode.java +++ b/src/main/java/org/ros/internal/node/DefaultNode.java @@ -258,7 +258,7 @@ public Publisher newPublisher(GraphName topicName, String messageType) { TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription); Publisher publisher = null; try { - publisher = publisherFactory.newOrExisting(topicDeclaration, slaveServer.getSubscribers()); + publisher = publisherFactory.newOrExisting(topicDeclaration); } catch(IOException e) { throw new RosRuntimeException(e); } return publisher; } diff --git a/src/main/java/org/ros/internal/node/server/SlaveServer.java b/src/main/java/org/ros/internal/node/server/SlaveServer.java index 1aae47e..1502efd 100644 --- a/src/main/java/org/ros/internal/node/server/SlaveServer.java +++ b/src/main/java/org/ros/internal/node/server/SlaveServer.java @@ -79,13 +79,7 @@ public SlaveServer(GraphName nodeName, BindAddress tcpRosBindAddress, public AdvertiseAddress getTcpRosAdvertiseAddress() { return tcpRosServer.getAdvertiseAddress(); } - /** - * Return the ChannelHandlerContext array of subscribers - * @return - */ - public ArrayBlockingQueue getSubscribers() { - return tcpRosServer.getSubscribers(); - } + /** * Start the RPC server. This start() routine requires that the * {@link TcpRosServer} is initialized first so that the slave server returns diff --git a/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java b/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java index b165495..fc7112d 100644 --- a/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java +++ b/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java @@ -59,12 +59,12 @@ public class DefaultPublisher extends DefaultTopicParticipant implements Publ private final ArrayBlockingQueue subscribers; public DefaultPublisher(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, - MessageFactory messageFactory, ScheduledExecutorService executorService, ArrayBlockingQueue arrayBlockingQueue) throws IOException { + MessageFactory messageFactory, ScheduledExecutorService executorService) throws IOException { super(topicDeclaration); this.nodeIdentifier = nodeIdentifier; this.messageFactory = messageFactory; - this.subscribers = arrayBlockingQueue; - outgoingMessageQueue = new OutgoingMessageQueue(executorService, arrayBlockingQueue); + this.subscribers = new ArrayBlockingQueue(1024); + outgoingMessageQueue = new OutgoingMessageQueue(executorService, subscribers); if(DEBUG) log.info("DefaultPublisher contructed with "+outgoingMessageQueue.getNumberOfChannels()+" channels."); listeners = new ListenerGroup>(executorService); @@ -203,7 +203,19 @@ public void addSubscriber(SubscriberIdentifier subscriberIdentifer, ChannelHandl //outgoingMessageQueue.addChannel(ctx); subscribers.add(ctx); if (DEBUG) { - log.info("Current number of subscribers:"+subscribers.size()); + StringBuilder sb = new StringBuilder(); + sb.append(subscribers.size()); + sb.append("Subscribers for publisher "); + sb.append(toDeclaration()); + sb.append(":\r\n"); + Object[] sa = subscribers.toArray(); + for(int i = 0; i < subscribers.size(); i++) { + sb.append(i); + sb.append(") "); + sb.append(sa[i]); + sb.append("\r\n"); + } + log.info(sb.toString()); } signalOnNewSubscriber(subscriberIdentifer); } diff --git a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index 77d6b00..f81ca37 100644 --- a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -157,10 +157,7 @@ public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddr log.info("Defaultsubscriber addPublisher topicParticipantManager DOES NOT CONTAIN "+publisherIdentifier+" at "+address); topicParticipantManager.addSubscriberConnection(this, publisherIdentifier); } - tcpClientManager.connect(toString(), address); - // TODO(damonkohler): knownPublishers is duplicate information that is - // already available to the TopicParticipantManager. - //knownPublishers.add(publisherIdentifier); + tcpClientManager.connect(toString(), address); signalOnNewPublisher(publisherIdentifier); } } diff --git a/src/main/java/org/ros/internal/node/topic/PublisherFactory.java b/src/main/java/org/ros/internal/node/topic/PublisherFactory.java index 894596a..553a3a3 100644 --- a/src/main/java/org/ros/internal/node/topic/PublisherFactory.java +++ b/src/main/java/org/ros/internal/node/topic/PublisherFactory.java @@ -41,20 +41,19 @@ public PublisherFactory(NodeIdentifier nodeIdentifier, * * @param the message type associated with the {@link Publisher} * @param topicDeclaration {@link TopicDeclaration} that is being published - * @param arrayBlockingQueue * @param messageSerializer the {@link MessageSerializer} used for published messages * @return a new or cached {@link Publisher} instance * @throws IOException */ @SuppressWarnings("unchecked") - public Publisher newOrExisting(TopicDeclaration topicDeclaration, ArrayBlockingQueue arrayBlockingQueue) throws IOException { + public Publisher newOrExisting(TopicDeclaration topicDeclaration) throws IOException { GraphName topicName = topicDeclaration.getName(); synchronized (mutex) { if (topicParticipantManager.hasPublisher(topicName)) { return (DefaultPublisher) topicParticipantManager.getPublisher(topicName); } else { DefaultPublisher publisher = - new DefaultPublisher(nodeIdentifier, topicDeclaration, messageFactory, executorService, arrayBlockingQueue); + new DefaultPublisher(nodeIdentifier, topicDeclaration, messageFactory, executorService); publisher.addListener(new DefaultPublisherListener() { @Override public void onNewSubscriber(Publisher publisher, SubscriberIdentifier subscriberIdentifier) { diff --git a/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java b/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java index ceab23a..9c8289a 100644 --- a/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java +++ b/src/main/java/org/ros/internal/transport/ChannelHandlerContext.java @@ -10,7 +10,6 @@ import java.util.Set; import java.util.concurrent.Executor; -import org.ros.internal.transport.tcp.ChannelGroup; /** * This is the ChannelHandlerContext that links the underlying TCP Socket 'channel' to the {@link ChannelPipeline} and the {@link ChannelGroup} @@ -96,12 +95,6 @@ public interface ChannelHandlerContext { */ ChannelPipeline pipeline(); - /** - * Return the channel group - * - */ - /*Asynchronous*/ChannelGroup getChannelGroup(); - /** * Determine if this channel is ready for processing, it is configured, has a socket * and the communication is sound. If the socket breaks this goes false and no writes are @@ -114,12 +107,6 @@ public interface ChannelHandlerContext { * @param ready */ void setReady(boolean ready); - - /** - * Get the Object representing a mutex to use for completion of operation if necessary. - * @return - */ - Object getChannelCompletionMutex(); /** * Each successive handshake completion will add another message type to this synchronized set. diff --git a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java index cb6d5d7..e8a2979 100644 --- a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java +++ b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java @@ -15,8 +15,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.ros.internal.transport.tcp.ChannelGroup; - /** * A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers. @@ -30,27 +28,24 @@ * The functions of the system move data through the pipeline, triggering the handlers in the sequence they were * added. * Traffic is filtered to subscriber channels via the hash set of requested message types - * @author jg + * @author Jonathan Groff Copyright (C) NeoCoreTechs 2017,2022 * */ public class ChannelHandlerContextImpl implements ChannelHandlerContext { private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(ChannelHandlerContextImpl.class); - /*Asynchronous*/ChannelGroup channelGroup; /*Asynchronous*/Socket/*Channel*/ channel; + private Executor executor; ChannelPipeline pipeline; boolean ready = false; - private Object mutex = new Object(); Set outboundMessageTypes; - InputStream is = null; - OutputStream os = null; - ObjectInputStream ois = null; - public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/Socket channel2) { - channelGroup = channelGroup2; - channel = channel2; - pipeline = new ChannelPipelineImpl(this); - outboundMessageTypes = (Set) new HashSet(); + + public ChannelHandlerContextImpl(Executor executor, /*Asynchronous*/Socket channel) { + this.executor = executor; + this.channel = channel; + this.pipeline = new ChannelPipelineImpl(this); + this.outboundMessageTypes = (Set) new HashSet(); } public void setChannel(/*Asynchronous*/Socket/*Channel*/ sock) { @@ -59,7 +54,7 @@ public void setChannel(/*Asynchronous*/Socket/*Channel*/ sock) { @Override public Executor executor() { - return channelGroup.getExecutorService(); + return executor; } @Override @@ -115,52 +110,51 @@ public void close() throws IOException { @Override public Object read() throws IOException { - is = channel.getInputStream(); - //ObjectInputStream ois = new ObjectInputStream(is); - ois = new ObjectInputStream(is); - try { - return ois.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } catch(StreamCorruptedException sce) { - is = channel.getInputStream(); - ois = new ObjectInputStream(is); + InputStream is = channel.getInputStream(); + ObjectInputStream ois = new ObjectInputStream(is); try { return ois.readObject(); - } catch (ClassNotFoundException cnf) { - throw new IOException(cnf); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } catch(StreamCorruptedException sce) { + is = channel.getInputStream(); + ois = new ObjectInputStream(is); + try { + return ois.readObject(); + } catch (ClassNotFoundException cnf) { + throw new IOException(cnf); + } } - } } @Override public void write(Object msg) throws IOException { - os = channel.getOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(os); - oos.writeObject(msg); - oos.flush(); + OutputStream os = channel.getOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(os); + oos.writeObject(msg); + oos.flush(); } @Override public void write(Object msg, CompletionHandler handler) { - try { - write(msg); - handler.completed(0, null); - } catch (IOException e) { - handler.failed(e, null); - } + try { + write(msg); + handler.completed(0, null); + } catch (IOException e) { + handler.failed(e, null); + } } @Override public Object read(CompletionHandler handler) { - try { - Object o = read(); - handler.completed(0, null); - return o; - } catch (IOException e) { - handler.failed(e, null); - } - return null; + try { + Object o = read(); + handler.completed(0, null); + return o; + } catch (IOException e) { + handler.failed(e, null); + } + return null; } @@ -169,11 +163,6 @@ public ChannelPipeline pipeline() { return pipeline; } - @Override - public /*Asynchronous*/ChannelGroup getChannelGroup() { - return channelGroup; - } - @Override public Socket channel() { return channel; @@ -181,7 +170,7 @@ public Socket channel() { @Override public String toString() { - return new String("ChannelHandlerContext:"+channel+" ChannelGroup:"+channelGroup+" ChannelPipeline:"+pipeline+" ready:"+ready); + return new String("ChannelHandlerContext:"+channel+" ChannelPipeline:"+pipeline+" ready:"+ready); } @Override @@ -194,13 +183,6 @@ public boolean isReady() { */ @Override public void setReady(boolean ready) { this.ready = ready;} - - /** - * Object to synchronize read and write completion for the channel in this context, since we will have - * multiple outbound writers accessing the same channel - */ - @Override - public Object getChannelCompletionMutex() { return mutex; } /** * Get the type of messages we want to send to the attached subscriber, based on the handshakes diff --git a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java index 6216e86..a16b74b 100644 --- a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java +++ b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java @@ -3,12 +3,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channel; -import java.nio.channels.CompletionHandler; -import java.nio.channels.WritePendingException; + import java.util.Iterator; -import java.util.List; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; @@ -30,7 +27,7 @@ */ public class OutgoingMessageQueue { - private static final boolean DEBUG = false; + private static final boolean DEBUG = true; private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class); private static final int DEQUE_CAPACITY = 16; @@ -119,14 +116,14 @@ public void failed(Throwable arg0, Void arg1) { } // loop method } - public OutgoingMessageQueue(ExecutorService executorService, ArrayBlockingQueue arrayBlockingQueue) throws IOException { + public OutgoingMessageQueue(ExecutorService executorService, ArrayBlockingQueue subscriberQueue) throws IOException { deque = new CircularBlockingDeque(DEQUE_CAPACITY); writer = new Writer(); //messageBufferPool = new MessageBufferPool(); latchedBuffer = MessageBuffers.dynamicBuffer(); mutex = new Object(); latchMode = false; - channels = arrayBlockingQueue; + channels = subscriberQueue; executorService.execute(writer); } diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java b/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java index ee532af..6a08005 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java @@ -3,6 +3,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -13,7 +15,7 @@ * Functionally, this class Extends AsynchTCPServer, takes connections and spins the worker thread to handle each * incoming connection.

* The critical ChannelhandlerContext is created here after the server socket accepts a connection.

- * After the ChannelHandlerContext is creates, an AsynchTcpWorker is constructed using that context.

+ * After the ChannelHandlerContext is created, an AsynchTcpWorker is constructed using that context.

* @author Jonathan Groff Copyright (C) NeoCoreTechs 2016,2021 */ public final class AsynchBaseServer extends AsynchTCPServer { @@ -23,18 +25,19 @@ public final class AsynchBaseServer extends AsynchTCPServer { public InetSocketAddress address = null; private TcpRosServer tcpserver = null; AsynchTCPWorker uworker = null; + ExecutorService executor; public AsynchBaseServer(TcpRosServer tcpserver) throws IOException { super(); this.address = tcpserver.getAddress(); this.tcpserver = tcpserver; - this.channelGroup = tcpserver.getChannelGroup(); + this.executor = tcpserver.getExecutor(); } public void startServer() throws IOException { if( address == null ) throw new IOException("Server address not defined, can not start Base Server"); - startServer(channelGroup, address); + startServer(executor, address); } public void run() { @@ -50,7 +53,7 @@ public void run() { channel.setSendBufferSize(4096000); channel.setReceiveBufferSize(4096000); //channel.setTcpNoDelay(true); - ChannelHandlerContext ctx = new ChannelHandlerContextImpl(channelGroup, channel/*.get()*/); + ChannelHandlerContext ctx = new ChannelHandlerContextImpl(executor, channel/*.get()*/); //if(DEBUG) // log.info("Adding new ChannelHandlerContext to subscribers array, subscribers="+tcpserver.getSubscribers().size()+" "+ctx); //tcpserver.getSubscribers().add(ctx); @@ -66,7 +69,7 @@ public void run() { // After it gets it the thread terminates and a new handler is inserted to generate outbound traffic uworker = new AsynchTCPWorker(ctx); // and send it all to executor for running - channelGroup.getExecutorService().execute(uworker); + ctx.executor().execute(uworker); ctx.pipeline().fireChannelActive(); if( DEBUG ) { log.info("AsynchBaseServer worker starting for context:"+ctx); diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java index 601fd84..ed74f9e 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java @@ -2,13 +2,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.Socket; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; //import java.nio.channels.AsynchronousChannelGroup; //import java.nio.channels.AsynchronousServerSocketChannel; //import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -16,6 +14,7 @@ /** * AsynchTCPServer is the superclass of all objects using AsynchServerSockets.

* Extended by AsynchBaseServer which takes a TcpRosServer and does a ServerSocket.accept

+* The executor service is shut down here. * @author Jonathan Groff Copyright (C) NeoCoreTechs 2015,2021 */ public abstract class AsynchTCPServer implements Cloneable, Runnable { @@ -25,36 +24,36 @@ public abstract class AsynchTCPServer implements Cloneable, Runnable { //AsynchronousChannelGroup channelGroup; //AsynchronousSocketChannel data = null; ServerSocket server = null; - ChannelGroup channelGroup; + ExecutorService executor; volatile boolean shouldStop = false; /** * Construct a ServerSocket bound to the specified port using the supplied channel group.

* The primary purpose of the channel group is to provide the executor to start the server. - * @param group + * @param executor The executor for task running * @param port * @throws IOException */ - public synchronized void startServer(ChannelGroup group, int port) throws IOException { + public synchronized void startServer(Executor executor, int port) throws IOException { if( server == null ) { if( DEBUG ) log.info("AsynchTCPServer attempt local bind port "+port); - channelGroup = group; + this.executor = (ExecutorService) executor; //server = AsynchronousServerSocketChannel.open(channelGroup); server = new ServerSocket();//channelGroup); server.bind(new InetSocketAddress(port)); - group.getExecutorService().execute(this); + executor.execute(this); } } //public synchronized void startServer(AsynchronousChannelGroup group, InetSocketAddress binder) throws IOException { - public synchronized void startServer(ChannelGroup group, InetSocketAddress binder) throws IOException { + public synchronized void startServer(Executor executor, InetSocketAddress binder) throws IOException { if( server == null ) { if( DEBUG ) log.info("AsynchTCPServer attempt bind "+binder); - channelGroup = group; + this.executor = (ExecutorService) executor; server = new ServerSocket(); server.bind(binder); - group.getExecutorService().execute(this); + executor.execute(this); } } @@ -63,6 +62,7 @@ public synchronized void shutdown() throws IOException { shouldStop = true; server.close(); server = null; + ((ExecutorService)executor).shutdown(); } } diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java index e2afad9..9780966 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java @@ -14,7 +14,7 @@ * */ public class AsynchTCPWorker implements Runnable { - private static final boolean DEBUG = false; + private static final boolean DEBUG = true; private static final Log log = LogFactory.getLog(AsynchTCPWorker.class); public volatile boolean shouldRun = true; private ChannelHandlerContext ctx; @@ -40,7 +40,7 @@ public void run() { ctx.pipeline().fireChannelRead(reso); ctx.pipeline().fireChannelReadComplete(); } catch(StreamCorruptedException sce) { - log.info("Stream was corrupted on read:"+sce); + log.info("Thread "+Thread.currentThread().getId()+" context:"+ctx+" stream was corrupted on read:"+sce); } } // shouldRun } catch(Exception se) { diff --git a/src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java b/src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java deleted file mode 100644 index f98e136..0000000 --- a/src/main/java/org/ros/internal/transport/tcp/ChannelGroup.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.ros.internal.transport.tcp; - -import java.util.concurrent.ExecutorService; -/** - * The ChannelGroup interface specifies an executor service and a shutdown method.

- * In essence, a way to start a server and stop it, and in doing so, - * a group of channels is thereby manifest. - * @author Jonathan Groff Copyright (C) NeoCoreTechs 2015,2021 - * - */ -public interface ChannelGroup { - public void shutdown(); - public ExecutorService getExecutorService(); -} diff --git a/src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java b/src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java deleted file mode 100644 index fad583f..0000000 --- a/src/main/java/org/ros/internal/transport/tcp/ChannelGroupImpl.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.ros.internal.transport.tcp; - -import java.util.concurrent.ExecutorService; -/** - * Manipulate groups of channels al la AsynchronousChannelGroup. - * Primarily contains the ExecutorService by which threads are put to work. - * @author Jonathan Groff Copyright (C) NeoCoreTechs 2015,2021 - * - */ -public class ChannelGroupImpl implements ChannelGroup { - private ExecutorService executorService; - public ChannelGroupImpl(ExecutorService executorService) { - this.executorService = executorService; - } - public void shutdown() { - executorService.shutdown(); - } - public ExecutorService getExecutorService() { return executorService; } - - @Override - public boolean equals(Object o) { - return (executorService == ((ChannelGroup)o).getExecutorService()); - } - @Override - public int hashCode() { - return executorService.hashCode(); - } - @Override - public String toString() { - return "ChannelGroup executor "+executorService; - } -} diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java index 1349275..cb7432c 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java @@ -15,6 +15,7 @@ import java.nio.channels.SocketChannel; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** @@ -37,11 +38,11 @@ public class TcpClient { private final List namedChannelHandlers; private Socket channel; - private /*Asynchronous*/ChannelGroup channelGroup; + private ExecutorService executor; private ChannelInitializerFactoryStack factoryStack; // Stack of ChannelInitializer factories to load ChannelHandlers - public TcpClient( /*Asynchronous*/ChannelGroup channelGroup, List namedChannelHandlers) { - this.channelGroup = channelGroup; + public TcpClient( ExecutorService executor, List namedChannelHandlers) { + this.executor = executor; this.namedChannelHandlers = namedChannelHandlers; this.factoryStack = new ChannelInitializerFactoryStack(); } @@ -84,11 +85,11 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws //((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_RCVBUF, 4096000); //((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_SNDBUF, 4096000); //((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.TCP_NODELAY, false); - ctx = new ChannelHandlerContextImpl(channelGroup, channel); + ctx = new ChannelHandlerContextImpl(executor, channel); // connect outbound to pub ctx.connect(socketAddress); // - TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(channelGroup, namedChannelHandlers); + TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(namedChannelHandlers); // add handler pipeline factory to stack factoryStack.addLast(tcpClientPipelineFactory); // load the handlers from the pipeline factories @@ -99,7 +100,7 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws // connect outbound to pub //ctx.connect(socketAddress); AsynchTCPWorker uworker = new AsynchTCPWorker(ctx); - channelGroup.getExecutorService().execute(uworker); + executor.execute(uworker); // notify pipeline we connected (or failed via exceptionCaught and runtime exception) ctx.pipeline().fireChannelActive(); // recall we keep the list of contexts in TcpClientManager diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java index bc1f6b1..e200951 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java @@ -23,9 +23,9 @@ public class TcpClientManager { public static boolean DEBUG = false; private static final Log log = LogFactory.getLog(TcpClientManager.class); - private final /*Asynchronous*/ChannelGroup channelGroup; private final Collection tcpClients; private final List namedChannelHandlers; + private final ExecutorService executor; //private static ConcurrentHashMap executors = new ConcurrentHashMap(1024); @@ -41,11 +41,9 @@ public class TcpClientManager { //} public TcpClientManager(ExecutorService executor) { - this.channelGroup = new ChannelGroupImpl(executor);/*AsynchronousChannelGroup.withThreadPool(executor);*/ + this.executor = executor; this.tcpClients = new ArrayList(); this.namedChannelHandlers = new ArrayList(); - if( DEBUG ) - log.info("TcpClientManager:"+executor+" "+channelGroup); } public void addNamedChannelHandler(NamedChannelHandler namedChannelHandler) { @@ -71,7 +69,7 @@ public void addAllNamedChannelHandlers(List namedChannelHan public TcpClient connect(String connectionName, SocketAddress socketAddress) throws Exception { if( DEBUG ) log.info("TcpClient connect:"+connectionName+" "+socketAddress); - TcpClient tcpClient = new TcpClient(channelGroup, namedChannelHandlers); + TcpClient tcpClient = new TcpClient(executor, namedChannelHandlers); tcpClient.connect(connectionName, socketAddress); tcpClients.add(tcpClient); return tcpClient; @@ -84,7 +82,6 @@ public TcpClient connect(String connectionName, SocketAddress socketAddress) thr public void shutdown() { if( DEBUG ) log.info("TcpClient shutdown:"); - channelGroup.shutdown(); tcpClients.clear(); // We don't call channelFactory.releaseExternalResources() or // bootstrap.releaseExternalResources() since the only external resource is diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java index 76f9f0e..201bd03 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java @@ -19,13 +19,12 @@ public class TcpClientPipelineFactory extends ChannelInitializer { private List namedChannelHandlers; /** * ChannelGroup has shutdown method and access to ExecutorService - * @param asynchronousChannelGroup * @param namedChannelHandlers */ - public TcpClientPipelineFactory(/*Asynchronous*/ChannelGroup asynchronousChannelGroup, List namedChannelHandlers) { + public TcpClientPipelineFactory(List namedChannelHandlers) { this.namedChannelHandlers = namedChannelHandlers; if( DEBUG ) - log.info("TcpClientPipelineFactory:"+asynchronousChannelGroup+" constructing with "+namedChannelHandlers.size()+" NamedChannelHandler(s)."); + log.info("TcpClientPipelineFactory: constructing with "+namedChannelHandlers.size()+" NamedChannelHandler(s)."); } @Override diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java b/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java index 298d4ff..0898cbf 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java @@ -50,7 +50,7 @@ */ public class TcpRosServer implements Serializable { private static final long serialVersionUID = 1298495789043968855L; - private static final boolean DEBUG = false; + private static final boolean DEBUG = true; private static final Log log = LogFactory.getLog(TcpRosServer.class); private BindAddress bindAddress; @@ -61,8 +61,7 @@ public class TcpRosServer implements Serializable { //private transient AsynchronousChannelGroup outgoingChannelGroup; // publisher with connected subscribers //private transient AsynchronousChannelGroup incomingChannelGroup; // subscriber connected to publishers - private transient ChannelGroup outgoingChannelGroup; // publisher with connected subscribers - private transient ChannelGroup incomingChannelGroup; // subscriber connected to publishers + private transient TcpServerPipelineFactory serverPipelineFactory; private transient ChannelInitializerFactoryStack factoryStack; // Stack of ChannelInitializer factories to load ChannelHandlers private transient ArrayBlockingQueue contexts; @@ -85,14 +84,12 @@ public TcpRosServer(BindAddress bindAddress, AdvertiseAddress advertiseAddress, public void start() { //assert(outgoingChannel == null); try { - incomingChannelGroup = new ChannelGroupImpl(executorService);//AsynchronousChannelGroup.withThreadPool(executorService); advertiseAddress.setPort(bindAddress.toInetSocketAddress().getPort()); factoryStack = new ChannelInitializerFactoryStack(); - serverPipelineFactory = - new TcpServerPipelineFactory(incomingChannelGroup, topicParticipantManager, serviceManager); + serverPipelineFactory = new TcpServerPipelineFactory(topicParticipantManager, serviceManager); factoryStack.addLast(serverPipelineFactory); server = new AsynchBaseServer(this); - server.startServer(incomingChannelGroup, bindAddress.toInetSocketAddress()); + server.startServer(executorService, bindAddress.toInetSocketAddress()); if (DEBUG) { log.info("TcpRosServer starting and Bound to:" + bindAddress + " with advertise address:"+advertiseAddress); } @@ -106,11 +103,10 @@ public void start() { /** * Close all incoming connections and the server socket.

- * Calls shutdown on the AsynchBaseServer. The incoming and outgoing channel group executors - * are shut down first.

+ * Calls shutdown on the AsynchBaseServer. executors are shut down first.

* The only external resources are the ExecutorService and control of that must remain with the overall * application. Although the executor services should shut down threads in the servers executed by - * the executors in the channel groups, we do explicit shutdowns of the various servers as well. + * the executors, we do explicit shutdowns of the various servers as well. *

* Calling this method more than once has no effect. * @throws IOException @@ -119,14 +115,6 @@ public void shutdown() throws IOException { if (DEBUG) { log.info("TcpRosServer Shutting down address: " + getAddress()); } - if (outgoingChannelGroup != null) { - outgoingChannelGroup.shutdown(); - outgoingChannelGroup = null; - } - if( incomingChannelGroup != null) { - incomingChannelGroup.shutdown(); - incomingChannelGroup = null; - } server.shutdown(); @@ -160,8 +148,6 @@ public ArrayBlockingQueue getSubscribers() { public ChannelInitializerFactoryStack getFactoryStack() { return factoryStack; } public ExecutorService getExecutor() { return executorService; } - - public /*Asynchronous*/ChannelGroup getChannelGroup() { return incomingChannelGroup; } } diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java index bacf185..53681d4 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java @@ -1,6 +1,7 @@ package org.ros.internal.transport.tcp; import java.util.Collection; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,8 +29,7 @@ public class TcpServerPipelineFactory extends ChannelInitializer { private final TopicParticipantManager topicParticipantManager; private final ServiceManager serviceManager; - public TcpServerPipelineFactory(ChannelGroup incomingChannelGroup, - TopicParticipantManager topicParticipantManager, ServiceManager serviceManager) { + public TcpServerPipelineFactory(TopicParticipantManager topicParticipantManager, ServiceManager serviceManager) { //if( DEBUG ) // log.info("TcpServerPipeLineFactory ctor:"+incomingChannelGroup+" "+topicParticipantManager+" "+serviceManager); this.topicParticipantManager = topicParticipantManager; diff --git a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java index 82647b4..7295cae 100644 --- a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java +++ b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java @@ -28,8 +28,6 @@ import org.ros.internal.node.topic.TopicParticipantManager; import org.ros.internal.transport.queue.IncomingMessageQueue; import org.ros.internal.transport.queue.OutgoingMessageQueue; -import org.ros.internal.transport.tcp.ChannelGroup; -import org.ros.internal.transport.tcp.ChannelGroupImpl; import org.ros.internal.transport.tcp.ChannelInitializerFactoryStack; import org.ros.internal.transport.tcp.NamedChannelHandler; import org.ros.internal.transport.tcp.TcpClient; @@ -182,12 +180,10 @@ private ChannelHandlerContext buildServerChannel() { isock = new InetSocketAddress(0); TopicParticipantManager topicParticipantManager = new TopicParticipantManager(); ServiceManager serviceManager = new ServiceManager(); - /*Asynchronous*/ChannelGroup incomingChannelGroup = null; - incomingChannelGroup = new ChannelGroupImpl(executorService);/* AsynchronousChannelGroup.withThreadPool(executorService);*/ ChannelInitializerFactoryStack factoryStack = new ChannelInitializerFactoryStack(); TcpServerPipelineFactory serverPipelineFactory = - new TcpServerPipelineFactory(incomingChannelGroup, topicParticipantManager, serviceManager) { + new TcpServerPipelineFactory(topicParticipantManager, serviceManager) { @Override protected void initChannel(ChannelHandlerContext ch) { ch.pipeline().remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER); @@ -220,7 +216,7 @@ protected void initChannel(ChannelHandlerContext ch) { log.debug("Accept "+channel); } ChannelHandlerContextImpl ctx = null; - ctx = new ChannelHandlerContextImpl(incomingChannelGroup,channel/*.get()*/); + ctx = new ChannelHandlerContextImpl(executorService, channel/*.get()*/); return ctx; }