Skip to content

Commit

Permalink
Operational Beta release. Bug with image data over 65k, tends not to …
Browse files Browse the repository at this point in the history
…receive entire buffer sometimes. channel bug?
  • Loading branch information
neocoretechs committed May 28, 2016
1 parent 19afda9 commit 52c5db3
Show file tree
Hide file tree
Showing 21 changed files with 195 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import org.ros.address.AdvertiseAddress;
import org.ros.concurrent.ListenerGroup;
import org.ros.concurrent.SignalRunnable;
import org.ros.internal.message.MessageBuffers;
import org.ros.internal.message.service.ServiceDescription;
import org.ros.internal.node.topic.DefaultPublisher;
import org.ros.internal.system.Utility;
import org.ros.internal.transport.ChannelHandler;
import org.ros.internal.transport.ConnectionHeader;
import org.ros.internal.transport.ConnectionHeaderFields;
Expand Down Expand Up @@ -75,7 +77,7 @@ public void onMasterUnregistrationFailure(ServiceServer<T, S> registrant) {
});
}

public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeader) {
public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
if (DEBUG) {
log.info("Client handshake header: " + incomingConnectionHeader);
}
Expand All @@ -88,7 +90,9 @@ public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeade
if (DEBUG) {
log.info("Server handshake header: " + connectionHeader);
}
return connectionHeader;
ByteBuffer headbuf = MessageBuffers.dynamicBuffer();
Utility.serialize(connectionHeader, headbuf);
return headbuf;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//import org.jboss.netty.channel.ChannelHandlerContext;
//import org.jboss.netty.channel.MessageEvent;
//import org.jboss.netty.channel.SimpleChannelHandler;
import org.ros.exception.RosRuntimeException;
import org.ros.exception.ServiceException;
import org.ros.internal.message.MessageBufferPool;
import org.ros.internal.system.Utility;
Expand All @@ -13,6 +14,7 @@
import org.ros.message.MessageFactory;
import org.ros.node.service.ServiceResponseBuilder;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
Expand All @@ -39,9 +41,7 @@ public ServiceRequestHandler(ServiceDeclaration serviceDeclaration,
messageBufferPool = new MessageBufferPool();
}

private void handleRequest(ByteBuffer requestBuffer, ByteBuffer responseBuffer)
throws ServiceException {
T request = (T) Utility.deserialize(requestBuffer);
private void handleRequest(T request, ByteBuffer responseBuffer) throws ServiceException {
S response = messageFactory.newFromType(serviceDeclaration.getType());
responseBuilder.build(request, response);
Utility.serialize(response, responseBuffer);
Expand All @@ -51,24 +51,34 @@ private void handleSuccess(final ChannelHandlerContext ctx, ServiceServerRespons
response.setErrorCode(1);
response.setMessageLength(responseBuffer.limit());
response.setMessage(responseBuffer);
ctx.write(response);
ByteBuffer resbuf = messageBufferPool.acquire();
Utility.serialize(response, resbuf);
try {
ctx.write(resbuf);
} catch (IOException e) {
throw new RosRuntimeException(e);
}
}

private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response,
String message) {
private void handleError(final ChannelHandlerContext ctx, ServiceServerResponse response, String message, ByteBuffer responseBuffer) {
response.setErrorCode(0);
ByteBuffer encodedMessage = Charset.forName("US-ASCII").encode(message);
response.setMessageLength(encodedMessage.limit());
response.setMessage(encodedMessage);
ctx.write(response);
Utility.serialize(response, responseBuffer);
try {
ctx.write(responseBuffer);
} catch (IOException e) {
throw new RosRuntimeException(e);
}
}

@Override
public Object channelRead(final ChannelHandlerContext ctx, Object e) throws Exception {
// Although the ChannelHandlerContext is explicitly documented as being safe
// to keep for later use, the MessageEvent is not. So, we make a defensive
// copy of the buffer.
final ByteBuffer requestBuffer = ((ByteBuffer) e);
final T requestBuffer = ((T) e);
this.executorService.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -79,7 +89,7 @@ public void run() {
handleRequest(requestBuffer, responseBuffer);
success = true;
} catch (ServiceException ex) {
handleError(ctx, response, ex.getMessage());
handleError(ctx, response, ex.getMessage(), responseBuffer);
success = false;
}
if (success) {
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/org/ros/internal/system/Utility.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
*
*/
public class Utility {
private static boolean DEBUG = false;
private static boolean DEBUG = true;
private static final Log log = LogFactory.getLog(Utility.class);

public static <T> void serialize(T value, ByteBuffer buffer) {
//serializer.serialize((Message) value, buffer);
DirectByteArrayOutputStream dbaos = new DirectByteArrayOutputStream();
ObjectOutputStream oos;
try {
Expand All @@ -41,7 +41,6 @@ public static <T> void serialize(T value, ByteBuffer buffer) {


public static Object deserialize(ByteBuffer buffer) {
//return deserializer.deserialize(buffer);
byte[] obuf = buffer.array();
Object Od = null;
try {
Expand All @@ -53,8 +52,7 @@ public static Object deserialize(ByteBuffer buffer) {
s.close();
bais.close();
//rbc.close();
} catch (IOException ioe) {
} catch (ClassNotFoundException cnf) {
} catch (IOException | ClassNotFoundException cnf) {
log.error("Class cannot be deserialized, may have been modified beyond version compatibility");
}
if( DEBUG )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import java.util.concurrent.ExecutorService;

/**
* Abstraction of top level ChannelHandler interface
* Abstraction of top level ChannelHandler interface.
* Common functionality for {@link ClientHandshake} handlers.
* @author jg
*
*/
public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import org.ros.internal.transport.tcp.ChannelGroup;

public interface ChannelHandlerContext {
/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
Expand Down Expand Up @@ -39,7 +42,7 @@ public interface ChannelHandlerContext {
* called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
AsynchronousSocketChannel bind(SocketAddress localAddress) throws IOException;
/*Asynchronous*/SocketChannel bind(SocketAddress localAddress) throws IOException;

/**
* Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
Expand All @@ -53,8 +56,9 @@ public interface ChannelHandlerContext {
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, CompletionHandler)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* @throws IOException
*/
void connect(SocketAddress remoteAddress);
void connect(SocketAddress remoteAddress) throws IOException;

/**
* Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
Expand Down Expand Up @@ -109,8 +113,9 @@ public interface ChannelHandlerContext {
* {@link ChannelHandler#read(ChannelHandlerContext)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* @throws IOException
*/
Future<Integer> read(ByteBuffer buf);
/*Future<Integer>*/int read(ByteBuffer buf) throws IOException;
/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was
Expand All @@ -122,22 +127,24 @@ public interface ChannelHandlerContext {
* {@link ChannelHandler#read(ChannelHandlerContext)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* @throws IOException
*/
void read(ByteBuffer buf,CompletionHandler<Integer, Void> completionHandler);

/**
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
* @throws IOException
*/
Future<Integer> write(Object msg);
/*Future<Integer>*/int write(ByteBuffer msg) throws IOException;

/**
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
*/
void write(Object msg, CompletionHandler<Integer,Void> handler);
void write(ByteBuffer msg, CompletionHandler<Integer,Void> handler);


/**
Expand All @@ -149,7 +156,7 @@ public interface ChannelHandlerContext {
* Return the channel group
*
*/
AsynchronousChannelGroup getChannelGroup();
/*Asynchronous*/ChannelGroup getChannelGroup();

/**
* Determine if this channel is ready for processing, it is configured, has a socket
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package org.ros.internal.transport;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
//import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.ros.internal.transport.tcp.ChannelGroup;

/**
* A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers.
Expand All @@ -29,24 +35,24 @@
*
*/
public class ChannelHandlerContextImpl implements ChannelHandlerContext {
AsynchronousChannelGroup channelGroup;
/*Asynchronous*/ChannelGroup channelGroup;
Executor executor;
AsynchronousSocketChannel channel;
/*Asynchronous*/SocketChannel channel;
ChannelPipeline pipeline;
boolean ready = false;
Object mutex = new Object();
Set<String> outboundMessageTypes;


public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) {
channelGroup = grp;
channel = ch;
public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/SocketChannel channel2, Executor exc) {
channelGroup = channelGroup2;
channel = channel2;
executor = exc;
pipeline = new ChannelPipelineImpl(this);
outboundMessageTypes = (Set<String>) new HashSet<String>();
}

public void setChannel(AsynchronousSocketChannel sock) {
public void setChannel(/*Asynchronous*/SocketChannel sock) {
this.channel = sock;
}

Expand All @@ -63,12 +69,12 @@ public String name() {


@Override
public AsynchronousSocketChannel bind(SocketAddress localAddress) throws IOException {
public /*Asynchronous*/SocketChannel bind(SocketAddress localAddress) throws IOException {
return channel.bind(localAddress);
}

@Override
public void connect(SocketAddress remoteAddress) {
public void connect(SocketAddress remoteAddress) throws IOException {
channel.connect(remoteAddress);
}

Expand All @@ -91,23 +97,33 @@ public void close() throws IOException {


@Override
public Future<Integer> read(ByteBuffer buf) {
return channel.read(buf);
public /*Future<Integer>*/ int read(ByteBuffer buf) throws IOException {
return channel.read(buf);
}

@Override
public Future<Integer> write(Object msg) {
return channel.write((ByteBuffer) msg);
public /*Future<Integer>*/int write(ByteBuffer msg) throws IOException {
return channel.write(msg);
}

@Override
public void write(Object msg, CompletionHandler<Integer, Void> handler) {
channel.write((ByteBuffer)msg, null, handler);
public void write(ByteBuffer msg, CompletionHandler<Integer, Void> handler) {
try {
int res = channel.write(msg/*, null, handler*/);
handler.completed(res, null);
} catch (IOException e) {
handler.failed(e, null);
}
}

@Override
public void read(ByteBuffer buf, CompletionHandler<Integer, Void> handler) {
channel.read(buf, null, handler);
try {
int res = channel.read(buf/*, null, handler*/);
handler.completed(res, null);
} catch (IOException e) {
handler.failed(e, null);
}
}


Expand All @@ -117,7 +133,7 @@ public ChannelPipeline pipeline() {
}

@Override
public AsynchronousChannelGroup getChannelGroup() {
public /*Asynchronous*/ChannelGroup getChannelGroup() {
return channelGroup;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.logging.LogFactory;
import org.ros.concurrent.CancellableLoop;
import org.ros.concurrent.CircularBlockingDeque;
import org.ros.exception.RosRuntimeException;
import org.ros.internal.message.Message;
import org.ros.internal.message.MessageBufferPool;
import org.ros.internal.message.MessageBuffers;
Expand All @@ -37,7 +38,6 @@ public class OutgoingMessageQueue<T> {

private final CircularBlockingDeque<T> deque;
private final Writer writer;
private final MessageBufferPool messageBufferPool;
private final ByteBuffer latchedBuffer;
private final Object mutex;

Expand Down Expand Up @@ -105,7 +105,7 @@ public void failed(Throwable arg0, Void arg1) {
public OutgoingMessageQueue(ExecutorService executorService, List<ChannelHandlerContext> ctxs) throws IOException {
deque = new CircularBlockingDeque<T>(DEQUE_CAPACITY);
writer = new Writer();
messageBufferPool = new MessageBufferPool();
//messageBufferPool = new MessageBufferPool();
latchedBuffer = MessageBuffers.dynamicBuffer();
mutex = new Object();
latchMode = false;
Expand Down Expand Up @@ -151,7 +151,11 @@ private void writeLatchedMessage() {
Iterator<ChannelHandlerContext> it = channels.iterator();
while(it.hasNext()) {
ChannelHandlerContext ctx = it.next();
ctx.write(latchedBuffer);
try {
ctx.write(latchedBuffer);
} catch (IOException e) {
throw new RosRuntimeException(e);
}
}
}
}
Expand Down
Loading

0 comments on commit 52c5db3

Please sign in to comment.