From c5459ae44d51d625c51e8e5ed6fc4e0fac9d386e Mon Sep 17 00:00:00 2001 From: Yutaka Takeda Date: Sun, 31 Dec 2023 20:46:20 -0800 Subject: [PATCH] Use dumbConn2 in createAssocs This is avoid the issue with unexpected errors occur on Ubuntu linux. Relates to #270 --- association_test.go | 139 ++++++++++++++++++++++++-------------------- 1 file changed, 77 insertions(+), 62 deletions(-) diff --git a/association_test.go b/association_test.go index 38108233..2b539e0b 100644 --- a/association_test.go +++ b/association_test.go @@ -2564,88 +2564,104 @@ func TestAssocMaxMessageSize(t *testing.T) { }) } -// udpConnWrapper wraps a *net.UDPConn and implements net.Conn interface. -type udpConnWrapper struct { - conn *net.UDPConn - remoteAddr net.Addr +type dumbConnInboundHandler func([]byte) + +type dumbConn2 struct { + net.Conn + packets [][]byte + closed bool + localAddr net.Addr + remoteAddr net.Addr + remoteInboundHandler dumbConnInboundHandler + mutex sync.Mutex + cond *sync.Cond } -func newUDPConnWrapper(conn *net.UDPConn, remoteAddr net.Addr) net.Conn { - return &udpConnWrapper{ - conn: conn, +func newDumConn2(localAddr, remoteAddr net.Addr) *dumbConn2 { + c := &dumbConn2{ + packets: [][]byte{}, + localAddr: localAddr, remoteAddr: remoteAddr, } + c.cond = sync.NewCond(&c.mutex) + return c } // Implement the net.Conn interface methods -func (w *udpConnWrapper) Read(b []byte) (n int, err error) { - // w.conn.ReadFrom(b) - n, _, err = w.conn.ReadFrom(b) - return n, err -} +func (c *dumbConn2) Read(b []byte) (n int, err error) { + c.mutex.Lock() + defer c.mutex.Unlock() -func (w *udpConnWrapper) Write(b []byte) (n int, err error) { - return w.conn.WriteTo(b, w.remoteAddr) -} + for { + if len(c.packets) > 0 { + packet := c.packets[0] + c.packets = c.packets[1:] + n := copy(b, packet) + return n, nil + } -func (w *udpConnWrapper) Close() error { - return w.conn.Close() -} + if c.closed { + return 0, io.EOF + } -func (w *udpConnWrapper) LocalAddr() net.Addr { - return w.conn.LocalAddr() + c.cond.Wait() + } } -func (w *udpConnWrapper) RemoteAddr() net.Addr { - return w.remoteAddr -} +func (c *dumbConn2) Write(b []byte) (int, error) { + c.mutex.Lock() + closed := c.closed + c.mutex.Unlock() -func (w *udpConnWrapper) SetDeadline(t time.Time) error { - return w.conn.SetDeadline(t) + if closed { + return 0, &net.OpError{Op: "write", Net: "udp", Addr: c.remoteAddr, Err: net.ErrClosed} + } + c.remoteInboundHandler(b) + return len(b), nil } -func (w *udpConnWrapper) SetReadDeadline(t time.Time) error { - return w.conn.SetReadDeadline(t) -} +func (c *dumbConn2) Close() error { + c.mutex.Lock() + defer c.mutex.Unlock() -func (w *udpConnWrapper) SetWriteDeadline(t time.Time) error { - return w.conn.SetWriteDeadline(t) + c.closed = true + c.cond.Signal() + return nil } -// crateUDPConnPair creates a pair of net.UDPConn objects that are connected with each other -func createUDPConnPair(t *testing.T) (net.Conn, net.Conn, error) { - udp1, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1")}) - if err != nil { - return nil, nil, err - } - addr1, ok := udp1.LocalAddr().(*net.UDPAddr) - require.True(t, ok) +func (c *dumbConn2) LocalAddr() net.Addr { + // Unused by Association + return c.localAddr +} - udp2, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1")}) - if err != nil { - return nil, nil, err - } - addr2, ok := udp2.LocalAddr().(*net.UDPAddr) - require.True(t, ok) +func (c *dumbConn2) RemoteAddr() net.Addr { + // Unused by Association + return c.remoteAddr +} - conn1 := newUDPConnWrapper(udp1, addr2) - if err != nil { - return nil, nil, err - } +func (c *dumbConn2) inboundHandler(packet []byte) { + c.mutex.Lock() + defer c.mutex.Unlock() - conn2 := newUDPConnWrapper(udp2, addr1) - if err != nil { - return nil, nil, err + if !c.closed { + c.packets = append(c.packets, packet) + c.cond.Signal() } +} - return conn1, conn2, nil +// crateUDPConnPair creates a pair of net.UDPConn objects that are connected with each other +func createUDPConnPair() (net.Conn, net.Conn) { + addr1 := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 1234} + addr2 := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5678} + conn1 := newDumConn2(addr1, addr2) + conn2 := newDumConn2(addr2, addr1) + conn1.remoteInboundHandler = conn2.inboundHandler + conn2.remoteInboundHandler = conn1.inboundHandler + return conn1, conn2 } -func createAssocs(t *testing.T) (*Association, *Association, error) { - udp1, udp2, err := createUDPConnPair(t) - if err != nil { - return nil, nil, err - } +func createAssocs() (*Association, *Association, error) { + udp1, udp2 := createUDPConnPair() loggerFactory := logging.NewDefaultLoggerFactory() loggerFactory.DefaultLogLevel = logging.LogLevelDebug @@ -2714,7 +2730,7 @@ loop: func TestAssociation_Shutdown(t *testing.T) { checkGoroutineLeaks(t) - a1, a2, err := createAssocs(t) + a1, a2, err := createAssocs() require.NoError(t, err) s11, err := a1.OpenStream(1, PayloadTypeWebRTCString) @@ -2752,7 +2768,7 @@ func TestAssociation_Shutdown(t *testing.T) { func TestAssociation_ShutdownDuringWrite(t *testing.T) { checkGoroutineLeaks(t) - a1, a2, err := createAssocs(t) + a1, a2, err := createAssocs() require.NoError(t, err) s11, err := a1.OpenStream(1, PayloadTypeWebRTCString) @@ -2963,7 +2979,7 @@ func TestAssociation_HandlePacketInCookieWaitState(t *testing.T) { func TestAssociation_Abort(t *testing.T) { checkGoroutineLeaks(t) - a1, a2, err := createAssocs(t) + a1, a2, err := createAssocs() require.NoError(t, err) s11, err := a1.OpenStream(1, PayloadTypeWebRTCString) @@ -3002,8 +3018,7 @@ func TestAssociation_Abort(t *testing.T) { func TestAssociation_createClientWithContext(t *testing.T) { checkGoroutineLeaks(t) - udp1, udp2, err := createUDPConnPair(t) - require.NoError(t, err) + udp1, udp2 := createUDPConnPair() loggerFactory := logging.NewDefaultLoggerFactory()