Skip to content

Commit

Permalink
Daily commit
Browse files Browse the repository at this point in the history
  • Loading branch information
pyropy committed Nov 21, 2024
1 parent 079bd3e commit e3203f2
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 31 deletions.
2 changes: 0 additions & 2 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pmemory "github.com/libp2p/go-libp2p/p2p/transport/memory"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"go.uber.org/mock/gomock"

ma "github.com/multiformats/go-multiaddr"
Expand Down
6 changes: 3 additions & 3 deletions p2p/transport/memory/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func newConnection(

func (c *conn) Close() error {
c.closed.Store(true)
for _, s := range c.streams {
for id, s := range c.streams {
c.removeStream(id)
s.Close()
}

Expand All @@ -83,8 +84,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {

func (c *conn) AcceptStream() (network.MuxedStream, error) {
in := <-c.streamC
id := streamCounter.Add(1)
c.addStream(id, in)
c.addStream(in.id, in)
return in, nil
}

Expand Down
58 changes: 34 additions & 24 deletions p2p/transport/memory/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ var ErrClosed = errors.New("stream closed")

func newStreamPair() (*stream, *stream) {
ra, rb := make(chan byte, 4096), make(chan byte, 4096)
wa, wb := rb, ra

in := newStream(rb, wb, network.DirInbound)
out := newStream(ra, wa, network.DirOutbound)
in := newStream(rb, ra, network.DirInbound)
out := newStream(ra, rb, network.DirOutbound)
return in, out
}

Expand All @@ -47,56 +46,67 @@ func newStream(r, w chan byte, _ network.Direction) *stream {
return s
}

// How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
if s.closed.Load() {
return 0, ErrClosed
}

for i := 0; i < len(p); i++ {
select {
case <-s.reset:
return 0, network.ErrReset
case <-s.closeWrite:
return 0, ErrClosed
default:
}

for n < len(p) {
select {
case <-s.reset:
err = network.ErrReset
return
case <-s.closeWrite:
err = ErrClosed
return
case s.write <- p[i]:
n = i
return n, ErrClosed
case <-s.reset:
return n, network.ErrReset
case s.write <- p[n]:
n++
default:
err = io.ErrClosedPipe
return
}
}

return n + 1, err
return
}

func (s *stream) Read(p []byte) (n int, err error) {
if s.closed.Load() {
return 0, ErrClosed
}

for n = 0; n < len(p); n++ {
select {
case <-s.reset:
return 0, network.ErrReset
case <-s.closeRead:
return 0, ErrClosed
default:
}

for n < len(p) {
select {
case <-s.reset:
err = network.ErrReset
return
case <-s.closeRead:
err = ErrClosed
return
return n, ErrClosed
case <-s.reset:
return n, network.ErrReset
case b, ok := <-s.read:
if !ok {
err = io.EOF
return
return n, ErrClosed
}
p[n] = b
n++
default:
err = io.EOF
return
return n, io.EOF
}
}

return
return n, nil
}

func (s *stream) CloseWrite() error {
Expand Down
24 changes: 23 additions & 1 deletion p2p/transport/memory/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

func TestStreamSimpleReadWriteClose(t *testing.T) {
// t.Parallel()
clientStr, serverStr := newStreamPair()

// send a foobar from the client
Expand All @@ -24,6 +25,7 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
b, err := io.ReadAll(serverStr)
require.NoError(t, err)
require.Equal(t, []byte("foobar"), b)

// reading again should give another io.EOF
n, err = serverStr.Read(make([]byte, 10))
require.Zero(t, n)
Expand All @@ -35,7 +37,6 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
require.NoError(t, serverStr.CloseWrite())

// and read it at the client
//require.False(t, clientDone.Load())
b, err = io.ReadAll(clientStr)
require.NoError(t, err)
require.Equal(t, []byte("lorem ipsum"), b)
Expand All @@ -46,3 +47,24 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
// Need to call Close for cleanup. Otherwise the FIN_ACK is never read
require.NoError(t, serverStr.Close())
}

func TestStreamPartialReads(t *testing.T) {
// t.Parallel()
clientStr, serverStr := newStreamPair()

_, err := serverStr.Write([]byte("foobar"))
require.NoError(t, err)
require.NoError(t, serverStr.CloseWrite())

n, err := clientStr.Read([]byte{}) // empty read
require.NoError(t, err)
require.Zero(t, n)
b := make([]byte, 3)
n, err = clientStr.Read(b)
require.Equal(t, 3, n)
require.NoError(t, err)
require.Equal(t, []byte("foo"), b)
b, err = io.ReadAll(clientStr)
require.NoError(t, err)
require.Equal(t, []byte("bar"), b)
}
2 changes: 1 addition & 1 deletion test-plans/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.1
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/libp2p/go-libp2p v0.0.0
github.com/multiformats/go-multiaddr v0.13.0
github.com/multiformats/go-multiaddr v0.14.0
)

require (
Expand Down
1 change: 1 addition & 0 deletions test-plans/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a
github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo=
github.com/multiformats/go-multiaddr v0.13.0 h1:BCBzs61E3AGHcYYTv8dqRH43ZfyrqM8RXVPT8t13tLQ=
github.com/multiformats/go-multiaddr v0.13.0/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII=
github.com/multiformats/go-multiaddr v0.14.0/go.mod h1:6EkVAxtznq2yC3QT5CM1UTAwG0GTP3EWAIcjHuzQ+r4=
github.com/multiformats/go-multiaddr-dns v0.4.0 h1:P76EJ3qzBXpUXZ3twdCDx/kvagMsNo0LMFXpyms/zgU=
github.com/multiformats/go-multiaddr-dns v0.4.0/go.mod h1:7hfthtB4E4pQwirrz+J0CcDUfbWzTqEzVyYKKIKpgkc=
github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E=
Expand Down

0 comments on commit e3203f2

Please sign in to comment.