Skip to content

Commit

Permalink
Add NADA implementation
Browse files Browse the repository at this point in the history
This change adds the NADA congestion control implementation.
The binding to cc is left TODO.
  • Loading branch information
kevmo314 committed Apr 2, 2022
1 parent 09051cd commit a1771d6
Show file tree
Hide file tree
Showing 10 changed files with 611 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/nada/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# [RFC 8698] NADA: A Unified Congestion Control Scheme for Real-Time Media

Notes:

* The receiver in this implementation assumes a monotonically ordered sequence of packets.
97 changes: 97 additions & 0 deletions pkg/nada/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package rfc8698

import "time"

type Bits uint32

type BitsPerSecond float64

const Kbps = BitsPerSecond(1_000)
const Mbps = BitsPerSecond(1_000_000)

type Config struct {
// Weight of priority of the flow
Priority float64
// Minimum rate of the application supported by the media encoder
MinimumRate BitsPerSecond // RMIN
// Maximum rate of the application supported by media encoder
MaximumRate BitsPerSecond // RMAX
// Reference congestion level
ReferenceCongestionLevel time.Duration // XREF
// Scaling parameter for gradual rate update calculation
κ float64
// Scaling parameter for gradual rate update calculation
η float64
// Upper bound of RTT in gradual rate update calculation
τ time.Duration
// Target feedback interval
δ time.Duration

// Observation window in time for calculating packet summary statistics at receiver
LogWindow time.Duration // LOGWIN
// Threshold for determining queuing delay build up at receiver
QueueingDelayThreshold time.Duration
// Bound on filtering delay
FilteringDelay time.Duration // DFILT
// Upper bound on rate increase ratio for accelerated ramp-up
γ_max float64
// Upper bound on self-inflicted queueing delay during ramp up
QueueBound time.Duration // QBOUND

// Multiplier for self-scaling the expiration threshold of the last observed loss
// (loss_exp) based on measured average loss interval (loss_int)
LossMultiplier float64 // MULTILOSS
// Delay threshold for invoking non-linear warping
DelayThreshold time.Duration // QTH
// Scaling parameter in the exponent of non-linear warping
λ float64

// Reference packet loss ratio
ReferencePacketLossRatio float64 // PLRREF
// Reference packet marking ratio
ReferencePacketMarkingRatio float64 // PMRREF
// Reference delay penalty for loss when lacket loss ratio is at least PLRREF
ReferenceDelayLoss time.Duration // DLOSS
// Reference delay penalty for ECN marking when packet marking is at PMRREF
ReferenceDelayMarking time.Duration // DMARK

// Frame rate of incoming video
FrameRate float64 // FRAMERATE
// Scaling parameter for modulating outgoing sending rate
β_s float64
// Scaling parameter for modulating video encoder target rate
β_v float64
// Smoothing factor in exponential smoothing of packet loss and marking rate
α float64
}

var DefaultConfig = Config{
Priority: 1.0,
MinimumRate: 150 * Kbps,
MaximumRate: 1500 * Kbps,
ReferenceCongestionLevel: 10 * time.Millisecond,
κ: 0.5,
η: 2.0,
τ: 500 * time.Millisecond,
δ: 100 * time.Millisecond,

LogWindow: 500 * time.Millisecond,
QueueingDelayThreshold: 10 * time.Millisecond,
FilteringDelay: 120 * time.Millisecond,
γ_max: 0.5,
QueueBound: 50 * time.Millisecond,

LossMultiplier: 7.0,
DelayThreshold: 50 * time.Millisecond,
λ: 0.5,

ReferencePacketLossRatio: 0.01,
ReferencePacketMarkingRatio: 0.01,
ReferenceDelayLoss: 10 * time.Millisecond,
ReferenceDelayMarking: 2 * time.Millisecond,

FrameRate: 30.0,
β_s: 0.1,
β_v: 0.1,
α: 0.1,
}
20 changes: 20 additions & 0 deletions pkg/nada/ecn/ecn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ecn

import (
"errors"
"syscall"
)

// CheckExplicitCongestionNotification checks if the given oob data includes an ECN bit set.
func CheckExplicitCongestionNotification(oob []byte) (uint8, error) {
ctrlMsgs, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return 0, err
}
for _, ctrlMsg := range ctrlMsgs {
if ctrlMsg.Header.Type == syscall.IP_TOS {
return (ctrlMsg.Data[0] & 0x3), nil
}
}
return 0, errors.New("no ECN control message")
}
10 changes: 10 additions & 0 deletions pkg/nada/ecn/ecn_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ecn

