Skip to content

Commit

Permalink
address review comments 2
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jan 29, 2025
1 parent 798258c commit 146812c
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 32 deletions.
8 changes: 3 additions & 5 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ func (c *ConnError) Error() string {
}

func (c *ConnError) Is(target error) bool {
if target == ErrReset {
return true
}
if tce, ok := target.(*ConnError); ok {
return tce.ErrorCode == c.ErrorCode && tce.Remote == c.Remote
}
return false
}

func (c *ConnError) Unwrap() error {
return c.TransportError
func (c *ConnError) Unwrap() []error {
return []error{ErrReset, c.TransportError}
}

const (
Expand All @@ -56,6 +53,7 @@ const (
ConnGarbageCollected ConnErrorCode = 1006
ConnShutdown ConnErrorCode = 1007
ConnGated ConnErrorCode = 1008
ConnCodeOutOfRange ConnErrorCode = 1009
)

// Conn is a connection to a remote peer. It multiplexes streams.
Expand Down
8 changes: 3 additions & 5 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ func (s *StreamError) Error() string {
}

func (s *StreamError) Is(target error) bool {
if target == ErrReset {
return true
}
if tse, ok := target.(*StreamError); ok {
return tse.ErrorCode == s.ErrorCode && tse.Remote == s.Remote
}
return false
}

func (s *StreamError) Unwrap() error {
return s.TransportError
func (s *StreamError) Unwrap() []error {
return []error{ErrReset, s.TransportError}
}

const (
Expand All @@ -55,6 +52,7 @@ const (
StreamGarbageCollected StreamErrorCode = 1006
StreamShutdown StreamErrorCode = 1007
StreamGated StreamErrorCode = 1008
StreamCodeOutOfRange StreamErrorCode = 1009
)

// MuxedStream is a bidirectional io pipe within a connection.
Expand Down
13 changes: 8 additions & 5 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,11 +850,14 @@ func (c *connWithMetrics) Close() error {
}

func (c *connWithMetrics) CloseWithError(errCode network.ConnErrorCode) error {
c.metricsTracer.ClosedConnection(c.dir, time.Since(c.opened), c.ConnState(), c.LocalMultiaddr())
if ce, ok := c.CapableConn.(network.CloseWithErrorer); ok {
return ce.CloseWithError(errCode)
}
return c.CapableConn.Close()
c.once.Do(func() {
c.metricsTracer.ClosedConnection(c.dir, time.Since(c.opened), c.ConnState(), c.LocalMultiaddr())
if ce, ok := c.CapableConn.(network.CloseWithErrorer); ok {
c.closeErr = ce.CloseWithError(errCode)
}
c.closeErr = c.CapableConn.Close()
})
return c.closeErr
}

func (c *connWithMetrics) Stat() network.ConnStats {
Expand Down
1 change: 0 additions & 1 deletion p2p/protocol/circuitv2/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ func TestRelayLimitData(t *testing.T) {

rc := relay.DefaultResources()
rc.Limit.Duration = time.Second
// Due to yamux framing, 4 blocks of 1024 bytes will exceed the data limit
rc.Limit.Data = 4096

r, err := relay.New(hosts[1], relay.WithResources(rc))
Expand Down
39 changes: 32 additions & 7 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,6 @@ func TestConnClosedWhenRemoteCloses(t *testing.T) {
}

func TestErrorCodes(t *testing.T) {

assertStreamErrors := func(s network.Stream, expectedError error) {
buf := make([]byte, 10)
_, err := s.Read(buf)
Expand Down Expand Up @@ -925,20 +924,46 @@ func TestErrorCodes(t *testing.T) {
require.NoError(t, err)
pingPong(s)

remoteStream := <-remoteStreamQ
defer remoteStream.Reset()

err = s.ResetWithError(42)
require.NoError(t, err)
assertStreamErrors(s, &network.StreamError{
ErrorCode: 42,
Remote: false,
})

assertStreamErrors(remoteStream, &network.StreamError{
ErrorCode: 42,
Remote: true,
})
})
t.Run("StreamResetWithErrorByRemote", func(t *testing.T) {
if tc.Name == "WebTransport" {
t.Skipf("skipping: %s, not implemented", tc.Name)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s, err := client.NewStream(ctx, server.ID(), "/test")
require.NoError(t, err)
pingPong(s)

remoteStream := <-remoteStreamQ
defer remoteStream.Reset()

assertStreamErrors(remoteStream, &network.StreamError{
err = remoteStream.ResetWithError(42)
require.NoError(t, err)

assertStreamErrors(s, &network.StreamError{
ErrorCode: 42,
Remote: true,
})

assertStreamErrors(remoteStream, &network.StreamError{
ErrorCode: 42,
Remote: false,
})
})

t.Run("StreamResetByConnCloseWithError", func(t *testing.T) {
Expand All @@ -952,6 +977,9 @@ func TestErrorCodes(t *testing.T) {
require.NoError(t, err)
pingPong(s)

remoteStream := <-remoteStreamQ
defer remoteStream.Reset()

err = s.Conn().CloseWithError(42)
require.NoError(t, err)

Expand All @@ -960,16 +988,13 @@ func TestErrorCodes(t *testing.T) {
Remote: false,
})

remoteStream := <-remoteStreamQ
defer remoteStream.Reset()

assertStreamErrors(remoteStream, &network.ConnError{
ErrorCode: 42,
Remote: true,
})
})

t.Run("StreamResetByConnCloseWithError", func(t *testing.T) {
t.Run("NewStreamErrorByConnCloseWithError", func(t *testing.T) {
if tc.Name == "WebTransport" || tc.Name == "WebRTC" {
t.Skipf("skipping: %s, not implemented", tc.Name)
return
Expand Down
20 changes: 13 additions & 7 deletions p2p/transport/quic/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@ func parseStreamError(err error) error {
}
se := &quic.StreamError{}
if errors.As(err, &se) {
code := se.ErrorCode
if code > math.MaxUint32 {
// TODO(sukunrt): do we need this?
code = reset
var code network.StreamErrorCode
if se.ErrorCode > math.MaxUint32 {
code = network.StreamCodeOutOfRange
} else {
code = network.StreamErrorCode(se.ErrorCode)
}
err = &network.StreamError{
ErrorCode: network.StreamErrorCode(code),
ErrorCode: code,
Remote: se.Remote,
TransportError: se,
}
}
ae := &quic.ApplicationError{}
if errors.As(err, &ae) {
code := ae.ErrorCode
var code network.ConnErrorCode
if ae.ErrorCode > math.MaxUint32 {
code = network.ConnCodeOutOfRange
} else {
code = network.ConnErrorCode(ae.ErrorCode)
}
err = &network.ConnError{
ErrorCode: network.ConnErrorCode(code),
ErrorCode: code,
Remote: ae.Remote,
TransportError: ae,
}
Expand Down
4 changes: 3 additions & 1 deletion p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (c *connection) Close() error {
return nil
}

func (c *connection) CloseWithError(errCode network.ConnErrorCode) error {
// CloseWithError closes the connection ignoring the error code. As there's no way to signal
// the remote peer on closing the underlying peerconnection, we ignore the error code.
func (c *connection) CloseWithError(_ network.ConnErrorCode) error {
return c.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/webtransport/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *conn) Close() error {
return err
}

func (c *conn) CloseWithError(errCode network.ConnErrorCode) error {
func (c *conn) CloseWithError(_ network.ConnErrorCode) error {
return c.Close()
}

Expand Down
5 changes: 5 additions & 0 deletions p2p/transport/webtransport/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (s *stream) Reset() error {
return nil
}

// ResetWithError resets the stream ignoring the error code. Error codes aren't
// specified for WebTransport as the current implementation of WebTransport in
// browsers(https://www.ietf.org/archive/id/draft-kinnear-webtransport-http2-02.html)
// only supports 1 byte error codes. For more details, see
// https://github.com/libp2p/specs/blob/4eca305185c7aef219e936bef76c48b1ab0a8b43/error-codes/README.md?plain=1#L84
func (s *stream) ResetWithError(_ network.StreamErrorCode) error {
s.Stream.CancelRead(reset)
s.Stream.CancelWrite(reset)
Expand Down

0 comments on commit 146812c

Please sign in to comment.