diff --git a/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java b/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java index b3c6795..1cc69e9 100644 --- a/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java +++ b/src/main/java/org/ros/internal/node/service/DefaultServiceClient.java @@ -2,7 +2,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.ros.internal.message.MessageBufferPool; +//import org.ros.internal.message.MessageBufferPool; import org.ros.internal.transport.ClientHandshakeListener; import org.ros.internal.transport.ConnectionHeader; import org.ros.internal.transport.ConnectionHeaderFields; @@ -70,7 +70,7 @@ public void reset() { private final ServiceDeclaration serviceDeclaration; private final MessageFactory messageFactory; - private final MessageBufferPool messageBufferPool; + //private final MessageBufferPool messageBufferPool; private final Queue> responseListeners; private final ConnectionHeader connectionHeader; private final TcpClientManager tcpClientManager; @@ -89,7 +89,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla MessageFactory messageFactory, ScheduledExecutorService executorService) throws IOException { this.serviceDeclaration = serviceDeclaration; this.messageFactory = messageFactory; - messageBufferPool = new MessageBufferPool(); + //messageBufferPool = new MessageBufferPool(); responseListeners = new LinkedList>(); connectionHeader = new ConnectionHeader(); connectionHeader.addField(ConnectionHeaderFields.CALLER_ID, nodeName.toString()); diff --git a/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java b/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java index 4d04f84..deff2e6 100644 --- a/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java +++ b/src/main/java/org/ros/internal/node/service/ServiceRequestHandler.java @@ -1,5 +1,6 @@ package org.ros.internal.node.service; +import org.apache.commons.logging.LogFactory; //import org.jboss.netty.buffer.ChannelBuffer; //import org.jboss.netty.buffer.ChannelBuffers; //import org.jboss.netty.channel.ChannelHandlerContext; @@ -7,13 +8,16 @@ //import org.jboss.netty.channel.SimpleChannelHandler; import org.ros.exception.RosRuntimeException; import org.ros.exception.ServiceException; -import org.ros.internal.message.MessageBufferPool; +import org.ros.internal.message.MessageBuffers; +//import org.ros.internal.message.MessageBufferPool; import org.ros.internal.system.Utility; import org.ros.internal.transport.ChannelHandler; import org.ros.internal.transport.ChannelHandlerContext; import org.ros.message.MessageFactory; import org.ros.node.service.ServiceResponseBuilder; +import rosgraph_msgs.Log; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -23,13 +27,14 @@ * @author jg */ class ServiceRequestHandler implements ChannelHandler { - + private static final org.apache.commons.logging.Log log = LogFactory.getLog(ServiceRequestHandler.class); private final ServiceDeclaration serviceDeclaration; private final ServiceResponseBuilder responseBuilder; private final MessageFactory messageFactory; private final ExecutorService executorService; - private final MessageBufferPool messageBufferPool; + //private final ByteBuffer messageBuffer = MessageBuffers.dynamicBuffer(); + //private final MessageBufferPool messageBufferPool; public ServiceRequestHandler(ServiceDeclaration serviceDeclaration, ServiceResponseBuilder responseBuilder, MessageFactory messageFactory, @@ -38,26 +43,35 @@ public ServiceRequestHandler(ServiceDeclaration serviceDeclaration, this.responseBuilder = responseBuilder; this.messageFactory = messageFactory; this.executorService = executorService; - messageBufferPool = new MessageBufferPool(); + // messageBufferPool = new MessageBufferPool(); } - private void handleRequest(T request, ByteBuffer responseBuffer) throws ServiceException { + private S handleRequest(T request) throws ServiceException { S response = messageFactory.newFromType(serviceDeclaration.getType()); responseBuilder.build(request, response); - Utility.serialize(response, responseBuffer); + return response; } - private void handleSuccess(final ChannelHandlerContext ctx, ServiceServerResponse response, ByteBuffer responseBuffer) { + private void handleSuccess(final ChannelHandlerContext ctx, S result, ServiceServerResponse response, ByteBuffer responseBuffer) { response.setErrorCode(1); - response.setMessageLength(responseBuffer.limit()); - response.setMessageBytes(responseBuffer.array()); - ByteBuffer resbuf = messageBufferPool.acquire(); - Utility.serialize(response, resbuf); + ByteBuffer resbuf = MessageBuffers.dynamicBuffer(); // allocate for serialized result of service method + Utility.serialize(result, resbuf); + byte[] b = new byte[resbuf.limit()]; + resbuf.get(b); + response.setMessageBytes(b); + response.setMessageLength(response.getMessageBytes().length); + responseBuffer.putInt(response.getErrorCode()); + responseBuffer.putInt(response.getMessageLength()); + log.info("Response to be serialized:"+response); + Utility.serialize(response, responseBuffer); + //log.info("ServiceRequestHandler serializing message buffer "+responseBuffer+ + // " with payload "+response.getMessageBytes().length); try { - ctx.write(resbuf.array()); + ctx.write(responseBuffer.array()); } catch (IOException e) { throw new RosRuntimeException(e); } + MessageBuffers.returnBuffer(resbuf); } private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response, String message, ByteBuffer responseBuffer) { @@ -66,6 +80,8 @@ private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response.setMessageLength(encodedMessage.limit()); response.setMessage(encodedMessage); response.setMessageBytes(encodedMessage.array()); + responseBuffer.putInt(response.getErrorCode()); + responseBuffer.putInt(response.getMessageLength()); Utility.serialize(response, responseBuffer); try { ctx.write(responseBuffer.array()); @@ -84,19 +100,21 @@ public Object channelRead(final ChannelHandlerContext ctx, Object e) throws Exce @Override public void run() { ServiceServerResponse response = new ServiceServerResponse(); - ByteBuffer responseBuffer = messageBufferPool.acquire(); + ByteBuffer responseBuffer = MessageBuffers.dynamicBuffer(); boolean success; + S result = null; try { - handleRequest(requestBuffer, responseBuffer); + result = handleRequest(requestBuffer); success = true; } catch (ServiceException ex) { handleError(ctx, response, ex.getMessage(), responseBuffer); success = false; } if (success) { - handleSuccess(ctx, response, responseBuffer); + handleSuccess(ctx, result, response, responseBuffer); } - messageBufferPool.release(responseBuffer); + //messageBufferPool.release(responseBuffer); + MessageBuffers.returnBuffer(responseBuffer); } }); return requestBuffer; diff --git a/src/main/java/org/ros/internal/node/service/ServiceResponseDecoder.java b/src/main/java/org/ros/internal/node/service/ServiceResponseDecoder.java index b3d68d0..0afc450 100644 --- a/src/main/java/org/ros/internal/node/service/ServiceResponseDecoder.java +++ b/src/main/java/org/ros/internal/node/service/ServiceResponseDecoder.java @@ -33,15 +33,36 @@ protected void decode(int code, ChannelHandlerContext ctx, ByteBuffer buffer, Li switch (code) { case ERROR_CODE: response.setErrorCode(buffer.getInt()); + try { + if(rstate.size() > 0) + rstate.add(0, response); + else + rstate.add(response); + return; + } finally { + reset(); + } //checkpoint(ServiceResponseDecoderState.MESSAGE_LENGTH); case MESSAGE_LENGTH: response.setMessageLength(buffer.getInt()); + try { + if(rstate.size() > 1) + rstate.add(1, response); + else + rstate.add(response); + return; + } finally { + reset(); + } // checkpoint(ServiceResponseDecoderState.MESSAGE); case MESSAGE: response.setMessage(buffer); try { - //return response; - rstate.add(response); + if(rstate.size() > 2) + rstate.add(2, response); + else + rstate.add(response); + return; } finally { reset(); } diff --git a/src/main/java/org/ros/internal/node/service/ServiceResponseHandler.java b/src/main/java/org/ros/internal/node/service/ServiceResponseHandler.java index 8d5e047..076d2e8 100644 --- a/src/main/java/org/ros/internal/node/service/ServiceResponseHandler.java +++ b/src/main/java/org/ros/internal/node/service/ServiceResponseHandler.java @@ -8,6 +8,10 @@ //import org.jboss.netty.channel.SimpleChannelHandler; import org.ros.exception.RemoteException; import org.ros.internal.message.MessageBuffers; +import org.ros.internal.message.service.ServiceDefinitionResourceProvider; +import org.ros.internal.message.service.ServiceRequestMessageFactory; +import org.ros.internal.message.service.ServiceResponseMessageFactory; +import org.ros.internal.node.response.Response; import org.ros.internal.node.response.StatusCode; import org.ros.internal.system.Utility; import org.ros.internal.transport.ChannelHandler; @@ -23,6 +27,13 @@ /** * A handler for service responses.

