diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index d4f6e52..fefc46e 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -13,11 +13,17 @@ jobs: runs-on: ubuntu-latest services: - phoenix: + backend: image: braverhq/phoenix-dart-server ports: - 4001:4001 - 4002:4002 + + proxy: + image: ghcr.io/shopify/toxiproxy + ports: + - 8474:8474 + - 4004:4004 steps: diff --git a/example/backend/Dockerfile b/example/backend/Dockerfile index 83acde2..9b7f2a1 100644 --- a/example/backend/Dockerfile +++ b/example/backend/Dockerfile @@ -1,4 +1,15 @@ -FROM bitwalker/alpine-elixir-phoenix +FROM hexpm/elixir:1.15.7-erlang-26.2.2-alpine-3.19.1 + +RUN apk update && apk upgrade && \ + apk add --no-cache \ + ca-certificates \ + bash \ + curl \ + git + +RUN echo America/Montreal > /etc/timezone +RUN mix local.hex --force && \ + mix local.rebar --force EXPOSE 4001 diff --git a/example/backend/compose.yaml b/example/backend/compose.yaml new file mode 100644 index 0000000..ce49b47 --- /dev/null +++ b/example/backend/compose.yaml @@ -0,0 +1,14 @@ +services: + backend: + build: + context: . + dockerfile: Dockerfile + ports: + - 4001:4001 + - 4002:4002 + + proxy: + image: ghcr.io/shopify/toxiproxy + ports: + - 8474:8474 + - 4004:4004 diff --git a/lib/src/channel.dart b/lib/src/channel.dart index 2f0e484..e62c823 100644 --- a/lib/src/channel.dart +++ b/lib/src/channel.dart @@ -278,12 +278,16 @@ class PhoenixChannel { ); if (canPush) { + _logger.finest(() => 'Sending out push ${pushEvent.ref}'); pushEvent.send(); } else { - if (_state == PhoenixChannelState.closed) { - throw ChannelClosedError('Can\'t push event on a closed channel'); + if (_state == PhoenixChannelState.closed || + _state == PhoenixChannelState.errored) { + throw ChannelClosedError('Can\'t push event on a $_state channel'); } + _logger.finest( + () => 'Buffering push ${pushEvent.ref} for later send ($_state)'); pushBuffer.add(pushEvent); } diff --git a/lib/src/events.dart b/lib/src/events.dart index 28e7ea0..b2566b2 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -133,8 +133,7 @@ class PhoenixChannelEvent { value.startsWith(__replyEventName); /// Whether the event name is a 'channel reply' event - bool get isChannelReply => - value.startsWith(__chanReplyEventName); + bool get isChannelReply => value.startsWith(__chanReplyEventName); @override bool operator ==(Object other) => @@ -142,4 +141,7 @@ class PhoenixChannelEvent { @override int get hashCode => Object.hash(runtimeType, value); + + @override + String toString() => 'PhoenixChannelEvent($value)'; } diff --git a/lib/src/push.dart b/lib/src/push.dart index 55a75ba..8e03622 100644 --- a/lib/src/push.dart +++ b/lib/src/push.dart @@ -202,7 +202,7 @@ class Push { _timeoutTimer ??= Timer(timeout!, () { _timeoutTimer = null; - _logger.warning('Push $ref timed out'); + _logger.warning(() => 'Push $ref timed out'); _channel.trigger(Message.timeoutFor(ref)); }); } @@ -268,6 +268,7 @@ class Push { // Remove existing waiters and reset completer void cleanUp() { if (_sent) { + _logger.fine('Cleaning up completer'); clearReceivers(); _responseCompleter = Completer(); } @@ -279,7 +280,9 @@ class Push { if (response.event == replyEvent) { trigger(PushResponse.fromMessage(response)); } - } else { + } else if (event != PhoenixChannelEvent.join) { + _logger.finest( + () => "Completing with error: ${_responseCompleter.hashCode}"); if (!_responseCompleter.isCompleted) { _responseCompleter.completeError(response); clearReceivers(); diff --git a/phoenix_socket.code-workspace b/phoenix_socket.code-workspace index c28bc7a..b324dbf 100644 --- a/phoenix_socket.code-workspace +++ b/phoenix_socket.code-workspace @@ -1,14 +1,29 @@ { - "folders": [ - { - "path": "." - }, - { - "path": "example/backend" - } - ], - "settings": { - "editor.formatOnSave": true, - "elixirLS.fetchDeps": true - } + "folders": [ + { + "path": "." + }, + { + "path": "example/backend" + } + ], + "settings": { + "editor.formatOnSave": true, + "elixirLS.fetchDeps": true, + "[dart]": { + "editor.tabSize": 2, + "editor.insertSpaces": true, + "editor.detectIndentation": false, + "editor.suggest.insertMode": "replace", + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit", + "source.fixAll": "explicit" + } + }, + "dart.env": { + "LOG_ALL_LEVELS": "y" + }, + "dart.testAdditionalArgs": ["--chain-stack-traces"] + } } diff --git a/test/channel_integration_test.dart b/test/channel_integration_test.dart index 11a10fd..79b1501 100644 --- a/test/channel_integration_test.dart +++ b/test/channel_integration_test.dart @@ -3,14 +3,25 @@ import 'dart:async'; import 'package:phoenix_socket/phoenix_socket.dart'; import 'package:test/test.dart'; -import 'control.dart'; +import 'helpers/logging.dart'; +import 'helpers/proxy.dart'; -void main() { - const addr = 'ws://localhost:4001/socket/websocket'; +Set usedPorts = {}; +void main() { group('PhoenixChannel', () { + const addr = 'ws://localhost:4004/socket/websocket'; + + setUpAll(() { + maybeActivateAllLogLevels(); + }); + setUp(() async { - await restartBackend(); + await prepareProxy(); + }); + + tearDown(() async { + await destroyProxy(); }); test('can join a channel through a socket', () async { @@ -28,10 +39,11 @@ void main() { test('can join a channel through a socket that starts closed then connects', () async { + await haltThenResumeProxy(); + final socket = PhoenixSocket(addr); final completer = Completer(); - await stopThenRestartBackend(); await socket.connect(); socket.addChannel(topic: 'channel1').join().onReply('ok', (reply) { @@ -50,10 +62,10 @@ void main() { await socket.connect(); - await stopBackend(); + await haltProxy(); final joinFuture = socket.addChannel(topic: 'channel1').join(); Future.delayed(const Duration(milliseconds: 300)) - .then((value) => restartBackend()); + .then((value) => resumeProxy()); joinFuture.onReply('ok', (reply) { expect(reply.status, equals('ok')); @@ -63,6 +75,32 @@ void main() { await completer.future; }); + test( + 'can join a channel through a socket that gets a "peer reset" before join but reconnects', + () async { + final socket = PhoenixSocket(addr); + final completer = Completer(); + + await socket.connect(); + addTearDown(() { + socket.close(); + }); + await resetPeer(); + + runZonedGuarded(() { + final joinFuture = socket.addChannel(topic: 'channel1').join(); + joinFuture.onReply('ok', (reply) { + expect(reply.status, equals('ok')); + completer.complete(); + }); + }, (error, stack) {}); + + Future.delayed(const Duration(milliseconds: 1000)) + .then((value) => resetPeer(enable: false)); + + await completer.future; + }); + test('can join a channel through an unawaited socket', () async { final socket = PhoenixSocket(addr); final completer = Completer(); @@ -137,8 +175,8 @@ void main() { }); test( - 'can send messages to channels that got transiently disconnected and receive a reply', - () async { + 'can send messages to channels that got transiently ' + 'disconnected and receive a reply', () async { final socket = PhoenixSocket(addr); await socket.connect(); @@ -146,13 +184,91 @@ void main() { final channel1 = socket.addChannel(topic: 'channel1'); await channel1.join().future; - await stopThenRestartBackend(); + await haltThenResumeProxy(); + await socket.openStream.first; final reply = await channel1.push('hello!', {'foo': 'bar'}).future; expect(reply.status, equals('ok')); expect(reply.response, equals({'name': 'bar'})); }); + test( + 'can send messages to channels that got "peer reset" ' + 'and receive a reply', () async { + final socket = PhoenixSocket(addr); + + await socket.connect(); + + final channel1 = socket.addChannel(topic: 'channel1'); + await channel1.join().future; + + await resetPeerThenResumeProxy(); + + final push = channel1.push('hello!', {'foo': 'bar'}); + final reply = await push.future; + + expect(reply.status, equals('ok')); + expect(reply.response, equals({'name': 'bar'})); + }); + + test( + 'throws when sending messages to channels that got "peer reset" ' + 'and that have not recovered yet', () async { + final socket = PhoenixSocket(addr); + + await socket.connect(); + + final channel1 = socket.addChannel(topic: 'channel1'); + await channel1.join().future; + + await resetPeer(); + + final Completer errorCompleter = Completer(); + + runZonedGuarded(() async { + final push = channel1.push('hello!', {'foo': 'bar'}); + try { + await push.future; + } catch (err) { + errorCompleter.complete(err); + } + }, (error, stack) {}); + + final Object exception; + expect(exception = await errorCompleter.future, isA()); + expect((exception as PhoenixException).socketClosed, isNotNull); + }); + + test( + 'throws when sending messages to channels that got disconnected ' + 'and that have not recovered yet', + () async { + final socket = PhoenixSocket(addr); + + await socket.connect(); + + final channel1 = socket.addChannel(topic: 'channel1'); + await channel1.join().future; + + await haltProxy(); + + final Completer errorCompleter = Completer(); + runZonedGuarded(() async { + try { + final push = channel1.push('hello!', {'foo': 'bar'}); + await push.future; + } catch (err) { + errorCompleter.complete(err); + } + }, (error, stack) {}); + + expect(await errorCompleter.future, isA()); + }, + timeout: Timeout( + Duration(seconds: 5), + ), + ); + test('only emits reply messages that are channel replies', () async { final socket = PhoenixSocket(addr); @@ -195,6 +311,11 @@ void main() { final channel2 = socket2.addChannel(topic: 'channel3'); await channel2.join().future; + addTearDown(() { + socket1.close(); + socket2.close(); + }); + expect( channel1.messages, emitsInOrder([ @@ -249,6 +370,11 @@ void main() { final channel2 = socket2.addChannel(topic: 'channel3'); await channel2.join().future; + addTearDown(() { + socket1.close(); + socket2.close(); + }); + channel1.push('ping', {'from': 'socket1'}); expect( @@ -278,6 +404,11 @@ void main() { final channel2 = socket2.addChannel(topic: 'channel3'); await channel2.join().future; + addTearDown(() { + socket1.close(); + socket2.close(); + }); + channel1.push('ping', {'from': 'socket1'}); expect( @@ -315,8 +446,8 @@ void main() { final socket = PhoenixSocket(addr); await socket.connect(); final channel = socket.addChannel(topic: 'channel3'); - await channel.join().future; + await channel.join().future; await channel.leave().future; expect( diff --git a/test/control.dart b/test/control.dart deleted file mode 100644 index 6291b17..0000000 --- a/test/control.dart +++ /dev/null @@ -1,27 +0,0 @@ -import 'package:http/http.dart'; - -Future stopBackend() { - return get(Uri.parse('http://localhost:4002/stop')).then((response) { - if (response.statusCode != 200) { - throw Exception('Failed to stop backend'); - } - }); -} - -Future restartBackend() { - return get(Uri.parse('http://localhost:4002/start')).then((response) { - if (response.statusCode != 200) { - throw Exception('Failed to start backend'); - } - }); -} - -Future stopThenRestartBackend( - [Duration delay = const Duration(milliseconds: 200)]) { - return get(Uri.parse('http://localhost:4002/stop')).then((response) { - if (response.statusCode != 200) { - throw Exception('Failed to stop backend'); - } - Future.delayed(delay).then((_) => restartBackend()); - }); -} diff --git a/test/helpers/env/_io.dart b/test/helpers/env/_io.dart new file mode 100644 index 0000000..c177908 --- /dev/null +++ b/test/helpers/env/_io.dart @@ -0,0 +1,7 @@ +import 'dart:io'; + +final _logAllLevels = Platform.environment['LOG_ALL_LEVELS']; + +bool shouldPrintAllLogs() { + return _logAllLevels == 'y'; +} diff --git a/test/helpers/env/base.dart b/test/helpers/env/base.dart new file mode 100644 index 0000000..9345c88 --- /dev/null +++ b/test/helpers/env/base.dart @@ -0,0 +1,3 @@ +bool shouldPrintAllLogs() { + return true; +} diff --git a/test/helpers/logging.dart b/test/helpers/logging.dart new file mode 100644 index 0000000..0b6385a --- /dev/null +++ b/test/helpers/logging.dart @@ -0,0 +1,21 @@ +import 'package:logging/logging.dart'; + +import 'env/base.dart' if (dart.library.io) 'env/_io.dart'; + +void maybeActivateAllLogLevels() { + if (!shouldPrintAllLogs()) { + return; + } + + Logger.root.level = Level.ALL; + Logger.root.onRecord.forEach((record) { + print( + '[${record.loggerName}] ${record.level.name} ${record.time}: ${record.message}'); + if (record.error != null) { + print('${record.error}'); + if (record.stackTrace != null) { + record.stackTrace.toString().split('\n').forEach(print); + } + } + }); +} diff --git a/test/helpers/proxy.dart b/test/helpers/proxy.dart new file mode 100644 index 0000000..246b7e6 --- /dev/null +++ b/test/helpers/proxy.dart @@ -0,0 +1,74 @@ +import 'dart:convert'; + +import 'package:http/http.dart'; + +const toxiproxy = 'http://localhost:8474'; + +Future prepareProxy() { + return post( + Uri.parse('$toxiproxy/proxies'), + body: jsonEncode( + { + 'name': 'backend', + 'listen': '0.0.0.0:4004', + 'upstream': 'backend:4001', + 'enabled': true, + }, + ), + ); +} + +Future haltProxy() { + return patch( + Uri.parse('$toxiproxy/proxies/backend'), + body: jsonEncode( + {'enabled': false}, + ), + ); +} + +Future resumeProxy() { + return patch( + Uri.parse('$toxiproxy/proxies/backend'), + body: jsonEncode( + {'enabled': true}, + ), + ); +} + +Future resetPeer({bool enable = true}) { + return enable + ? post( + Uri.parse('$toxiproxy/proxies/backend/toxics'), + body: jsonEncode( + {'name': 'reset-peer', 'type': 'reset_peer'}, + ), + ) + : delete( + Uri.parse('$toxiproxy/proxies/backend/toxics/reset-peer'), + ); +} + +Future destroyProxy() { + return delete(Uri.parse('$toxiproxy/proxies/backend')); +} + +Future haltThenResumeProxy([ + Duration delay = const Duration(milliseconds: 500), +]) { + return haltProxy().then((response) { + return Future.delayed(delay).then((_) async { + await resumeProxy(); + }); + }); +} + +Future resetPeerThenResumeProxy([ + Duration delay = const Duration(milliseconds: 500), +]) { + return resetPeer().then((response) { + return Future.delayed(delay).then((_) async { + await resetPeer(enable: false); + }); + }); +}