diff --git a/src/main/java/org/ros/address/AdvertiseAddress.java b/src/main/java/org/ros/address/AdvertiseAddress.java index d9c393f..441c648 100644 --- a/src/main/java/org/ros/address/AdvertiseAddress.java +++ b/src/main/java/org/ros/address/AdvertiseAddress.java @@ -68,7 +68,7 @@ public AdvertiseAddress(String host, final int port) { } public AdvertiseAddress(InetAddress host, int port) { - this.host = host.getCanonicalHostName(); + this.host = host.getHostName(); this.port = port; } diff --git a/src/main/java/org/ros/internal/node/client/Registrar.java b/src/main/java/org/ros/internal/node/client/Registrar.java index ddddacc..594d980 100644 --- a/src/main/java/org/ros/internal/node/client/Registrar.java +++ b/src/main/java/org/ros/internal/node/client/Registrar.java @@ -27,7 +27,15 @@ /** * Manages topic, and service registrations of a {@link SlaveServer} with the - * {@link MasterServer}. + * {@link MasterServer}.
+ * The primary activity here is to respond to events that occur when a publisher or subscriber is added + * or removed. The fact that it is constructed with a MasterClient and an ExecutorService is a clue. + * For instance, in the onSubscriberAdded event handler method, we contact the remote master to + * register the subscriber. To do this we call updatePublishers on the subscriber + * using the collection of publisherUris returned in the response to masterClient.
+ * Passed DefaultSubscriber class creates an UpdatePublisherRunnable thread + * which creates a SlaveClient of type SlaveRpcEndpointImpl to + * contact the publishers. If successful, call signalOnMasterRegistrationSuccess for the subscriber. * * @author jg */ @@ -170,7 +178,12 @@ public void run() { }); } } - + /** + * Contact the remote master to register the subscriber. Call updatePublishers on the subscriber + * using the collection of publisherUris returned in the response. Passed DefaultSubscriber class creates an + * UpdatePublisherRunnable thread which creates a SlaveClient of type SlaveRpcEndpointImpl to + * contact the publishers. If successful, signalOnMasterRegistrationSuccess for the subscriber. + */ @Override public void onSubscriberAdded(final DefaultSubscriber subscriber) { if (DEBUG) { diff --git a/src/main/java/org/ros/internal/node/server/SlaveServer.java b/src/main/java/org/ros/internal/node/server/SlaveServer.java index 27f8045..6bced91 100644 --- a/src/main/java/org/ros/internal/node/server/SlaveServer.java +++ b/src/main/java/org/ros/internal/node/server/SlaveServer.java @@ -35,7 +35,9 @@ * A remote client can request connections, get collections of publishers and subscriber and information * about the state of the bus including connections between publishers and subscribers. TcpRosServer is * the class that does most of the work and is wrapped by this class, which is here to provide the subset of - * remotely invokable methods. + * remotely invokable methods via reflection using the ServerInvokeMethod class. + * @see ServerInvokeMethod + * @see TcpRosServer * @author jg */ public class SlaveServer extends RpcServer { @@ -111,7 +113,7 @@ public List getBusStats(String callerId) { * The fourth element is the protocol
* The fifth element is the topic name.
* @param callerId - * @return + * @return A List of Objects representing the above, packaged thusly to enable remote serialization delivery. */ public List getBusInfo(String callerId) { List busInfo = new ArrayList(); @@ -183,10 +185,13 @@ public int paramUpdate(GraphName parameterName, Object parameterValue) { return parameterManager.updateParameter(parameterName, parameterValue); } /** - * + * If there is a subscriber subscribed to this topicName, call updatePublishers on the subscriber + * using the collection of publisherUris. DefaultSubscriber class creates an + * UpdatePublisherRunnable thread which creates a SlaveClient of type SlaveRpcEndpointImpl to + * contact the publisher. * @param callerId * @param topicName - * @param publisherUris + * @param publisherUris collection of InetSocketAddress of remote publishers to be updated. */ public void publisherUpdate(String callerId, String topicName, Collection publisherUris) { GraphName graphName = GraphName.of(topicName); @@ -199,7 +204,7 @@ public void publisherUpdate(String callerId, String topicName, Collection + * Primary players are knownPublishers, which is a Set of PublisherIdentifiers,
+ * and TcpClientManager, which has the NamedChannelHandlers.
+ * Here, we also maintain the incomingMessageQueue, which contains MessageListeners of the type this + * class is parameterized with.
+ * * * @author jg */ public class DefaultSubscriber extends DefaultTopicParticipant implements Subscriber { - - private static final Log log = LogFactory.getLog(DefaultSubscriber.class); + private static boolean DEBUG = true; + private static final Log log = LogFactory.getLog(DefaultSubscriber.class); /** * The maximum delay before shutdown will begin even if all @@ -43,8 +47,9 @@ public class DefaultSubscriber extends DefaultTopicParticipant implements Sub private final NodeIdentifier nodeIdentifier; private final ScheduledExecutorService executorService; private final IncomingMessageQueue incomingMessageQueue; - private final Set knownPublishers; + // private final Set knownPublishers; private final TcpClientManager tcpClientManager; + private final TopicParticipantManager topicParticipantManager; private final Object mutex; /** @@ -52,18 +57,25 @@ public class DefaultSubscriber extends DefaultTopicParticipant implements Sub */ private final ListenerGroup> subscriberListeners; + //public static DefaultSubscriber newDefault(NodeIdentifier nodeIdentifier, + // TopicDeclaration description, ScheduledExecutorService executorService) throws IOException { + // return new DefaultSubscriber(nodeIdentifier, description, executorService); + //} public static DefaultSubscriber newDefault(NodeIdentifier nodeIdentifier, - TopicDeclaration description, ScheduledExecutorService executorService) throws IOException { - return new DefaultSubscriber(nodeIdentifier, description, executorService); + TopicDeclaration description, + TopicParticipantManager topicParticipantManager, + ScheduledExecutorService executorService) throws IOException { + return new DefaultSubscriber(nodeIdentifier, description, topicParticipantManager, executorService); } - - private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, ScheduledExecutorService executorService) throws IOException { + private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, + TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) throws IOException { super(topicDeclaration); this.nodeIdentifier = nodeIdentifier; this.executorService = executorService; incomingMessageQueue = new IncomingMessageQueue(executorService); - knownPublishers = new HashSet(); + //knownPublishers = new HashSet(); tcpClientManager = TcpClientManager.getInstance(executorService); + this.topicParticipantManager = topicParticipantManager; mutex = new Object(); SubscriberHandshakeHandler subscriberHandshakeHandler = new SubscriberHandshakeHandler(toDeclaration().toConnectionHeader(), @@ -73,22 +85,26 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD subscriberListeners.add(new DefaultSubscriberListener() { @Override public void onMasterRegistrationSuccess(Subscriber registrant) { - log.info("Subscriber registered: " + DefaultSubscriber.this); + if(DEBUG) + log.info("Subscriber registered: " + DefaultSubscriber.this); } @Override public void onMasterRegistrationFailure(Subscriber registrant) { - log.info("Subscriber registration failed: " + DefaultSubscriber.this); + if(DEBUG) + log.info("Subscriber registration failed: " + DefaultSubscriber.this); } @Override public void onMasterUnregistrationSuccess(Subscriber registrant) { - log.info("Subscriber unregistered: " + DefaultSubscriber.this); + if(DEBUG) + log.info("Subscriber unregistered: " + DefaultSubscriber.this); } @Override public void onMasterUnregistrationFailure(Subscriber registrant) { - log.info("Subscriber unregistration failed: " + DefaultSubscriber.this); + if(DEBUG) + log.info("Subscriber unregistration failed: " + DefaultSubscriber.this); } }); } @@ -119,27 +135,44 @@ public void addMessageListener(MessageListener messageListener, int limit) { public void addMessageListener(MessageListener messageListener) { addMessageListener(messageListener, 1); } - - + /** + * When the SlaveClient requests a topic from the publisher in UpdatePublisherRunnable, as + * happens when the method updatePublishers is called here, this method is called back on reply from master. + * TcpClientManager calls connect to the passed InetSocketAddress. After that, all the SubscriberListeners are + * signaled with the new Publisher. + * @param publisherIdentifier + * @param address + * @throws Exception + */ public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) throws Exception { synchronized (mutex) { // TODO(damonkohler): If the connection is dropped, knownPublishers should // be updated. - if (knownPublishers.contains(publisherIdentifier)) { - return; - } - tcpClientManager.connect(toString(), address); + //if (knownPublishers.contains(publisherIdentifier)) { + // return; + //} + Collection pubs = topicParticipantManager.getSubscriberConnections(this); + if(pubs != null && pubs.contains(publisherIdentifier)) { + log.info("Defaultsubscriber addPublisher topicParticipantManager CONTAINS "+publisherIdentifier+" at "+address); + } else { + log.info("Defaultsubscriber addPublisher topicParticipantManager DOES NOT CONTAIN "+publisherIdentifier+" at "+address); + topicParticipantManager.addSubscriberConnection(this, publisherIdentifier); + } + tcpClientManager.connect(toString(), address); // TODO(damonkohler): knownPublishers is duplicate information that is // already available to the TopicParticipantManager. - knownPublishers.add(publisherIdentifier); + //knownPublishers.add(publisherIdentifier); signalOnNewPublisher(publisherIdentifier); } } /** * Updates the list of {@link Publisher}s for the topic that this - * {@link Subscriber} is interested in. - * + * {@link Subscriber} is interested in.

+ * Creates UpdatePublisherRunnable of this classes generic type for each PublisherIdentifier. + * Using executorService, spin the runnable which creates SlaveClient of type SlaveRpcEndpoint. + * This is invoked from client Registrar when the onSubscriberAdded event occurs, and + * from the SlaveServer when publisherUpdate is called.
* @param publisherIdentifiers * {@link Collection} of {@link PublisherIdentifier}s for the * subscribed topic @@ -279,4 +312,5 @@ public void run(SubscriberListener listener) { public String toString() { return "Subscriber<" + getTopicDeclaration() + ">"; } + } diff --git a/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java b/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java index a0b3aaa..5b69091 100644 --- a/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java +++ b/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java @@ -65,7 +65,7 @@ public Subscriber newOrExisting(TopicDeclaration topicDeclaration) throws return (DefaultSubscriber) topicParticipantManager.getSubscriber(topicName); } else { DefaultSubscriber subscriber = - DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, executorService); + DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, topicParticipantManager, executorService); subscriber.addSubscriberListener(new DefaultSubscriberListener() { @Override public void onNewPublisher(Subscriber subscriber, PublisherIdentifier publisherIdentifier) { diff --git a/src/main/java/org/ros/internal/node/topic/TopicParticipantManager.java b/src/main/java/org/ros/internal/node/topic/TopicParticipantManager.java index f5c4b9b..4d6f226 100644 --- a/src/main/java/org/ros/internal/node/topic/TopicParticipantManager.java +++ b/src/main/java/org/ros/internal/node/topic/TopicParticipantManager.java @@ -37,13 +37,13 @@ public class TopicParticipantManager { * A mapping from {@link Subscriber} to its connected * {@link PublisherIdentifier}s. */ - private final HashMap, List> subscriberConnections; + private final Map, List> subscriberConnections; /** * A mapping from {@link Publisher} to its connected * {@link SubscriberIdentifier}s. */ - private final HashMap,List> publisherConnections; + private final Map,List> publisherConnections; // TODO(damonkohler): Change to ListenerGroup. private TopicParticipantManagerListener listener; @@ -51,8 +51,8 @@ public class TopicParticipantManager { public TopicParticipantManager() { publishers = new ConcurrentHashMap>(); subscribers = new ConcurrentHashMap>(); - subscriberConnections = new HashMap, List>(); - publisherConnections = new HashMap, List>(); + subscriberConnections = new ConcurrentHashMap, List>(); + publisherConnections = new ConcurrentHashMap, List>(); } public void setListener(TopicParticipantManagerListener listener) { diff --git a/src/main/java/org/ros/internal/node/topic/UpdatePublisherRunnable.java b/src/main/java/org/ros/internal/node/topic/UpdatePublisherRunnable.java index a268cbe..7b8c33f 100644 --- a/src/main/java/org/ros/internal/node/topic/UpdatePublisherRunnable.java +++ b/src/main/java/org/ros/internal/node/topic/UpdatePublisherRunnable.java @@ -20,7 +20,7 @@ * @author jg */ class UpdatePublisherRunnable implements Runnable { - private static boolean DEBUG = false; + private static boolean DEBUG = true; private static final Log log = LogFactory.getLog(UpdatePublisherRunnable.class); private final DefaultSubscriber subscriber; @@ -48,24 +48,24 @@ public void run() { try { if(DEBUG) log.info("Attempting to create SlaveClient:"+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri()); - slaveClient = new SlaveClient(nodeIdentifier.getName(), publisherIdentifier.getNodeUri()); + slaveClient = new SlaveClient(nodeIdentifier.getName(), publisherIdentifier.getNodeUri()); if(DEBUG) { log.info("SlaveClient created "+nodeIdentifier.getName()+" pub:"+publisherIdentifier.getNodeUri()); log.info("Requesting topic name "+subscriber.getTopicName()); } - Response response = + Response response = slaveClient.requestTopic(subscriber.getTopicName(), ProtocolNames.SUPPORTED); - // If null there is no publisher for the requested topic - if( response != null ) { + // If null there is no publisher for the requested topic + if( response != null ) { ProtocolDescription selected = response.getResult(); if (ProtocolNames.SUPPORTED.contains(selected.getName())) { subscriber.addPublisher(publisherIdentifier, selected.getAddress()); } else { log.error("Publisher returned unsupported protocol selection: " + response); } - } else { + } else { log.error("There are NO publishers available for topic "+subscriber.getTopicName()); - } + } } catch (Exception e) { // TODO(damonkohler): Retry logic is needed at the RPC layer. log.error(e); diff --git a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java index 96db034..9bbf53b 100644 --- a/src/main/java/org/ros/internal/transport/tcp/TcpClient.java +++ b/src/main/java/org/ros/internal/transport/tcp/TcpClient.java @@ -94,7 +94,7 @@ public Socket connect(String connectionName, SocketAddress socketAddress) throws AsynchTCPWorker uworker = new AsynchTCPWorker(ctx); channelGroup.getExecutorService().execute(uworker); // notify pipeline we connected (or failed via exceptionCaught and runtime exception) - ctx.pipeline().fireChannelActive(); + //ctx.pipeline().fireChannelActive(); // recall we keep the list of contexts in TcpClientManager if (DEBUG) { log.info("TcpClient Connected with ChannelHandlerContext "+ctx);