Skip to content

Commit

Permalink
feat: add a proper tracker management (#17)
Browse files Browse the repository at this point in the history
* feat: add a proper tracker management

* chore: remove newline

* fix: use closeWait instead of close

* chore: replace custom made checkTrackers by chronos checkLeaks

* chore: renamed UdpTransport.stop into close

* chore: remove flakyAsyncTest
  • Loading branch information
lchenut authored Jul 5, 2024
1 parent 11111e6 commit 81b91e3
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 87 deletions.
32 changes: 2 additions & 30 deletions tests/asyncunit.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest2, chronos
import chronos/unittest2/asynctests

export unittest2, chronos
export unittest2, chronos, asynctests

template asyncTeardown*(body: untyped): untyped =
teardown:
Expand All @@ -15,32 +16,3 @@ template asyncSetup*(body: untyped): untyped =
proc() {.async, gcsafe.} =
body
)())

template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((
proc() {.async, gcsafe.} =
body
)())

template flakyAsyncTest*(name: string, attempts: int, body: untyped): untyped =
test name:
var attemptNumber = 0
while attemptNumber < attempts:
let isLastAttempt = attemptNumber == attempts - 1
inc attemptNumber
try:
waitFor((
proc() {.async, gcsafe.} =
body
)())
except Exception as e:
if isLastAttempt: raise e
else: testStatusIMPL = TestStatus.FAILED
finally:
if not isLastAttempt:
if testStatusIMPL == TestStatus.FAILED:
# Retry
testStatusIMPL = TestStatus.OK
else:
break
48 changes: 0 additions & 48 deletions tests/helpers.nim

This file was deleted.

22 changes: 18 additions & 4 deletions tests/teststun.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

{.used.}

import chronos
import options
import bearssl
import ../webrtc/udp_transport
Expand All @@ -28,7 +29,10 @@ proc passwordProvEmpty(username: seq[byte]): seq[byte] {.raises: [], gcsafe.} =
proc passwordProvTest(username: seq[byte]): seq[byte] {.raises: [], gcsafe.} = @[1'u8, 2, 3, 4]

suite "Stun message encoding/decoding":
test "Get BindingRequest + encode & decode with a set username":
teardown:
checkLeaks()

asyncTest "Get BindingRequest + encode & decode with a set username":
var
udp = UdpTransport.new(AnyAddress)
conn = StunConn.new(
Expand All @@ -52,8 +56,9 @@ suite "Stun message encoding/decoding":
messageIntegrity.attributeType == AttrMessageIntegrity.uint16
fingerprint.attributeType == AttrFingerprint.uint16
conn.close()
await udp.close()

test "Get BindingResponse from BindingRequest + encode & decode":
asyncTest "Get BindingResponse from BindingRequest + encode & decode":
var
udp = UdpTransport.new(AnyAddress)
conn = StunConn.new(
Expand All @@ -77,9 +82,14 @@ suite "Stun message encoding/decoding":
bindingResponse == decoded
messageIntegrity.attributeType == AttrMessageIntegrity.uint16
fingerprint.attributeType == AttrFingerprint.uint16
conn.close()
await udp.close()

suite "Stun checkForError":
test "checkForError: Missing MessageIntegrity or Username":
teardown:
checkLeaks()

asyncTest "checkForError: Missing MessageIntegrity or Username":
var
udp = UdpTransport.new(AnyAddress)
conn = StunConn.new(
Expand All @@ -104,8 +114,10 @@ suite "Stun checkForError":

check:
errorMissUsername.getAttribute(ErrorCode).get().getErrorCode() == ECBadRequest
conn.close()
await udp.close()

test "checkForError: UsernameChecker returns false":
asyncTest "checkForError: UsernameChecker returns false":
var
udp = UdpTransport.new(AnyAddress)
conn = StunConn.new(
Expand All @@ -124,3 +136,5 @@ suite "Stun checkForError":

check:
error.getAttribute(ErrorCode).get().getErrorCode() == ECUnauthorized
conn.close()
await udp.close()
3 changes: 3 additions & 0 deletions webrtc/stun/stun_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ logScope:
# - Need to implement ICE-CONTROLL(ED|ING) for browser to browser (not critical)

const
StunConnectionTracker* = "webrtc.stun.connection"
StunMaxQueuingMessages = 1024
StunBindingRequest* = 0x0001'u16
StunBindingResponse* = 0x0101'u16
Expand Down Expand Up @@ -211,6 +212,7 @@ proc new*(
rng: rng
)
self.handlesFut = self.stunMessageHandler()
trackCounter(StunConnectionTracker)
return self

proc join*(self: StunConn) {.async: (raises: [CancelledError]).} =
Expand All @@ -227,6 +229,7 @@ proc close*(self: StunConn) =
self.closeEvent.fire()
self.handlesFut.cancelSoon()
self.closed = true
untrackCounter(StunConnectionTracker)

proc write*(
self: StunConn,
Expand Down
3 changes: 3 additions & 0 deletions webrtc/stun/stun_transport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ logScope:
topics = "webrtc stun stun_transport"

const
StunTransportTracker* = "webrtc.stun.transport"
StunMaxPendingConnections = 512

type
Expand Down Expand Up @@ -89,6 +90,7 @@ proc stop(self: Stun) =
for conn in self.connections.values():
conn.close()
self.readingLoop.cancelSoon()
untrackCounter(StunTransportTracker)

proc defaultUsernameProvider(): string = ""
proc defaultUsernameChecker(username: seq[byte]): bool = true
Expand All @@ -113,4 +115,5 @@ proc new*(
)
self.readingLoop = stunReadLoop()
self.pendingConn = newAsyncQueue[StunConn](StunMaxPendingConnections)
trackCounter(StunTransportTracker)
return self
14 changes: 9 additions & 5 deletions webrtc/udp_transport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type
dataRecv: AsyncQueue[UdpPacketInfo]
closed: bool

const UdpTransportTrackerName* = "webrtc.udp.transport"

proc new*(T: type UdpTransport, laddr: TransportAddress): T =
## Initialize an Udp Transport
##
Expand All @@ -47,16 +49,18 @@ proc new*(T: type UdpTransport, laddr: TransportAddress): T =

self.dataRecv = newAsyncQueue[UdpPacketInfo]()
self.udp = newDatagramTransport(onReceive, local = laddr)
trackCounter(UdpTransportTrackerName)
return self

proc close*(self: UdpTransport) =
proc close*(self: UdpTransport) {.async: (raises: []).} =
## Close an Udp Transport
##
if self.closed:
debug "Trying to close an already closed UdpConn"
debug "Trying to stop an already stopped UdpTransport"
return
self.closed = true
self.udp.close()
await self.udp.closeWait()
untrackCounter(UdpTransportTrackerName)

proc write*(
self: UdpTransport,
Expand All @@ -66,7 +70,7 @@ proc write*(
## Write a message on Udp to a remote address `raddr`
##
if self.closed:
debug "Try to write on an already closed UdpConn"
debug "Try to write on an already closed UdpTransport"
return
trace "UDP write", msg
try:
Expand All @@ -79,7 +83,7 @@ proc read*(self: UdpTransport): Future[UdpPacketInfo] {.async: (raises: [Cancell
## Read the next received Udp message
##
if self.closed:
debug "Try to read on an already closed UdpConn"
debug "Try to read on an already closed UdpTransport"
return
trace "UDP read"
return await self.dataRecv.popFirst()

0 comments on commit 81b91e3

Please sign in to comment.