Skip to content

Commit

Permalink
fabric8io#71 Add closing message to websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
speedfl committed Feb 1, 2022
1 parent 482fbb1 commit 2446543
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 25 deletions.
10 changes: 10 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ To support mock of web sockets this wrapper allows you to either specify a ``req
.waitFor(1500).andEmit("root - DELETED")
.done()
.once()
#### Closing Web Socket messages ####

server.expect().withPath("/api/v1/users/watch")
.andUpgradeToWebSocket()
.open()
.waitFor(1000).andEmit("root - CREATED")
.waitFor(1500).andEmit(new WebsocketCloseReason(1000, "Bye bye"))
.done()
.once()

### CRUD Mocking ###

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -20,17 +20,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.mockwebserver.Context;
import io.fabric8.mockwebserver.MockServerException;
import io.fabric8.mockwebserver.dsl.Emitable;
import io.fabric8.mockwebserver.dsl.EventDoneable;
import io.fabric8.mockwebserver.dsl.Function;
import io.fabric8.mockwebserver.dsl.TimesOrOnceable;
import io.fabric8.mockwebserver.dsl.WebSocketSessionBuilder;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import io.fabric8.mockwebserver.dsl.*;

import java.util.*;

public class InlineWebSocketSessionBuilder<T> implements WebSocketSessionBuilder<T>, EventDoneable<T> {

Expand Down Expand Up @@ -171,7 +163,10 @@ private WebSocketMessage toWebSocketMessage(Object content, Boolean toBeRemoved)
}

private WebSocketMessage toWebSocketMessage(Long delay, Object content, Boolean toBeRemoved) {
if (content instanceof String) {
if (content instanceof WebsocketCloseReason) {
WebsocketCloseReason closeReason = (WebsocketCloseReason) content;
return new WebSocketMessage(delay, closeReason.getReason(), toBeRemoved, closeReason.getCode());
} else if (content instanceof String) {
return new WebSocketMessage(delay, (String) content, toBeRemoved);
} else if (content instanceof WebSocketMessage) {
return (WebSocketMessage) content;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,50 @@ public class WebSocketMessage {
private final byte[] body;
private final boolean toBeRemoved;
private final boolean binary;
private final Integer closingReason;

public WebSocketMessage(String body) {
this(0L, body, true);
}

public WebSocketMessage(byte[] body) {
this(0L, body, true, true);
this(0L, body, true, true, null);
}

public WebSocketMessage(String body, boolean toBeRemoved) {
this(0L, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false);
this(0L, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false, null);
}

public WebSocketMessage(byte[] body, boolean toBeRemoved) {
this(0L, body, toBeRemoved, true);
this(0L, body, toBeRemoved, true, null);
}

public WebSocketMessage(Long delay, String body, boolean toBeRemoved) {
this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false);
this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false, null);
}

public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved) {
this(delay, body, toBeRemoved, true);
this(delay, body, toBeRemoved, true, null);
}

public WebSocketMessage(Long delay, String body, boolean toBeRemoved, Integer closingReason) {
this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false, closingReason);
}

public WebSocketMessage(Long delay, String body, boolean toBeRemoved, boolean binary) {
this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, binary);
public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved, Integer closingReason) {
this(delay, body, toBeRemoved, true, closingReason);
}

public WebSocketMessage(Long delay, String body, boolean toBeRemoved, boolean binary, Integer closingReason) {
this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, binary, closingReason);
}

public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved, boolean binary) {
public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved, boolean binary, Integer closingReason) {
this.delay = delay;
this.body = body;
this.toBeRemoved = toBeRemoved;
this.binary = binary;
this.closingReason = closingReason;
}

public Long getDelay() {
Expand All @@ -78,4 +88,8 @@ public byte[] getBytes() {
public boolean isBinary() {
return binary;
}

public Integer getClosingReason() {
return closingReason;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import okhttp3.mockwebserver.RecordedRequest;
import okio.ByteString;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -161,7 +162,13 @@ private void send(final WebSocket ws, final WebSocketMessage message) {
pendingMessages.add(id);
executor.schedule(() -> {
if (ws != null) {
if (message.isBinary()) {
if (message.getClosingReason() != null) {
if (message.isBinary()) {
ws.close(message.getClosingReason(), new String(message.getBytes(), StandardCharsets.UTF_8));
} else {
ws.close(message.getClosingReason(), message.getBody());
}
} else if (message.isBinary()) {
ws.send(ByteString.of(message.getBytes()));
} else {
ws.send(message.getBody());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.fabric8.mockwebserver.internal;

public class WebsocketCloseReason {

private final int code;
private final String reason;

public WebsocketCloseReason(int code, String reason) {
super();
this.code = code;
this.reason = reason;
}

public int getCode() {
return code;
}

public String getReason() {
return reason;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import java.util.stream.IntStream

import io.fabric8.mockwebserver.internal.WebsocketCloseReason

class DefaultMockServerWebSocketTest extends Specification {

DefaultMockServer server
Expand Down Expand Up @@ -119,4 +121,56 @@ class DefaultMockServerWebSocketTest extends Specification {
cleanup:
wss.forEach(ws -> ws.close(1000, "Test finished"))
}

// https://github.com/fabric8io/mockwebserver/pull/66#issuecomment-944289335
def "andUpgradeToWebSocket, with closing events, should close session"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(5L).andEmit("A text message").waitFor(10L).andEmit(new WebsocketCloseReason(1000, "Everything is ok")).done().always()
def future = new CompletableFuture<List<String>>()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
List<String> messages = new ArrayList<>()
@Override
void onMessage(WebSocket webSocket, String text) {
messages.add(text)
}
@Override
void onClosing(WebSocket webSocket, int code, String reason) {
messages.add("Session closed with code " + code + " and reason " + reason)
future.complete(messages)
}
})
then:
def result = future.get(50L, TimeUnit.MILLISECONDS);
assert result.size() == 2
assert result.get(0) == "A text message"
assert result.get(1) == "Session closed with code 1000 and reason Everything is ok"
}

def "andUpgradeToWebSocket, with closing events, should close session, later message skipped"() {
given:
server.expect()
.withPath("/websocket")
.andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").waitFor(5L).andEmit(new WebsocketCloseReason(1000, "Everything is ok")).done().always()
def future = new CompletableFuture<List<String>>()
when:
def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
List<String> messages = new ArrayList<>()
@Override
void onMessage(WebSocket webSocket, String text) {
messages.add(text)
}
@Override
void onClosing(WebSocket webSocket, int code, String reason) {
messages.add("Session closed with code " + code + " and reason " + reason)
future.complete(messages)
}
})
then:
def result = future.get(50L, TimeUnit.MILLISECONDS);
assert result.size() == 1
assert result.get(0) == "Session closed with code 1000 and reason Everything is ok"
}
}

0 comments on commit 2446543

Please sign in to comment.