Skip to content

Commit

Permalink
Update TestStreamClose to repro stream not found
Browse files Browse the repository at this point in the history
Relates to #187
  • Loading branch information
enobufs authored and jerry-tao committed Oct 13, 2022
1 parent c546e1d commit 247fc1d
Showing 1 changed file with 96 additions and 109 deletions.
205 changes: 96 additions & 109 deletions vnet_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sctp

import (
"bytes"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -386,91 +387,91 @@ func TestRwndFull(t *testing.T) {
}

func TestStreamClose(t *testing.T) {
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

loggerFactory := logging.NewDefaultLoggerFactory()
log := loggerFactory.NewLogger("test")

venv, err := buildVNetEnv(&vNetEnvConfig{
loggerFactory: loggerFactory,
log: log,
})
if !assert.NoError(t, err, "should succeed") {
return
}
if !assert.NotNil(t, venv, "should not be nil") {
return
}
defer venv.wan.Stop() // nolint:errcheck
loopBackTest := func(t *testing.T, dropReconfigChunk bool) {
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

serverStreamReady := make(chan struct{})
clientStreamReady := make(chan struct{})
clientStartClose := make(chan struct{})
serverStreamClosed := make(chan struct{})
shutDownClient := make(chan struct{})
clientShutDown := make(chan struct{})
serverShutDown := make(chan struct{})
loggerFactory := logging.NewDefaultLoggerFactory()
log := loggerFactory.NewLogger("test")

go func() {
defer close(serverShutDown)
// connected UDP conn for server
conn, err := venv.net0.DialUDP("udp4",
&net.UDPAddr{IP: net.ParseIP("1.1.1.1"), Port: 5000},
&net.UDPAddr{IP: net.ParseIP("2.2.2.2"), Port: 5000},
)
venv, err := buildVNetEnv(&vNetEnvConfig{
loggerFactory: loggerFactory,
log: log,
})
if !assert.NoError(t, err, "should succeed") {
return
}
defer conn.Close() // nolint:errcheck

// server association
assoc, err := Server(Config{
NetConn: conn,
LoggerFactory: loggerFactory,
})
if !assert.NoError(t, err, "should succeed") {
if !assert.NotNil(t, venv, "should not be nil") {
return
}
defer assoc.Close() // nolint:errcheck
defer venv.wan.Stop() // nolint:errcheck

log.Info("server handshake complete")
clientShutDown := make(chan struct{})
serverShutDown := make(chan struct{})

stream, err := assoc.AcceptStream()
if !assert.NoError(t, err, "should succeed") {
return
}
assert.Equal(t, StreamStateOpen, stream.State())
const numMessages = 10
const messageSize = 1024
var messages [][]byte
var numServerReceived int
var numClientReceived int

buf := make([]byte, 1500)
for {
n, err := stream.Read(buf)
if err != nil {
log.Infof("server: Read returned %v", err)
stream.Close() // nolint:errcheck
for i := 0; i < numMessages; i++ {
bytes := make([]byte, messageSize)
messages = append(messages, bytes)
}

assert.Equal(t, StreamStateClosed, stream.State())
go func() {
defer close(serverShutDown)
// connected UDP conn for server
conn, err := venv.net0.DialUDP("udp4",
&net.UDPAddr{IP: net.ParseIP("1.1.1.1"), Port: 5000},
&net.UDPAddr{IP: net.ParseIP("2.2.2.2"), Port: 5000},
)
if !assert.NoError(t, err, "should succeed") {
return
}
defer conn.Close() // nolint:errcheck

// give a bit of time for the OutgoingResetRequest so that
// the client side stream.Read() will return with EOF.
time.Sleep(100 * time.Millisecond)
break
// server association
assoc, err := Server(Config{
NetConn: conn,
LoggerFactory: loggerFactory,
})
if !assert.NoError(t, err, "should succeed") {
return
}
defer assoc.Close() // nolint:errcheck

log.Info("server handshake complete")

if !assert.Equal(t, "HELLO", string(buf[:n]), "should receive HELLO") {
continue
stream, err := assoc.AcceptStream()
if !assert.NoError(t, err, "should succeed") {
return
}
assert.Equal(t, StreamStateOpen, stream.State())

log.Info("server stream ready")
close(serverStreamReady)
}
buf := make([]byte, 1500)
for {
n, err := stream.Read(buf)
if err != nil {
log.Infof("server: Read returned %v", err)
stream.Close() // nolint:errcheck
assert.Equal(t, StreamStateClosed, stream.State())
break
}

close(serverStreamClosed)
log.Info("server closing")
}()
log.Infof("server: received %d bytes (%d)", n, numServerReceived)
assert.Equal(t, 0, bytes.Compare(buf[:n], messages[numServerReceived]), "should receive HELLO")

_, err = stream.Write(buf[:n])
assert.NoError(t, err, "should succeed")

numServerReceived++
}
// don't close association until the client's stream routine is complete
<-clientShutDown
}()

go func() {
defer close(clientShutDown)
// connected UDP conn for client
conn, err := venv.net1.DialUDP("udp4",
&net.UDPAddr{IP: net.ParseIP("2.2.2.2"), Port: 5000},
Expand All @@ -479,6 +480,7 @@ func TestStreamClose(t *testing.T) {
if !assert.NoError(t, err, "should succeed") {
return
}
defer conn.Close() // nolint:errcheck

// client association
assoc, err := Client(Config{
Expand All @@ -488,11 +490,7 @@ func TestStreamClose(t *testing.T) {
if !assert.NoError(t, err, "should succeed") {
return
}
defer func() {
log.Info("closing client side association")
assoc.Close() // nolint:errcheck
log.Info("closed client side association")
}()
defer assoc.Close() // nolint:errcheck

log.Info("client handshake complete")

Expand All @@ -501,46 +499,46 @@ func TestStreamClose(t *testing.T) {
return
}
assert.Equal(t, StreamStateOpen, stream.State())

stream.SetReliabilityParams(false, ReliabilityTypeReliable, 0)

// Send a message to let server side stream to open
_, err = stream.Write([]byte("HELLO"))
if !assert.NoError(t, err, "should succeed") {
return
}

// begin client read-loop
buf := make([]byte, 1500)
done := make(chan struct{})
go func() {
defer close(clientShutDown)
for {
log.Info("client read")
_, err2 := stream.Read(buf)
n, err2 := stream.Read(buf)
if err2 != nil {
log.Infof("client: Read returned %v", err2)
assert.Equal(t, StreamStateClosed, stream.State())
break
}

log.Infof("client: received %d bytes (%d)", n, numClientReceived)
assert.Equal(t, 0, bytes.Compare(buf[:n], messages[numClientReceived]), "should receive HELLO")
numClientReceived++
}
close(done)
}()

log.Info("client stream ready")
close(clientStreamReady)

<-clientStartClose
// Send messages to the server
for i := 0; i < numMessages; i++ {
_, err = stream.Write(messages[i])
assert.NoError(t, err, "should succeed")
}

// drop next 1 RECONFIG chunk
venv.dropNextReconfigChunk(1)
if dropReconfigChunk {
venv.dropNextReconfigChunk(1)
}

// Immediately close the stream
err = stream.Close()
assert.NoError(t, err, "should succeed")
assert.Equal(t, StreamStateClosing, stream.State())

log.Info("client wait for exit reading..")
<-done
<-clientShutDown

<-shutDownClient
assert.Equal(t, numMessages, numServerReceived, "all messages should be received")
assert.Equal(t, numMessages, numClientReceived, "all messages should be received")

// Check if RECONFIG was actually dropped
assert.Equal(t, 0, venv.numToDropReconfig, "should be zero")
Expand All @@ -553,26 +551,15 @@ func TestStreamClose(t *testing.T) {
pendingReconfigs := len(assoc.reconfigs)
assoc.lock.RUnlock()
assert.Equal(t, 0, pendingReconfigs, "should be zero")
}

log.Info("client closing")
}()

// wait until both establish a stream
<-clientStreamReady
<-serverStreamReady

log.Info("stream ready")

// let client begin writing
log.Info("client start closing")
close(clientStartClose)

<-serverStreamClosed
close(shutDownClient)
t.Run("without dropping Reconfig", func(t *testing.T) {
loopBackTest(t, false)
})

<-clientShutDown
<-serverShutDown
log.Info("all done")
t.Run("with dropping Reconfig", func(t *testing.T) {
loopBackTest(t, true)
})
}

// this test case reproduces the issue mentioned in
Expand Down

0 comments on commit 247fc1d

Please sign in to comment.