import (
"net"
)

// EnableExplicitCongestionNotification enables ECN on the given connection.
func EnableExplicitCongestionNotification(conn *net.UDPConn) {
// noop.
}
17 changes: 17 additions & 0 deletions pkg/nada/ecn/ecn_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ecn

import (
"net"
"reflect"
"syscall"
)

// EnableExplicitCongestionNotification enables ECN on the given connection.
func EnableExplicitCongestionNotification(conn *net.UDPConn) {
ptrVal := reflect.ValueOf(*conn)
fdmember := reflect.Indirect(ptrVal).FieldByName("fd")
pfdmember := reflect.Indirect(fdmember).FieldByName("pfd")
netfdmember := reflect.Indirect(pfdmember).FieldByName("Sysfd")
fd := int(netfdmember.Int())
syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_RECVTOS, 1)
}
104 changes: 104 additions & 0 deletions pkg/nada/packet_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package rfc8698

import (
"errors"
"fmt"
"sync"
"time"

"github.com/muxable/rtptools/pkg/x_range"
)

type packet struct {
ts time.Time
seq uint16
ecn bool
size Bits
queueingDelay bool
}

// String returns a string representation of the packet.
func (p *packet) String() string {
return fmt.Sprintf("%v@%v", p.seq, p.ts.Nanosecond()%1000)
}

type packetStream struct {
sync.Mutex

window time.Duration
packets []*packet
markCount uint16
totalSize Bits
queueingDelayCount uint16

tail uint16
}

func newPacketStream(window time.Duration) *packetStream {
return &packetStream{
window: window,
}
}

var errTimeOrder = errors.New("invalid packet timestamp ordering")

// add writes a packet to the underlying stream.
func (ps *packetStream) add(ts time.Time, seq uint16, ecn bool, size Bits, queueingDelay bool) error {
ps.Lock()
defer ps.Unlock()

if len(ps.packets) > 0 && ps.packets[len(ps.packets)-1].ts.After(ts) {
return errTimeOrder
}
// check if the packet seq already exists.
for _, p := range ps.packets {
if p.seq == seq {
return errTimeOrder
}
}
ps.packets = append(ps.packets, &packet{
ts: ts,
seq: seq,
ecn: ecn,
size: size,
queueingDelay: queueingDelay,
})
if ecn {
ps.markCount++
}
ps.totalSize += size
if queueingDelay {
ps.queueingDelayCount++
}
return nil
}

// prune removes packets that are older than the window and returns the loss and marking rate.
func (ps *packetStream) prune(now time.Time) (loss float64, marking float64, receivingRate BitsPerSecond, hasQueueingDelay bool) {
ps.Lock()
defer ps.Unlock()

startTs := now.Add(-ps.window)
start := 0
for ; start < len(ps.packets) && ps.packets[start].ts.Before(startTs); start++ {
// decrement mark count if ecn.
if ps.packets[start].ecn {
ps.markCount--
}
ps.totalSize -= ps.packets[start].size
if ps.packets[start].queueingDelay {
ps.queueingDelayCount--
}
}
if start > 0 {
ps.packets = ps.packets[start:]
}
seqs := make([]uint16, len(ps.packets))
for i, p := range ps.packets {
seqs[i] = p.seq
}
begin, end := x_range.GetSeqRange(seqs)
loss = 1 - float64(len(ps.packets))/float64(end-begin+1)
marking = float64(ps.markCount) / float64(end-begin+1)
return loss, marking, BitsPerSecond(float64(ps.totalSize) / ps.window.Seconds()), ps.queueingDelayCount > 0
}
124 changes: 124 additions & 0 deletions pkg/nada/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package rfc8698

import (
"math"
"time"
)

type Receiver struct {
config Config
BaselineDelay time.Duration // d_base
EstimatedQueuingDelay time.Duration // d_queue
EstimatedPacketLossRatio float64
EstimatedPacketECNMarkingRatio float64
ReceivingRate BitsPerSecond
LastTimestamp time.Time
CurrentTimestamp time.Time
RecommendedRateAdaptionMode RateAdaptionMode

packetStream *packetStream
}