+ * This handler lives on the pipe line and can be retrieved as follows: + * ChannelHandler ch = ctx.pipeline().get("ResponseHandler"); + * {@code ServiceResponseHandler srh = (ServiceResponseHandler)ch;} + * It arrives there by a successful handshake via the {@code ServiceClientHandshakeHandler.onSuccess()} + * which is called with the ConnectionHeader and ChannelHandlerContext as parameters.

+ * The ctx.pipeLine() method retrieves the pipe line and and addList is invoked using the literal "ResponseHandler" + * as the key and a new instance of ServiceResponseHandler is created to use as value.

* The handler revolves around the encoder and the decoder much like the pub/sub model. * The encoder and decoder work with the channel handler context and the event model to deliver * requests and responses on and off the bus. @@ -96,35 +107,33 @@ public ServiceResponseHandler(Queue> messa */ @Override public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception { - log.info("ServiceResponseHandler channelRead for ChannelHandlerContext:"+ctx+" using Object:"+e); final ServiceResponseListener listener = responseListeners.poll(); assert(listener != null) : "No listener for incoming service response."; final ByteBuffer buffer = ByteBuffer.wrap((byte[]) e); + ServiceServerResponse response = (ServiceServerResponse) Utility.deserialize(buffer); + log.info("ServiceResponseHandler channelRead for ChannelHandlerContext:"+ctx+" with ServiceServerResponse:"+response); final ServiceResponseDecoder decoder = new ServiceResponseDecoder(); - final List rstate = new ArrayList(); - ServiceServerResponse response = new ServiceServerResponse(); + //final List rstate = new ArrayList(); executorService.execute(new Runnable() { @Override public void run() { try { - decoder.decode(ServiceResponseDecoderState.ERROR_CODE.ordinal(), ctx, buffer, rstate); - decoder.decode(ServiceResponseDecoderState.MESSAGE_LENGTH.ordinal(), ctx, buffer, rstate); - decoder.decode(ServiceResponseDecoderState.MESSAGE.ordinal(), ctx, buffer, rstate); - ServiceServerResponse sresponse = (ServiceServerResponse) rstate.get(0); - if (sresponse.getErrorCode() != ServiceResponseDecoderState.ERROR_CODE.ordinal()) { - sresponse = (ServiceServerResponse) rstate.get(2); + //decoder.decode(ServiceResponseDecoderState.ERROR_CODE.ordinal(), ctx, buffer, rstate); + //decoder.decode(ServiceResponseDecoderState.MESSAGE_LENGTH.ordinal(), ctx, buffer, rstate); + //decoder.decode(ServiceResponseDecoderState.MESSAGE.ordinal(), ctx, buffer, rstate); + //ServiceServerResponse sresponse = (ServiceServerResponse) rstate.get(0); + if (response.getErrorCode() != ServiceResponseDecoderState.ERROR_CODE.ordinal()) { // TODO UDP transport? //sresponse = (ServiceServerResponse) rstate.get(1); //int messageLength = sresponse.getMessageLength(); - listener.onSuccess((ResponseType) sresponse); + //Response.fromListChecked(sresponse, resultFactory) + Object o = Utility.deserialize(ByteBuffer.wrap(response.getMessageBytes())); + log.info("About to fire successful call with response from service. Class:"+o.getClass()+" payload:"+o); + listener.onSuccess( (ResponseType) o); } else { - sresponse = (ServiceServerResponse) rstate.get(2); - String message = Charset.forName("US-ASCII").decode(sresponse.getMessage()).toString(); + String message = Charset.forName("US-ASCII").decode(response.getMessage()).toString(); listener.onFailure(new RemoteException(StatusCode.ERROR, message)); } - response.setErrorCode(sresponse.getErrorCode()); - response.setMessageLength(sresponse.getMessageLength()); - response.setMessage(sresponse.getMessage()); } catch (Exception e1) { log.error("Error:"+e1+" decoding ServiceResponse for context:"+ctx+" using proposed ServiceResponse:"+e); e1.printStackTrace(); @@ -133,7 +142,8 @@ public void run() { }); return response; } - + + @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub diff --git a/src/main/java/org/ros/internal/node/service/ServiceServerResponse.java b/src/main/java/org/ros/internal/node/service/ServiceServerResponse.java index 4aabe79..234affd 100644 --- a/src/main/java/org/ros/internal/node/service/ServiceServerResponse.java +++ b/src/main/java/org/ros/internal/node/service/ServiceServerResponse.java @@ -48,5 +48,17 @@ public int getMessageLength() { public byte[] getMessageBytes() { return messageBytes; } public void setMessageBytes(byte[] b) { messageBytes = b; } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClass().getName()); + sb.append(" Error Code:"); + sb.append(errorCode); + sb.append(" Message Length:"); + sb.append(messageLength); + sb.append(" Message size:"); + sb.append(messageBytes.length); + return sb.toString(); + } } diff --git a/src/main/java/org/ros/node/NodeConfiguration.java b/src/main/java/org/ros/node/NodeConfiguration.java index 5eb9ce5..0514337 100644 --- a/src/main/java/org/ros/node/NodeConfiguration.java +++ b/src/main/java/org/ros/node/NodeConfiguration.java @@ -51,7 +51,7 @@ * @author ethan.rublee@gmail.com (Ethan Rublee) * @author kwc@willowgarage.com (Ken Conley) * @author damonkohler@google.com (Damon Kohler) - * @author jg + * @author groffj@neocoretechs.com (Jonathan Neville Groff) */ public class NodeConfiguration { @@ -61,7 +61,7 @@ public class NodeConfiguration { public static final InetSocketAddress DEFAULT_MASTER_URI; static { - DEFAULT_MASTER_URI = new InetSocketAddress("172.16.0.101", 8090); + DEFAULT_MASTER_URI = new InetSocketAddress("127.0.0.1", 8090); } private NameResolver parentResolver;