Skip to content

Commit

Permalink
Merge pull request #114 from SUSE/master-support-partial-websocket-me…
Browse files Browse the repository at this point in the history
…ssages

Buffer partial websocket messages
  • Loading branch information
renner committed Sep 21, 2015
2 parents f38171c + 04da237 commit 9283ad1
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,9 @@ public Future<Key.Names> keysAsync() {
* {@code GET /events}
*
* @return the event stream
* @throws SaltStackException in case of an error during websocket stream initialization
*/
public EventStream events() {
public EventStream events() throws SaltStackException {
return new EventStream(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public class ClientConfig {
public static final Key<String> PROXY_USERNAME = new Key<>();
public static final Key<String> PROXY_PASSWORD = new Key<>();

/**
* Maximum websocket message length in characters. The default value corresponds to
* 10 MB, assuming that one character takes up 2 bytes. This limit can be disabled
* by setting a value less than or equal to 0.
*/
public static final Key<Integer> WEBSOCKET_MAX_MESSAGE_LENGTH = new Key<>(0x500000);

/**
* A key to use with {@link ClientConfig}.
* @param <T> The type of the value associated with this key.
Expand Down
119 changes: 81 additions & 38 deletions src/main/java/com/suse/saltstack/netapi/event/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.suse.saltstack.netapi.config.ClientConfig;
import com.suse.saltstack.netapi.datatypes.Event;
import com.suse.saltstack.netapi.exception.MessageTooBigException;
import com.suse.saltstack.netapi.exception.SaltStackException;
import com.suse.saltstack.netapi.parser.JsonParser;

Expand Down Expand Up @@ -39,6 +40,21 @@ public class EventStream implements AutoCloseable {
*/
private final List<EventListener> listeners = new ArrayList<>();

/**
* Default message buffer size in characters.
*/
private final int defaultBufferSize = 0x400;

/**
* Maximum message length in characters, configurable via {@link ClientConfig}.
*/
private final int maxMessageLength;

/**
* Buffer for partial messages.
*/
private final StringBuilder messageBuffer = new StringBuilder(defaultBufferSize);

/**
* The {@link WebSocketContainer} object for a @ClientEndpoint implementation.
*/
Expand All @@ -48,29 +64,20 @@ public class EventStream implements AutoCloseable {
/**
* The WebSocket {@link Session}.
*/
public Session session;

/**
* A default constructor used to create this object empty. It prepare the WebSocket
* implementation, but it does not start the connection to the server
* and then the event processing too. This constructor is used for unit testing.
*/
public EventStream() {
}
private Session session;

/**
* Constructor used to create this object.
* Automatically open a WebSocket and start event processing.
*
* @param config Contains the necessary details such as EndPoint URL and
* authentication token required to create the WebSocket.
* @throws SaltStackException in case of an error during stream initialization
*/
public EventStream(ClientConfig config) {
try {
initializeStream(config);
} catch (SaltStackException e) {
e.printStackTrace();
}
public EventStream(ClientConfig config) throws SaltStackException {
maxMessageLength = config.get(ClientConfig.WEBSOCKET_MAX_MESSAGE_LENGTH) > 0 ?
config.get(ClientConfig.WEBSOCKET_MAX_MESSAGE_LENGTH) : Integer.MAX_VALUE;
initializeStream(config);
}

/**
Expand All @@ -93,6 +100,9 @@ public void processEvents(URI uri, ClientConfig config)

/**
* Connect the WebSocket to the server pointing to /ws/{token} to receive events.
*
* @param config the client configuration
* @throws SaltStackException in case of an error during stream initialization
*/
private void initializeStream (ClientConfig config) throws SaltStackException {
try {
Expand Down Expand Up @@ -153,16 +163,24 @@ public boolean isEventStreamClosed() {

/**
* Close the WebSocket {@link Session}.
*
* @throws IOException in case of an error when closing the session
*/
@Override
public void close() {
public void close() throws IOException {
close(new CloseReason(CloseCodes.GOING_AWAY,
"The listener has closed the event stream"));
}

/**
* Close the WebSocket {@link Session} with a given close reason.
*
* @param closeReason the reason for the websocket closure
* @throws IOException in case of an error when closing the session
*/
public void close(CloseReason closeReason) throws IOException {
if (!isEventStreamClosed()) {
try {
this.session.close(new CloseReason(CloseCodes.GOING_AWAY,
"The listener has closed the event stream"));
} catch (IOException e) {
e.printStackTrace();
}
session.close(closeReason);
}
}

Expand All @@ -173,8 +191,7 @@ public void close() {
*
* @param session The just started WebSocket {@link Session}.
* @param config The {@link EndpointConfig} containing the handshake informations.
* @throws IOException Exception thrown if something goes wrong sending message
* to the remote peer.
* @throws IOException if something goes wrong sending message to the remote peer
*/
@OnOpen
public void onOpen(Session session, EndpointConfig config) throws IOException {
Expand All @@ -183,30 +200,56 @@ public void onOpen(Session session, EndpointConfig config) throws IOException {
}

/**
* Notify listeners on each event received on the WebSocket.
* Notify listeners on each event received on the websocket and buffer partial messages.
*
* @param message The message received on this WebSocket
* @param partialMessage partial message received on this websocket
* @param last indicate the last part of a message
* @throws MessageTooBigException in case the message is longer than maxMessageLength
*/
@OnMessage
public void onMessage(String message) {
if (message != null && !message.equals("server received message")) {
// Salt API adds a "data: " prefix that we need to ignore
Event event = JsonParser.EVENTS.parse(message.substring(6));
synchronized (listeners) {
listeners.stream().forEach(l -> l.notify(event));
public void onMessage(String partialMessage, boolean last)
throws MessageTooBigException {
if (partialMessage.length() > maxMessageLength - messageBuffer.length()) {
throw new MessageTooBigException(maxMessageLength);
}

if (last) {
String message;
if (messageBuffer.length() == 0) {
message = partialMessage;
} else {
messageBuffer.append(partialMessage);
message = messageBuffer.toString();

// Reset the size to the defaultBufferSize and empty the buffer
messageBuffer.setLength(defaultBufferSize);
messageBuffer.trimToSize();
messageBuffer.setLength(0);
}

// Notify all registered listeners
if (!message.equals("server received message")) {
// Salt API adds a "data: " prefix that we need to ignore
Event event = JsonParser.EVENTS.parse(message.substring(6));
synchronized (listeners) {
listeners.stream().forEach(listener -> listener.notify(event));
}
}
} else {
messageBuffer.append(partialMessage);
}
}

/**
* On error, close all objects of this class.
* On error, convert {@link Throwable} into {@link CloseReason} and close the session.
*
* @param t The Throwable object received on the current error.
* @param throwable The Throwable object received on the current error.
* @throws IOException in case of an error when closing the session
*/
@OnError
public void onError(Throwable t) {
this.close();
t.printStackTrace();
public void onError(Throwable throwable) throws IOException {
close(new CloseReason(throwable instanceof MessageTooBigException ?
CloseCodes.TOO_BIG : CloseCodes.CLOSED_ABNORMALLY, throwable.getMessage()));
}

/**
Expand All @@ -222,7 +265,7 @@ public void onClose(Session session, CloseReason closeReason) {

// Notify all the listeners and cleanup
synchronized (listeners) {
listeners.stream().forEach(l -> l.eventStreamClosed(closeReason));
listeners.stream().forEach(listener -> listener.eventStreamClosed(closeReason));

// Clear out the listeners
listeners.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.suse.saltstack.netapi.exception;

/**
* Exception to be thrown in case of a websocket message exceeding the configurable
* maximum message length.
*/
public class MessageTooBigException extends SaltStackException {

/**
* Constructor.
*
* @param maxMessageLength the maximum message length to be mentioned in the message
*/
public MessageTooBigException(int maxMessageLength) {
super("Message length exceeded the configured maximum (" +
maxMessageLength + " characters)");
}
}
Loading

0 comments on commit 9283ad1

Please sign in to comment.