func NewReceiver(now time.Time, config Config) *Receiver {
return &Receiver{
config: config,
BaselineDelay: time.Duration(1<<63 - 1),
EstimatedPacketLossRatio: 0.0,
EstimatedPacketECNMarkingRatio: 0.0,
ReceivingRate: 0.0,
LastTimestamp: now,
CurrentTimestamp: now,
packetStream: newPacketStream(config.LogWindow),
}
}

// OnReceiveMediaPacket implements the media receive algorithm.
func (r *Receiver) OnReceiveMediaPacket(now time.Time, sent time.Time, seq uint16, ecn bool, size Bits) error {
// obtain current timestamp t_curr from system clock
r.CurrentTimestamp = now

// obtain from packet header sending time stamp t_sent
t_sent := sent

// obtain one-way delay measurement: d_fwd = t_curr - t_sent
d_fwd := r.CurrentTimestamp.Sub(t_sent)

// update baseline delay: d_base = min(d_base, d_fwd)
if d_fwd < r.BaselineDelay {
r.BaselineDelay = d_fwd
}

// update queuing delay: d_queue = d_fwd - d_base
r.EstimatedQueuingDelay = d_fwd - r.BaselineDelay

if err := r.packetStream.add(now, seq, ecn, size, r.EstimatedQueuingDelay > r.config.QueueingDelayThreshold); err != nil {
return err
}

p_loss_inst, p_mark_inst, r_recv_inst, hasQueueingDelay := r.packetStream.prune(now)

// update packet loss ratio estimate p_loss
// r.config.α*p_loss_inst + (1-r.config.α)*r.EstimatedPacketLossRatio
r.EstimatedPacketLossRatio = r.config.α*(p_loss_inst-r.EstimatedPacketLossRatio) + r.EstimatedPacketLossRatio

// update packet marking ratio estimate p_mark
// r.config.α*p_mark_inst + (1-r.config.α)*r.EstimatedPacketECNMarkingRatio
r.EstimatedPacketECNMarkingRatio = r.config.α*(p_mark_inst-r.EstimatedPacketECNMarkingRatio) + r.EstimatedPacketECNMarkingRatio

// update measurement of receiving rate r_recv
r.ReceivingRate = r_recv_inst

// update recommended rate adaption mode.
if p_loss_inst == 0 && !hasQueueingDelay {
r.RecommendedRateAdaptionMode = RateAdaptionModeAcceleratedRampUp
} else {
r.RecommendedRateAdaptionMode = RateAdaptionModeGradualUpdate
}

return nil
}

// BuildFeedbackPacket creates a new feedback packet.
func (r *Receiver) BuildFeedbackReport() *FeedbackReport {
// calculate non-linear warping of delay d_tilde if packet loss exists
equivalentDelay := r.equivalentDelay()

// calculate current aggregate congestion signal x_curr
aggregatedCongestionSignal := equivalentDelay +
scale(r.config.ReferenceDelayMarking, math.Pow(r.EstimatedPacketECNMarkingRatio/r.config.ReferencePacketMarkingRatio, 2)) +
scale(r.config.ReferenceDelayLoss, math.Pow(r.EstimatedPacketLossRatio/r.config.ReferencePacketLossRatio, 2))

// determine mode of rate adaptation for sender: rmode
rmode := r.RecommendedRateAdaptionMode

// update t_last = t_curr
r.LastTimestamp = r.CurrentTimestamp

// send feedback containing values of: rmode, x_curr, and r_recv
return &FeedbackReport{
RecommendedRateAdaptionMode: rmode,
AggregatedCongestionSignal: aggregatedCongestionSignal,
ReceivingRate: r.ReceivingRate,
}
}

func scale(t time.Duration, x float64) time.Duration {
return time.Duration(float64(t) * x)
}

// d_tilde computes d_tilde as described by
//
// / d_queue, if d_queue<QTH;
// |
// d_tilde = < (1)
// | (d_queue-QTH)
// \ QTH exp(-LAMBDA ---------------) , otherwise.
// QTH
//
func (r *Receiver) equivalentDelay() time.Duration {
if r.EstimatedQueuingDelay < r.config.DelayThreshold {
return r.EstimatedQueuingDelay
}
scaling := math.Exp(-r.config.λ * float64((r.EstimatedQueuingDelay-r.config.DelayThreshold)/r.config.DelayThreshold))
return scale(r.config.DelayThreshold, scaling)
}
Loading

0 comments on commit a1771d6

Please sign in to comment.