From 94328b0ad6115ae657040ee6f167d2d54b5c7b57 Mon Sep 17 00:00:00 2001 From: Sukun Date: Wed, 6 Dec 2023 23:05:46 +0530 Subject: [PATCH] webrtc: don't test for ManyStreams in transport integration test --- p2p/test/transport/transport_test.go | 19 ++++++------------- p2p/transport/webrtc/connection.go | 1 - p2p/transport/webrtc/stream.go | 8 ++------ p2p/transport/webrtc/stream_read.go | 2 +- p2p/transport/webrtc/stream_write.go | 13 +++---------- p2p/transport/webrtc/transport_test.go | 1 - 6 files changed, 12 insertions(+), 32 deletions(-) diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index bc51328c6c..bfbd2c0bce 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -314,9 +314,6 @@ func TestManyStreams(t *testing.T) { const streamCount = 128 for _, tc := range transportsToTest { t.Run(tc.Name, func(t *testing.T) { - if strings.Contains(tc.Name, "WebRTC") { - t.Skip("Pion doesn't correctly handle large queues of streams.") - } h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true}) defer h1.Close() @@ -382,6 +379,9 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { const streamCount = 1024 for _, tc := range transportsToTest { t.Run(tc.Name, func(t *testing.T) { + if strings.Contains(tc.Name, "WebRTC") { + t.Skip("This test potentially exhausts the uint16 WebRTC stream ID space.") + } listenerLimits := rcmgr.PartialLimitConfig{ PeerDefault: rcmgr.ResourceLimits{ Streams: 32, @@ -425,9 +425,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { workerCount := 4 var startWorker func(workerIdx int) - var wCount atomic.Int32 startWorker = func(workerIdx int) { - fmt.Println("worker count", wCount.Add(1)) wg.Add(1) defer wg.Done() for { @@ -439,10 +437,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { // Inline function so we can use defer func() { var didErr bool - defer func() { - x := completedStreams.Add(1) - fmt.Println("completed streams", x) - }() + defer completedStreams.Add(1) defer func() { // Only the first worker adds more workers if workerIdx == 0 && !didErr && !sawFirstErr.Load() { @@ -485,6 +480,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { return } err = func(s network.Stream) error { + defer s.Close() err = s.SetDeadline(time.Now().Add(100 * time.Millisecond)) if err != nil { return err @@ -512,12 +508,8 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { return nil }(s) if err != nil && shouldRetry(err) { - fmt.Println("failed to write stream!", err) - s.Reset() time.Sleep(50 * time.Millisecond) continue - } else { - s.Close() } return @@ -684,6 +676,7 @@ func TestDiscoverPeerIDFromSecurityNegotiation(t *testing.T) { // Try connecting with the bogus peer ID err = h2.Connect(ctx, *ai) require.Error(t, err, "somehow we successfully connected to a bogus peerID!") + // Extract the actual peer ID from the error newPeerId, err := extractPeerIDFromError(err) require.NoError(t, err) diff --git a/p2p/transport/webrtc/connection.go b/p2p/transport/webrtc/connection.go index 3241ce46cd..5ac6cb8d44 100644 --- a/p2p/transport/webrtc/connection.go +++ b/p2p/transport/webrtc/connection.go @@ -185,7 +185,6 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error if err != nil { return nil, fmt.Errorf("open stream: %w", err) } - fmt.Println("opened dc with ID: ", *dc.ID()) str := newStream(dc, rwc, func() { c.removeStream(*dc.ID()) }) if err := c.addStream(str); err != nil { str.Reset() diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index db7109e4bf..abc3deef6c 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -74,11 +74,7 @@ type stream struct { nextMessage *pb.Message receiveState receiveState - // writerMx ensures that only a single goroutine is calling WriteMsg on writer. writer is a - // pbio.uvarintWriter which is not thread safe. The public Write API is not promised to be - // thread safe, but we need to be able to write control messages concurrently - writerMx sync.Mutex - writer pbio.Writer + writer pbio.Writer // concurrent writes prevented by mx sendStateChanged chan struct{} sendState sendState writeDeadline time.Time @@ -232,7 +228,7 @@ func (s *stream) processIncomingFlag(flag *pb.Message_Flag) { if s.receiveState == receiveStateReceiving { s.receiveState = receiveStateDataRead } - if err := s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_FIN_ACK.Enum()}); err != nil { + if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_FIN_ACK.Enum()}); err != nil { log.Debugf("failed to send FIN_ACK: %s", err) // Remote has finished writing all the data It'll stop waiting for the // FIN_ACK eventually or will be notified when we close the datachannel diff --git a/p2p/transport/webrtc/stream_read.go b/p2p/transport/webrtc/stream_read.go index 209af0a634..1ad171564c 100644 --- a/p2p/transport/webrtc/stream_read.go +++ b/p2p/transport/webrtc/stream_read.go @@ -90,7 +90,7 @@ func (s *stream) CloseRead() error { defer s.mx.Unlock() var err error if s.receiveState == receiveStateReceiving && s.closeErr == nil { - err = s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_STOP_SENDING.Enum()}) + err = s.writer.WriteMsg(&pb.Message{Flag: pb.Message_STOP_SENDING.Enum()}) s.receiveState = receiveStateReset } s.controlMessageReaderOnce.Do(s.spawnControlMessageReader) diff --git a/p2p/transport/webrtc/stream_write.go b/p2p/transport/webrtc/stream_write.go index 7a99957288..ec0feadb58 100644 --- a/p2p/transport/webrtc/stream_write.go +++ b/p2p/transport/webrtc/stream_write.go @@ -7,7 +7,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/p2p/transport/webrtc/pb" - "google.golang.org/protobuf/proto" ) var errWriteAfterClose = errors.New("write after close") @@ -95,7 +94,7 @@ func (s *stream) Write(b []byte) (int, error) { end = len(b) } msg := &pb.Message{Message: b[:end]} - if err := s.writeMsgOnWriter(msg); err != nil { + if err := s.writer.WriteMsg(msg); err != nil { return n, err } n += end @@ -137,7 +136,7 @@ func (s *stream) cancelWrite() error { case s.sendStateChanged <- struct{}{}: default: } - if err := s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_RESET.Enum()}); err != nil { + if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_RESET.Enum()}); err != nil { return err } return nil @@ -155,14 +154,8 @@ func (s *stream) CloseWrite() error { case s.sendStateChanged <- struct{}{}: default: } - if err := s.writeMsgOnWriter(&pb.Message{Flag: pb.Message_FIN.Enum()}); err != nil { + if err := s.writer.WriteMsg(&pb.Message{Flag: pb.Message_FIN.Enum()}); err != nil { return err } return nil } - -func (s *stream) writeMsgOnWriter(msg proto.Message) error { - s.writerMx.Lock() - defer s.writerMx.Unlock() - return s.writer.WriteMsg(msg) -} diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index c596851007..983e03c00f 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -481,7 +481,6 @@ func TestTransportWebRTC_RemoteReadsAfterClose(t *testing.T) { return } err = stream.Close() - fmt.Println("closed!") if err != nil { done <- err return