Skip to content

Commit

Permalink
Fixed subscriber shutdown.
Browse files Browse the repository at this point in the history
  • Loading branch information
neocoretechs committed May 15, 2021
1 parent b936d6f commit c3092bc
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
12 changes: 10 additions & 2 deletions src/main/java/org/ros/internal/node/client/Registrar.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* which creates a SlaveClient of type SlaveRpcEndpointImpl to
* contact the publishers. If successful, call signalOnMasterRegistrationSuccess for the subscriber.
*
* @author jg
* @author Jonathan Groff Copyright (C) NeoCoreTechs 2015,2021
*/
public class Registrar implements TopicParticipantManagerListener, ServiceManagerListener {

Expand Down Expand Up @@ -252,7 +252,15 @@ public void run() {
});
}
}


@Override
public void onSubscriberRemoved(final DefaultSubscriber<?> subscriber, boolean remove) {
if (DEBUG) {
log.info("Unregistering subscriber for shutdown: " + subscriber);
}
masterClient.unregisterSubscriber(nodeIdentifier, subscriber);
}

@Override
public void onServiceServerAdded(final DefaultServiceServer<?, ?> serviceServer) {
if (DEBUG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void onNewPublisher(Subscriber<T> subscriber, PublisherIdentifier publish
@Override
public void onShutdown(Subscriber<T> subscriber) {
assert(subscriber != null );
topicParticipantManager.removeSubscriber((DefaultSubscriber<T>) subscriber);
topicParticipantManager.removeSubscriber((DefaultSubscriber<T>) subscriber, true);
}
});
topicParticipantManager.addSubscriber(subscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -104,7 +103,16 @@ public void removeSubscriber(DefaultSubscriber<?> subscriber) {
listener.onSubscriberRemoved(subscriber);
}
}

/**
* Variation for removal of subscriber during executor service shutdown.<p/>
* Dont fire onSubscriberRemoved method of Registrar, which resubmits
* @param subscriber
* @param b
*/
public void removeSubscriber(DefaultSubscriber<?> subscriber, boolean b) {
subscribers.remove(subscriber.getTopicName());
}

public void addSubscriberConnection(DefaultSubscriber<?> subscriber, PublisherIdentifier publisherIdentifier) {
if(DEBUG)
log.info("Connecting subscriber:"+subscriber+" to publisher Identifier:"+publisherIdentifier+" for "+this);
Expand Down Expand Up @@ -163,4 +171,6 @@ public Collection<DefaultPublisher<?>> getPublishers() {
public Collection<SubscriberIdentifier> getPublisherConnections(DefaultPublisher<?> publisher) {
return publisherConnections.get(publisher);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ public interface TopicParticipantManagerListener {
/**
* Called when a {@link Subscriber} is removed.
*
* @param subscriber
* the {@link Subscriber} that was removed
* @param subscriber the {@link Subscriber} that was removed
*/
void onSubscriberRemoved(DefaultSubscriber<?> subscriber);

/**
* Called when a {@link Subscriber} is removed for shutdown.
*
* @param subscriber the {@link Subscriber} that was removed
*/
void onSubscriberRemoved(DefaultSubscriber<?> subscriber, boolean remove);
}

0 comments on commit c3092bc

Please sign in to comment.