Skip to content

Commit

Permalink
webrtc: don't test for ManyStreams in transport integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Dec 6, 2023
1 parent b2e6691 commit 94328b0
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 32 deletions.
19 changes: 6 additions & 13 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,6 @@ func TestManyStreams(t *testing.T) {
const streamCount = 128
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTC") {
t.Skip("Pion doesn't correctly handle large queues of streams.")
}
h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true})
defer h1.Close()
Expand Down Expand Up @@ -382,6 +379,9 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
const streamCount = 1024
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTC") {
t.Skip("This test potentially exhausts the uint16 WebRTC stream ID space.")
}
listenerLimits := rcmgr.PartialLimitConfig{
PeerDefault: rcmgr.ResourceLimits{
Streams: 32,
Expand Down Expand Up @@ -425,9 +425,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
workerCount := 4

var startWorker func(workerIdx int)
var wCount atomic.Int32
startWorker = func(workerIdx int) {
fmt.Println("worker count", wCount.Add(1))
wg.Add(1)
defer wg.Done()
for {
Expand All @@ -439,10 +437,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
// Inline function so we can use defer
func() {
var didErr bool
defer func() {
x := completedStreams.Add(1)
fmt.Println("completed streams", x)
}()
defer completedStreams.Add(1)
defer func() {
// Only the first worker adds more workers
if workerIdx == 0 && !didErr && !sawFirstErr.Load() {
Expand Down Expand Up @@ -485,6 +480,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
return
}
err = func(s network.Stream) error {
defer s.Close()
err = s.SetDeadline(time.Now().Add(100 * time.Millisecond))
if err != nil {
return err
Expand Down Expand Up @@ -512,12 +508,8 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
return nil
}(s)
if err != nil && shouldRetry(err) {
fmt.Println("failed to write stream!", err)
s.Reset()
time.Sleep(50 * time.Millisecond)
continue
} else {
s.Close()
}
return

Expand Down Expand Up @@ -684,6 +676,7 @@ func TestDiscoverPeerIDFromSecurityNegotiation(t *testing.T) {
// Try connecting with the bogus peer ID
err = h2.Connect(ctx, *ai)
require.Error(t, err, "somehow we successfully connected to a bogus peerID!")

// Extract the actual peer ID from the error
newPeerId, err := extractPeerIDFromError(err)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
if err != nil {
return nil, fmt.Errorf("open stream: %w", err)
}
fmt.Println("opened dc with ID: ", *dc.ID())
str := newStream(dc, rwc, func() { c.removeStream(*dc.ID()) })
if err := c.addStream(str); err != nil {
str.Reset()
Expand Down
8 changes: 2 additions & 6 deletions p2p/transport/webrtc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ type stream struct {
nextMessage *pb.Message
receiveState receiveState

// writerMx ensures that only a single goroutine is calling WriteMsg on writer. writer is a
// pbio.uvarintWriter which is not thread safe. The public Write API is not promised to be
// thread safe, but we need to be able to write control messages concurrently
writerMx sync.Mutex
writer pbio.Writer
writer pbio.Writer // concurrent writes prevented by mx
sendStateChanged chan struct{}
sendState sendState
writeDeadline time.Time
Expand Down Expand Up @@ -232,7 +228,7 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) {
if s.receiveState == receiveStateReceiving {
s.receiveState = receiveStateDataRead
}
if err := s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_FIN_ACK.Enum()}); err != nil {
if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_FIN_ACK.Enum()}); err != nil {
log.Debugf("failed to send FIN_ACK: %s", err)
// Remote has finished writing all the data It'll stop waiting for the
// FIN_ACK eventually or will be notified when we close the datachannel
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/webrtc/stream_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *stream) CloseRead() error {
defer s.mx.Unlock()
var err error
if s.receiveState == receiveStateReceiving && s.closeErr == nil {
err = s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_STOP_SENDING.Enum()})
err = s.writer.WriteMsg(&pb.Message{Flag: pb.Message_STOP_SENDING.Enum()})
s.receiveState = receiveStateReset
}
s.controlMessageReaderOnce.Do(s.spawnControlMessageReader)
Expand Down
13 changes: 3 additions & 10 deletions p2p/transport/webrtc/stream_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/transport/webrtc/pb"
"google.golang.org/protobuf/proto"
)

var errWriteAfterClose = errors.New("write after close")
Expand Down Expand Up @@ -95,7 +94,7 @@ func (s *stream) Write(b []byte) (int, error) {
end = len(b)
}
msg := &pb.Message{Message: b[:end]}
if err := s.writeMsgOnWriter(msg); err != nil {
if err := s.writer.WriteMsg(msg); err != nil {
return n, err
}
n += end
Expand Down Expand Up @@ -137,7 +136,7 @@ func (s *stream) cancelWrite() error {
case s.sendStateChanged <- struct{}{}:
default:
}
if err := s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_RESET.Enum()}); err != nil {
if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_RESET.Enum()}); err != nil {
return err
}
return nil
Expand All @@ -155,14 +154,8 @@ func (s *stream) CloseWrite() error {
case s.sendStateChanged <- struct{}{}:
default:
}
if err := s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_FIN.Enum()}); err != nil {
if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_FIN.Enum()}); err != nil {
return err
}
return nil
}

func (s *stream) writeMsgOnWriter(msg proto.Message) error {
s.writerMx.Lock()
defer s.writerMx.Unlock()
return s.writer.WriteMsg(msg)
}
1 change: 0 additions & 1 deletion p2p/transport/webrtc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ func TestTransportWebRTC_RemoteReadsAfterClose(t *testing.T) {
return
}
err = stream.Close()
fmt.Println("closed!")
if err != nil {
done <- err
return
Expand Down

0 comments on commit 94328b0

Please sign in to comment.