Skip to content

Commit

Permalink
Netty removal. full fucntionality. Partial message issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
neocoretechs committed May 27, 2016
1 parent 9c9d53e commit 19afda9
Show file tree
Hide file tree
Showing 23 changed files with 292 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void onMasterUnregistrationFailure(ServiceServer<T, S> registrant) {
});
}

public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
public ConnectionHeader finishHandshake(ConnectionHeader incomingConnectionHeader) {
if (DEBUG) {
log.info("Client handshake header: " + incomingConnectionHeader);
}
Expand All @@ -88,7 +88,7 @@ public ByteBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
if (DEBUG) {
log.info("Server handshake header: " + connectionHeader);
}
return connectionHeader.encode();
return connectionHeader;
}

@Override
Expand Down
29 changes: 11 additions & 18 deletions src/main/java/org/ros/internal/node/topic/DefaultPublisher.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,12 @@
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.topic;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.ros.concurrent.ListenerGroup;
import org.ros.concurrent.SignalRunnable;
import org.ros.internal.message.MessageBuffers;
import org.ros.internal.node.server.NodeIdentifier;
import org.ros.internal.system.Utility;
import org.ros.internal.transport.ChannelHandlerContext;
import org.ros.internal.transport.ConnectionHeader;
import org.ros.internal.transport.ConnectionHeaderFields;
Expand All @@ -40,8 +26,13 @@

