Skip to content

Commit

Permalink
Funcional service invocation!
Browse files Browse the repository at this point in the history
  • Loading branch information
neocoretechs committed Dec 16, 2020
1 parent 99755e2 commit 3fb5a3c
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.ros.internal.message.MessageBufferPool;
//import org.ros.internal.message.MessageBufferPool;
import org.ros.internal.transport.ClientHandshakeListener;
import org.ros.internal.transport.ConnectionHeader;
import org.ros.internal.transport.ConnectionHeaderFields;
Expand Down Expand Up @@ -70,7 +70,7 @@ public void reset() {
private final ServiceDeclaration serviceDeclaration;

private final MessageFactory messageFactory;
private final MessageBufferPool messageBufferPool;
//private final MessageBufferPool messageBufferPool;
private final Queue<ServiceResponseListener<S>> responseListeners;
private final ConnectionHeader connectionHeader;
private final TcpClientManager tcpClientManager;
Expand All @@ -89,7 +89,7 @@ private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDecla
MessageFactory messageFactory, ScheduledExecutorService executorService) throws IOException {
this.serviceDeclaration = serviceDeclaration;
this.messageFactory = messageFactory;
messageBufferPool = new MessageBufferPool();
//messageBufferPool = new MessageBufferPool();
responseListeners = new LinkedList<ServiceResponseListener<S>>();
connectionHeader = new ConnectionHeader();
connectionHeader.addField(ConnectionHeaderFields.CALLER_ID, nodeName.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package org.ros.internal.node.service;

import org.apache.commons.logging.LogFactory;
//import org.jboss.netty.buffer.ChannelBuffer;
//import org.jboss.netty.buffer.ChannelBuffers;
//import org.jboss.netty.channel.ChannelHandlerContext;
//import org.jboss.netty.channel.MessageEvent;
//import org.jboss.netty.channel.SimpleChannelHandler;
import org.ros.exception.RosRuntimeException;
import org.ros.exception.ServiceException;
import org.ros.internal.message.MessageBufferPool;
import org.ros.internal.message.MessageBuffers;
//import org.ros.internal.message.MessageBufferPool;
import org.ros.internal.system.Utility;
import org.ros.internal.transport.ChannelHandler;
import org.ros.internal.transport.ChannelHandlerContext;
import org.ros.message.MessageFactory;
import org.ros.node.service.ServiceResponseBuilder;

import rosgraph_msgs.Log;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand All @@ -23,13 +27,14 @@
* @author jg
*/
class ServiceRequestHandler<T, S> implements ChannelHandler {

private static final org.apache.commons.logging.Log log = LogFactory.getLog(ServiceRequestHandler.class);
private final ServiceDeclaration serviceDeclaration;
private final ServiceResponseBuilder<T, S> responseBuilder;

private final MessageFactory messageFactory;
private final ExecutorService executorService;
private final MessageBufferPool messageBufferPool;
//private final ByteBuffer messageBuffer = MessageBuffers.dynamicBuffer();
//private final MessageBufferPool messageBufferPool;

public ServiceRequestHandler(ServiceDeclaration serviceDeclaration,
ServiceResponseBuilder<T, S> responseBuilder, MessageFactory messageFactory,
Expand All @@ -38,26 +43,35 @@ public ServiceRequestHandler(ServiceDeclaration serviceDeclaration,
this.responseBuilder = responseBuilder;
this.messageFactory = messageFactory;
this.executorService = executorService;
messageBufferPool = new MessageBufferPool();
// messageBufferPool = new MessageBufferPool();
}

private void handleRequest(T request, ByteBuffer responseBuffer) throws ServiceException {
private S handleRequest(T request) throws ServiceException {
S response = messageFactory.newFromType(serviceDeclaration.getType());
responseBuilder.build(request, response);
Utility.serialize(response, responseBuffer);
return response;
}

private void handleSuccess(final ChannelHandlerContext ctx, ServiceServerResponse response, ByteBuffer responseBuffer) {
private void handleSuccess(final ChannelHandlerContext ctx, S result, ServiceServerResponse response, ByteBuffer responseBuffer) {
response.setErrorCode(1);
response.setMessageLength(responseBuffer.limit());
response.setMessageBytes(responseBuffer.array());
ByteBuffer resbuf = messageBufferPool.acquire();
Utility.serialize(response, resbuf);
ByteBuffer resbuf = MessageBuffers.dynamicBuffer(); // allocate for serialized result of service method
Utility.serialize(result, resbuf);
byte[] b = new byte[resbuf.limit()];
resbuf.get(b);
response.setMessageBytes(b);
response.setMessageLength(response.getMessageBytes().length);
responseBuffer.putInt(response.getErrorCode());
responseBuffer.putInt(response.getMessageLength());
log.info("Response to be serialized:"+response);
Utility.serialize(response, responseBuffer);
//log.info("ServiceRequestHandler serializing message buffer "+responseBuffer+
// " with payload "+response.getMessageBytes().length);
try {
ctx.write(resbuf.array());
ctx.write(responseBuffer.array());
} catch (IOException e) {
throw new RosRuntimeException(e);
}
MessageBuffers.returnBuffer(resbuf);
}

private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response, String message, ByteBuffer responseBuffer) {
Expand All @@ -66,6 +80,8 @@ private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse
response.setMessageLength(encodedMessage.limit());
response.setMessage(encodedMessage);
response.setMessageBytes(encodedMessage.array());
responseBuffer.putInt(response.getErrorCode());
responseBuffer.putInt(response.getMessageLength());
Utility.serialize(response, responseBuffer);
try {
ctx.write(responseBuffer.array());
Expand All @@ -84,19 +100,21 @@ public Object channelRead(final ChannelHandlerContext ctx, Object e) throws Exce
@Override
public void run() {
ServiceServerResponse response = new ServiceServerResponse();
ByteBuffer responseBuffer = messageBufferPool.acquire();
ByteBuffer responseBuffer = MessageBuffers.dynamicBuffer();
boolean success;
S result = null;
try {
handleRequest(requestBuffer, responseBuffer);
result = handleRequest(requestBuffer);
success = true;
} catch (ServiceException ex) {
handleError(ctx, response, ex.getMessage(), responseBuffer);
success = false;
}
if (success) {
handleSuccess(ctx, response, responseBuffer);
handleSuccess(ctx, result, response, responseBuffer);
}
messageBufferPool.release(responseBuffer);
//messageBufferPool.release(responseBuffer);
MessageBuffers.returnBuffer(responseBuffer);
}
});
return requestBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,36 @@ protected void decode(int code, ChannelHandlerContext ctx, ByteBuffer buffer, Li
switch (code) {
case ERROR_CODE:
response.setErrorCode(buffer.getInt());
try {
if(rstate.size() > 0)
rstate.add(0, response);
else
rstate.add(response);
return;
} finally {
reset();
}
//checkpoint(ServiceResponseDecoderState.MESSAGE_LENGTH);
case MESSAGE_LENGTH:
response.setMessageLength(buffer.getInt());
try {
if(rstate.size() > 1)
rstate.add(1, response);
else
rstate.add(response);
return;
} finally {
reset();
}
// checkpoint(ServiceResponseDecoderState.MESSAGE);
case MESSAGE:
response.setMessage(buffer);
try {
//return response;
rstate.add(response);
if(rstate.size() > 2)
rstate.add(2, response);
else
rstate.add(response);
return;
} finally {
reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
//import org.jboss.netty.channel.SimpleChannelHandler;
import org.ros.exception.RemoteException;
import org.ros.internal.message.MessageBuffers;
import org.ros.internal.message.service.ServiceDefinitionResourceProvider;
import org.ros.internal.message.service.ServiceRequestMessageFactory;
import org.ros.internal.message.service.ServiceResponseMessageFactory;
import org.ros.internal.node.response.Response;
import org.ros.internal.node.response.StatusCode;
import org.ros.internal.system.Utility;
import org.ros.internal.transport.ChannelHandler;
Expand All @@ -23,6 +27,13 @@

/**
* A handler for service responses.<p/>
* This handler lives on the pipe line and can be retrieved as follows:
* ChannelHandler ch = ctx.pipeline().get("ResponseHandler");
* {@code ServiceResponseHandler<ResponseType> srh = (ServiceResponseHandler<ResponseType>)ch;}
* It arrives there by a successful handshake via the {@code ServiceClientHandshakeHandler.onSuccess()}
* which is called with the ConnectionHeader and ChannelHandlerContext as parameters.<p/>
* The ctx.pipeLine() method retrieves the pipe line and and addList is invoked using the literal "ResponseHandler"
* as the key and a new instance of ServiceResponseHandler<S> is created to use as value.<p/>
* The handler revolves around the encoder and the decoder much like the pub/sub model.
* The encoder and decoder work with the channel handler context and the event model to deliver
* requests and responses on and off the bus.
Expand Down Expand Up @@ -96,35 +107,33 @@ public ServiceResponseHandler(Queue<ServiceResponseListener<ResponseType>> messa
*/
@Override
public Object channelRead(ChannelHandlerContext ctx, Object e) throws Exception {
log.info("ServiceResponseHandler channelRead for ChannelHandlerContext:"+ctx+" using Object:"+e);
final ServiceResponseListener<ResponseType> listener = responseListeners.poll();
assert(listener != null) : "No listener for incoming service response.";
final ByteBuffer buffer = ByteBuffer.wrap((byte[]) e);
ServiceServerResponse response = (ServiceServerResponse) Utility.deserialize(buffer);
log.info("ServiceResponseHandler channelRead for ChannelHandlerContext:"+ctx+" with ServiceServerResponse:"+response);
final ServiceResponseDecoder decoder = new ServiceResponseDecoder();
final List<Object> rstate = new ArrayList<Object>();
ServiceServerResponse response = new ServiceServerResponse();
//final List<Object> rstate = new ArrayList<Object>();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
decoder.decode(ServiceResponseDecoderState.ERROR_CODE.ordinal(), ctx, buffer, rstate);
decoder.decode(ServiceResponseDecoderState.MESSAGE_LENGTH.ordinal(), ctx, buffer, rstate);
decoder.decode(ServiceResponseDecoderState.MESSAGE.ordinal(), ctx, buffer, rstate);
ServiceServerResponse sresponse = (ServiceServerResponse) rstate.get(0);
if (sresponse.getErrorCode() != ServiceResponseDecoderState.ERROR_CODE.ordinal()) {
sresponse = (ServiceServerResponse) rstate.get(2);
//decoder.decode(ServiceResponseDecoderState.ERROR_CODE.ordinal(), ctx, buffer, rstate);
//decoder.decode(ServiceResponseDecoderState.MESSAGE_LENGTH.ordinal(), ctx, buffer, rstate);
//decoder.decode(ServiceResponseDecoderState.MESSAGE.ordinal(), ctx, buffer, rstate);
//ServiceServerResponse sresponse = (ServiceServerResponse) rstate.get(0);
if (response.getErrorCode() != ServiceResponseDecoderState.ERROR_CODE.ordinal()) {
// TODO UDP transport?
//sresponse = (ServiceServerResponse) rstate.get(1);
//int messageLength = sresponse.getMessageLength();
listener.onSuccess((ResponseType) sresponse);
//Response.fromListChecked(sresponse, resultFactory)
Object o = Utility.deserialize(ByteBuffer.wrap(response.getMessageBytes()));
log.info("About to fire successful call with response from service. Class:"+o.getClass()+" payload:"+o);
listener.onSuccess( (ResponseType) o);
} else {
sresponse = (ServiceServerResponse) rstate.get(2);
String message = Charset.forName("US-ASCII").decode(sresponse.getMessage()).toString();
String message = Charset.forName("US-ASCII").decode(response.getMessage()).toString();
listener.onFailure(new RemoteException(StatusCode.ERROR, message));
}
response.setErrorCode(sresponse.getErrorCode());
response.setMessageLength(sresponse.getMessageLength());
response.setMessage(sresponse.getMessage());
} catch (Exception e1) {
log.error("Error:"+e1+" decoding ServiceResponse for context:"+ctx+" using proposed ServiceResponse:"+e);
e1.printStackTrace();
Expand All @@ -133,7 +142,8 @@ public void run() {
});
return response;
}



@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,17 @@ public int getMessageLength() {
public byte[] getMessageBytes() { return messageBytes; }

public void setMessageBytes(byte[] b) { messageBytes = b; }

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getName());
sb.append(" Error Code:");
sb.append(errorCode);
sb.append(" Message Length:");
sb.append(messageLength);
sb.append(" Message size:");
sb.append(messageBytes.length);
return sb.toString();
}

}
4 changes: 2 additions & 2 deletions src/main/java/org/ros/node/NodeConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* @author [email protected] (Ethan Rublee)
* @author [email protected] (Ken Conley)
* @author [email protected] (Damon Kohler)
* @author jg
* @author [email protected] (Jonathan Neville Groff)
*/
public class NodeConfiguration {

Expand All @@ -61,7 +61,7 @@ public class NodeConfiguration {
public static final InetSocketAddress DEFAULT_MASTER_URI;

static {
DEFAULT_MASTER_URI = new InetSocketAddress("172.16.0.101", 8090);
DEFAULT_MASTER_URI = new InetSocketAddress("127.0.0.1", 8090);
}

private NameResolver parentResolver;
Expand Down

0 comments on commit 3fb5a3c

Please sign in to comment.