Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable RTOMax - fix #181 #303

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ type Config struct {
MaxMessageSize uint32
EnableZeroChecksum bool
LoggerFactory logging.LoggerFactory
// RTOMax is the maximum retransmission timeout in milliseconds
RTOMax float64
}

// Server accepts a SCTP stream over a conn
Expand Down Expand Up @@ -312,7 +314,7 @@ func createAssociation(config Config) *Association {
myNextRSN: tsn,
minTSN2MeasureRTT: tsn,
state: closed,
rtoMgr: newRTOManager(),
rtoMgr: newRTOManager(config.RTOMax),
streams: map[uint16]*Stream{},
reconfigs: map[uint32]*chunkReconfig{},
reconfigRequests: map[uint32]*paramOutgoingResetRequest{},
Expand Down Expand Up @@ -340,11 +342,11 @@ func createAssociation(config Config) *Association {
a.name, a.CWND(), a.ssthresh, a.inflightQueue.getNumBytes())

a.srtt.Store(float64(0))
a.t1Init = newRTXTimer(timerT1Init, a, maxInitRetrans)
a.t1Cookie = newRTXTimer(timerT1Cookie, a, maxInitRetrans)
a.t2Shutdown = newRTXTimer(timerT2Shutdown, a, noMaxRetrans) // retransmit forever
a.t3RTX = newRTXTimer(timerT3RTX, a, noMaxRetrans) // retransmit forever
a.tReconfig = newRTXTimer(timerReconfig, a, noMaxRetrans) // retransmit forever
a.t1Init = newRTXTimer(timerT1Init, a, maxInitRetrans, config.RTOMax)
a.t1Cookie = newRTXTimer(timerT1Cookie, a, maxInitRetrans, config.RTOMax)
a.t2Shutdown = newRTXTimer(timerT2Shutdown, a, noMaxRetrans, config.RTOMax)
a.t3RTX = newRTXTimer(timerT3RTX, a, noMaxRetrans, config.RTOMax)
a.tReconfig = newRTXTimer(timerReconfig, a, noMaxRetrans, config.RTOMax)
a.ackTimer = newAckTimer(a)

return a
Expand Down
32 changes: 23 additions & 9 deletions rtx_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
const (
rtoInitial float64 = 1.0 * 1000 // msec
rtoMin float64 = 1.0 * 1000 // msec
rtoMax float64 = 60.0 * 1000 // msec
defaultRTOMax float64 = 60.0 * 1000 // msec
rtoAlpha float64 = 0.125
rtoBeta float64 = 0.25
maxInitRetrans uint = 8
Expand All @@ -28,13 +28,19 @@ type rtoManager struct {
rto float64
noUpdate bool
mutex sync.RWMutex
rtoMax float64
}

// newRTOManager creates a new rtoManager.
func newRTOManager() *rtoManager {
return &rtoManager{
rto: rtoInitial,
func newRTOManager(rtoMax float64) *rtoManager {
mgr := rtoManager{
rto: rtoInitial,
rtoMax: rtoMax,
}
if mgr.rtoMax == 0 {
mgr.rtoMax = defaultRTOMax
}
return &mgr
}

// setNewRTT takes a newly measured RTT then adjust the RTO in msec.
Expand All @@ -55,7 +61,7 @@ func (m *rtoManager) setNewRTT(rtt float64) float64 {
m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt))
m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt
}
m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), rtoMax)
m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), m.rtoMax)
return m.srtt
}

Expand Down Expand Up @@ -106,19 +112,27 @@ type rtxTimer struct {
stopFunc stopTimerLoop
closed bool
mutex sync.RWMutex
rtoMax float64
}

type stopTimerLoop func()

