Skip to content

Commit

Permalink
Minimal debug, cleaned up handshake
Browse files Browse the repository at this point in the history
  • Loading branch information
neocoretechs committed Jun 3, 2016
1 parent 9c92ad5 commit d5e67bc
Show file tree
Hide file tree
Showing 40 changed files with 147 additions and 225 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/ros/internal/node/DefaultNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.ros.internal.node.topic.SubscriberFactory;
import org.ros.internal.node.topic.TopicDeclaration;
import org.ros.internal.node.topic.TopicParticipantManager;
import org.ros.internal.transport.tcp.TcpRosServer;

import org.ros.message.MessageFactory;
import org.ros.message.Time;
import org.ros.namespace.GraphName;
Expand Down Expand Up @@ -62,7 +62,7 @@
*
*/
public class DefaultNode implements ConnectedNode {
private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(DefaultNode.class);
/**
* The maximum delay before shutdown will begin even if all
Expand Down
31 changes: 5 additions & 26 deletions src/main/java/org/ros/internal/node/client/ParameterClient.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.client;


Expand All @@ -24,35 +8,30 @@
import org.ros.internal.node.response.IntegerResultFactory;
import org.ros.internal.node.response.ObjectResultFactory;
import org.ros.internal.node.response.Response;
import org.ros.internal.node.response.ResultFactory;
import org.ros.internal.node.response.StringListResultFactory;

import org.ros.internal.node.response.StringResultFactory;
import org.ros.internal.node.response.VoidResultFactory;
import org.ros.internal.node.rpc.MasterRpcEndpointImpl;
import org.ros.internal.node.rpc.ParameterServerRpcEndpoint;
import org.ros.internal.node.server.NodeIdentifier;
import org.ros.internal.node.server.ParameterServer;
import org.ros.internal.transport.tcp.TcpRosServer;

import org.ros.namespace.GraphName;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;


/**
* Provide access to the RPC API for a ROS {@link ParameterServer}.
* The ParameterServer typically starts one port higher than the master.
*
* @author [email protected] (Ken Conley)
* @author [email protected] (Damon Kohler)
* @author jg
*/
public class ParameterClient extends Client<ParameterServerRpcEndpoint> {
private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(ParameterClient.class);
private final NodeIdentifier nodeIdentifier;
private final String nodeName;
Expand Down
23 changes: 3 additions & 20 deletions src/main/java/org/ros/internal/node/client/Registrar.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.client;

import org.apache.commons.logging.Log;
Expand All @@ -34,7 +18,7 @@

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -45,12 +29,11 @@
* Manages topic, and service registrations of a {@link SlaveServer} with the
* {@link MasterServer}.
*
* @author [email protected] (Ken Conley)
* @author [email protected] (Damon Kohler)
* @author jg
*/
public class Registrar implements TopicParticipantManagerListener, ServiceManagerListener {

private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(Registrar.class);

private static final int SHUTDOWN_TIMEOUT = 5;
Expand Down
26 changes: 6 additions & 20 deletions src/main/java/org/ros/internal/node/rpc/SlaveRpcEndpointImpl.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.rpc;


Expand All @@ -35,16 +19,18 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import java.util.Set;
import java.util.Vector;


/**
* @author [email protected] (Damon Kohler)
* Facility for contacting the remote master and issuing commands via the remote invokable methods.
* The transport is accomplished via the RemoteRequestinterface implementors.
* @author jg
*/
public class SlaveRpcEndpointImpl implements SlaveRpcEndpoint {

private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(SlaveRpcEndpointImpl.class);

//private final SlaveServer slave;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/ros/internal/node/server/BaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*
*/
public final class BaseServer extends TCPServer {
private static boolean DEBUG = true;
private static boolean DEBUG = false;
private static final Log log = LogFactory.getLog(BaseServer.class);
public int WORKBOOTPORT = 8090;
public InetAddress address = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @author Groff
*/
public class ParameterServer extends RpcServer {
private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(ParameterServer.class);

private final Map<String, Object> tree;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/ros/internal/node/server/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
public abstract class RpcServer {

private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(RpcServer.class);

private final AdvertiseAddress advertiseAddress;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/ros/internal/node/server/TCPServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* TCPServer is the superclass of all objects using ServerSockets.
*/
public abstract class TCPServer implements Cloneable, Runnable {
private static boolean DEBUG = true;
private static boolean DEBUG = false;
private static final Log log = LogFactory.getLog(TCPServer.class);
ServerSocket server = null;
Socket data = null;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/ros/internal/node/server/TCPWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/
public class TCPWorker implements Runnable {
private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(TCPWorker.class);
public boolean shouldRun = true;
private Socket dataSocket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
*/
public class ThreadPoolManager {
private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(ThreadPoolManager.class);
private static String DEFAULT_THREAD_POOL = "RPCSERVER";
private int threadNum = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @author jg
*/
public class MasterRegistrationManagerImpl {
private static boolean DEBUG = true;
private static boolean DEBUG = false;
private static final Log log = LogFactory.getLog(MasterRegistrationManagerImpl.class);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.server.master;


Expand All @@ -40,7 +24,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -55,8 +39,7 @@
*
* @see <a href="http://www.ros.org/wiki/Master">Master documentation</a>
*
* @author [email protected] (Damon Kohler)
* @author [email protected] (Keith M. Hughes)
* @author jg
*/
public class MasterServer extends RpcServer implements MasterRegistrationListener {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

package org.ros.internal.node.service;


Expand Down Expand Up @@ -27,7 +26,7 @@
/**
* Default implementation of a {@link ServiceClient}.
*
* @author [email protected] (Damon Kohler)
* @author jg
*/
public class DefaultServiceClient<T, S> implements ServiceClient<T, S> {

Expand Down Expand Up @@ -96,7 +95,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla
// TODO(damonkohler): Support non-persistent connections.
connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1");
connectionHeader.merge(serviceDeclaration.toConnectionHeader());
tcpClientManager = new TcpClientManager(executorService);
tcpClientManager = TcpClientManager.getInstance(executorService);
ServiceClientHandshakeHandler<T, S> serviceClientHandshakeHandler =
new ServiceClientHandshakeHandler<T, S>(connectionHeader, responseListeners, executorService);
handshakeLatch = new HandshakeLatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publisher<T> {

private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(DefaultPublisher.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicD
this.executorService = executorService;
incomingMessageQueue = new IncomingMessageQueue<T>(executorService);
knownPublishers = new HashSet<PublisherIdentifier>();
tcpClientManager = new TcpClientManager(executorService);
tcpClientManager = TcpClientManager.getInstance(executorService);
mutex = new Object();
SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
public class SubscriberHandshake extends BaseClientHandshake {

private static final boolean DEBUG = true;
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(SubscriberHandshake.class);

public SubscriberHandshake(ConnectionHeader outgoingConnectionHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.ros.internal.transport.ConnectionHeader;
import org.ros.internal.transport.ConnectionHeaderFields;
import org.ros.internal.transport.queue.IncomingMessageQueue;
import org.ros.internal.transport.tcp.AsynchTCPWorker;
import org.ros.internal.transport.tcp.AsynchTempTCPWorker;
import org.ros.internal.transport.tcp.NamedChannelHandler;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
Expand All @@ -28,7 +30,7 @@
* the {@link Subscriber} may only subscribe to messages of this type
*/
class SubscriberHandshakeHandler<T> extends BaseClientHandshakeHandler {
private static boolean DEBUG = true;
private static boolean DEBUG = false;
private static final Log log = LogFactory.getLog(SubscriberHandshakeHandler.class);

private final IncomingMessageQueue<T> incomingMessageQueue;
Expand All @@ -40,25 +42,40 @@ public SubscriberHandshakeHandler(ConnectionHeader outgoingConnectionHeader,
if( DEBUG )
log.info("subscriberhandshakeHandler ctor:"+this);
}

/**
* Triggered from BaseClientHandshakeHandler channelRead
*/
@Override
protected void onSuccess(ConnectionHeader incomingConnectionHeader, ChannelHandlerContext ctx) {
if( DEBUG )
log.info("SubscriberHandshakeHandler.onSuccess:"+ctx+" "+incomingConnectionHeader);
ChannelPipeline pipeline = ctx.pipeline();
pipeline.remove(SubscriberHandshakeHandler.this);
pipeline.remove(getName());
NamedChannelHandler namedChannelHandler = incomingMessageQueue.getMessageReceiver();
pipeline.addLast(namedChannelHandler.getName(), namedChannelHandler);
String latching = incomingConnectionHeader.getField(ConnectionHeaderFields.LATCHING);
if (latching != null && latching.equals("1")) {
incomingMessageQueue.setLatchMode(true);
}
ctx.setReady(true);
// the termination of the AsynchTempTCPWorker after successful handshake will set context ready
// Launch the permanent worker
AsynchTCPWorker uworker = null;
try {
uworker = new AsynchTCPWorker(ctx);
} catch (IOException e) {
log.error("Cannot start worker for context:"+ctx);
e.printStackTrace();
return;
}
executor.execute(uworker);
}

/**
* Triggered from BaseClientHandshakeHandler
*/
@Override
protected void onFailure(String errorMessage, ChannelHandlerContext ctx) throws IOException {
log.info("Subscriber handshake failed: " + errorMessage);
ctx.setReady(false);
ctx.close();
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/ros/internal/system/Utility.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*
*/
public class Utility {
private static boolean DEBUG = true;
private static boolean DEBUG = false;
private static final Log log = LogFactory.getLog(Utility.class);

public static <T> void serialize(T value, ByteBuffer buffer) {
Expand Down
Loading

0 comments on commit d5e67bc

Please sign in to comment.