Skip to content

Commit

Permalink
Multiple handshake bug fix. Completion of original TODO of using Topi…
Browse files Browse the repository at this point in the history
…cParticipantManager in DefaultSubscriber.
  • Loading branch information
neocoretechs committed Feb 4, 2020
1 parent 332cb7b commit 9b50e2a
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/ros/address/AdvertiseAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public AdvertiseAddress(String host, final int port) {
}

public AdvertiseAddress(InetAddress host, int port) {
this.host = host.getCanonicalHostName();
this.host = host.getHostName();
this.port = port;
}

Expand Down
17 changes: 15 additions & 2 deletions src/main/java/org/ros/internal/node/client/Registrar.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@

/**
* Manages topic, and service registrations of a {@link SlaveServer} with the
* {@link MasterServer}.
* {@link MasterServer}.<br/>
* The primary activity here is to respond to events that occur when a publisher or subscriber is added
* or removed. The fact that it is constructed with a MasterClient and an ExecutorService is a clue.
* For instance, in the onSubscriberAdded event handler method, we contact the remote master to
* register the subscriber. To do this we call updatePublishers on the subscriber
* using the collection of publisherUris returned in the response to masterClient.<br/>
* Passed DefaultSubscriber class creates an UpdatePublisherRunnable thread
* which creates a SlaveClient of type SlaveRpcEndpointImpl to
* contact the publishers. If successful, call signalOnMasterRegistrationSuccess for the subscriber.
*
* @author jg
*/
Expand Down Expand Up @@ -170,7 +178,12 @@ public void run() {
});
}
}