// newRTXTimer creates a new retransmission timer.
// if maxRetrans is set to 0, it will keep retransmitting until stop() is called.
// (it will never make onRetransmissionFailure() callback.
func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint) *rtxTimer {
return &rtxTimer{
func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint,
rtoMax float64,
) *rtxTimer {
timer := rtxTimer{
id: id,
observer: observer,
maxRetrans: maxRetrans,
rtoMax: rtoMax,
}
if timer.rtoMax == 0 {
timer.rtoMax = defaultRTOMax
}
return &timer
}

// start starts the timer.
Expand Down Expand Up @@ -148,7 +162,7 @@ func (t *rtxTimer) start(rto float64) bool {
canceling := false

for !canceling {
timeout := calculateNextTimeout(rto, nRtos)
timeout := calculateNextTimeout(rto, nRtos, t.rtoMax)
timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)

select {
Expand Down Expand Up @@ -208,7 +222,7 @@ func (t *rtxTimer) isRunning() bool {
return (t.stopFunc != nil)
}

func calculateNextTimeout(rto float64, nRtos uint) float64 {
func calculateNextTimeout(rto float64, nRtos uint, rtoMax float64) float64 {
// RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
// E2) For the destination address for which the timer expires, set RTO
// <- RTO * 2 ("back off the timer"). The maximum value discussed
Expand Down
49 changes: 30 additions & 19 deletions rtx_timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestRTOManager(t *testing.T) {
t.Run("initial values", func(t *testing.T) {
m := newRTOManager()
m := newRTOManager(0)
assert.Equal(t, rtoInitial, m.rto, "should be rtoInitial")
assert.Equal(t, rtoInitial, m.getRTO(), "should be rtoInitial")
assert.Equal(t, float64(0), m.srtt, "should be 0")
Expand All @@ -23,7 +23,7 @@ func TestRTOManager(t *testing.T) {

t.Run("RTO calculation (small RTT)", func(t *testing.T) {
var rto float64
m := newRTOManager()
m := newRTOManager(0)
exp := []int32{
1800,
1500,
Expand All @@ -41,7 +41,7 @@ func TestRTOManager(t *testing.T) {

t.Run("RTO calculation (large RTT)", func(t *testing.T) {
var rto float64
m := newRTOManager()
m := newRTOManager(0)
exp := []int32{
60000, // capped at RTO.Max
60000, // capped at RTO.Max
Expand All @@ -59,22 +59,33 @@ func TestRTOManager(t *testing.T) {

t.Run("calculateNextTimeout", func(t *testing.T) {
var rto float64
rto = calculateNextTimeout(1.0, 0)
rto = calculateNextTimeout(1.0, 0, defaultRTOMax)
assert.Equal(t, float64(1), rto, "should match")
rto = calculateNextTimeout(1.0, 1)
rto = calculateNextTimeout(1.0, 1, defaultRTOMax)
assert.Equal(t, float64(2), rto, "should match")
rto = calculateNextTimeout(1.0, 2)
rto = calculateNextTimeout(1.0, 2, defaultRTOMax)
assert.Equal(t, float64(4), rto, "should match")
rto = calculateNextTimeout(1.0, 30)
rto = calculateNextTimeout(1.0, 30, defaultRTOMax)
assert.Equal(t, float64(60000), rto, "should match")
rto = calculateNextTimeout(1.0, 63)
rto = calculateNextTimeout(1.0, 63, defaultRTOMax)
assert.Equal(t, float64(60000), rto, "should match")
rto = calculateNextTimeout(1.0, 64)
rto = calculateNextTimeout(1.0, 64, defaultRTOMax)
assert.Equal(t, float64(60000), rto, "should match")
})
t.Run("calculateNextTimeout w/ RTOMax", func(t *testing.T) {
var rto float64
rto = calculateNextTimeout(1.0, 0, 2.0)
assert.Equal(t, 1.0, rto, "should match")
rto = calculateNextTimeout(1.5, 1, 2.0)
assert.Equal(t, 2.0, rto, "should match")
rto = calculateNextTimeout(1.0, 10, 2.0)
assert.Equal(t, 2.0, rto, "should match")
rto = calculateNextTimeout(1.0, 31, 1000.0)
assert.Equal(t, 1000.0, rto, "should match")
})

t.Run("reset", func(t *testing.T) {
m := newRTOManager()
m := newRTOManager(0)
for i := 0; i < 10; i++ {
m.setNewRTT(200)
}
Expand Down Expand Up @@ -118,7 +129,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

assert.False(t, rt.isRunning(), "should not be running")

Expand All @@ -144,7 +155,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

interval := float64(30.0)
ok := rt.start(interval)
Expand All @@ -171,7 +182,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

interval := float64(30.0)
ok := rt.start(interval)
Expand All @@ -194,7 +205,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

interval := float64(30.0)
ok := rt.start(interval)
Expand All @@ -221,7 +232,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

for i := 0; i < 1000; i++ {
ok := rt.start(30)
Expand Down Expand Up @@ -253,7 +264,7 @@ func TestRtxTimer(t *testing.T) {
t.Logf("onRtxFailure: elapsed=%.03f\n", elapsed)
doneCh <- true
},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

// RTO(msec) Total(msec)
// 10 10 1st RTO
Expand Down Expand Up @@ -297,7 +308,7 @@ func TestRtxTimer(t *testing.T) {
onRtxFailure: func(id int) {
assert.Fail(t, "timer should not fail")
},
}, 0)
}, 0, 0)

// RTO(msec) Total(msec)
// 10 10 1st RTO
Expand Down Expand Up @@ -332,7 +343,7 @@ func TestRtxTimer(t *testing.T) {
doneCh <- true
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

for i := 0; i < 10; i++ {
rt.stop()
Expand All @@ -355,7 +366,7 @@ func TestRtxTimer(t *testing.T) {
rtoCount++
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

ok := rt.start(20)
assert.True(t, ok, "should be accepted")
Expand Down
Loading