Skip to content

Commit

Permalink
Fixed node disconnect/reconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
neocoretechs committed Jun 1, 2016
1 parent 52c5db3 commit 9c92ad5
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 102 deletions.
22 changes: 4 additions & 18 deletions src/main/java/org/ros/internal/node/client/SlaveClient.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.client;

import org.ros.internal.node.response.IntegerResultFactory;
Expand All @@ -34,10 +18,12 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;


/**
* @author [email protected] (Damon Kohler)
* Create a client to the RpcEndpoint via SlaveRpcEndpointImpl
* from the socket address passed in constructor.
* @author jg
*/
public class SlaveClient extends Client<SlaveRpcEndpoint> {

Expand Down
18 changes: 7 additions & 11 deletions src/main/java/org/ros/internal/node/server/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import org.ros.internal.system.Process;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,18 +31,16 @@ public abstract class RpcServer {

public RpcServer(BindAddress bindAddress, AdvertiseAddress advertiseAddress) throws IOException {
this.advertiseAddress = advertiseAddress;
/*
this.advertiseAddress.setPortCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return server.getPort();
}
});
*/
}


/**
* Invoke a method via remote call.
* @param rri The RemoteRequestInterface passed from remote client
* @return The Object result of invocation
* @throws Exception
*/
public abstract Object invokeMethod(RemoteRequestInterface rri) throws Exception;

/**
* Start up the remote calling server.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,16 @@
/*
* Copyright (C) 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.server.master;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.ros.internal.node.service.ServiceIdentifier;
import org.ros.master.client.TopicSystemState;
import org.ros.namespace.GraphName;
Expand All @@ -37,10 +23,10 @@
* <p>
* This class is not thread-safe.
*
* @author [email protected] (Keith M. Hughes)
* @author jg
*/
public class MasterRegistrationManagerImpl {

private static boolean DEBUG = true;
private static final Log log = LogFactory.getLog(MasterRegistrationManagerImpl.class);

/**
Expand All @@ -65,6 +51,7 @@ public class MasterRegistrationManagerImpl {
*/
private final MasterRegistrationListener listener;


public MasterRegistrationManagerImpl(MasterRegistrationListener listener) {
this.listener = listener;
nodes = new HashMap<GraphName, NodeRegistrationInfo>();
Expand Down Expand Up @@ -425,21 +412,27 @@ private NodeRegistrationInfo obtainNodeRegistrationInfo(GraphName nodeName, Inet
}
log.info("Replacing node "+node.getNodeSlaveUri()+" with new requested "+nodeSlaveUri);
// The node is switching slave URIs, so we need a new one.
potentiallyDeleteNode(node);
//potentiallyDeleteNode(node);
nodes.remove(nodeName);
cleanupNode(node);
NodeRegistrationInfo newNode = new NodeRegistrationInfo(nodeName, nodeSlaveUri);
nodes.put(nodeName, newNode);
// Try to reach old node via SlaveClient to shut it down
/*
try {
listener.onNodeReplacement(node);
} catch (Exception e) {
// No matter what, we want to keep going
log.error("Error during onNodeReplacement call", e);
}
*/
return newNode;
} else {
// no existing node
node = new NodeRegistrationInfo(nodeName, nodeSlaveUri);
nodes.put(nodeName, node);
return node;
}

// Either no existing node, or the old node needs to go away
node = new NodeRegistrationInfo(nodeName, nodeSlaveUri);
nodes.put(nodeName, node);

return node;
}

/**
Expand All @@ -463,6 +456,39 @@ private void cleanupNode(NodeRegistrationInfo node) {
}
}

/**
* A node is being replaced. Change the NodeRegistrationInfo to the new address
*
* @param node
* the node being replaced
* @param newNode
* the new node
*/
private void replaceNode(NodeRegistrationInfo node, NodeRegistrationInfo newNode) {
boolean found = false;
for (TopicRegistrationInfo topic : node.getPublishers()) {
found = topic.removePublisher(node);
if( found ) {
if( DEBUG )
log.info("Replacing publisher:"+node+" "+newNode+" "+topic);
topic.addPublisher(newNode, topic.getMessageType());
}
}

for (TopicRegistrationInfo topic : node.getSubscribers()) {
found = topic.removeSubscriber(node);
if( found ) {
if( DEBUG )
log.info("Replacing subscriber:"+node+" "+newNode+" "+topic);
topic.addSubscriber(newNode, topic.getMessageType());
}
}

// TODO: service?
for (ServiceRegistrationInfo service : node.getServices()) {
services.remove(service.getServiceName());
}
}
/**
* Remove a node from registration if it no longer has any registrations.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ protected void contactSubscriberForPublisherUpdate(InetSocketAddress subscriberS
try {
client = new SlaveClient(MASTER_NODE_NAME, subscriberSlaveUri);
} catch (IOException e) {
log.error("MasterServer cannot construct slave cleint to unknown host "+subscriberSlaveUri,e);
log.error("MasterServer cannot construct slave client to unknown host "+subscriberSlaveUri,e);
throw new RosRuntimeException(e);
}
client.publisherUpdate(topicName, publisherUris);
Expand Down Expand Up @@ -490,7 +490,10 @@ public List<Object> getPublishedTopics(GraphName caller, GraphName subgraph) {
return result;
}
}

/**
* Create a new SlaveClient with passed NodeRegistrationInfo
* Triggered after shutdown and removal of publishers and subscribers from old slave node
*/
@Override
public void onNodeReplacement(NodeRegistrationInfo nodeInfo) {
// A node in the registration manager is being replaced. Contact the node
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
/*
* Copyright (C) 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.server.master;

import org.ros.namespace.GraphName;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;

/**
* Information a master needs about a node.
*
* @author [email protected] (Keith M. Hughes)
* @author jg
*/
public class NodeRegistrationInfo {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (C) 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.server.master;


Expand All @@ -27,6 +11,8 @@
/**
* All information known to the manager about a topic.
*
* @author jg
*
*/
public class TopicRegistrationInfo {

Expand Down
35 changes: 24 additions & 11 deletions src/main/java/org/ros/internal/transport/tcp/AsynchTCPWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,29 @@ public void run() {
// If we get a read pending exception, try again
final ByteBuffer buf = MessageBuffers.dynamicBuffer();//pool.acquire();
buf.clear();
final CountDownLatch cdl = new CountDownLatch(1);
//final CountDownLatch cdl = new CountDownLatch(1);
int res = ctx.read(buf);
buf.flip();
Object reso = Utility.deserialize(buf);
if( DEBUG )
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" result:"+res+" Object:"+reso);
// seems like a -1 is generated when channel breaks, so stop
// this worker on that case
if( res == -1) {
shouldRun = false;
if( DEBUG )
log.info("ROS AsynchTCPWorker CHANNEL BREAK, TERMINATING for "+ctx);
} else {
buf.flip();
Object reso = Utility.deserialize(buf);
if( DEBUG )
log.info("ROS AsynchTCPWorker COMPLETED READ for "+ctx+" buffer:"+buf+" result:"+res+" Object:"+reso);
try {
ctx.pipeline().fireChannelRead(reso);
} catch (Exception e) {
if( DEBUG) {
log.info("Exception out of fireChannelRead",e);
e.printStackTrace();
}
ctx.pipeline().fireExceptionCaught(e);
}
}
/*
ctx.read(buf, new CompletionHandler<Integer, Void>() {
@Override
Expand Down Expand Up @@ -103,17 +120,13 @@ public void failed(Throwable arg0, Void arg1) {
} // shouldRun

} catch(Exception se) {
if( se instanceof SocketException ) {
log.error("Received SocketException, connection reset..");
} else {
log.error("Remote invocation failure ",se);
}
log.error("AsynchTCPWorker terminating due to ",se);
} finally {
try {
if( DEBUG )
log.info("<<<<<<<<<< Datasocket closing >>>>>>>>");
ctx.close();
ctx.setReady(false);
ctx.close();
} catch (IOException e) {}
}
synchronized(waitHalt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception
if( DEBUG ) {
log.info("TcpServerHandshakeHandler channelRead:"+e);
}
// check for null, possible fault on bad connect
ConnectionHeader incomingHeader = (ConnectionHeader)e;
if (incomingHeader.hasField(ConnectionHeaderFields.SERVICE)) {
handleServiceHandshake(ctx, incomingHeader);
Expand Down

0 comments on commit 9c92ad5

Please sign in to comment.