/**
* Contact the remote master to register the subscriber. Call updatePublishers on the subscriber
* using the collection of publisherUris returned in the response. Passed DefaultSubscriber class creates an
* UpdatePublisherRunnable thread which creates a SlaveClient of type SlaveRpcEndpointImpl to
* contact the publishers. If successful, signalOnMasterRegistrationSuccess for the subscriber.
*/
@Override
public void onSubscriberAdded(final DefaultSubscriber<?> subscriber) {
if (DEBUG) {
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/org/ros/internal/node/server/SlaveServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
* A remote client can request connections, get collections of publishers and subscriber and information
* about the state of the bus including connections between publishers and subscribers. TcpRosServer is
* the class that does most of the work and is wrapped by this class, which is here to provide the subset of
* remotely invokable methods.
* remotely invokable methods via reflection using the ServerInvokeMethod class.
* @see ServerInvokeMethod
* @see TcpRosServer
* @author jg
*/
public class SlaveServer extends RpcServer {
Expand Down Expand Up @@ -111,7 +113,7 @@ public List<Object> getBusStats(String callerId) {
* The fourth element is the protocol<br/>
* The fifth element is the topic name.<br/>
* @param callerId
* @return
* @return A List of Objects representing the above, packaged thusly to enable remote serialization delivery.
*/
public List<Object> getBusInfo(String callerId) {
List<Object> busInfo = new ArrayList<Object>();
Expand Down Expand Up @@ -183,10 +185,13 @@ public int paramUpdate(GraphName parameterName, Object parameterValue) {
return parameterManager.updateParameter(parameterName, parameterValue);
}
/**
*
* If there is a subscriber subscribed to this topicName, call updatePublishers on the subscriber
* using the collection of publisherUris. DefaultSubscriber class creates an
* UpdatePublisherRunnable thread which creates a SlaveClient of type SlaveRpcEndpointImpl to
* contact the publisher.
* @param callerId
* @param topicName
* @param publisherUris
* @param publisherUris collection of InetSocketAddress of remote publishers to be updated.
*/
public void publisherUpdate(String callerId, String topicName, Collection<InetSocketAddress> publisherUris) {
GraphName graphName = GraphName.of(topicName);
Expand All @@ -199,7 +204,7 @@ public void publisherUpdate(String callerId, String topicName, Collection<InetSo
if(DEBUG) {
log.info("Updating subscriber:"+subscriber);
for(InetSocketAddress i: publisherUris)
log.info("Publisher:"+i);
log.info("PUBLISHER:"+i+" for:"+this);
}
}
}
Expand Down
80 changes: 57 additions & 23 deletions src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.ros.concurrent.ListenerGroup;
import org.ros.concurrent.SignalRunnable;
import org.ros.internal.node.server.NodeIdentifier;
Expand All @@ -24,13 +23,18 @@
import java.util.concurrent.TimeUnit;

/**
* Default implementation of a {@link Subscriber}.
* Default implementation of a {@link Subscriber}.<br/>
* Primary players are knownPublishers, which is a Set of PublisherIdentifiers,<br/>
* and TcpClientManager, which has the NamedChannelHandlers.<br/>
* Here, we also maintain the incomingMessageQueue, which contains MessageListeners of the type this
* class is parameterized with.<br/>
*
*
* @author jg
*/
public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {

private static final Log log = LogFactory.getLog(DefaultSubscriber.class);
private static boolean DEBUG = true;
private static final Log log = LogFactory.getLog(DefaultSubscriber.class);

/**
* The maximum delay before shutdown will begin even if all
Expand All @@ -43,27 +47,35 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
private final NodeIdentifier nodeIdentifier;
private final ScheduledExecutorService executorService;
private final IncomingMessageQueue<T> incomingMessageQueue;
private final Set<PublisherIdentifier> knownPublishers;
// private final Set<PublisherIdentifier> knownPublishers;
private final TcpClientManager tcpClientManager;
private final TopicParticipantManager topicParticipantManager;
private final Object mutex;

/**
* Manages the {@link SubscriberListener}s for this {@link Subscriber}.
*/
private final ListenerGroup<SubscriberListener<T>> subscriberListeners;

//public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier,
// TopicDeclaration description, ScheduledExecutorService executorService) throws IOException {
// return new DefaultSubscriber<S>(nodeIdentifier, description, executorService);
//}
public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier,
TopicDeclaration description, ScheduledExecutorService executorService) throws IOException {
return new DefaultSubscriber<S>(nodeIdentifier, description, executorService);
TopicDeclaration description,
TopicParticipantManager topicParticipantManager,
ScheduledExecutorService executorService) throws IOException {
return new DefaultSubscriber<S>(nodeIdentifier, description, topicParticipantManager, executorService);
}

private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, ScheduledExecutorService executorService) throws IOException {
private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) throws IOException {
super(topicDeclaration);
this.nodeIdentifier = nodeIdentifier;
this.executorService = executorService;
incomingMessageQueue = new IncomingMessageQueue<T>(executorService);
knownPublishers = new HashSet<PublisherIdentifier>();
//knownPublishers = new HashSet<PublisherIdentifier>();
tcpClientManager = TcpClientManager.getInstance(executorService);
this.topicParticipantManager = topicParticipantManager;
mutex = new Object();
SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(),
Expand All @@ -73,22 +85,26 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD
subscriberListeners.add(new DefaultSubscriberListener<T>() {
@Override
public void onMasterRegistrationSuccess(Subscriber<T> registrant) {
log.info("Subscriber registered: " + DefaultSubscriber.this);
if(DEBUG)
log.info("Subscriber registered: " + DefaultSubscriber.this);
}

@Override
public void onMasterRegistrationFailure(Subscriber<T> registrant) {
log.info("Subscriber registration failed: " + DefaultSubscriber.this);
if(DEBUG)
log.info("Subscriber registration failed: " + DefaultSubscriber.this);
}

@Override
public void onMasterUnregistrationSuccess(Subscriber<T> registrant) {
log.info("Subscriber unregistered: " + DefaultSubscriber.this);
if(DEBUG)
log.info("Subscriber unregistered: " + DefaultSubscriber.this);
}

@Override
public void onMasterUnregistrationFailure(Subscriber<T> registrant) {
log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
if(DEBUG)
log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
}
});
}
Expand Down Expand Up @@ -119,27 +135,44 @@ public void addMessageListener(MessageListener<T> messageListener, int limit) {
public void addMessageListener(MessageListener<T> messageListener) {
addMessageListener(messageListener, 1);
}


/**
* When the SlaveClient requests a topic from the publisher in UpdatePublisherRunnable, as
* happens when the method updatePublishers is called here, this method is called back on reply from master.
* TcpClientManager calls connect to the passed InetSocketAddress. After that, all the SubscriberListeners are
* signaled with the new Publisher.
* @param publisherIdentifier
* @param address
* @throws Exception
*/
public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) throws Exception {
synchronized (mutex) {
// TODO(damonkohler): If the connection is dropped, knownPublishers should
// be updated.
if (knownPublishers.contains(publisherIdentifier)) {
return;
}
tcpClientManager.connect(toString(), address);
//if (knownPublishers.contains(publisherIdentifier)) {
// return;
//}
Collection<PublisherIdentifier> pubs = topicParticipantManager.getSubscriberConnections(this);
if(pubs != null && pubs.contains(publisherIdentifier)) {
log.info("Defaultsubscriber addPublisher topicParticipantManager CONTAINS "+publisherIdentifier+" at "+address);
} else {
log.info("Defaultsubscriber addPublisher topicParticipantManager DOES NOT CONTAIN "+publisherIdentifier+" at "+address);
topicParticipantManager.addSubscriberConnection(this, publisherIdentifier);
}
tcpClientManager.connect(toString(), address);
// TODO(damonkohler): knownPublishers is duplicate information that is
// already available to the TopicParticipantManager.
knownPublishers.add(publisherIdentifier);
//knownPublishers.add(publisherIdentifier);
signalOnNewPublisher(publisherIdentifier);
}
}

/**
* Updates the list of {@link Publisher}s for the topic that this
* {@link Subscriber} is interested in.
*
* {@link Subscriber} is interested in.<p/>
* Creates UpdatePublisherRunnable of this classes generic type for each PublisherIdentifier.
* Using executorService, spin the runnable which creates SlaveClient of type SlaveRpcEndpoint.
* This is invoked from client Registrar when the onSubscriberAdded event occurs, and
* from the SlaveServer when publisherUpdate is called.<br/>
* @param publisherIdentifiers
* {@link Collection} of {@link PublisherIdentifier}s for the
* subscribed topic
Expand Down Expand Up @@ -279,4 +312,5 @@ public void run(SubscriberListener<T> listener) {
public String toString() {
return "Subscriber<" + getTopicDeclaration() + ">";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration) throws
return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName);
} else {
DefaultSubscriber<T> subscriber =
DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, executorService);
DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, topicParticipantManager, executorService);
subscriber.addSubscriberListener(new DefaultSubscriberListener<T>() {
@Override
public void onNewPublisher(Subscriber<T> subscriber, PublisherIdentifier publisherIdentifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ public class TopicParticipantManager {
* A mapping from {@link Subscriber} to its connected
* {@link PublisherIdentifier}s.
*/
private final HashMap<DefaultSubscriber<?>, List<PublisherIdentifier>> subscriberConnections;
private final Map<DefaultSubscriber<?>, List<PublisherIdentifier>> subscriberConnections;

/**
* A mapping from {@link Publisher} to its connected
* {@link SubscriberIdentifier}s.
*/
private final HashMap<DefaultPublisher<?>,List<SubscriberIdentifier>> publisherConnections;
private final Map<DefaultPublisher<?>,List<SubscriberIdentifier>> publisherConnections;

// TODO(damonkohler): Change to ListenerGroup.
private TopicParticipantManagerListener listener;

public TopicParticipantManager() {
publishers = new ConcurrentHashMap<GraphName, DefaultPublisher<?>>();
subscribers = new ConcurrentHashMap<GraphName, DefaultSubscriber<?>>();
subscriberConnections = new HashMap<DefaultSubscriber<?>, List<PublisherIdentifier>>();
publisherConnections = new HashMap<DefaultPublisher<?>, List<SubscriberIdentifier>>();
subscriberConnections = new ConcurrentHashMap<DefaultSubscriber<?>, List<PublisherIdentifier>>();
publisherConnections = new ConcurrentHashMap<DefaultPublisher<?>, List<SubscriberIdentifier>>();
}

public void setListener(TopicParticipantManagerListener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* @author jg
*/
class UpdatePublisherRunnable<MessageType> implements Runnable {
private static boolean DEBUG = false;
private static boolean DEBUG = true;
private static final Log log = LogFactory.getLog(UpdatePublisherRunnable.class);

private final DefaultSubscriber<MessageType> subscriber;
Expand Down Expand Up @@ -48,24 +48,24 @@ public void run() {
try {
if(DEBUG)
log.info("Attempting to create SlaveClient:"+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
slaveClient = new SlaveClient(nodeIdentifier.getName(), publisherIdentifier.getNodeUri());
slaveClient = new SlaveClient(nodeIdentifier.getName(), publisherIdentifier.getNodeUri());
if(DEBUG) {
log.info("SlaveClient created "+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri());
log.info("Requesting topic name "+subscriber.getTopicName());
}
Response<ProtocolDescription> response =
Response<ProtocolDescription> response =
slaveClient.requestTopic(subscriber.getTopicName(), ProtocolNames.SUPPORTED);
// If null there is no publisher for the requested topic
if( response != null ) {
// If null there is no publisher for the requested topic
if( response != null ) {
ProtocolDescription selected = response.getResult();
if (ProtocolNames.SUPPORTED.contains(selected.getName())) {
subscriber.addPublisher(publisherIdentifier, selected.getAddress());
} else {
log.error("Publisher returned unsupported protocol selection: " + response);
}
} else {
} else {
log.error("There are NO publishers available for topic "+subscriber.getTopicName());
}
}
} catch (Exception e) {
// TODO(damonkohler): Retry logic is needed at the RPC layer.
log.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws
AsynchTCPWorker uworker = new AsynchTCPWorker(ctx);
channelGroup.getExecutorService().execute(uworker);
// notify pipeline we connected (or failed via exceptionCaught and runtime exception)
ctx.pipeline().fireChannelActive();
//ctx.pipeline().fireChannelActive();
// recall we keep the list of contexts in TcpClientManager
if (DEBUG) {
log.info("TcpClient Connected with ChannelHandlerContext "+ctx);
Expand Down

0 comments on commit 9b50e2a

Please sign in to comment.