/**
* Default implementation of a {@link Publisher}.
* An outgoing message queue is constructed to deliver outbound messages.
* A ListenerGroup of PublisherListeners.
* A MessageFactory
* A list of subscribers as ChannelHandlerContexts.
* A DefaultPublisherListener is constructed as a default entry in the list.
*
* @author [email protected] (Damon Kohler)
* @author jg
*/
public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publisher<T> {

Expand Down Expand Up @@ -180,7 +171,9 @@ public ByteBuffer finishHandshake(ConnectionHeader incomingHeader) {
// TODO(damonkohler): Force latch mode to be consistent throughout the life
// of the publisher.
outgoingConnectionHeader.addField(ConnectionHeaderFields.LATCHING, getLatchMode() ? "1" : "0");
return (ByteBuffer) outgoingConnectionHeader.encode();
ByteBuffer buffer = MessageBuffers.dynamicBuffer();
Utility.serialize(outgoingConnectionHeader, buffer);
return buffer;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {

private static final Log log = LogFactory.getLog(DefaultPublisher.class);
private static final Log log = LogFactory.getLog(DefaultSubscriber.class);

/**
* The maximum delay before shutdown will begin even if all
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.internal.node.topic;

import org.ros.internal.transport.ConnectionHeader;
Expand All @@ -23,9 +7,13 @@
import java.util.List;

/**
* Base definition of a {@link TopicSystemState}.
* Abstract class and Base definition of a {@link TopicSystemState}.
* Primarily operates on TopicDeclaration supplied in constructor.
* During handshake, the topic declaration ConnectionHeader and message type are used to set up
* the class of responses to subscriber which is then stored in the ChannelHandlercontext to filter traffic.
* Provides master signaling methods.
*
* @author [email protected] (Damon Kohler)
* @author jg
*/
public abstract class DefaultTopicParticipant implements TopicParticipant {

Expand Down
17 changes: 11 additions & 6 deletions src/main/java/org/ros/internal/system/Utility.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
//import java.nio.channels.Channels;
//import java.nio.channels.ReadableByteChannel;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.ros.internal.message.field.DirectByteArrayOutputStream;

/**
* Static methods to serialize and deserialize ByteBuffer to/from Object.
* We are using java serialization. Traditional ROS uses XML/RPC so in general
* we are using ROS generated messages, with all the associated ROS fields and formats
* in a Java serialization context. If we need a ROS gateway the bindings should remain straightforward.
* @author jg
*
*/
public class Utility {
private static boolean DEBUG = true;
private static boolean DEBUG = false;
private static final Log log = LogFactory.getLog(Utility.class);
public static <T> void serialize(T value, ByteBuffer buffer) {
//serializer.serialize((Message) value, buffer);
Expand All @@ -36,8 +43,6 @@ public static <T> void serialize(T value, ByteBuffer buffer) {
public static Object deserialize(ByteBuffer buffer) {
//return deserializer.deserialize(buffer);
byte[] obuf = buffer.array();
if( DEBUG )
log.info("Deserialize:"+obuf.length);
Object Od = null;
try {
ObjectInputStream s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import org.apache.commons.logging.LogFactory;
import org.ros.concurrent.ListenerGroup;
import org.ros.concurrent.SignalRunnable;
import org.ros.internal.message.MessageBuffers;
import org.ros.internal.system.Utility;
import org.ros.internal.transport.tcp.AbstractNamedChannelHandler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;

/**
Expand Down Expand Up @@ -35,7 +38,20 @@ public void addListener(ClientHandshakeListener clientHandshakeListener) {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.write(clientHandshake.getOutgoingConnectionHeader().encode());
ByteBuffer bb = MessageBuffers.dynamicBuffer();
Utility.serialize(clientHandshake.getOutgoingConnectionHeader(), bb);
ctx.write(bb, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer arg0, Void arg1) {
if( DEBUG )
log.info("BaseClientHandshakeHandler channelActive reply to master complete");
}
@Override
public void failed(Throwable arg0, Void arg1) {
log.info("BaseClientHandshakeHandler channelActive reply to master failed with:"+arg0);
}

});
}

@Override
Expand All @@ -45,16 +61,15 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public Object channelRead(ChannelHandlerContext ctx, Object buff) throws Exception {
ByteBuffer buffer = (ByteBuffer) buff;
ConnectionHeader connectionHeader = ConnectionHeader.decode(buffer);
ConnectionHeader connectionHeader = (ConnectionHeader)buff;
if (clientHandshake.handshake(connectionHeader)) {
onSuccess(connectionHeader, ctx);
signalOnSuccess(connectionHeader);
} else {
onFailure(clientHandshake.getErrorMessage(), ctx);
signalOnFailure(clientHandshake.getErrorMessage());
}
return buffer;
return buff;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -157,8 +158,24 @@ public interface ChannelHandlerContext {
*/
boolean isReady();

/**
* Set this channel and its context ready or not for traffic.
* @param ready
*/
void setReady(boolean ready);

/**
* Get the Object representing a mutex to use for completion of operation if necessary.
* @return
*/
Object getChannelCompletionMutex();

/**
* Each successive handshake completion will add another message type to this synchronized set.
* This set is used to determine whether a message placed on the outbound queue will be sent to the
* channel in the context.
* @return The synchronized hash set of message type strings
*/
Set<String> getMessageTypes();

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

/**
* A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers.
* There is one channel per context.
* There is one pipeline context.
* There is one pipeline per context.
* There is one executor per group of contexts.
* The pipeline is a stateful collection of handlers that represent the current channel state and means
* of executing functions in the process of connecting, disconnecting, reading, failing etc.
* The pipeline is configured by means of factories that create ChannelInitializers, inserting
* them in order in the pipeline deque.
* The functions of the system move data through the pipeline, triggering the handlers in the sequence they were
* added.
* Traffic is filtered to subscriber channels via the hash set of requested message types
* @author jg
*
*/
Expand All @@ -31,13 +35,15 @@ public class ChannelHandlerContextImpl implements ChannelHandlerContext {
ChannelPipeline pipeline;
boolean ready = false;
Object mutex = new Object();
Set<String> outboundMessageTypes;


public ChannelHandlerContextImpl(AsynchronousChannelGroup grp, AsynchronousSocketChannel ch, Executor exc) {
channelGroup = grp;
channel = ch;
executor = exc;
pipeline = new ChannelPipelineImpl(this);
outboundMessageTypes = (Set<String>) new HashSet<String>();
}

public void setChannel(AsynchronousSocketChannel sock) {
Expand Down Expand Up @@ -130,11 +136,24 @@ public boolean isReady() {
return ready;
}

/**
* Sets this context ready or not to receive traffic
*/
@Override
public void setReady(boolean ready) { this.ready = ready;}

/**
* Object to synchronize read and write completion for the channel in this context, since we will have
* multiple outbound writers accessing the same channel
*/
public Object getChannelCompletionMutex() { return mutex; }

/**
* Get the type of messages we want to send to the attached subscriber, based on the handshakes
* received.
* @return The HashSet of message type as String
*/
public Set<String> getMessageTypes() { return outboundMessageTypes; }


}
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public ChannelPipeline fireChannelRead(Object msg) throws Exception {
Map.Entry<String,ChannelHandler> me = null;
while(it.hasNext()) {
me = it.next();
msg = me.getValue().channelRead(ctx, msg);
me.getValue().channelRead(ctx, msg);
}
return this;
}
Expand Down
Loading

0 comments on commit 19afda9

Please sign in to comment.