Skip to content

Commit

Permalink
Add more resilience over connectivity issues (braverhealth#79)
Browse files Browse the repository at this point in the history
* add toxiproxy service for testing

* add tests for certain connection issues

* fixes

* fix service name

* fix logging flag
  • Loading branch information
matehat authored and s6o committed May 23, 2024
1 parent f1964e8 commit 473d6bd
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 58 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ jobs:
runs-on: ubuntu-latest

services:
phoenix:
backend:
image: braverhq/phoenix-dart-server
ports:
- 4001:4001
- 4002:4002

proxy:
image: ghcr.io/shopify/toxiproxy
ports:
- 8474:8474
- 4004:4004

steps:

Expand Down
13 changes: 12 additions & 1 deletion example/backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
FROM bitwalker/alpine-elixir-phoenix
FROM hexpm/elixir:1.15.7-erlang-26.2.2-alpine-3.19.1

RUN apk update && apk upgrade && \
apk add --no-cache \
ca-certificates \
bash \
curl \
git

RUN echo America/Montreal > /etc/timezone
RUN mix local.hex --force && \
mix local.rebar --force

EXPOSE 4001

Expand Down
14 changes: 14 additions & 0 deletions example/backend/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
backend:
build:
context: .
dockerfile: Dockerfile
ports:
- 4001:4001
- 4002:4002

proxy:
image: ghcr.io/shopify/toxiproxy
ports:
- 8474:8474
- 4004:4004
8 changes: 6 additions & 2 deletions lib/src/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,16 @@ class PhoenixChannel {
);

if (canPush) {
_logger.finest(() => 'Sending out push ${pushEvent.ref}');
pushEvent.send();
} else {
if (_state == PhoenixChannelState.closed) {
throw ChannelClosedError('Can\'t push event on a closed channel');
if (_state == PhoenixChannelState.closed ||
_state == PhoenixChannelState.errored) {
throw ChannelClosedError('Can\'t push event on a $_state channel');
}

_logger.finest(
() => 'Buffering push ${pushEvent.ref} for later send ($_state)');
pushBuffer.add(pushEvent);
}

Expand Down
6 changes: 4 additions & 2 deletions lib/src/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,15 @@ class PhoenixChannelEvent {
value.startsWith(__replyEventName);

/// Whether the event name is a 'channel reply' event
bool get isChannelReply =>
value.startsWith(__chanReplyEventName);
bool get isChannelReply => value.startsWith(__chanReplyEventName);

@override
bool operator ==(Object other) =>
other is PhoenixChannelEvent && other.value == value;

@override
int get hashCode => Object.hash(runtimeType, value);

@override
String toString() => 'PhoenixChannelEvent($value)';
}
7 changes: 5 additions & 2 deletions lib/src/push.dart
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class Push {

_timeoutTimer ??= Timer(timeout!, () {
_timeoutTimer = null;
_logger.warning('Push $ref timed out');
_logger.warning(() => 'Push $ref timed out');
_channel.trigger(Message.timeoutFor(ref));
});
}
Expand Down Expand Up @@ -268,6 +268,7 @@ class Push {
// Remove existing waiters and reset completer
void cleanUp() {
if (_sent) {
_logger.fine('Cleaning up completer');
clearReceivers();
_responseCompleter = Completer();
}
Expand All @@ -279,7 +280,9 @@ class Push {
if (response.event == replyEvent) {
trigger(PushResponse.fromMessage(response));
}
} else {
} else if (event != PhoenixChannelEvent.join) {
_logger.finest(
() => "Completing with error: ${_responseCompleter.hashCode}");
if (!_responseCompleter.isCompleted) {
_responseCompleter.completeError(response);
clearReceivers();
Expand Down
39 changes: 27 additions & 12 deletions phoenix_socket.code-workspace
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
{
"folders": [
{
"path": "."
},
{
"path": "example/backend"
}
],
"settings": {
"editor.formatOnSave": true,
"elixirLS.fetchDeps": true
}
"folders": [
{
"path": "."
},
{
"path": "example/backend"
}
],
"settings": {
"editor.formatOnSave": true,
"elixirLS.fetchDeps": true,
"[dart]": {
"editor.tabSize": 2,
"editor.insertSpaces": true,
"editor.detectIndentation": false,
"editor.suggest.insertMode": "replace",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit"
}
},
"dart.env": {
"LOG_ALL_LEVELS": "y"
},
"dart.testAdditionalArgs": ["--chain-stack-traces"]
}
}
153 changes: 142 additions & 11 deletions test/channel_integration_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@ import 'dart:async';
import 'package:phoenix_socket/phoenix_socket.dart';
import 'package:test/test.dart';

import 'control.dart';
import 'helpers/logging.dart';
import 'helpers/proxy.dart';

void main() {
const addr = 'ws://localhost:4001/socket/websocket';
Set<int> usedPorts = {};

void main() {
group('PhoenixChannel', () {
const addr = 'ws://localhost:4004/socket/websocket';

setUpAll(() {
maybeActivateAllLogLevels();
});

setUp(() async {
await restartBackend();
await prepareProxy();
});

tearDown(() async {
await destroyProxy();
});

test('can join a channel through a socket', () async {
Expand All @@ -28,10 +39,11 @@ void main() {

test('can join a channel through a socket that starts closed then connects',
() async {
await haltThenResumeProxy();

final socket = PhoenixSocket(addr);
final completer = Completer<void>();

await stopThenRestartBackend();
await socket.connect();

socket.addChannel(topic: 'channel1').join().onReply('ok', (reply) {
Expand All @@ -50,10 +62,10 @@ void main() {

await socket.connect();

await stopBackend();
await haltProxy();
final joinFuture = socket.addChannel(topic: 'channel1').join();
Future.delayed(const Duration(milliseconds: 300))
.then((value) => restartBackend());
.then((value) => resumeProxy());

joinFuture.onReply('ok', (reply) {
expect(reply.status, equals('ok'));
Expand All @@ -63,6 +75,32 @@ void main() {
await completer.future;
});

test(
'can join a channel through a socket that gets a "peer reset" before join but reconnects',
() async {
final socket = PhoenixSocket(addr);
final completer = Completer<void>();

await socket.connect();
addTearDown(() {
socket.close();
});
await resetPeer();

runZonedGuarded(() {
final joinFuture = socket.addChannel(topic: 'channel1').join();
joinFuture.onReply('ok', (reply) {
expect(reply.status, equals('ok'));
completer.complete();
});
}, (error, stack) {});

Future.delayed(const Duration(milliseconds: 1000))
.then((value) => resetPeer(enable: false));

await completer.future;
});

test('can join a channel through an unawaited socket', () async {
final socket = PhoenixSocket(addr);
final completer = Completer<void>();
Expand Down Expand Up @@ -137,22 +175,100 @@ void main() {
});

test(
'can send messages to channels that got transiently disconnected and receive a reply',
() async {
'can send messages to channels that got transiently '
'disconnected and receive a reply', () async {
final socket = PhoenixSocket(addr);

await socket.connect();

final channel1 = socket.addChannel(topic: 'channel1');
await channel1.join().future;

await stopThenRestartBackend();
await haltThenResumeProxy();
await socket.openStream.first;

final reply = await channel1.push('hello!', {'foo': 'bar'}).future;
expect(reply.status, equals('ok'));
expect(reply.response, equals({'name': 'bar'}));
});

test(
'can send messages to channels that got "peer reset" '
'and receive a reply', () async {
final socket = PhoenixSocket(addr);

await socket.connect();

final channel1 = socket.addChannel(topic: 'channel1');
await channel1.join().future;

await resetPeerThenResumeProxy();

final push = channel1.push('hello!', {'foo': 'bar'});
final reply = await push.future;

expect(reply.status, equals('ok'));
expect(reply.response, equals({'name': 'bar'}));
});

test(
'throws when sending messages to channels that got "peer reset" '
'and that have not recovered yet', () async {
final socket = PhoenixSocket(addr);

await socket.connect();

final channel1 = socket.addChannel(topic: 'channel1');
await channel1.join().future;

await resetPeer();

final Completer<Object> errorCompleter = Completer();

runZonedGuarded(() async {
final push = channel1.push('hello!', {'foo': 'bar'});
try {
await push.future;
} catch (err) {
errorCompleter.complete(err);
}
}, (error, stack) {});

final Object exception;
expect(exception = await errorCompleter.future, isA<PhoenixException>());
expect((exception as PhoenixException).socketClosed, isNotNull);
});

test(
'throws when sending messages to channels that got disconnected '
'and that have not recovered yet',
() async {
final socket = PhoenixSocket(addr);

await socket.connect();

final channel1 = socket.addChannel(topic: 'channel1');
await channel1.join().future;

await haltProxy();

final Completer<Object> errorCompleter = Completer();
runZonedGuarded(() async {
try {
final push = channel1.push('hello!', {'foo': 'bar'});
await push.future;
} catch (err) {
errorCompleter.complete(err);
}
}, (error, stack) {});

expect(await errorCompleter.future, isA<ChannelClosedError>());
},
timeout: Timeout(
Duration(seconds: 5),
),
);

test('only emits reply messages that are channel replies', () async {
final socket = PhoenixSocket(addr);

Expand Down Expand Up @@ -195,6 +311,11 @@ void main() {
final channel2 = socket2.addChannel(topic: 'channel3');
await channel2.join().future;

addTearDown(() {
socket1.close();
socket2.close();
});

expect(
channel1.messages,
emitsInOrder([
Expand Down Expand Up @@ -249,6 +370,11 @@ void main() {
final channel2 = socket2.addChannel(topic: 'channel3');
await channel2.join().future;

addTearDown(() {
socket1.close();
socket2.close();
});

channel1.push('ping', {'from': 'socket1'});

expect(
Expand Down Expand Up @@ -278,6 +404,11 @@ void main() {
final channel2 = socket2.addChannel(topic: 'channel3');
await channel2.join().future;

addTearDown(() {
socket1.close();
socket2.close();
});

channel1.push('ping', {'from': 'socket1'});

expect(
Expand Down Expand Up @@ -315,8 +446,8 @@ void main() {
final socket = PhoenixSocket(addr);
await socket.connect();
final channel = socket.addChannel(topic: 'channel3');
await channel.join().future;

await channel.join().future;
await channel.leave().future;

expect(
Expand Down
Loading

0 comments on commit 473d6bd

Please sign in to comment.