Skip to content

Commit

Permalink
webrtc: wait for fin_ack for closing datachannel
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Dec 1, 2023
1 parent 3089ac3 commit 07fd995
Show file tree
Hide file tree
Showing 11 changed files with 506 additions and 179 deletions.
7 changes: 7 additions & 0 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type MuxedStream interface {
SetWriteDeadline(time.Time) error
}

// AsyncCloser is implemented by streams that need to do expensive operations on close before
// releasing the resources. Closing the stream async avoids blocking the calling goroutine.
type AsyncCloser interface {
// AsyncClose closes the stream and executes onDone after the stream is closed
AsyncClose(onDone func()) error
}

// MuxedConn represents a connection to a remote peer that has been
// extended to support stream multiplexing.
//
Expand Down
6 changes: 6 additions & 0 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func (s *Stream) Write(p []byte) (int, error) {
// Close closes the stream, closing both ends and freeing all associated
// resources.
func (s *Stream) Close() error {
if as, ok := s.stream.(network.AsyncCloser); ok {
err := as.AsyncClose(func() {
s.closeAndRemoveStream()
})
return err
}
err := s.stream.Close()
s.closeAndRemoveStream()
return err
Expand Down
45 changes: 45 additions & 0 deletions p2p/net/swarm/swarm_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package swarm

import (
"context"
"sync/atomic"
"testing"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
)

type asyncStreamWrapper struct {
network.MuxedStream
beforeClose func()
}

func (s *asyncStreamWrapper) AsyncClose(onDone func()) error {
s.beforeClose()
err := s.Close()
onDone()
return err
}

func TestStreamAsyncCloser(t *testing.T) {
s1 := makeSwarm(t)
s2 := makeSwarm(t)

s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.TempAddrTTL)
s, err := s1.NewStream(context.Background(), s2.LocalPeer())
require.NoError(t, err)
ss, ok := s.(*Stream)
require.True(t, ok)

var called atomic.Bool
as := &asyncStreamWrapper{
MuxedStream: ss.stream,
beforeClose: func() {
called.Store(true)
},
}
ss.stream = as
ss.Close()
require.True(t, called.Load())
}
76 changes: 37 additions & 39 deletions p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ var _ tpt.CapableConn = &connection{}

const maxAcceptQueueLen = 10

const maxDataChannelID = 1 << 10

type errConnectionTimeout struct{}

var _ net.Error = &errConnectionTimeout{}
Expand All @@ -47,7 +45,8 @@ type connection struct {
transport *WebRTCTransport
scope network.ConnManagementScope

closeErr error
closeOnce sync.Once
closeErr error

localPeer peer.ID
localMultiaddr ma.Multiaddr
Expand Down Expand Up @@ -110,12 +109,6 @@ func newConnection(
if c.IsClosed() {
return
}
// Limit the number of streams, since we're not able to actually properly close them.
// See https://github.com/libp2p/specs/issues/575 for details.
if *dc.ID() > maxDataChannelID {
c.Close()
return
}
dc.OnOpen(func() {
rwc, err := dc.Detach()
if err != nil {
Expand All @@ -133,7 +126,6 @@ func newConnection(
}
})
})

return c, nil
}

Expand All @@ -144,16 +136,41 @@ func (c *connection) ConnState() network.ConnectionState {

// Close closes the underlying peerconnection.
func (c *connection) Close() error {
if c.IsClosed() {
return nil
}
c.closeOnce.Do(func() {
c.closeErr = errors.New("connection closed")
// cancel must be called after closeErr is set. This ensures interested goroutines waiting on
// ctx.Done can read closeErr without holding the conn lock.
c.cancel()
c.m.Lock()
streams := c.streams
c.streams = nil
c.m.Unlock()
for _, str := range streams {
str.Reset()
}
c.pc.Close()
c.scope.Done()
})
return nil
}

c.m.Lock()
defer c.m.Unlock()
c.scope.Done()
c.closeErr = errors.New("connection closed")
c.cancel()
return c.pc.Close()
func (c *connection) closeTimedOut() error {
c.closeOnce.Do(func() {
c.closeErr = errConnectionTimeout{}
// cancel must be called after closeErr is set. This ensures interested goroutines waiting on
// ctx.Done can read closeErr without holding the conn lock.
c.cancel()
c.m.Lock()
streams := c.streams
c.streams = nil
c.m.Unlock()
for _, str := range streams {
str.closeWithError(errConnectionTimeout{})
}
c.pc.Close()
c.scope.Done()
})
return nil
}

func (c *connection) IsClosed() bool {
Expand All @@ -174,12 +191,6 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
if id > math.MaxUint16 {
return nil, errors.New("exhausted stream ID space")
}
// Limit the number of streams, since we're not able to actually properly close them.
// See https://github.com/libp2p/specs/issues/575 for details.
if id > maxDataChannelID {
c.Close()
return c.OpenStream(ctx)
}

streamID := uint16(id)
dc, err := c.pc.CreateDataChannel("", &webrtc.DataChannelInit{ID: &streamID})
Expand Down Expand Up @@ -238,20 +249,7 @@ func (c *connection) removeStream(id uint16) {

func (c *connection) onConnectionStateChange(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
// reset any streams
if c.IsClosed() {
return
}
c.m.Lock()
defer c.m.Unlock()
c.closeErr = errConnectionTimeout{}
for k, str := range c.streams {
str.setCloseError(c.closeErr)
delete(c.streams, k)
}
c.cancel()
c.scope.Done()
c.pc.Close()
c.closeTimedOut()
}
}

Expand Down
29 changes: 18 additions & 11 deletions p2p/transport/webrtc/pb/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions p2p/transport/webrtc/pb/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ message Message {
// The sender abruptly terminates the sending part of the stream. The
// receiver can discard any data that it already received on that stream.
RESET = 2;
// Sending the FIN_ACK flag acknowledges the previous receipt of a message
// with the FIN flag set. Receiving a FIN_ACK flag gives the recipient
// confidence that the remote has received all sent messages.
FIN_ACK = 3;
}

optional Flag flag=1;
Expand Down
Loading

0 comments on commit 07fd995

Please sign in to comment.