diff --git a/src/main/java/org/ros/internal/node/client/SlaveClient.java b/src/main/java/org/ros/internal/node/client/SlaveClient.java index bea5159..dfa3ae3 100644 --- a/src/main/java/org/ros/internal/node/client/SlaveClient.java +++ b/src/main/java/org/ros/internal/node/client/SlaveClient.java @@ -1,19 +1,3 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.client; import org.ros.internal.node.response.IntegerResultFactory; @@ -34,10 +18,12 @@ import java.util.Collection; import java.util.List; -import java.util.Map; + /** - * @author damonkohler@google.com (Damon Kohler) + * Create a client to the RpcEndpoint via SlaveRpcEndpointImpl + * from the socket address passed in constructor. + * @author jg */ public class SlaveClient extends Client { diff --git a/src/main/java/org/ros/internal/node/server/RpcServer.java b/src/main/java/org/ros/internal/node/server/RpcServer.java index eccc689..5330aab 100644 --- a/src/main/java/org/ros/internal/node/server/RpcServer.java +++ b/src/main/java/org/ros/internal/node/server/RpcServer.java @@ -8,10 +8,8 @@ import org.ros.internal.system.Process; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,18 +31,16 @@ public abstract class RpcServer { public RpcServer(BindAddress bindAddress, AdvertiseAddress advertiseAddress) throws IOException { this.advertiseAddress = advertiseAddress; - /* - this.advertiseAddress.setPortCallable(new Callable() { - @Override - public Integer call() throws Exception { - return server.getPort(); - } - }); - */ } - + /** + * Invoke a method via remote call. + * @param rri The RemoteRequestInterface passed from remote client + * @return The Object result of invocation + * @throws Exception + */ public abstract Object invokeMethod(RemoteRequestInterface rri) throws Exception; + /** * Start up the remote calling server. * diff --git a/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java b/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java index e94c0a6..072276a 100644 --- a/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java +++ b/src/main/java/org/ros/internal/node/server/master/MasterRegistrationManagerImpl.java @@ -1,19 +1,3 @@ -/* - * Copyright (C) 2012 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.server.master; import java.net.InetSocketAddress; @@ -21,10 +5,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; + import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.ros.internal.node.service.ServiceIdentifier; import org.ros.master.client.TopicSystemState; import org.ros.namespace.GraphName; @@ -37,10 +23,10 @@ *

* This class is not thread-safe. * - * @author khughes@google.com (Keith M. Hughes) + * @author jg */ public class MasterRegistrationManagerImpl { - + private static boolean DEBUG = true; private static final Log log = LogFactory.getLog(MasterRegistrationManagerImpl.class); /** @@ -65,6 +51,7 @@ public class MasterRegistrationManagerImpl { */ private final MasterRegistrationListener listener; + public MasterRegistrationManagerImpl(MasterRegistrationListener listener) { this.listener = listener; nodes = new HashMap(); @@ -425,21 +412,27 @@ private NodeRegistrationInfo obtainNodeRegistrationInfo(GraphName nodeName, Inet } log.info("Replacing node "+node.getNodeSlaveUri()+" with new requested "+nodeSlaveUri); // The node is switching slave URIs, so we need a new one. - potentiallyDeleteNode(node); + //potentiallyDeleteNode(node); + nodes.remove(nodeName); cleanupNode(node); + NodeRegistrationInfo newNode = new NodeRegistrationInfo(nodeName, nodeSlaveUri); + nodes.put(nodeName, newNode); + // Try to reach old node via SlaveClient to shut it down + /* try { listener.onNodeReplacement(node); } catch (Exception e) { // No matter what, we want to keep going log.error("Error during onNodeReplacement call", e); } + */ + return newNode; + } else { + // no existing node + node = new NodeRegistrationInfo(nodeName, nodeSlaveUri); + nodes.put(nodeName, node); + return node; } - - // Either no existing node, or the old node needs to go away - node = new NodeRegistrationInfo(nodeName, nodeSlaveUri); - nodes.put(nodeName, node); - - return node; } /** @@ -463,6 +456,39 @@ private void cleanupNode(NodeRegistrationInfo node) { } } + /** + * A node is being replaced. Change the NodeRegistrationInfo to the new address + * + * @param node + * the node being replaced + * @param newNode + * the new node + */ + private void replaceNode(NodeRegistrationInfo node, NodeRegistrationInfo newNode) { + boolean found = false; + for (TopicRegistrationInfo topic : node.getPublishers()) { + found = topic.removePublisher(node); + if( found ) { + if( DEBUG ) + log.info("Replacing publisher:"+node+" "+newNode+" "+topic); + topic.addPublisher(newNode, topic.getMessageType()); + } + } + + for (TopicRegistrationInfo topic : node.getSubscribers()) { + found = topic.removeSubscriber(node); + if( found ) { + if( DEBUG ) + log.info("Replacing subscriber:"+node+" "+newNode+" "+topic); + topic.addSubscriber(newNode, topic.getMessageType()); + } + } + + // TODO: service? + for (ServiceRegistrationInfo service : node.getServices()) { + services.remove(service.getServiceName()); + } + } /** * Remove a node from registration if it no longer has any registrations. * diff --git a/src/main/java/org/ros/internal/node/server/master/MasterServer.java b/src/main/java/org/ros/internal/node/server/master/MasterServer.java index 5ca1ac7..fb69baf 100644 --- a/src/main/java/org/ros/internal/node/server/master/MasterServer.java +++ b/src/main/java/org/ros/internal/node/server/master/MasterServer.java @@ -276,7 +276,7 @@ protected void contactSubscriberForPublisherUpdate(InetSocketAddress subscriberS try { client = new SlaveClient(MASTER_NODE_NAME, subscriberSlaveUri); } catch (IOException e) { - log.error("MasterServer cannot construct slave cleint to unknown host "+subscriberSlaveUri,e); + log.error("MasterServer cannot construct slave client to unknown host "+subscriberSlaveUri,e); throw new RosRuntimeException(e); } client.publisherUpdate(topicName, publisherUris); @@ -490,7 +490,10 @@ public List getPublishedTopics(GraphName caller, GraphName subgraph) { return result; } } - + /** + * Create a new SlaveClient with passed NodeRegistrationInfo + * Triggered after shutdown and removal of publishers and subscribers from old slave node + */ @Override public void onNodeReplacement(NodeRegistrationInfo nodeInfo) { // A node in the registration manager is being replaced. Contact the node diff --git a/src/main/java/org/ros/internal/node/server/master/NodeRegistrationInfo.java b/src/main/java/org/ros/internal/node/server/master/NodeRegistrationInfo.java index 8eb11e4..fd2380f 100644 --- a/src/main/java/org/ros/internal/node/server/master/NodeRegistrationInfo.java +++ b/src/main/java/org/ros/internal/node/server/master/NodeRegistrationInfo.java @@ -1,33 +1,15 @@ -/* - * Copyright (C) 2012 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.server.master; import org.ros.namespace.GraphName; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.URI; import java.util.HashSet; import java.util.Set; /** * Information a master needs about a node. * - * @author khughes@google.com (Keith M. Hughes) + * @author jg */ public class NodeRegistrationInfo { diff --git a/src/main/java/org/ros/internal/node/server/master/TopicRegistrationInfo.java b/src/main/java/org/ros/internal/node/server/master/TopicRegistrationInfo.java index 3ce5a44..2664cd2 100644 --- a/src/main/java/org/ros/internal/node/server/master/TopicRegistrationInfo.java +++ b/src/main/java/org/ros/internal/node/server/master/TopicRegistrationInfo.java @@ -1,19 +1,3 @@ -/* - * Copyright (C) 2012 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package org.ros.internal.node.server.master; @@ -27,6 +11,8 @@ /** * All information known to the manager about a topic. * + * @author jg + * */ public class TopicRegistrationInfo { diff --git a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java index 652b740..f630db9 100644 --- a/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java +++ b/src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java @@ -51,12 +51,29 @@ public void run() { // If we get a read pending exception, try again final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire(); buf.clear(); - final CountDownLatch cdl = new CountDownLatch(1); + //final CountDownLatch cdl = new CountDownLatch(1); int res = ctx.read(buf); - buf.flip(); - Object reso = Utility.deserialize(buf); - if( DEBUG ) - log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" result:"+res+" Object:"+reso); + // seems like a -1 is generated when channel breaks, so stop + // this worker on that case + if( res == -1) { + shouldRun = false; + if( DEBUG ) + log.info("ROS AsynchTCPWorker CHANNEL BREAK, TERMINATING for "+ctx); + } else { + buf.flip(); + Object reso = Utility.deserialize(buf); + if( DEBUG ) + log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" result:"+res+" Object:"+reso); + try { + ctx.pipeline().fireChannelRead(reso); + } catch (Exception e) { + if( DEBUG) { + log.info("Exception out of fireChannelRead",e); + e.printStackTrace(); + } + ctx.pipeline().fireExceptionCaught(e); + } + } /* ctx.read(buf, new CompletionHandler() { @Override @@ -103,17 +120,13 @@ public void failed(Throwable arg0, Void arg1) { } // shouldRun } catch(Exception se) { - if( se instanceof SocketException ) { - log.error("Received SocketException, connection reset.."); - } else { - log.error("Remote invocation failure ",se); - } + log.error("AsynchTCPWorker terminating due to ",se); } finally { try { if( DEBUG ) log.info("<<<<<<<<<< Datasocket closing >>>>>>>>"); - ctx.close(); ctx.setReady(false); + ctx.close(); } catch (IOException e) {} } synchronized(waitHalt) { diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java b/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java index ba21617..4ca8bf9 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java @@ -50,6 +50,7 @@ public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception if( DEBUG ) { log.info("TcpServerHandshakeHandler channelRead:"+e); } + // check for null, possible fault on bad connect ConnectionHeader incomingHeader = (ConnectionHeader)e; if (incomingHeader.hasField(ConnectionHeaderFields.SERVICE)) { handleServiceHandshake(ctx, incomingHeader);