diff --git a/go.mod b/go.mod index c951f4a986..b60e7a84ba 100644 --- a/go.mod +++ b/go.mod @@ -44,10 +44,10 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.5 - github.com/pion/ice/v2 v2.3.6 + github.com/pion/ice/v2 v2.3.11 github.com/pion/logging v0.2.2 - github.com/pion/stun v0.6.0 - github.com/pion/webrtc/v3 v3.2.9 + github.com/pion/stun v0.6.1 + github.com/pion/webrtc/v3 v3.2.23 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.4.0 github.com/quic-go/quic-go v0.41.0 @@ -87,7 +87,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.4.0 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/goprocess v0.1.4 // indirect @@ -101,17 +101,17 @@ require ( github.com/multiformats/go-base36 v0.2.0 // indirect github.com/onsi/ginkgo/v2 v2.13.0 // indirect github.com/opencontainers/runtime-spec v1.1.0 // indirect - github.com/pion/dtls/v2 v2.2.7 // indirect - github.com/pion/interceptor v0.1.17 // indirect - github.com/pion/mdns v0.0.7 // indirect + github.com/pion/dtls/v2 v2.2.8 // indirect + github.com/pion/interceptor v0.1.25 // indirect + github.com/pion/mdns v0.0.9 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.10 // indirect - github.com/pion/rtp v1.7.13 // indirect - github.com/pion/sctp v1.8.7 // indirect + github.com/pion/rtcp v1.2.13 // indirect + github.com/pion/rtp v1.8.3 // indirect + github.com/pion/sctp v1.8.9 // indirect github.com/pion/sdp/v3 v3.0.6 // indirect - github.com/pion/srtp/v2 v2.0.15 // indirect - github.com/pion/transport/v2 v2.2.1 // indirect - github.com/pion/turn/v2 v2.1.0 // indirect + github.com/pion/srtp/v2 v2.0.18 // indirect + github.com/pion/transport/v2 v2.2.4 // indirect + github.com/pion/turn/v2 v2.1.4 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/common v0.37.0 // indirect diff --git a/go.sum b/go.sum index 3dc547dd8b..6fc99e55e4 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,9 @@ github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b h1:RMpPgZTSApbPf7xaVe github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -400,43 +401,51 @@ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhM github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew8= github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= -github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= -github.com/pion/ice/v2 v2.3.6 h1:Jgqw36cAud47iD+N6rNX225uHvrgWtAlHfVyOQc3Heg= -github.com/pion/ice/v2 v2.3.6/go.mod h1:9/TzKDRwBVAPsC+YOrKH/e3xDrubeTRACU9/sHQarsU= -github.com/pion/interceptor v0.1.17 h1:prJtgwFh/gB8zMqGZoOgJPHivOwVAp61i2aG61Du/1w= -github.com/pion/interceptor v0.1.17/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI= +github.com/pion/dtls/v2 v2.2.8 h1:BUroldfiIbV9jSnC6cKOMnyiORRWrWWpV11JUyEu5OA= +github.com/pion/dtls/v2 v2.2.8/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= +github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= +github.com/pion/interceptor v0.1.25 h1:pwY9r7P6ToQ3+IF0bajN0xmk/fNw/suTgaTdlwTDmhc= +github.com/pion/interceptor v0.1.25/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= -github.com/pion/mdns v0.0.7 h1:P0UB4Sr6xDWEox0kTVxF0LmQihtCbSAdW0H2nEgkA3U= -github.com/pion/mdns v0.0.7/go.mod h1:4iP2UbeFhLI/vWju/bw6ZfwjJzk0z8DNValjGxR/dD8= +github.com/pion/mdns v0.0.8/go.mod h1:hYE72WX8WDveIhg7fmXgMKivD3Puklk0Ymzog0lSyaI= +github.com/pion/mdns v0.0.9 h1:7Ue5KZsqq8EuqStnpPWV33vYYEH0+skdDN5L7EiEsI4= +github.com/pion/mdns v0.0.9/go.mod h1:2JA5exfxwzXiCihmxpTKgFUpiQws2MnipoPK09vecIc= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= -github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA= -github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtcp v1.2.13 h1:+EQijuisKwm/8VBs8nWllr0bIndR7Lf7cZG200mpbNo= +github.com/pion/rtcp v1.2.13/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.3 h1:VEHxqzSVQxCkKDSHro5/4IUUG1ea+MFdqR2R3xSpNU8= +github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0= -github.com/pion/sctp v1.8.7 h1:JnABvFakZueGAn4KU/4PSKg+GWbF6QWbKTWZOSGJjXw= -github.com/pion/sctp v1.8.7/go.mod h1:g1Ul+ARqZq5JEmoFy87Q/4CePtKnTJ1QCL9dBBdN6AU= +github.com/pion/sctp v1.8.8/go.mod h1:igF9nZBrjh5AtmKc7U30jXltsFHicFCXSmWA2GWRaWs= +github.com/pion/sctp v1.8.9 h1:TP5ZVxV5J7rz7uZmbyvnUvsn7EJ2x/5q9uhsTtXbI3g= +github.com/pion/sctp v1.8.9/go.mod h1:cMLT45jqw3+jiJCrtHVwfQLnfR0MGZ4rgOJwUOIqLkI= github.com/pion/sdp/v3 v3.0.6 h1:WuDLhtuFUUVpTfus9ILC4HRyHsW6TdugjEX/QY9OiUw= github.com/pion/sdp/v3 v3.0.6/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= -github.com/pion/srtp/v2 v2.0.15 h1:+tqRtXGsGwHC0G0IUIAzRmdkHvriF79IHVfZGfHrQoA= -github.com/pion/srtp/v2 v2.0.15/go.mod h1:b/pQOlDrbB0HEH5EUAQXzSYxikFbNcNuKmF8tM0hCtw= -github.com/pion/stun v0.4.0/go.mod h1:QPsh1/SbXASntw3zkkrIk3ZJVKz4saBY2G7S10P3wCw= -github.com/pion/stun v0.6.0 h1:JHT/2iyGDPrFWE8NNC15wnddBN8KifsEDw8swQmrEmU= -github.com/pion/stun v0.6.0/go.mod h1:HPqcfoeqQn9cuaet7AOmB5e5xkObu9DwBdurwLKO9oA= +github.com/pion/srtp/v2 v2.0.18 h1:vKpAXfawO9RtTRKZJbG4y0v1b11NZxQnxRl85kGuUlo= +github.com/pion/srtp/v2 v2.0.18/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA= +github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= +github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8= github.com/pion/transport v0.14.1 h1:XSM6olwW+o8J4SCmOBb/BpwZypkHeyM0PGFCxNQBr40= github.com/pion/transport v0.14.1/go.mod h1:4tGmbk00NeYA3rUa9+n+dzCCoKkcy3YlYb99Jn2fNnI= -github.com/pion/transport/v2 v2.0.0/go.mod h1:HS2MEBJTwD+1ZI2eSXSvHJx/HnzQqRy2/LXxt6eVMHc= -github.com/pion/transport/v2 v2.1.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlDuQdctTThQ= -github.com/pion/transport/v2 v2.2.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlDuQdctTThQ= -github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= -github.com/pion/turn/v2 v2.1.0 h1:5wGHSgGhJhP/RpabkUb/T9PdsAjkGLS6toYz5HNzoSI= -github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs= -github.com/pion/webrtc/v3 v3.2.9 h1:U8NSjQDlZZ+Iy/hg42Q/u6mhEVSXYvKrOIZiZwYTfLc= -github.com/pion/webrtc/v3 v3.2.9/go.mod h1:gjQLMZeyN3jXBGdxGmUYCyKjOuYX/c99BDjGqmadq0A= +github.com/pion/transport/v2 v2.2.2/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc= +github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v2 v2.2.4 h1:41JJK6DZQYSeVLxILA2+F4ZkKb4Xd/tFJZRFZQ9QAlo= +github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/turn/v2 v2.1.4 h1:2xn8rduI5W6sCZQkEnIUDAkrBQNl2eYIBCHMZ3QMmP8= +github.com/pion/turn/v2 v2.1.4/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/webrtc/v3 v3.2.23 h1:GbqEuxBbVLFhXk0GwxKAoaIJYiEa9TyoZPEZC+2HZxM= +github.com/pion/webrtc/v3 v3.2.23/go.mod h1:1CaT2fcZzZ6VZA+O1i9yK2DU4EOcXVvSbWG9pr5jefs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -536,7 +545,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -598,7 +606,10 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -684,11 +695,13 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= +golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -778,21 +791,26 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= +golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -801,10 +819,12 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index 19209a48f4..e39b72a71a 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -314,9 +314,6 @@ func TestManyStreams(t *testing.T) { const streamCount = 128 for _, tc := range transportsToTest { t.Run(tc.Name, func(t *testing.T) { - if strings.Contains(tc.Name, "WebRTC") { - t.Skip("Pion doesn't correctly handle large queues of streams.") - } h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true}) defer h1.Close() diff --git a/p2p/transport/webrtc/connection.go b/p2p/transport/webrtc/connection.go index fd31f8351a..f220360ed4 100644 --- a/p2p/transport/webrtc/connection.go +++ b/p2p/transport/webrtc/connection.go @@ -25,9 +25,7 @@ import ( var _ tpt.CapableConn = &connection{} -const maxAcceptQueueLen = 10 - -const maxDataChannelID = 1 << 10 +const maxAcceptQueueLen = 256 type errConnectionTimeout struct{} @@ -47,7 +45,8 @@ type connection struct { transport *WebRTCTransport scope network.ConnManagementScope - closeErr error + closeOnce sync.Once + closeErr error localPeer peer.ID localMultiaddr ma.Multiaddr @@ -107,15 +106,6 @@ func newConnection( pc.OnConnectionStateChange(c.onConnectionStateChange) pc.OnDataChannel(func(dc *webrtc.DataChannel) { - if c.IsClosed() { - return - } - // Limit the number of streams, since we're not able to actually properly close them. - // See https://github.com/libp2p/specs/issues/575 for details. - if *dc.ID() > maxDataChannelID { - c.Close() - return - } dc.OnOpen(func() { rwc, err := dc.Detach() if err != nil { @@ -133,7 +123,6 @@ func newConnection( } }) }) - return c, nil } @@ -144,25 +133,31 @@ func (c *connection) ConnState() network.ConnectionState { // Close closes the underlying peerconnection. func (c *connection) Close() error { - if c.IsClosed() { - return nil - } + c.closeOnce.Do(func() { c.closeWithError(errors.New("connection closed")) }) + return nil +} + +// closeWithError is used to Close the connection when the underlying DTLS connection fails +func (c *connection) closeWithError(err error) { + c.closeErr = err + // cancel must be called after closeErr is set. This ensures interested goroutines waiting on + // ctx.Done can read closeErr without holding the conn lock. + c.cancel() + // closing peerconnection will close the datachannels associated with the streams + c.pc.Close() c.m.Lock() - defer c.m.Unlock() + streams := c.streams + c.streams = nil + c.m.Unlock() + for _, s := range streams { + s.closeForShutdown(err) + } c.scope.Done() - c.closeErr = errors.New("connection closed") - c.cancel() - return c.pc.Close() } func (c *connection) IsClosed() bool { - select { - case <-c.ctx.Done(): - return true - default: - return false - } + return c.ctx.Err() != nil } func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error) { @@ -174,13 +169,6 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error if id > math.MaxUint16 { return nil, errors.New("exhausted stream ID space") } - // Limit the number of streams, since we're not able to actually properly close them. - // See https://github.com/libp2p/specs/issues/575 for details. - if id > maxDataChannelID { - c.Close() - return c.OpenStream(ctx) - } - streamID := uint16(id) dc, err := c.pc.CreateDataChannel("", &webrtc.DataChannelInit{ID: &streamID}) if err != nil { @@ -188,12 +176,13 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error } rwc, err := c.detachChannel(ctx, dc) if err != nil { - return nil, fmt.Errorf("open stream: %w", err) + dc.Close() + return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err) } str := newStream(dc, rwc, func() { c.removeStream(streamID) }) if err := c.addStream(str); err != nil { - str.Close() - return nil, err + str.Reset() + return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err) } return str, nil } @@ -205,7 +194,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) { case dc := <-c.acceptQueue: str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) }) if err := c.addStream(str); err != nil { - str.Close() + str.Reset() return nil, err } return str, nil @@ -223,6 +212,9 @@ func (c *connection) Transport() tpt.Transport { return c.transport } func (c *connection) addStream(str *stream) error { c.m.Lock() defer c.m.Unlock() + if c.streams == nil { + return c.closeErr + } if _, ok := c.streams[str.id]; ok { return errors.New("stream ID already exists") } @@ -238,20 +230,9 @@ func (c *connection) removeStream(id uint16) { func (c *connection) onConnectionStateChange(state webrtc.PeerConnectionState) { if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed { - // reset any streams - if c.IsClosed() { - return - } - c.m.Lock() - defer c.m.Unlock() - c.closeErr = errConnectionTimeout{} - for k, str := range c.streams { - str.setCloseError(c.closeErr) - delete(c.streams, k) - } - c.cancel() - c.scope.Done() - c.pc.Close() + c.closeOnce.Do(func() { + c.closeWithError(errConnectionTimeout{}) + }) } } @@ -272,8 +253,11 @@ func (c *connection) onConnectionStateChange(state webrtc.PeerConnectionState) { // This was desired because it was not feasible to introduce backpressure // with the OnMessage callbacks. The tradeoff is a change in the semantics of // the OnOpen callback, and having to force close Read locally. -func (c *connection) detachChannel(ctx context.Context, dc *webrtc.DataChannel) (rwc datachannel.ReadWriteCloser, err error) { +func (c *connection) detachChannel(ctx context.Context, dc *webrtc.DataChannel) (datachannel.ReadWriteCloser, error) { done := make(chan struct{}) + + var rwc datachannel.ReadWriteCloser + var err error // OnOpen will return immediately for detached datachannels // refer: https://github.com/pion/webrtc/blob/7ab3174640b3ce15abebc2516a2ca3939b5f105f/datachannel.go#L278-L282 dc.OnOpen(func() { @@ -287,8 +271,8 @@ func (c *connection) detachChannel(ctx context.Context, dc *webrtc.DataChannel) case <-ctx.Done(): return nil, ctx.Err() case <-done: + return rwc, err } - return } // A note on these setters and why they are needed: diff --git a/p2p/transport/webrtc/datachannel.go b/p2p/transport/webrtc/datachannel.go deleted file mode 100644 index aedca6d5b0..0000000000 --- a/p2p/transport/webrtc/datachannel.go +++ /dev/null @@ -1,28 +0,0 @@ -package libp2pwebrtc - -import ( - "context" - - "github.com/pion/datachannel" - "github.com/pion/webrtc/v3" -) - -// only use this if the datachannels are detached, since the OnOpen callback -// will be called immediately. Only use after the peerconnection is open. -// The context should close if the peerconnection underlying the datachannel -// is closed. -func getDetachedChannel(ctx context.Context, dc *webrtc.DataChannel) (rwc datachannel.ReadWriteCloser, err error) { - done := make(chan struct{}) - dc.OnOpen(func() { - defer close(done) - rwc, err = dc.Detach() - }) - // this is safe since for detached datachannels, the peerconnection runs the onOpen - // callback immediately if the SCTP transport is also connected. - select { - case <-done: - case <-ctx.Done(): - return nil, ctx.Err() - } - return -} diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index 0b29bf655d..7848bd5c6f 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -264,7 +264,7 @@ func (l *listener) setupConnection( } } - rwc, err := getDetachedChannel(ctx, rawDatachannel) + rwc, err := detachHandshakeDataChannel(ctx, rawDatachannel) if err != nil { return nil, err } diff --git a/p2p/transport/webrtc/pb/message.pb.go b/p2p/transport/webrtc/pb/message.pb.go index fffc025f7f..384bddd289 100644 --- a/p2p/transport/webrtc/pb/message.pb.go +++ b/p2p/transport/webrtc/pb/message.pb.go @@ -31,6 +31,10 @@ const ( // The sender abruptly terminates the sending part of the stream. The // receiver can discard any data that it already received on that stream. Message_RESET Message_Flag = 2 + // Sending the FIN_ACK flag acknowledges the previous receipt of a message + // with the FIN flag set. Receiving a FIN_ACK flag gives the recipient + // confidence that the remote has received all sent messages. + Message_FIN_ACK Message_Flag = 3 ) // Enum value maps for Message_Flag. @@ -39,11 +43,13 @@ var ( 0: "FIN", 1: "STOP_SENDING", 2: "RESET", + 3: "FIN_ACK", } Message_Flag_value = map[string]int32{ "FIN": 0, "STOP_SENDING": 1, "RESET": 2, + "FIN_ACK": 3, } ) @@ -143,17 +149,18 @@ var File_message_proto protoreflect.FileDescriptor var file_message_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0x74, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x66, 0x6c, - 0x61, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x2e, 0x46, 0x6c, 0x61, 0x67, 0x52, 0x04, 0x66, 0x6c, 0x61, 0x67, 0x12, 0x18, 0x0a, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x2c, 0x0a, 0x04, 0x46, 0x6c, 0x61, 0x67, 0x12, - 0x07, 0x0a, 0x03, 0x46, 0x49, 0x4e, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x54, 0x4f, 0x50, - 0x5f, 0x53, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x52, 0x45, - 0x53, 0x45, 0x54, 0x10, 0x02, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x62, 0x70, 0x32, 0x70, 0x2f, 0x67, 0x6f, 0x2d, 0x6c, 0x69, - 0x62, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x32, 0x70, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, - 0x72, 0x74, 0x2f, 0x77, 0x65, 0x62, 0x72, 0x74, 0x63, 0x2f, 0x70, 0x62, + 0x81, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x66, + 0x6c, 0x61, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x2e, 0x46, 0x6c, 0x61, 0x67, 0x52, 0x04, 0x66, 0x6c, 0x61, 0x67, 0x12, 0x18, + 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x39, 0x0a, 0x04, 0x46, 0x6c, 0x61, 0x67, + 0x12, 0x07, 0x0a, 0x03, 0x46, 0x49, 0x4e, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x54, 0x4f, + 0x50, 0x5f, 0x53, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x52, + 0x45, 0x53, 0x45, 0x54, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x49, 0x4e, 0x5f, 0x41, 0x43, + 0x4b, 0x10, 0x03, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6c, 0x69, 0x62, 0x70, 0x32, 0x70, 0x2f, 0x67, 0x6f, 0x2d, 0x6c, 0x69, 0x62, 0x70, + 0x32, 0x70, 0x2f, 0x70, 0x32, 0x70, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, + 0x2f, 0x77, 0x65, 0x62, 0x72, 0x74, 0x63, 0x2f, 0x70, 0x62, } var ( diff --git a/p2p/transport/webrtc/pb/message.proto b/p2p/transport/webrtc/pb/message.proto index d6b1957beb..aab885b0da 100644 --- a/p2p/transport/webrtc/pb/message.proto +++ b/p2p/transport/webrtc/pb/message.proto @@ -12,6 +12,10 @@ message Message { // The sender abruptly terminates the sending part of the stream. The // receiver can discard any data that it already received on that stream. RESET = 2; + // Sending the FIN_ACK flag acknowledges the previous receipt of a message + // with the FIN flag set. Receiving a FIN_ACK flag gives the recipient + // confidence that the remote has received all sent messages. + FIN_ACK = 3; } optional Flag flag=1; diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 0358dce56c..45641321dc 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -1,6 +1,8 @@ package libp2pwebrtc import ( + "errors" + "os" "sync" "time" @@ -19,6 +21,12 @@ const ( // We can change this value in the SettingEngine before creating the peerconnection. // https://github.com/pion/webrtc/blob/v3.1.49/sctptransport.go#L341 maxBufferedAmount = 2 * maxMessageSize + // maxTotalControlMessagesSize is the maximum total size of all control messages we will + // write on this stream. + // 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be + // exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection + // send queue. + maxTotalControlMessagesSize = 50 // bufferedAmountLowThreshold and maxBufferedAmount are bound // to a stream but congestion control is done on the whole // SCTP association. This means that a single stream can monopolize @@ -37,6 +45,9 @@ const ( // is less than or equal to 2 ^ 14, the varint will not be more than // 2 bytes in length. varintOverhead = 2 + // maxFINACKWait is the maximum amount of time a stream will wait to read + // FIN_ACK before closing the data channel + maxFINACKWait = 10 * time.Second ) type receiveState uint8 @@ -52,6 +63,7 @@ type sendState uint8 const ( sendStateSending sendState = iota sendStateDataSent + sendStateDataReceived sendStateReset ) @@ -59,32 +71,36 @@ const ( // and then a network.MuxedStream type stream struct { mx sync.Mutex - // pbio.Reader is not thread safe, - // and while our Read is not promised to be thread safe, - // we ourselves internally read from multiple routines... - reader pbio.Reader + + // readerMx ensures that only a single goroutine reads from the reader. Read is not threadsafe + // But we may need to read from reader for control messages from a different goroutine. + readerMx sync.Mutex + reader pbio.Reader + // this buffer is limited up to a single message. Reason we need it // is because a reader might read a message midway, and so we need a // wait to buffer that for as long as the remaining part is not (yet) read nextMessage *pb.Message receiveState receiveState - // The public Write API is not promised to be thread safe, - // but we need to be able to write control messages. - writer pbio.Writer - sendStateChanged chan struct{} - sendState sendState - controlMsgQueue []*pb.Message - writeDeadline time.Time - writeDeadlineUpdated chan struct{} - writeAvailable chan struct{} - - readLoopOnce sync.Once - - onDone func() - id uint16 // for logging purposes - dataChannel *datachannel.DataChannel - closeErr error + writer pbio.Writer // concurrent writes prevented by mx + writeStateChanged chan struct{} + sendState sendState + writeDeadline time.Time + + controlMessageReaderOnce sync.Once + // controlMessageReaderEndTime is the end time for reading FIN_ACK from the control + // message reader. We cannot rely on SetReadDeadline to do this since that is prone to + // race condition where a previous deadline timer fires after the latest call to + // SetReadDeadline + // See: https://github.com/pion/sctp/pull/290 + controlMessageReaderEndTime time.Time + controlMessageReaderDone sync.WaitGroup + + onDone func() + id uint16 // for logging purposes + dataChannel *datachannel.DataChannel + closeForShutdownErr error } var _ network.MuxedStream = &stream{} @@ -95,76 +111,74 @@ func newStream( onDone func(), ) *stream { s := &stream{ - reader: pbio.NewDelimitedReader(rwc, maxMessageSize), - writer: pbio.NewDelimitedWriter(rwc), - - sendStateChanged: make(chan struct{}, 1), - writeDeadlineUpdated: make(chan struct{}, 1), - writeAvailable: make(chan struct{}, 1), - - id: *channel.ID(), - dataChannel: rwc.(*datachannel.DataChannel), - onDone: onDone, + reader: pbio.NewDelimitedReader(rwc, maxMessageSize), + writer: pbio.NewDelimitedWriter(rwc), + writeStateChanged: make(chan struct{}, 1), + id: *channel.ID(), + dataChannel: rwc.(*datachannel.DataChannel), + onDone: onDone, } + // released when the controlMessageReader goroutine exits + s.controlMessageReaderDone.Add(1) + s.dataChannel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) + s.dataChannel.OnBufferedAmountLow(func() { + s.notifyWriteStateChanged() - channel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) - channel.OnBufferedAmountLow(func() { - s.mx.Lock() - defer s.mx.Unlock() - // first send out queued control messages - for len(s.controlMsgQueue) > 0 { - msg := s.controlMsgQueue[0] - available := s.availableSendSpace() - if controlMsgSize < available { - s.writer.WriteMsg(msg) // TODO: handle error - s.controlMsgQueue = s.controlMsgQueue[1:] - } else { - return - } - } - - if s.isDone() { - // onDone removes the stream from the connection and requires the connection lock. - // This callback(onBufferedAmountLow) is executing in the sctp readLoop goroutine. - // If Connection.Close is called concurrently, the closing goroutine will acquire - // the connection lock and wait for sctp readLoop to exit, the sctp readLoop will - // wait for the connection lock before exiting, causing a deadlock. - // Run this in a different goroutine to avoid the deadlock. - go func() { - s.mx.Lock() - defer s.mx.Unlock() - // TODO: we should be closing the underlying datachannel, but this resets the stream - // See https://github.com/libp2p/specs/issues/575 for details. - // _ = s.dataChannel.Close() - // TODO: write for the spawned reader to return - s.onDone() - }() - } - - select { - case s.writeAvailable <- struct{}{}: - default: - } }) return s } func (s *stream) Close() error { + s.mx.Lock() + isClosed := s.closeForShutdownErr != nil + s.mx.Unlock() + if isClosed { + return nil + } + closeWriteErr := s.CloseWrite() closeReadErr := s.CloseRead() - if closeWriteErr != nil { - return closeWriteErr + if closeWriteErr != nil || closeReadErr != nil { + s.Reset() + return errors.Join(closeWriteErr, closeReadErr) } - return closeReadErr + + s.mx.Lock() + if s.controlMessageReaderEndTime.IsZero() { + s.controlMessageReaderEndTime = time.Now().Add(maxFINACKWait) + s.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour)) + go func() { + s.controlMessageReaderDone.Wait() + s.cleanup() + }() + } + s.mx.Unlock() + return nil } func (s *stream) Reset() error { + s.mx.Lock() + isClosed := s.closeForShutdownErr != nil + s.mx.Unlock() + if isClosed { + return nil + } + + defer s.cleanup() cancelWriteErr := s.cancelWrite() closeReadErr := s.CloseRead() - if cancelWriteErr != nil { - return cancelWriteErr - } - return closeReadErr + s.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour)) + return errors.Join(closeReadErr, cancelWriteErr) +} + +func (s *stream) closeForShutdown(closeErr error) { + defer s.cleanup() + + s.mx.Lock() + defer s.mx.Unlock() + + s.closeForShutdownErr = closeErr + s.notifyWriteStateChanged() } func (s *stream) SetDeadline(t time.Time) error { @@ -173,8 +187,6 @@ func (s *stream) SetDeadline(t time.Time) error { } // processIncomingFlag process the flag on an incoming message -// It needs to be called with msg.Flag, not msg.GetFlag(), -// otherwise we'd misinterpret the default value. // It needs to be called while the mutex is locked. func (s *stream) processIncomingFlag(flag *pb.Message_Flag) { if flag == nil { @@ -182,50 +194,101 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) { } switch *flag { + case pb.Message_STOP_SENDING: + // We must process STOP_SENDING after sending a FIN(sendStateDataSent). Remote peer + // may not send a FIN_ACK once it has sent a STOP_SENDING + if s.sendState == sendStateSending || s.sendState == sendStateDataSent { + s.sendState = sendStateReset + } + s.notifyWriteStateChanged() + case pb.Message_FIN_ACK: + s.sendState = sendStateDataReceived + s.notifyWriteStateChanged() case pb.Message_FIN: if s.receiveState == receiveStateReceiving { s.receiveState = receiveStateDataRead } - case pb.Message_STOP_SENDING: - if s.sendState == sendStateSending { - s.sendState = sendStateReset - } - select { - case s.sendStateChanged <- struct{}{}: - default: + if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_FIN_ACK.Enum()}); err != nil { + log.Debugf("failed to send FIN_ACK: %s", err) + // Remote has finished writing all the data It'll stop waiting for the + // FIN_ACK eventually or will be notified when we close the datachannel } + s.spawnControlMessageReader() case pb.Message_RESET: if s.receiveState == receiveStateReceiving { s.receiveState = receiveStateReset } + s.spawnControlMessageReader() } - s.maybeDeclareStreamDone() } -// maybeDeclareStreamDone is used to force reset a stream. It should be called with -// the stream lock acquired. It calls stream.onDone which requires the connection lock. -func (s *stream) maybeDeclareStreamDone() { - if s.isDone() { - _ = s.SetReadDeadline(time.Now().Add(-1 * time.Hour)) // pion ignores zero times - // TODO: we should be closing the underlying datachannel, but this resets the stream - // See https://github.com/libp2p/specs/issues/575 for details. - // _ = s.dataChannel.Close() - // TODO: write for the spawned reader to return - s.onDone() - } -} +// spawnControlMessageReader is used for processing control messages after the reader is closed. +func (s *stream) spawnControlMessageReader() { + s.controlMessageReaderOnce.Do(func() { + // Spawn a goroutine to ensure that we're not holding any locks + go func() { + defer s.controlMessageReaderDone.Done() + // cleanup the sctp deadline timer goroutine + defer s.setDataChannelReadDeadline(time.Time{}) -// isDone indicates whether the stream is completed and all the control messages have also been -// flushed. It must be called with the stream lock acquired. -func (s *stream) isDone() bool { - return (s.sendState == sendStateReset || s.sendState == sendStateDataSent) && - (s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) && - len(s.controlMsgQueue) == 0 -} + setDeadline := func() bool { + if s.controlMessageReaderEndTime.IsZero() || time.Now().Before(s.controlMessageReaderEndTime) { + s.setDataChannelReadDeadline(s.controlMessageReaderEndTime) + return true + } + return false + } -func (s *stream) setCloseError(e error) { - s.mx.Lock() - defer s.mx.Unlock() + // Unblock any Read call waiting on reader.ReadMsg + s.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour)) + + s.readerMx.Lock() + // We have the lock any readers blocked on reader.ReadMsg have exited. + // From this point onwards only this goroutine will do reader.ReadMsg. + + //lint:ignore SA2001 we just want to ensure any exising readers have exited. + // Read calls from this point onwards will exit immediately on checking + // s.readState + s.readerMx.Unlock() - s.closeErr = e + s.mx.Lock() + defer s.mx.Unlock() + + if s.nextMessage != nil { + s.processIncomingFlag(s.nextMessage.Flag) + s.nextMessage = nil + } + for s.closeForShutdownErr == nil && + s.sendState != sendStateDataReceived && s.sendState != sendStateReset { + var msg pb.Message + if !setDeadline() { + return + } + s.mx.Unlock() + err := s.reader.ReadMsg(&msg) + s.mx.Lock() + if err != nil { + // We have to manually manage deadline exceeded errors since pion/sctp can + // return deadline exceeded error for cancelled deadlines + // see: https://github.com/pion/sctp/pull/290/files + if errors.Is(err, os.ErrDeadlineExceeded) { + continue + } + return + } + s.processIncomingFlag(msg.Flag) + } + }() + }) +} + +func (s *stream) cleanup() { + // Even if we close the datachannel pion keeps a reference to the datachannel around. + // Remove the onBufferedAmountLow callback to ensure that we at least garbage collect + // memory we allocated for this stream. + s.dataChannel.OnBufferedAmountLow(nil) + s.dataChannel.Close() + if s.onDone != nil { + s.onDone() + } } diff --git a/p2p/transport/webrtc/stream_read.go b/p2p/transport/webrtc/stream_read.go index e064c8558b..80d99ea91c 100644 --- a/p2p/transport/webrtc/stream_read.go +++ b/p2p/transport/webrtc/stream_read.go @@ -1,7 +1,6 @@ package libp2pwebrtc import ( - "errors" "io" "time" @@ -10,15 +9,14 @@ import ( ) func (s *stream) Read(b []byte) (int, error) { - if len(b) == 0 { - return 0, nil - } + s.readerMx.Lock() + defer s.readerMx.Unlock() s.mx.Lock() defer s.mx.Unlock() - if s.closeErr != nil { - return 0, s.closeErr + if s.closeForShutdownErr != nil { + return 0, s.closeForShutdownErr } switch s.receiveState { case receiveStateDataRead: @@ -27,29 +25,43 @@ func (s *stream) Read(b []byte) (int, error) { return 0, network.ErrReset } + if len(b) == 0 { + return 0, nil + } + var read int for { if s.nextMessage == nil { // load the next message s.mx.Unlock() var msg pb.Message - if err := s.reader.ReadMsg(&msg); err != nil { - s.mx.Lock() + err := s.reader.ReadMsg(&msg) + s.mx.Lock() + if err != nil { + // connection was closed + if s.closeForShutdownErr != nil { + return 0, s.closeForShutdownErr + } if err == io.EOF { // if the channel was properly closed, return EOF if s.receiveState == receiveStateDataRead { return 0, io.EOF } - // This case occurs when the remote node closes the stream without writing a FIN message - // There's little we can do here - return 0, errors.New("didn't receive final state for stream") + // This case occurs when remote closes the datachannel without writing a FIN + // message. Some implementations discard the buffered data on closing the + // datachannel. For these implementations a stream reset will be observed as an + // abrupt closing of the datachannel. + s.receiveState = receiveStateReset + return 0, network.ErrReset } if s.receiveState == receiveStateReset { return 0, network.ErrReset } + if s.receiveState == receiveStateDataRead { + return 0, io.EOF + } return 0, err } - s.mx.Lock() s.nextMessage = &msg } @@ -63,38 +75,39 @@ func (s *stream) Read(b []byte) (int, error) { // process flags on the message after reading all the data s.processIncomingFlag(s.nextMessage.Flag) s.nextMessage = nil - if s.closeErr != nil { - return read, s.closeErr + if s.closeForShutdownErr != nil { + return read, s.closeForShutdownErr } switch s.receiveState { case receiveStateDataRead: return read, io.EOF case receiveStateReset: - s.dataChannel.SetReadDeadline(time.Time{}) return read, network.ErrReset } } } -func (s *stream) SetReadDeadline(t time.Time) error { return s.dataChannel.SetReadDeadline(t) } +func (s *stream) SetReadDeadline(t time.Time) error { + s.mx.Lock() + defer s.mx.Unlock() + if s.receiveState == receiveStateReceiving { + s.setDataChannelReadDeadline(t) + } + return nil +} + +func (s *stream) setDataChannelReadDeadline(t time.Time) error { + return s.dataChannel.SetReadDeadline(t) +} func (s *stream) CloseRead() error { s.mx.Lock() defer s.mx.Unlock() - - if s.nextMessage != nil { - s.processIncomingFlag(s.nextMessage.Flag) - s.nextMessage = nil - } var err error - if s.receiveState == receiveStateReceiving && s.closeErr == nil { - err = s.sendControlMessage(&pb.Message{Flag: pb.Message_STOP_SENDING.Enum()}) + if s.receiveState == receiveStateReceiving && s.closeForShutdownErr == nil { + err = s.writer.WriteMsg(&pb.Message{Flag: pb.Message_STOP_SENDING.Enum()}) + s.receiveState = receiveStateReset } - s.receiveState = receiveStateReset - s.maybeDeclareStreamDone() - - // make any calls to Read blocking on ReadMsg return immediately - s.dataChannel.SetReadDeadline(time.Now()) - + s.spawnControlMessageReader() return err } diff --git a/p2p/transport/webrtc/stream_test.go b/p2p/transport/webrtc/stream_test.go index f1442b9bfd..8f1ec165cf 100644 --- a/p2p/transport/webrtc/stream_test.go +++ b/p2p/transport/webrtc/stream_test.go @@ -5,15 +5,18 @@ import ( "errors" "io" "os" + "sync/atomic" "testing" "time" "github.com/libp2p/go-libp2p/p2p/transport/webrtc/pb" + "github.com/libp2p/go-msgio/pbio" "github.com/libp2p/go-libp2p/core/network" "github.com/pion/datachannel" "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -24,19 +27,20 @@ type detachedChan struct { func getDetachedDataChannels(t *testing.T) (detachedChan, detachedChan) { s := webrtc.SettingEngine{} + s.SetIncludeLoopbackCandidate(true) s.DetachDataChannels() api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) offerPC, err := api.NewPeerConnection(webrtc.Configuration{}) require.NoError(t, err) t.Cleanup(func() { offerPC.Close() }) - offerRWCChan := make(chan datachannel.ReadWriteCloser, 1) + offerRWCChan := make(chan detachedChan, 1) offerDC, err := offerPC.CreateDataChannel("data", nil) require.NoError(t, err) offerDC.OnOpen(func() { rwc, err := offerDC.Detach() require.NoError(t, err) - offerRWCChan <- rwc + offerRWCChan <- detachedChan{rwc: rwc, dc: offerDC} }) answerPC, err := api.NewPeerConnection(webrtc.Configuration{}) @@ -91,15 +95,15 @@ func getDetachedDataChannels(t *testing.T) (detachedChan, detachedChan) { require.NoError(t, offerPC.SetRemoteDescription(answer)) require.NoError(t, answerPC.SetLocalDescription(answer)) - return <-answerChan, detachedChan{rwc: <-offerRWCChan, dc: offerDC} + return <-answerChan, <-offerRWCChan } func TestStreamSimpleReadWriteClose(t *testing.T) { client, server := getDetachedDataChannels(t) - var clientDone, serverDone bool - clientStr := newStream(client.dc, client.rwc, func() { clientDone = true }) - serverStr := newStream(server.dc, server.rwc, func() { serverDone = true }) + var clientDone, serverDone atomic.Bool + clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) }) + serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) }) // send a foobar from the client n, err := clientStr.Write([]byte("foobar")) @@ -109,7 +113,7 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { // writing after closing should error _, err = clientStr.Write([]byte("foobar")) require.Error(t, err) - require.False(t, clientDone) + require.False(t, clientDone.Load()) // now read all the data on the server side b, err := io.ReadAll(serverStr) @@ -119,19 +123,26 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { n, err = serverStr.Read(make([]byte, 10)) require.Zero(t, n) require.ErrorIs(t, err, io.EOF) - require.False(t, serverDone) + require.False(t, serverDone.Load()) // send something back _, err = serverStr.Write([]byte("lorem ipsum")) require.NoError(t, err) require.NoError(t, serverStr.CloseWrite()) - require.True(t, serverDone) + // and read it at the client - require.False(t, clientDone) + require.False(t, clientDone.Load()) b, err = io.ReadAll(clientStr) require.NoError(t, err) require.Equal(t, []byte("lorem ipsum"), b) - require.True(t, clientDone) + + // stream is only cleaned up on calling Close or Reset + clientStr.Close() + serverStr.Close() + require.Eventually(t, func() bool { return clientDone.Load() }, 5*time.Second, 100*time.Millisecond) + // Need to call Close for cleanup. Otherwise the FIN_ACK is never read + require.NoError(t, serverStr.Close()) + require.Eventually(t, func() bool { return serverDone.Load() }, 5*time.Second, 100*time.Millisecond) } func TestStreamPartialReads(t *testing.T) { @@ -201,7 +212,7 @@ func TestStreamReadReturnsOnClose(t *testing.T) { _, err := clientStr.Read([]byte{0}) errChan <- err }() - time.Sleep(50 * time.Millisecond) // give the Read call some time to hit the loop + time.Sleep(100 * time.Millisecond) // give the Read call some time to hit the loop require.NoError(t, clientStr.Close()) select { case err := <-errChan: @@ -209,14 +220,17 @@ func TestStreamReadReturnsOnClose(t *testing.T) { case <-time.After(500 * time.Millisecond): t.Fatal("timeout") } + + _, err := clientStr.Read([]byte{0}) + require.ErrorIs(t, err, network.ErrReset) } func TestStreamResets(t *testing.T) { client, server := getDetachedDataChannels(t) - var clientDone, serverDone bool - clientStr := newStream(client.dc, client.rwc, func() { clientDone = true }) - serverStr := newStream(server.dc, server.rwc, func() { serverDone = true }) + var clientDone, serverDone atomic.Bool + clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) }) + serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) }) // send a foobar from the client _, err := clientStr.Write([]byte("foobar")) @@ -224,7 +238,7 @@ func TestStreamResets(t *testing.T) { _, err = serverStr.Write([]byte("lorem ipsum")) require.NoError(t, err) require.NoError(t, clientStr.Reset()) // resetting resets both directions - require.True(t, clientDone) + require.True(t, clientDone.Load()) // attempting to write more data should result in a reset error _, err = clientStr.Write([]byte("foobar")) require.ErrorIs(t, err, network.ErrReset) @@ -234,7 +248,7 @@ func TestStreamResets(t *testing.T) { require.ErrorIs(t, err, network.ErrReset) // read the data on the server side - require.False(t, serverDone) + require.False(t, serverDone.Load()) b, err = io.ReadAll(serverStr) require.Equal(t, []byte("foobar"), b) require.ErrorIs(t, err, network.ErrReset) @@ -242,7 +256,10 @@ func TestStreamResets(t *testing.T) { _, err := serverStr.Write([]byte("foobar")) return errors.Is(err, network.ErrReset) }, time.Second, 50*time.Millisecond) - require.True(t, serverDone) + serverStr.Close() + require.Eventually(t, func() bool { + return serverDone.Load() + }, time.Second, 50*time.Millisecond) } func TestStreamReadDeadlineAsync(t *testing.T) { @@ -305,3 +322,207 @@ func TestStreamWriteDeadlineAsync(t *testing.T) { require.GreaterOrEqual(t, took, timeout) require.LessOrEqual(t, took, timeout*3/2) } + +func TestStreamReadAfterClose(t *testing.T) { + client, server := getDetachedDataChannels(t) + + clientStr := newStream(client.dc, client.rwc, func() {}) + serverStr := newStream(server.dc, server.rwc, func() {}) + + serverStr.Close() + b := make([]byte, 1) + _, err := clientStr.Read(b) + require.Equal(t, io.EOF, err) + _, err = clientStr.Read(nil) + require.Equal(t, io.EOF, err) + + client, server = getDetachedDataChannels(t) + + clientStr = newStream(client.dc, client.rwc, func() {}) + serverStr = newStream(server.dc, server.rwc, func() {}) + + serverStr.Reset() + b = make([]byte, 1) + _, err = clientStr.Read(b) + require.ErrorIs(t, err, network.ErrReset) + _, err = clientStr.Read(nil) + require.ErrorIs(t, err, network.ErrReset) +} + +func TestStreamCloseAfterFINACK(t *testing.T) { + client, server := getDetachedDataChannels(t) + + done := make(chan bool, 1) + clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, func() {}) + + go func() { + done <- true + err := clientStr.Close() + assert.NoError(t, err) + }() + <-done + + select { + case <-done: + t.Fatalf("Close should not have completed without processing FIN_ACK") + case <-time.After(2 * time.Second): + } + + b := make([]byte, 1) + _, err := serverStr.Read(b) + require.Error(t, err) + require.ErrorIs(t, err, io.EOF) + select { + case <-done: + case <-time.After(3 * time.Second): + t.Errorf("Close should have completed") + } +} + +// TestStreamFinAckAfterStopSending tests that FIN_ACK is sent even after the write half +// of the stream is closed. +func TestStreamFinAckAfterStopSending(t *testing.T) { + client, server := getDetachedDataChannels(t) + + done := make(chan bool, 1) + clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, func() {}) + + go func() { + clientStr.CloseRead() + clientStr.Write([]byte("hello world")) + done <- true + err := clientStr.Close() + assert.NoError(t, err) + }() + <-done + + select { + case <-done: + t.Errorf("Close should not have completed without processing FIN_ACK") + case <-time.After(500 * time.Millisecond): + } + + // serverStr has write half closed and read half open + // serverStr should still send FIN_ACK + b := make([]byte, 24) + _, err := serverStr.Read(b) + require.NoError(t, err) + serverStr.Close() // Sends stop_sending, fin + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("Close should have completed") + } +} + +func TestStreamConcurrentClose(t *testing.T) { + client, server := getDetachedDataChannels(t) + + start := make(chan bool, 2) + done := make(chan bool, 2) + clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, func() { done <- true }) + + go func() { + start <- true + clientStr.Close() + }() + go func() { + start <- true + serverStr.Close() + }() + <-start + <-start + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatalf("concurrent close should succeed quickly") + } + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatalf("concurrent close should succeed quickly") + } +} + +func TestStreamResetAfterClose(t *testing.T) { + client, _ := getDetachedDataChannels(t) + + done := make(chan bool, 2) + clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + clientStr.Close() + + select { + case <-done: + t.Fatalf("Close shouldn't run cleanup immediately") + case <-time.After(500 * time.Millisecond): + } + + clientStr.Reset() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatalf("Reset should run callback immediately") + } +} + +func TestStreamDataChannelCloseOnFINACK(t *testing.T) { + client, server := getDetachedDataChannels(t) + + done := make(chan bool, 1) + clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + + clientStr.Close() + + select { + case <-done: + t.Fatalf("Close shouldn't run cleanup immediately") + case <-time.After(500 * time.Millisecond): + } + + serverWriter := pbio.NewDelimitedWriter(server.rwc) + err := serverWriter.WriteMsg(&pb.Message{Flag: pb.Message_FIN_ACK.Enum()}) + require.NoError(t, err) + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatalf("Callback should be run on reading FIN_ACK") + } + b := make([]byte, 100) + N := 0 + for { + n, err := server.rwc.Read(b) + N += n + if err != nil { + require.ErrorIs(t, err, io.EOF) + break + } + } + require.Less(t, N, 10) +} + +func TestStreamChunking(t *testing.T) { + client, server := getDetachedDataChannels(t) + + clientStr := newStream(client.dc, client.rwc, func() {}) + serverStr := newStream(server.dc, server.rwc, func() {}) + + const N = (16 << 10) + 1000 + go func() { + data := make([]byte, N) + _, err := clientStr.Write(data) + require.NoError(t, err) + }() + + data := make([]byte, N) + n, err := serverStr.Read(data) + require.NoError(t, err) + require.LessOrEqual(t, n, 16<<10) + + nn, err := serverStr.Read(data) + require.NoError(t, err) + require.Equal(t, nn+n, N) +} diff --git a/p2p/transport/webrtc/stream_write.go b/p2p/transport/webrtc/stream_write.go index 698af9c4d6..82d4ac287d 100644 --- a/p2p/transport/webrtc/stream_write.go +++ b/p2p/transport/webrtc/stream_write.go @@ -19,22 +19,16 @@ func (s *stream) Write(b []byte) (int, error) { s.mx.Lock() defer s.mx.Unlock() - if s.closeErr != nil { - return 0, s.closeErr + if s.closeForShutdownErr != nil { + return 0, s.closeForShutdownErr } switch s.sendState { case sendStateReset: return 0, network.ErrReset - case sendStateDataSent: + case sendStateDataSent, sendStateDataReceived: return 0, errWriteAfterClose } - // Check if there is any message on the wire. This is used for control - // messages only when the read side of the stream is closed - if s.receiveState != receiveStateReceiving { - s.readLoopOnce.Do(s.spawnControlMessageReader) - } - if !s.writeDeadline.IsZero() && time.Now().After(s.writeDeadline) { return 0, os.ErrDeadlineExceeded } @@ -47,14 +41,15 @@ func (s *stream) Write(b []byte) (int, error) { }() var n int + var msg pb.Message for len(b) > 0 { - if s.closeErr != nil { - return n, s.closeErr + if s.closeForShutdownErr != nil { + return n, s.closeForShutdownErr } switch s.sendState { case sendStateReset: return n, network.ErrReset - case sendStateDataSent: + case sendStateDataSent, sendStateDataReceived: return n, errWriteAfterClose } @@ -81,12 +76,10 @@ func (s *stream) Write(b []byte) (int, error) { if availableSpace < minMessageSize { s.mx.Unlock() select { - case <-s.writeAvailable: case <-writeDeadlineChan: s.mx.Lock() return n, os.ErrDeadlineExceeded - case <-s.sendStateChanged: - case <-s.writeDeadlineUpdated: + case <-s.writeStateChanged: } s.mx.Lock() continue @@ -99,8 +92,8 @@ func (s *stream) Write(b []byte) (int, error) { if end > len(b) { end = len(b) } - msg := &pb.Message{Message: b[:end]} - if err := s.writer.WriteMsg(msg); err != nil { + msg = pb.Message{Message: b[:end]} + if err := s.writer.WriteMsg(&msg); err != nil { return n, err } n += end @@ -109,79 +102,37 @@ func (s *stream) Write(b []byte) (int, error) { return n, nil } -// used for reading control messages while writing, in case the reader is closed, -// as to ensure we do still get control messages. This is important as according to the spec -// our data and control channels are intermixed on the same conn. -func (s *stream) spawnControlMessageReader() { - if s.nextMessage != nil { - s.processIncomingFlag(s.nextMessage.Flag) - s.nextMessage = nil - } - - go func() { - // no deadline needed, Read will return once there's a new message, or an error occurred - _ = s.dataChannel.SetReadDeadline(time.Time{}) - for { - var msg pb.Message - if err := s.reader.ReadMsg(&msg); err != nil { - return - } - s.mx.Lock() - s.processIncomingFlag(msg.Flag) - s.mx.Unlock() - } - }() -} - func (s *stream) SetWriteDeadline(t time.Time) error { s.mx.Lock() defer s.mx.Unlock() s.writeDeadline = t - select { - case s.writeDeadlineUpdated <- struct{}{}: - default: - } + s.notifyWriteStateChanged() return nil } func (s *stream) availableSendSpace() int { buffered := int(s.dataChannel.BufferedAmount()) availableSpace := maxBufferedAmount - buffered - if availableSpace < 0 { // this should never happen, but better check + if availableSpace+maxTotalControlMessagesSize < 0 { // this should never happen, but better check log.Errorw("data channel buffered more data than the maximum amount", "max", maxBufferedAmount, "buffered", buffered) } return availableSpace } -// There's no way to determine the size of a Protobuf message in the pbio package. -// Setting the size to 100 works as long as the control messages (incl. the varint prefix) are smaller than that value. -const controlMsgSize = 100 - -func (s *stream) sendControlMessage(msg *pb.Message) error { - available := s.availableSendSpace() - if controlMsgSize < available { - return s.writer.WriteMsg(msg) - } - s.controlMsgQueue = append(s.controlMsgQueue, msg) - return nil -} - func (s *stream) cancelWrite() error { s.mx.Lock() defer s.mx.Unlock() - if s.sendState != sendStateSending { + // There's no need to reset the write half if the write half has been closed + // successfully or has been reset previously + if s.sendState == sendStateDataReceived || s.sendState == sendStateReset { return nil } s.sendState = sendStateReset - select { - case s.sendStateChanged <- struct{}{}: - default: - } - if err := s.sendControlMessage(&pb.Message{Flag: pb.Message_RESET.Enum()}); err != nil { + s.notifyWriteStateChanged() + if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_RESET.Enum()}); err != nil { return err } - s.maybeDeclareStreamDone() return nil } @@ -193,9 +144,16 @@ func (s *stream) CloseWrite() error { return nil } s.sendState = sendStateDataSent - if err := s.sendControlMessage(&pb.Message{Flag: pb.Message_FIN.Enum()}); err != nil { + s.notifyWriteStateChanged() + if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_FIN.Enum()}); err != nil { return err } - s.maybeDeclareStreamDone() return nil } + +func (s *stream) notifyWriteStateChanged() { + select { + case s.writeStateChanged <- struct{}{}: + default: + } +} diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index c13ad19b1e..6102669dd9 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -42,6 +42,7 @@ import ( manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multihash" + "github.com/pion/datachannel" pionlogger "github.com/pion/logging" "github.com/pion/webrtc/v3" ) @@ -315,9 +316,7 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement // This is disallowed in the ICE specification. However, implementations // do not strictly follow this, for eg. Chrome gathers TCP loopback candidates. // If you run pion on a system with only the loopback interface UP, - // it will not connect to anything. However, if it has any other interface - // (even a private one, eg. 192.168.0.0/16), it will gather candidates on it and - // will be able to connect to other pion instances on the same interface. + // it will not connect to anything. settingEngine.SetIncludeLoopbackCandidate(true) api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) @@ -371,7 +370,7 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement return nil, errors.New("peerconnection opening timed out") } - detached, err := getDetachedChannel(ctx, rawHandshakeChannel) + detached, err := detachHandshakeDataChannel(ctx, rawHandshakeChannel) if err != nil { return nil, err } @@ -414,10 +413,9 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement remotePubKey, err := t.noiseHandshake(ctx, pc, channel, p, remoteHashFunction, false) if err != nil { - return conn, err + return nil, err } if t.gater != nil && !t.gater.InterceptSecured(network.DirOutbound, p, conn) { - conn.Close() return nil, fmt.Errorf("secured connection gated") } conn.setRemotePublicKey(remotePubKey) @@ -497,7 +495,7 @@ func (t *WebRTCTransport) generateNoisePrologue(pc *webrtc.PeerConnection, hash return result, nil } -func (t *WebRTCTransport) noiseHandshake(ctx context.Context, pc *webrtc.PeerConnection, datachannel *stream, peer peer.ID, hash crypto.Hash, inbound bool) (ic.PubKey, error) { +func (t *WebRTCTransport) noiseHandshake(ctx context.Context, pc *webrtc.PeerConnection, s *stream, peer peer.ID, hash crypto.Hash, inbound bool) (ic.PubKey, error) { prologue, err := t.generateNoisePrologue(pc, hash, inbound) if err != nil { return nil, fmt.Errorf("generate prologue: %w", err) @@ -513,12 +511,12 @@ func (t *WebRTCTransport) noiseHandshake(ctx context.Context, pc *webrtc.PeerCon } var secureConn sec.SecureConn if inbound { - secureConn, err = sessionTransport.SecureOutbound(ctx, fakeStreamConn{datachannel}, peer) + secureConn, err = sessionTransport.SecureOutbound(ctx, netConnWrapper{s}, peer) if err != nil { return nil, fmt.Errorf("failed to secure inbound connection: %w", err) } } else { - secureConn, err = sessionTransport.SecureInbound(ctx, fakeStreamConn{datachannel}, peer) + secureConn, err = sessionTransport.SecureInbound(ctx, netConnWrapper{s}, peer) if err != nil { return nil, fmt.Errorf("failed to secure outbound connection: %w", err) } @@ -526,7 +524,34 @@ func (t *WebRTCTransport) noiseHandshake(ctx context.Context, pc *webrtc.PeerCon return secureConn.RemotePublicKey(), nil } -type fakeStreamConn struct{ *stream } +type netConnWrapper struct { + *stream +} -func (fakeStreamConn) LocalAddr() net.Addr { return nil } -func (fakeStreamConn) RemoteAddr() net.Addr { return nil } +func (netConnWrapper) LocalAddr() net.Addr { return nil } +func (netConnWrapper) RemoteAddr() net.Addr { return nil } +func (w netConnWrapper) Close() error { + // Close called while running the security handshake is an error and we should Reset the + // stream in that case rather than gracefully closing + w.stream.Reset() + return nil +} + +// detachHandshakeDataChannel detaches the handshake data channel +func detachHandshakeDataChannel(ctx context.Context, dc *webrtc.DataChannel) (datachannel.ReadWriteCloser, error) { + done := make(chan struct{}) + var rwc datachannel.ReadWriteCloser + var err error + dc.OnOpen(func() { + defer close(done) + rwc, err = dc.Detach() + }) + // this is safe since for detached datachannels, the peerconnection runs the onOpen + // callback immediately if the SCTP transport is also connected. + select { + case <-done: + return rwc, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index 334bfe63d4..3bfcaaf5a7 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -2,31 +2,29 @@ package libp2pwebrtc import ( "context" + "crypto/rand" "encoding/hex" "fmt" "io" "net" "os" + "strings" "sync" "sync/atomic" "testing" "time" - manet "github.com/multiformats/go-multiaddr/net" - - quicproxy "github.com/quic-go/quic-go/integrationtests/tools/proxy" - - "golang.org/x/crypto/sha3" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multibase" "github.com/multiformats/go-multihash" + quicproxy "github.com/quic-go/quic-go/integrationtests/tools/proxy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/crypto/sha3" ) func getTransport(t *testing.T, opts ...Option) (*WebRTCTransport, peer.ID) { @@ -291,63 +289,88 @@ func TestTransportWebRTC_DialerCanCreateStreams(t *testing.T) { } func TestTransportWebRTC_DialerCanCreateStreamsMultiple(t *testing.T) { - count := 5 tr, listeningPeer := getTransport(t) listenMultiaddr := ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct") listener, err := tr.Listen(listenMultiaddr) require.NoError(t, err) tr1, connectingPeer := getTransport(t) - done := make(chan struct{}) + readerDone := make(chan struct{}) + + const ( + numListeners = 10 + numStreams = 100 + numWriters = 10 + size = 20 << 10 + ) go func() { lconn, err := listener.Accept() require.NoError(t, err) require.Equal(t, connectingPeer, lconn.RemotePeer()) var wg sync.WaitGroup - - for i := 0; i < count; i++ { - stream, err := lconn.AcceptStream() - require.NoError(t, err) + var doneStreams atomic.Int32 + for i := 0; i < numListeners; i++ { wg.Add(1) go func() { defer wg.Done() - buf := make([]byte, 100) - n, err := stream.Read(buf) - require.NoError(t, err) - require.Equal(t, "test", string(buf[:n])) - _, err = stream.Write([]byte("test")) - require.NoError(t, err) + for { + var nn int32 + if nn = doneStreams.Add(1); nn > int32(numStreams) { + return + } + s, err := lconn.AcceptStream() + require.NoError(t, err) + n, err := io.Copy(s, s) + require.Equal(t, n, int64(size)) + require.NoError(t, err) + s.Close() + } }() } - wg.Wait() - done <- struct{}{} + readerDone <- struct{}{} }() conn, err := tr1.Dial(context.Background(), listener.Multiaddr(), listeningPeer) require.NoError(t, err) - t.Logf("dialer opened connection") - for i := 0; i < count; i++ { - idx := i + var writerWG sync.WaitGroup + var cnt atomic.Int32 + var streamsStarted atomic.Int32 + for i := 0; i < numWriters; i++ { + writerWG.Add(1) go func() { - stream, err := conn.OpenStream(context.Background()) - require.NoError(t, err) - t.Logf("dialer opened stream: %d", idx) - buf := make([]byte, 100) - _, err = stream.Write([]byte("test")) - require.NoError(t, err) - n, err := stream.Read(buf) - require.NoError(t, err) - require.Equal(t, "test", string(buf[:n])) + defer writerWG.Done() + buf := make([]byte, size) + for { + var nn int32 + if nn = streamsStarted.Add(1); nn > int32(numStreams) { + return + } + rand.Read(buf) + + s, err := conn.OpenStream(context.Background()) + require.NoError(t, err) + n, err := s.Write(buf) + require.Equal(t, n, size) + require.NoError(t, err) + s.CloseWrite() + resp := make([]byte, size+10) + n, err = io.ReadFull(s, resp) + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + require.Equal(t, n, size) + if string(buf) != string(resp[:size]) { + t.Errorf("bytes not equal: %d %d", len(buf), len(resp)) + } + s.Close() + t.Log("completed stream: ", cnt.Add(1), s.(*stream).id) + } }() - if i%10 == 0 && i > 0 { - time.Sleep(100 * time.Millisecond) - } } + writerWG.Wait() select { - case <-done: + case <-readerDone: case <-time.After(100 * time.Second): t.Fatal("timed out") } @@ -491,7 +514,6 @@ func TestTransportWebRTC_RemoteReadsAfterClose(t *testing.T) { require.NoError(t, err) // require write and close to complete require.NoError(t, <-done) - stream.SetReadDeadline(time.Now().Add(5 * time.Second)) buf := make([]byte, 10) @@ -656,17 +678,30 @@ func TestConnectionTimeoutOnListener(t *testing.T) { // start dropping all packets drop.Store(true) start := time.Now() - // TODO: return timeout errors here for { if _, err := str.Write([]byte("test")); err != nil { - require.True(t, os.IsTimeout(err)) + if os.IsTimeout(err) { + break + } + // If we write when a connection timeout happens, sctp provides + // a "stream closed" error. This occurs concurrently with the + // callback we receive for connection timeout. + // Test once more after sleep that we provide the correct error. + if strings.Contains(err.Error(), "stream closed") { + time.Sleep(50 * time.Millisecond) + _, err = str.Write([]byte("test")) + require.True(t, os.IsTimeout(err), "invalid error type: %v", err) + } else { + t.Fatal("invalid error type", err) + } break } + if time.Since(start) > 5*time.Second { t.Fatal("timeout") } // make sure to not write too often, we don't want to fill the flow control window - time.Sleep(5 * time.Millisecond) + time.Sleep(20 * time.Millisecond) } // make sure that accepting a stream also returns an error... _, err = conn.AcceptStream() diff --git a/p2p/transport/webrtc/udpmux/mux.go b/p2p/transport/webrtc/udpmux/mux.go index ca54c18593..30cc61edab 100644 --- a/p2p/transport/webrtc/udpmux/mux.go +++ b/p2p/transport/webrtc/udpmux/mux.go @@ -17,7 +17,10 @@ import ( var log = logging.Logger("webrtc-udpmux") -const ReceiveMTU = 1500 +// ReceiveBufSize is the size of the buffer used to receive packets from the PacketConn. +// It is fine for this number to be higher than the actual path MTU as this value is not +// used to decide the packet size on the write path. +const ReceiveBufSize = 1500 type Candidate struct { Ufrag string @@ -134,7 +137,7 @@ func (mux *UDPMux) readLoop() { default: } - buf := pool.Get(ReceiveMTU) + buf := pool.Get(ReceiveBufSize) n, addr, err := mux.socket.ReadFrom(buf) if err != nil { diff --git a/test-plans/go.mod b/test-plans/go.mod index b98ba1cbfa..c7ac811290 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -28,7 +28,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.4.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect @@ -67,21 +67,21 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pion/datachannel v1.5.5 // indirect - github.com/pion/dtls/v2 v2.2.7 // indirect - github.com/pion/ice/v2 v2.3.6 // indirect - github.com/pion/interceptor v0.1.17 // indirect + github.com/pion/dtls/v2 v2.2.8 // indirect + github.com/pion/ice/v2 v2.3.11 // indirect + github.com/pion/interceptor v0.1.25 // indirect github.com/pion/logging v0.2.2 // indirect - github.com/pion/mdns v0.0.7 // indirect + github.com/pion/mdns v0.0.9 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.10 // indirect - github.com/pion/rtp v1.7.13 // indirect - github.com/pion/sctp v1.8.7 // indirect + github.com/pion/rtcp v1.2.13 // indirect + github.com/pion/rtp v1.8.3 // indirect + github.com/pion/sctp v1.8.9 // indirect github.com/pion/sdp/v3 v3.0.6 // indirect - github.com/pion/srtp/v2 v2.0.15 // indirect - github.com/pion/stun v0.6.0 // indirect - github.com/pion/transport/v2 v2.2.1 // indirect - github.com/pion/turn/v2 v2.1.0 // indirect - github.com/pion/webrtc/v3 v3.2.9 // indirect + github.com/pion/srtp/v2 v2.0.18 // indirect + github.com/pion/stun v0.6.1 // indirect + github.com/pion/transport/v2 v2.2.4 // indirect + github.com/pion/turn/v2 v2.1.4 // indirect + github.com/pion/webrtc/v3 v3.2.23 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index ece77f373f..5022925b9d 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -198,8 +198,9 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b h1:RMpPgZTSApbPf7xaVel+QkoGPRLFLrwFO89uDUHEGf0= github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -353,43 +354,51 @@ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2D github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew8= github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= -github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= -github.com/pion/ice/v2 v2.3.6 h1:Jgqw36cAud47iD+N6rNX225uHvrgWtAlHfVyOQc3Heg= -github.com/pion/ice/v2 v2.3.6/go.mod h1:9/TzKDRwBVAPsC+YOrKH/e3xDrubeTRACU9/sHQarsU= -github.com/pion/interceptor v0.1.17 h1:prJtgwFh/gB8zMqGZoOgJPHivOwVAp61i2aG61Du/1w= -github.com/pion/interceptor v0.1.17/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI= +github.com/pion/dtls/v2 v2.2.8 h1:BUroldfiIbV9jSnC6cKOMnyiORRWrWWpV11JUyEu5OA= +github.com/pion/dtls/v2 v2.2.8/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= +github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= +github.com/pion/interceptor v0.1.25 h1:pwY9r7P6ToQ3+IF0bajN0xmk/fNw/suTgaTdlwTDmhc= +github.com/pion/interceptor v0.1.25/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= -github.com/pion/mdns v0.0.7 h1:P0UB4Sr6xDWEox0kTVxF0LmQihtCbSAdW0H2nEgkA3U= -github.com/pion/mdns v0.0.7/go.mod h1:4iP2UbeFhLI/vWju/bw6ZfwjJzk0z8DNValjGxR/dD8= +github.com/pion/mdns v0.0.8/go.mod h1:hYE72WX8WDveIhg7fmXgMKivD3Puklk0Ymzog0lSyaI= +github.com/pion/mdns v0.0.9 h1:7Ue5KZsqq8EuqStnpPWV33vYYEH0+skdDN5L7EiEsI4= +github.com/pion/mdns v0.0.9/go.mod h1:2JA5exfxwzXiCihmxpTKgFUpiQws2MnipoPK09vecIc= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= -github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA= -github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtcp v1.2.13 h1:+EQijuisKwm/8VBs8nWllr0bIndR7Lf7cZG200mpbNo= +github.com/pion/rtcp v1.2.13/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.3 h1:VEHxqzSVQxCkKDSHro5/4IUUG1ea+MFdqR2R3xSpNU8= +github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0= -github.com/pion/sctp v1.8.7 h1:JnABvFakZueGAn4KU/4PSKg+GWbF6QWbKTWZOSGJjXw= -github.com/pion/sctp v1.8.7/go.mod h1:g1Ul+ARqZq5JEmoFy87Q/4CePtKnTJ1QCL9dBBdN6AU= +github.com/pion/sctp v1.8.8/go.mod h1:igF9nZBrjh5AtmKc7U30jXltsFHicFCXSmWA2GWRaWs= +github.com/pion/sctp v1.8.9 h1:TP5ZVxV5J7rz7uZmbyvnUvsn7EJ2x/5q9uhsTtXbI3g= +github.com/pion/sctp v1.8.9/go.mod h1:cMLT45jqw3+jiJCrtHVwfQLnfR0MGZ4rgOJwUOIqLkI= github.com/pion/sdp/v3 v3.0.6 h1:WuDLhtuFUUVpTfus9ILC4HRyHsW6TdugjEX/QY9OiUw= github.com/pion/sdp/v3 v3.0.6/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= -github.com/pion/srtp/v2 v2.0.15 h1:+tqRtXGsGwHC0G0IUIAzRmdkHvriF79IHVfZGfHrQoA= -github.com/pion/srtp/v2 v2.0.15/go.mod h1:b/pQOlDrbB0HEH5EUAQXzSYxikFbNcNuKmF8tM0hCtw= -github.com/pion/stun v0.4.0/go.mod h1:QPsh1/SbXASntw3zkkrIk3ZJVKz4saBY2G7S10P3wCw= -github.com/pion/stun v0.6.0 h1:JHT/2iyGDPrFWE8NNC15wnddBN8KifsEDw8swQmrEmU= -github.com/pion/stun v0.6.0/go.mod h1:HPqcfoeqQn9cuaet7AOmB5e5xkObu9DwBdurwLKO9oA= +github.com/pion/srtp/v2 v2.0.18 h1:vKpAXfawO9RtTRKZJbG4y0v1b11NZxQnxRl85kGuUlo= +github.com/pion/srtp/v2 v2.0.18/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA= +github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= +github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8= github.com/pion/transport v0.14.1 h1:XSM6olwW+o8J4SCmOBb/BpwZypkHeyM0PGFCxNQBr40= github.com/pion/transport v0.14.1/go.mod h1:4tGmbk00NeYA3rUa9+n+dzCCoKkcy3YlYb99Jn2fNnI= -github.com/pion/transport/v2 v2.0.0/go.mod h1:HS2MEBJTwD+1ZI2eSXSvHJx/HnzQqRy2/LXxt6eVMHc= -github.com/pion/transport/v2 v2.1.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlDuQdctTThQ= -github.com/pion/transport/v2 v2.2.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlDuQdctTThQ= -github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= -github.com/pion/turn/v2 v2.1.0 h1:5wGHSgGhJhP/RpabkUb/T9PdsAjkGLS6toYz5HNzoSI= -github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs= -github.com/pion/webrtc/v3 v3.2.9 h1:U8NSjQDlZZ+Iy/hg42Q/u6mhEVSXYvKrOIZiZwYTfLc= -github.com/pion/webrtc/v3 v3.2.9/go.mod h1:gjQLMZeyN3jXBGdxGmUYCyKjOuYX/c99BDjGqmadq0A= +github.com/pion/transport/v2 v2.2.2/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc= +github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v2 v2.2.4 h1:41JJK6DZQYSeVLxILA2+F4ZkKb4Xd/tFJZRFZQ9QAlo= +github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/turn/v2 v2.1.4 h1:2xn8rduI5W6sCZQkEnIUDAkrBQNl2eYIBCHMZ3QMmP8= +github.com/pion/turn/v2 v2.1.4/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/webrtc/v3 v3.2.23 h1:GbqEuxBbVLFhXk0GwxKAoaIJYiEa9TyoZPEZC+2HZxM= +github.com/pion/webrtc/v3 v3.2.23/go.mod h1:1CaT2fcZzZ6VZA+O1i9yK2DU4EOcXVvSbWG9pr5jefs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -482,7 +491,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -535,7 +543,10 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -619,11 +630,13 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= +golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -709,21 +722,26 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= +golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -732,10 +750,12 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=