From d5e67bc602408651616d7a569a151525fa26023f Mon Sep 17 00:00:00 2001 From: NeoCoreTechs - New Core Technologies Date: Thu, 2 Jun 2016 19:50:31 -0700 Subject: [PATCH] Minimal debug, cleaned up handshake --- .../org/ros/internal/node/DefaultNode.java | 4 +- .../internal/node/client/ParameterClient.java | 31 +++------------ .../ros/internal/node/client/Registrar.java | 23 ++--------- .../node/rpc/SlaveRpcEndpointImpl.java | 26 +++---------- .../ros/internal/node/server/BaseServer.java | 2 +- .../internal/node/server/ParameterServer.java | 2 +- .../ros/internal/node/server/RpcServer.java | 2 +- .../ros/internal/node/server/TCPServer.java | 2 +- .../ros/internal/node/server/TCPWorker.java | 2 +- .../node/server/ThreadPoolManager.java | 2 +- .../master/MasterRegistrationManagerImpl.java | 2 +- .../node/server/master/MasterServer.java | 21 +--------- .../node/service/DefaultServiceClient.java | 5 +-- .../internal/node/topic/DefaultPublisher.java | 2 +- .../node/topic/DefaultSubscriber.java | 2 +- .../node/topic/SubscriberHandshake.java | 2 +- .../topic/SubscriberHandshakeHandler.java | 27 ++++++++++--- .../java/org/ros/internal/system/Utility.java | 2 +- .../transport/BaseClientHandshakeHandler.java | 4 +- .../transport/ChannelHandlerContextImpl.java | 9 +---- .../internal/transport/ConnectionHeader.java | 2 +- .../transport/ConnectionTrackingHandler.java | 2 +- .../transport/queue/MessageDispatcher.java | 2 +- .../transport/queue/MessageReceiver.java | 2 +- .../transport/queue/OutgoingMessageQueue.java | 9 +---- .../tcp/AbstractNamedChannelHandler.java | 8 ++-- .../transport/tcp/AsynchBaseServer.java | 8 +--- .../transport/tcp/AsynchTCPServer.java | 2 +- .../transport/tcp/AsynchTCPWorker.java | 12 ++---- .../transport/tcp/AsynchTempTCPWorker.java | 32 +++++----------- .../tcp/ChannelInitializerFactoryStack.java | 2 +- ...nectionTrackingChannelPipelineFactory.java | 2 +- .../ros/internal/transport/tcp/TcpClient.java | 12 +++--- .../transport/tcp/TcpClientManager.java | 38 +++++++++++++------ .../tcp/TcpClientPipelineFactory.java | 4 +- .../internal/transport/tcp/TcpRosServer.java | 2 +- .../tcp/TcpServerHandshakeHandler.java | 32 +++++++++++----- .../tcp/TcpServerPipelineFactory.java | 2 +- .../org/ros/node/DefaultNodeMainExecutor.java | 23 ++--------- .../MessageQueueIntegrationTest.java | 4 +- 40 files changed, 147 insertions(+), 225 deletions(-) diff --git a/src/main/java/org/ros/internal/node/DefaultNode.java b/src/main/java/org/ros/internal/node/DefaultNode.java index be9ec4d..a58d305 100644 --- a/src/main/java/org/ros/internal/node/DefaultNode.java +++ b/src/main/java/org/ros/internal/node/DefaultNode.java @@ -27,7 +27,7 @@ import org.ros.internal.node.topic.SubscriberFactory; import org.ros.internal.node.topic.TopicDeclaration; import org.ros.internal.node.topic.TopicParticipantManager; -import org.ros.internal.transport.tcp.TcpRosServer; + import org.ros.message.MessageFactory; import org.ros.message.Time; import org.ros.namespace.GraphName; @@ -62,7 +62,7 @@ * */ public class DefaultNode implements ConnectedNode { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(DefaultNode.class); /** * The maximum delay before shutdown will begin even if all diff --git a/src/main/java/org/ros/internal/node/client/ParameterClient.java b/src/main/java/org/ros/internal/node/client/ParameterClient.java index f67f873..fc75a62 100644 --- a/src/main/java/org/ros/internal/node/client/ParameterClient.java +++ b/src/main/java/org/ros/internal/node/client/ParameterClient.java @@ -1,19 +1,3 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.client; @@ -24,35 +8,30 @@ import org.ros.internal.node.response.IntegerResultFactory; import org.ros.internal.node.response.ObjectResultFactory; import org.ros.internal.node.response.Response; -import org.ros.internal.node.response.ResultFactory; -import org.ros.internal.node.response.StringListResultFactory; + import org.ros.internal.node.response.StringResultFactory; import org.ros.internal.node.response.VoidResultFactory; import org.ros.internal.node.rpc.MasterRpcEndpointImpl; import org.ros.internal.node.rpc.ParameterServerRpcEndpoint; import org.ros.internal.node.server.NodeIdentifier; import org.ros.internal.node.server.ParameterServer; -import org.ros.internal.transport.tcp.TcpRosServer; + import org.ros.namespace.GraphName; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.URI; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.List; -import java.util.Map; + /** * Provide access to the RPC API for a ROS {@link ParameterServer}. + * The ParameterServer typically starts one port higher than the master. * - * @author kwc@willowgarage.com (Ken Conley) - * @author damonkohler@google.com (Damon Kohler) * @author jg */ public class ParameterClient extends Client { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(ParameterClient.class); private final NodeIdentifier nodeIdentifier; private final String nodeName; diff --git a/src/main/java/org/ros/internal/node/client/Registrar.java b/src/main/java/org/ros/internal/node/client/Registrar.java index 3969394..ddddacc 100644 --- a/src/main/java/org/ros/internal/node/client/Registrar.java +++ b/src/main/java/org/ros/internal/node/client/Registrar.java @@ -1,19 +1,3 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.client; import org.apache.commons.logging.Log; @@ -34,7 +18,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.URI; + import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; @@ -45,12 +29,11 @@ * Manages topic, and service registrations of a {@link SlaveServer} with the * {@link MasterServer}. * - * @author kwc@willowgarage.com (Ken Conley) - * @author damonkohler@google.com (Damon Kohler) + * @author jg */ public class Registrar implements TopicParticipantManagerListener, ServiceManagerListener { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(Registrar.class); private static final int SHUTDOWN_TIMEOUT = 5; diff --git a/src/main/java/org/ros/internal/node/rpc/SlaveRpcEndpointImpl.java b/src/main/java/org/ros/internal/node/rpc/SlaveRpcEndpointImpl.java index e5e23e7..fcf2a31 100644 --- a/src/main/java/org/ros/internal/node/rpc/SlaveRpcEndpointImpl.java +++ b/src/main/java/org/ros/internal/node/rpc/SlaveRpcEndpointImpl.java @@ -1,19 +1,3 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.rpc; @@ -35,16 +19,18 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Map; + import java.util.Set; -import java.util.Vector; + /** - * @author damonkohler@google.com (Damon Kohler) + * Facility for contacting the remote master and issuing commands via the remote invokable methods. + * The transport is accomplished via the RemoteRequestinterface implementors. + * @author jg */ public class SlaveRpcEndpointImpl implements SlaveRpcEndpoint { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(SlaveRpcEndpointImpl.class); //private final SlaveServer slave; diff --git a/src/main/java/org/ros/internal/node/server/BaseServer.java b/src/main/java/org/ros/internal/node/server/BaseServer.java index 56f95d0..0d9847f 100644 --- a/src/main/java/org/ros/internal/node/server/BaseServer.java +++ b/src/main/java/org/ros/internal/node/server/BaseServer.java @@ -13,7 +13,7 @@ * */ public final class BaseServer extends TCPServer { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(BaseServer.class); public int WORKBOOTPORT = 8090; public InetAddress address = null; diff --git a/src/main/java/org/ros/internal/node/server/ParameterServer.java b/src/main/java/org/ros/internal/node/server/ParameterServer.java index aef24e8..4b7a34f 100644 --- a/src/main/java/org/ros/internal/node/server/ParameterServer.java +++ b/src/main/java/org/ros/internal/node/server/ParameterServer.java @@ -26,7 +26,7 @@ * @author Groff */ public class ParameterServer extends RpcServer { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(ParameterServer.class); private final Map tree; diff --git a/src/main/java/org/ros/internal/node/server/RpcServer.java b/src/main/java/org/ros/internal/node/server/RpcServer.java index 5330aab..89304bc 100644 --- a/src/main/java/org/ros/internal/node/server/RpcServer.java +++ b/src/main/java/org/ros/internal/node/server/RpcServer.java @@ -22,7 +22,7 @@ */ public abstract class RpcServer { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(RpcServer.class); private final AdvertiseAddress advertiseAddress; diff --git a/src/main/java/org/ros/internal/node/server/TCPServer.java b/src/main/java/org/ros/internal/node/server/TCPServer.java index ebb46ce..1e0ec6b 100644 --- a/src/main/java/org/ros/internal/node/server/TCPServer.java +++ b/src/main/java/org/ros/internal/node/server/TCPServer.java @@ -9,7 +9,7 @@ * TCPServer is the superclass of all objects using ServerSockets. */ public abstract class TCPServer implements Cloneable, Runnable { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(TCPServer.class); ServerSocket server = null; Socket data = null; diff --git a/src/main/java/org/ros/internal/node/server/TCPWorker.java b/src/main/java/org/ros/internal/node/server/TCPWorker.java index da44acb..182b9dc 100644 --- a/src/main/java/org/ros/internal/node/server/TCPWorker.java +++ b/src/main/java/org/ros/internal/node/server/TCPWorker.java @@ -17,7 +17,7 @@ * */ public class TCPWorker implements Runnable { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(TCPWorker.class); public boolean shouldRun = true; private Socket dataSocket; diff --git a/src/main/java/org/ros/internal/node/server/ThreadPoolManager.java b/src/main/java/org/ros/internal/node/server/ThreadPoolManager.java index ffd308d..089d3a8 100644 --- a/src/main/java/org/ros/internal/node/server/ThreadPoolManager.java +++ b/src/main/java/org/ros/internal/node/server/ThreadPoolManager.java @@ -26,7 +26,7 @@ * */ public class ThreadPoolManager { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(ThreadPoolManager.class); private static String DEFAULT_THREAD_POOL = "RPCSERVER"; private int threadNum = 0; diff --git a/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java b/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java index 072276a..9bade9a 100644 --- a/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java +++ b/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java @@ -26,7 +26,7 @@ * @author jg */ public class MasterRegistrationManagerImpl { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(MasterRegistrationManagerImpl.class); /** diff --git a/src/main/java/org/ros/internal/node/server/master/MasterServer.java b/src/main/java/org/ros/internal/node/server/master/MasterServer.java index fb69baf..d1088a3 100644 --- a/src/main/java/org/ros/internal/node/server/master/MasterServer.java +++ b/src/main/java/org/ros/internal/node/server/master/MasterServer.java @@ -1,19 +1,3 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.server.master; @@ -40,7 +24,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; -import java.net.UnknownHostException; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -55,8 +39,7 @@ * * @see Master documentation * - * @author damonkohler@google.com (Damon Kohler) - * @author khughes@google.com (Keith M. Hughes) + * @author jg */ public class MasterServer extends RpcServer implements MasterRegistrationListener { diff --git a/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java b/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java index 91e3b97..3e83c4b 100644 --- a/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java +++ b/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java @@ -1,4 +1,3 @@ - package org.ros.internal.node.service; @@ -27,7 +26,7 @@ /** * Default implementation of a {@link ServiceClient}. * - * @author damonkohler@google.com (Damon Kohler) + * @author jg */ public class DefaultServiceClient implements ServiceClient { @@ -96,7 +95,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla // TODO(damonkohler): Support non-persistent connections. connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1"); connectionHeader.merge(serviceDeclaration.toConnectionHeader()); - tcpClientManager = new TcpClientManager(executorService); + tcpClientManager = TcpClientManager.getInstance(executorService); ServiceClientHandshakeHandler serviceClientHandshakeHandler = new ServiceClientHandshakeHandler(connectionHeader, responseListeners, executorService); handshakeLatch = new HandshakeLatch(); diff --git a/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java b/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java index 46314f6..d22de50 100644 --- a/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java +++ b/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java @@ -36,7 +36,7 @@ */ public class DefaultPublisher extends DefaultTopicParticipant implements Publisher { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(DefaultPublisher.class); /** diff --git a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index 23b738b..bf497f2 100644 --- a/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -63,7 +63,7 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD this.executorService = executorService; incomingMessageQueue = new IncomingMessageQueue(executorService); knownPublishers = new HashSet(); - tcpClientManager = new TcpClientManager(executorService); + tcpClientManager = TcpClientManager.getInstance(executorService); mutex = new Object(); SubscriberHandshakeHandler subscriberHandshakeHandler = new SubscriberHandshakeHandler(toDeclaration().toConnectionHeader(), diff --git a/src/main/java/org/ros/internal/node/topic/SubscriberHandshake.java b/src/main/java/org/ros/internal/node/topic/SubscriberHandshake.java index c056362..bc900fe 100644 --- a/src/main/java/org/ros/internal/node/topic/SubscriberHandshake.java +++ b/src/main/java/org/ros/internal/node/topic/SubscriberHandshake.java @@ -16,7 +16,7 @@ */ public class SubscriberHandshake extends BaseClientHandshake { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(SubscriberHandshake.class); public SubscriberHandshake(ConnectionHeader outgoingConnectionHeader) { diff --git a/src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java b/src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java index 75d3346..0a5f14c 100644 --- a/src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java +++ b/src/main/java/org/ros/internal/node/topic/SubscriberHandshakeHandler.java @@ -8,6 +8,8 @@ import org.ros.internal.transport.ConnectionHeader; import org.ros.internal.transport.ConnectionHeaderFields; import org.ros.internal.transport.queue.IncomingMessageQueue; +import org.ros.internal.transport.tcp.AsynchTCPWorker; +import org.ros.internal.transport.tcp.AsynchTempTCPWorker; import org.ros.internal.transport.tcp.NamedChannelHandler; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; @@ -28,7 +30,7 @@ * the {@link Subscriber} may only subscribe to messages of this type */ class SubscriberHandshakeHandler extends BaseClientHandshakeHandler { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(SubscriberHandshakeHandler.class); private final IncomingMessageQueue incomingMessageQueue; @@ -40,25 +42,40 @@ public SubscriberHandshakeHandler(ConnectionHeader outgoingConnectionHeader, if( DEBUG ) log.info("subscriberhandshakeHandler ctor:"+this); } - + /** + * Triggered from BaseClientHandshakeHandler channelRead + */ @Override protected void onSuccess(ConnectionHeader incomingConnectionHeader, ChannelHandlerContext ctx) { if( DEBUG ) log.info("SubscriberHandshakeHandler.onSuccess:"+ctx+" "+incomingConnectionHeader); ChannelPipeline pipeline = ctx.pipeline(); - pipeline.remove(SubscriberHandshakeHandler.this); + pipeline.remove(getName()); NamedChannelHandler namedChannelHandler = incomingMessageQueue.getMessageReceiver(); pipeline.addLast(namedChannelHandler.getName(), namedChannelHandler); String latching = incomingConnectionHeader.getField(ConnectionHeaderFields.LATCHING); if (latching != null && latching.equals("1")) { incomingMessageQueue.setLatchMode(true); } - ctx.setReady(true); + // the termination of the AsynchTempTCPWorker after successful handshake will set context ready + // Launch the permanent worker + AsynchTCPWorker uworker = null; + try { + uworker = new AsynchTCPWorker(ctx); + } catch (IOException e) { + log.error("Cannot start worker for context:"+ctx); + e.printStackTrace(); + return; + } + executor.execute(uworker); } - + /** + * Triggered from BaseClientHandshakeHandler + */ @Override protected void onFailure(String errorMessage, ChannelHandlerContext ctx) throws IOException { log.info("Subscriber handshake failed: " + errorMessage); + ctx.setReady(false); ctx.close(); } diff --git a/src/main/java/org/ros/internal/system/Utility.java b/src/main/java/org/ros/internal/system/Utility.java index 652b3f2..56e1c20 100644 --- a/src/main/java/org/ros/internal/system/Utility.java +++ b/src/main/java/org/ros/internal/system/Utility.java @@ -20,7 +20,7 @@ * */ public class Utility { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(Utility.class); public static void serialize(T value, ByteBuffer buffer) { diff --git a/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java b/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java index 6cf0dbf..37afa17 100644 --- a/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java +++ b/src/main/java/org/ros/internal/transport/BaseClientHandshakeHandler.java @@ -20,12 +20,14 @@ * */ public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler { - protected static boolean DEBUG = true; + protected static boolean DEBUG = false; private static final Log log = LogFactory.getLog(BaseClientHandshakeHandler.class); private final ClientHandshake clientHandshake; private final ListenerGroup clientHandshakeListeners; + protected final ExecutorService executor; public BaseClientHandshakeHandler(ClientHandshake clientHandshake, ExecutorService executorService) { + this.executor = executorService; this.clientHandshake = clientHandshake; clientHandshakeListeners = new ListenerGroup(executorService); } diff --git a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java index 9463d16..8a4d65e 100644 --- a/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java +++ b/src/main/java/org/ros/internal/transport/ChannelHandlerContextImpl.java @@ -1,21 +1,16 @@ package org.ros.internal.transport; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.SocketAddress; + import java.nio.ByteBuffer; -//import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channel; import java.nio.channels.CompletionHandler; import java.nio.channels.SocketChannel; -import java.util.Collections; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.ros.internal.transport.tcp.ChannelGroup; diff --git a/src/main/java/org/ros/internal/transport/ConnectionHeader.java b/src/main/java/org/ros/internal/transport/ConnectionHeader.java index f010678..9145740 100644 --- a/src/main/java/org/ros/internal/transport/ConnectionHeader.java +++ b/src/main/java/org/ros/internal/transport/ConnectionHeader.java @@ -16,7 +16,7 @@ */ public class ConnectionHeader implements Serializable { private static final long serialVersionUID = -7508346673596180951L; - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(ConnectionHeader.class); private Map fields = new ConcurrentHashMap(); diff --git a/src/main/java/org/ros/internal/transport/ConnectionTrackingHandler.java b/src/main/java/org/ros/internal/transport/ConnectionTrackingHandler.java index e42d1e4..091f11f 100644 --- a/src/main/java/org/ros/internal/transport/ConnectionTrackingHandler.java +++ b/src/main/java/org/ros/internal/transport/ConnectionTrackingHandler.java @@ -21,7 +21,7 @@ */ public class ConnectionTrackingHandler extends AbstractNamedChannelHandler { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(ConnectionTrackingHandler.class); public ConnectionTrackingHandler() { diff --git a/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java b/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java index 4fe1a85..e408a30 100644 --- a/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java +++ b/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java @@ -19,7 +19,7 @@ */ public class MessageDispatcher extends CancellableLoop { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(MessageDispatcher.class); private final CircularBlockingDeque lazyMessages; diff --git a/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java b/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java index 24d5e59..31a11c8 100644 --- a/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java +++ b/src/main/java/org/ros/internal/transport/queue/MessageReceiver.java @@ -19,7 +19,7 @@ */ public class MessageReceiver extends AbstractNamedChannelHandler { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(MessageReceiver.class); private final CircularBlockingDeque lazyMessages; diff --git a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java index 82d3d6d..8bfc0aa 100644 --- a/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java +++ b/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java @@ -15,8 +15,7 @@ import org.ros.concurrent.CancellableLoop; import org.ros.concurrent.CircularBlockingDeque; import org.ros.exception.RosRuntimeException; -import org.ros.internal.message.Message; -import org.ros.internal.message.MessageBufferPool; + import org.ros.internal.message.MessageBuffers; import org.ros.internal.system.Utility; import org.ros.internal.transport.ChannelHandlerContext; @@ -31,7 +30,7 @@ */ public class OutgoingMessageQueue { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class); private static final int DEQUE_CAPACITY = 16384; @@ -52,13 +51,10 @@ public class OutgoingMessageQueue { * */ private final class Writer extends CancellableLoop { - //final Object waitFinish = new Object(); @Override public void loop() throws InterruptedException { T message = deque.takeFirst(); final ByteBuffer buffer = (ByteBuffer) latchedBuffer.clear();//messageBufferPool.acquire(); - //messageBufferPool.release(buffer); - //latchedBuffer.clear(); Utility.serialize(message, buffer); //if(DEBUG) { // log.info(String.format("Writing %d bytes.", buffer.position())); @@ -67,7 +63,6 @@ public void loop() throws InterruptedException { while(it.hasNext()) { final ChannelHandlerContext ctx = it.next(); final CountDownLatch cdl = new CountDownLatch(1); - //final Object waitFinish = ctx.getChannelCompletionMutex(); boolean sendMessage; synchronized( ctx.getMessageTypes() ) { sendMessage = ctx.getMessageTypes().contains(message.getClass().getName().replace('.', '/')); diff --git a/src/main/java/org/ros/internal/transport/tcp/AbstractNamedChannelHandler.java b/src/main/java/org/ros/internal/transport/tcp/AbstractNamedChannelHandler.java index 11d0aea..a5e3022 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AbstractNamedChannelHandler.java +++ b/src/main/java/org/ros/internal/transport/tcp/AbstractNamedChannelHandler.java @@ -1,17 +1,17 @@ package org.ros.internal.transport.tcp; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.ros.exception.RosRuntimeException; + import org.ros.internal.transport.ChannelHandlerContext; /** + * Provides operations on a named channel + * @author jg * */ public abstract class AbstractNamedChannelHandler implements NamedChannelHandler { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(AbstractNamedChannelHandler.class); @Override diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java b/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java index 98afceb..fca4129 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchBaseServer.java @@ -1,14 +1,10 @@ package org.ros.internal.transport.tcp; import java.io.IOException; -import java.net.InetAddress; + import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketOption; import java.net.StandardSocketOptions; -import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.SocketChannel; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -21,7 +17,7 @@ * */ public final class AsynchBaseServer extends AsynchTCPServer { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(AsynchBaseServer.class); public int WORKBOOTPORT = 0; public InetSocketAddress address = null; diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java index 61a5b20..5748b40 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPServer.java @@ -16,7 +16,7 @@ * @author jg */ public abstract class AsynchTCPServer implements Cloneable, Runnable { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(AsynchTCPServer.class); //AsynchronousServerSocketChannel server = null; //AsynchronousChannelGroup channelGroup; diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java index f630db9..a64feca 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java @@ -1,18 +1,12 @@ package org.ros.internal.transport.tcp; import java.io.IOException; -import java.net.SocketException; + import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.CompletionHandler; -import java.nio.channels.ReadPendingException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.ros.internal.message.MessageBufferPool; + import org.ros.internal.message.MessageBuffers; import org.ros.internal.system.Utility; import org.ros.internal.transport.ChannelHandlerContext; @@ -25,7 +19,7 @@ * */ public class AsynchTCPWorker implements Runnable { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(AsynchTCPWorker.class); public boolean shouldRun = true; private ChannelHandlerContext ctx; diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTempTCPWorker.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTempTCPWorker.java index 3cca8a4..bbac9b6 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTempTCPWorker.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTempTCPWorker.java @@ -3,22 +3,20 @@ import java.io.IOException; import java.net.SocketException; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.channels.ReadPendingException; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.ros.internal.message.MessageBufferPool; + import org.ros.internal.message.MessageBuffers; import org.ros.internal.system.Utility; import org.ros.internal.transport.ChannelHandlerContext; /** * This AsynchTCPWorker is spawned for servicing traffic for publisher during handshake. - * After handshake we dont need to get trqffic FROM subscribers, only send it to them. + * After handshake we dont need to get traffic FROM subscribers, only send it to them. * Due to that we need a worker that does not fiddle with sockets and try to close them if this thread * is terminated. The thread is designed to be terminated after handshake and replaced with * the outbound message processor. @@ -29,12 +27,11 @@ * */ public class AsynchTempTCPWorker implements Runnable { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(AsynchTempTCPWorker.class); public boolean shouldRun = true; private ChannelHandlerContext ctx; - private Object waitHalt = new Object(); - //private MessageBufferPool pool = new MessageBufferPool(); + public AsynchTempTCPWorker(ChannelHandlerContext ctx) throws IOException { this.ctx = ctx; @@ -43,14 +40,12 @@ public AsynchTempTCPWorker(ChannelHandlerContext ctx) throws IOException { } /** - * Client (Slave port) sends data to our master in the following loop + * Wait for completion of the handshake via polling the context read, repeatedly if necessary. + * Once we have success, terminate the thread after setting the context active. */ @Override public void run() { - //final Object waitFinish = ctx.getChannelCompletionMutex(); try { - //while(shouldRun) { - //final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire(); // initiate asynch read // If we get a read pending exception, try again final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire(); @@ -93,7 +88,8 @@ public void failed(Throwable arg0, Void arg1) { } catch(ReadPendingException rpe) { Thread.sleep(1); continue; } break; }// while - //} // shouldRun + // set as ready for channel write loop in OutogingMessageQueue + ctx.setReady(true); } catch(Exception se) { if( se instanceof SocketException ) { @@ -102,19 +98,9 @@ public void failed(Throwable arg0, Void arg1) { log.error("Remote invocation failure ",se); } } - synchronized(waitHalt) { - waitHalt.notify(); - } + } - public void close() { - synchronized(waitHalt) { - try { - shouldRun = false; - waitHalt.wait(); - } catch (InterruptedException ie) {} - } - } } diff --git a/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java b/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java index d7f0f76..e2f02b6 100644 --- a/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java +++ b/src/main/java/org/ros/internal/transport/tcp/ChannelInitializerFactoryStack.java @@ -17,7 +17,7 @@ * */ public class ChannelInitializerFactoryStack { - private static boolean DEBUG = true; + private static boolean DEBUG = false; private static final Log log = LogFactory.getLog(ChannelInitializerFactoryStack.class); private LinkedBlockingDeque queue = new LinkedBlockingDeque(); diff --git a/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java index e6c25ef..817f825 100644 --- a/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/ConnectionTrackingChannelPipelineFactory.java @@ -10,7 +10,7 @@ /** */ public class ConnectionTrackingChannelPipelineFactory extends ChannelInitializer { - public static boolean DEBUG = true; + public static boolean DEBUG = false; private static final Log log = LogFactory.getLog(ConnectionTrackingChannelPipelineFactory.class); public static final String CONNECTION_TRACKING_HANDLER = "ConnectionTrackingHandler"; diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java index 4c2b08b..a74136a 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java @@ -10,14 +10,12 @@ import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousSocketChannel; + import java.nio.channels.Channel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; + import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -27,7 +25,7 @@ */ public class TcpClient { - private static final boolean DEBUG = true; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(TcpClient.class); private static final int DEFAULT_CONNECTION_TIMEOUT_DURATION = 5; @@ -73,7 +71,7 @@ public Channel connect(String connectionName, SocketAddress socketAddress) throw channel = /*Asynchronous*/SocketChannel.open(/*channelGroup*/); ((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_RCVBUF, 4096000); ((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.SO_SNDBUF, 4096000); - ((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.TCP_NODELAY, true); + ((/*Asynchronous*/SocketChannel)channel).setOption(StandardSocketOptions.TCP_NODELAY, false); ctx = new ChannelHandlerContextImpl(channelGroup, channel, executor); TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(ctx.getChannelGroup(), namedChannelHandlers); // add handler pipeline factory to stack @@ -85,7 +83,7 @@ public Channel connect(String connectionName, SocketAddress socketAddress) throw // connect outbound to pub ctx.connect(socketAddress); - AsynchTCPWorker uworker = new AsynchTCPWorker(ctx); + AsynchTempTCPWorker uworker = new AsynchTempTCPWorker(ctx); executor.execute(uworker); // notify pipeline we connected (or failed via exceptionCaught and runtime exception) ctx.pipeline().fireChannelActive(); diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java index 89cca25..0f5403a 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java @@ -1,46 +1,60 @@ package org.ros.internal.transport.tcp; -//import org.jboss.netty.channel.Channel; -//import org.jboss.netty.channel.group.ChannelGroup; -//import org.jboss.netty.channel.group.DefaultChannelGroup; - import java.io.IOException; import java.net.SocketAddress; //import java.nio.channels.AsynchronousChannelGroup; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.ros.internal.transport.ChannelHandlerContext; - /** * TcpClientManager manages TCP clients which are the subscriber and service clients that communicate with * remote peers outside master domain. - * It requires the context with the channel group, and executor service constructed. + * It requires the executor service constructed. * For each TcpClient constructed there will be an associated ChannelHandlerContext. * We maintain a list of the ChannelHandlerContexts here (as TcpClients) such that we may perform the necessary ops on them. + * This class is set up as a singleton returning instances for each executor. * @author jg */ public class TcpClientManager { - public static boolean DEBUG = true; + public static boolean DEBUG = false; private static final Log log = LogFactory.getLog(TcpClientManager.class); private final /*Asynchronous*/ChannelGroup channelGroup; private final Collection tcpClients; private final List namedChannelHandlers; - private final Executor executor; + private final ExecutorService executor; + + private static ArrayBlockingQueue executors = new ArrayBlockingQueue(16); + private static ArrayBlockingQueue instances = new ArrayBlockingQueue(16); - public TcpClientManager(ExecutorService executor) throws IOException { - this.executor = executor; + public static TcpClientManager getInstance(ExecutorService exc) { + if( executors.contains(exc) ) { + Object[] exe = executors.toArray(); + Object[] ths = instances.toArray(); + for(int i = 0; i < exe.length; i++) { + if( exe.equals(exc) ) + return (TcpClientManager) ths[i]; + } + } + executors.add(exc); + TcpClientManager tcp = new TcpClientManager(exc); + instances.add(tcp); + return tcp; + } + + private TcpClientManager(ExecutorService executor) { + this.executor = executor; this.channelGroup = new ChannelGroupImpl(executor);/*AsynchronousChannelGroup.withThreadPool(executor);*/ this.tcpClients = new ArrayList(); this.namedChannelHandlers = new ArrayList(); - if( DEBUG ) log.info("TcpClientManager:"+executor+" "+channelGroup); } diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java index 7687045..8d69cd0 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClientPipelineFactory.java @@ -1,7 +1,5 @@ package org.ros.internal.transport.tcp; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.Channel; import java.util.List; import org.apache.commons.logging.Log; @@ -12,7 +10,7 @@ * @author jg */ public class TcpClientPipelineFactory extends ConnectionTrackingChannelPipelineFactory { - public static boolean DEBUG = true; + public static boolean DEBUG = false; private static final Log log = LogFactory.getLog(TcpClientPipelineFactory.class); public static final String LENGTH_FIELD_BASED_FRAME_DECODER = "LengthFieldBasedFrameDecoder"; public static final String LENGTH_FIELD_PREPENDER = "LengthFieldPrepender"; diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java b/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java index 94ac3ac..c1881eb 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpRosServer.java @@ -48,7 +48,7 @@ */ public class TcpRosServer implements Serializable { private static final long serialVersionUID = 1298495789043968855L; - private static final boolean DEBUG = true ; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(TcpRosServer.class); private BindAddress bindAddress; diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java b/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java index 4ca8bf9..3432c31 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java @@ -26,7 +26,7 @@ * @author jg */ public class TcpServerHandshakeHandler implements ChannelHandler { - private static final boolean DEBUG = true ; + private static final boolean DEBUG = false; private static final Log log = LogFactory.getLog(TcpServerHandshakeHandler.class); private final TopicParticipantManager topicParticipantManager; private final ServiceManager serviceManager; @@ -44,7 +44,9 @@ public TcpServerHandshakeHandler(TopicParticipantManager topicParticipantManager public void channelActive(ChannelHandlerContext ctx) { log.info("Channel active"); } - + /** + * Channel read initiated by pipeline generated message + */ @Override public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception { if( DEBUG ) { @@ -59,7 +61,12 @@ public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception } return e; } - + /** + * Handle the handshake for a service + * @param ctx + * @param incomingHeader + * @throws IOException + */ private void handleServiceHandshake(ChannelHandlerContext ctx, ConnectionHeader incomingHeader) throws IOException { if( DEBUG ) { log.info("service handshake:"+ctx+" header:"+incomingHeader); @@ -78,7 +85,13 @@ private void handleServiceHandshake(ChannelHandlerContext ctx, ConnectionHeader ctx.pipeline().addLast("ServiceRequestHandler", serviceServer.newRequestHandler()); } } - + /** + * Handle the handshake for a typical (not a service) subscriber. + * @param ctx + * @param incomingConnectionHeader + * @throws InterruptedException + * @throws Exception + */ private void handleSubscriberHandshake(final ChannelHandlerContext ctx, final ConnectionHeader incomingConnectionHeader) throws InterruptedException, Exception { if( DEBUG ) { @@ -90,10 +103,10 @@ private void handleSubscriberHandshake(final ChannelHandlerContext ctx, final Co GraphName.of(incomingConnectionHeader.getField(ConnectionHeaderFields.TOPIC)); assert(topicParticipantManager.hasPublisher(topicName)) : "No publisher for topic: " + topicName; + final DefaultPublisher publisher = topicParticipantManager.getPublisher(topicName); final ByteBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader); - // Write the handshake data back to client and upon completion set this channel - // ready for write queue + // Write the handshake data back to client ctx.write(outgoingBuffer, new CompletionHandler() { @Override public void completed(Integer arg0, Void arg1) { @@ -101,15 +114,14 @@ public void completed(Integer arg0, Void arg1) { publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName), new TopicIdentifier(topicName)), ctx); // Once the handshake is complete, there will be nothing incoming on the // channel as we are only queueing outbound traffic to the subscriber, which is done by the OutgoingMessgequeue. - // So, we replace the handshake handler with a handler which will - // drop everything. + // So, we remove the handler ctx.pipeline().remove(TcpServerPipelineFactory.HANDSHAKE_HANDLER); // Set this context ready to receive the message type specified synchronized(ctx.getMessageTypes()) { ctx.getMessageTypes().add(incomingConnectionHeader.getField(ConnectionHeaderFields.TYPE)); } - // set as ready for channel write loop in OutogingMessageQueue - ctx.setReady(true); + // The handshake is complete and the only task is to set the context ready, which will allow + // the outbound queue to start sending messages. if( DEBUG ) { log.info("subscriber complete:"+outgoingBuffer); } diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java b/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java index bd41b95..ea85f2e 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpServerPipelineFactory.java @@ -18,7 +18,7 @@ * @author jg */ public class TcpServerPipelineFactory extends ConnectionTrackingChannelPipelineFactory { - public static boolean DEBUG = true; + public static boolean DEBUG = false; private static final Log log = LogFactory.getLog(TcpServerPipelineFactory.class); public static final String LENGTH_FIELD_BASED_FRAME_DECODER = "LengthFieldBasedFrameDecoder"; public static final String LENGTH_FIELD_PREPENDER = "LengthFieldPrepender"; diff --git a/src/main/java/org/ros/node/DefaultNodeMainExecutor.java b/src/main/java/org/ros/node/DefaultNodeMainExecutor.java index f4e8fb5..ec72887 100644 --- a/src/main/java/org/ros/node/DefaultNodeMainExecutor.java +++ b/src/main/java/org/ros/node/DefaultNodeMainExecutor.java @@ -1,22 +1,5 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.node; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.ros.concurrent.DefaultScheduledExecutorService; @@ -31,9 +14,11 @@ import java.util.concurrent.ScheduledExecutorService; /** - * Executes {@link NodeMain}s in separate threads. + * Executes {@link NodeMain}s in a multithreaded environment using the threads + * provided an ExecutorService. + * Methods here to execute: create, start, register, shutdown instances of NodeMain. * - * @author damonkohler@google.com (Damon Kohler) + * @author jg */ public class DefaultNodeMainExecutor implements NodeMainExecutor { diff --git a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java index 8ceb309..cc8b0b3 100644 --- a/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java +++ b/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java @@ -151,9 +151,9 @@ public void setup() { new IncomingMessageQueue(executorService); secondIncomingMessageQueue = new IncomingMessageQueue(executorService); - firstTcpClientManager = new TcpClientManager(executorService); + firstTcpClientManager = TcpClientManager.getInstance(executorService); firstTcpClientManager.addNamedChannelHandler(firstIncomingMessageQueue.getMessageReceiver()); - secondTcpClientManager = new TcpClientManager(executorService); + secondTcpClientManager = TcpClientManager.getInstance(executorService); secondTcpClientManager.addNamedChannelHandler(secondIncomingMessageQueue.getMessageReceiver()); } catch(Exception e) { throw new RosRuntimeException(e); } }