Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pacing interceptor #309

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/pion/interceptor

go 1.20
go 1.21

Check failure on line 3 in go.mod

View workflow job for this annotation

GitHub Actions / lint / Metadata

Invalid Go version

Found 1.21. Expected 1.20

require (
github.com/pion/logging v0.2.3
Expand Down
144 changes: 144 additions & 0 deletions pkg/pacing/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package pacing

Check failure on line 1 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

package-comments: should have a package comment (revive)

import (
"context"
"errors"
"log"
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)

type Pacer interface {

Check failure on line 15 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported type Pacer should have comment or be unexported (revive)
Budget(t time.Time) int
OnSent(t time.Time, size int)
}

type PacerFactory func() Pacer

Check failure on line 20 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported type PacerFactory should have comment or be unexported (revive)

type InterceptorFactory struct {
pf PacerFactory
opts []Option
}

type Option func(*Interceptor) error

func NewInterceptor(pf PacerFactory) (*InterceptorFactory, error) {
return &InterceptorFactory{
pf: pf,
}, nil

Check warning on line 32 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L29-L32

Added lines #L29 - L32 were not covered by tests
}

func Interval(interval time.Duration) Option {
return func(i *Interceptor) error {
i.interval = interval
return nil
}

Check warning on line 39 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L35-L39

Added lines #L35 - L39 were not covered by tests
}

func QueueSize(size int) Option {
return func(i *Interceptor) error {
i.queueSize = size
return nil
}

Check warning on line 46 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L42-L46

Added lines #L42 - L46 were not covered by tests
}

func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {

Check failure on line 49 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'id' seems to be unused, consider removing or renaming it as _ (revive)
ctx, cancel := context.WithCancel(context.Background())
i := &Interceptor{
NoOp: interceptor.NoOp{},
lock: sync.Mutex{},
log: logging.NewDefaultLoggerFactory().NewLogger("pacer_interceptor"),
interval: 5 * time.Millisecond,
pacer: f.pf(),
queue: nil,
close: ctx,
cancelFn: cancel,
}
for _, opt := range f.opts {
if err := opt(i); err != nil {
return nil, err
}

Check warning on line 64 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L49-L64

Added lines #L49 - L64 were not covered by tests
}
i.queue = make(chan packetToSend, i.queueSize)
go i.run()
return i, nil

Check warning on line 68 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L66-L68

Added lines #L66 - L68 were not covered by tests
}

type Interceptor struct {
interceptor.NoOp
lock sync.Mutex
log logging.LeveledLogger
interval time.Duration
pacer Pacer
queueSize int
queue chan packetToSend
close context.Context
cancelFn context.CancelFunc
}

type packetToSend struct {
header *rtp.Header
payload []byte
attributes interceptor.Attributes
writer interceptor.RTPWriter
}

// BindLocalStream implements interceptor.Interceptor.
func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {

Check failure on line 91 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'info' seems to be unused, consider removing or renaming it as _ (revive)
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
ch := header.Clone()
buf := make([]byte, len(payload))
if n := copy(buf, payload); n != len(payload) {
return n, errors.New("copied wrong payload length")
}
select {

Check warning on line 98 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L91-L98

Added lines #L91 - L98 were not covered by tests
case i.queue <- packetToSend{
header: &ch,
payload: buf,
attributes: attributes,
writer: writer,
}:
default:
return header.MarshalSize() + len(payload), errors.New("pacer dropped packet due to queue overflow")

Check warning on line 106 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L104-L106

Added lines #L104 - L106 were not covered by tests
}
return header.MarshalSize() + len(payload), nil

Check warning on line 108 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L108

Added line #L108 was not covered by tests
})
}

// Close implements interceptor.Interceptor.
func (i *Interceptor) Close() error {
i.cancelFn()
return nil

Check warning on line 115 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L113-L115

Added lines #L113 - L115 were not covered by tests
}

func (i *Interceptor) run() {
ticker := time.NewTicker(i.interval)
for {
select {
case <-i.close.Done():
return
case <-ticker.C:
for pkt := range i.queue {
size := pkt.header.MarshalSize() + len(pkt.payload)
now := time.Now()
budget := i.pacer.Budget(now)
if budget < size {
log.Printf("budget too small: %v, queuesize=%v", budget, len(i.queue))

Check failure on line 130 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

use of `log.Printf` forbidden by pattern `^log.(Panic|Fatal|Print)(f|ln)?$` (forbidigo)
}
n, err := pkt.writer.Write(pkt.header, pkt.payload, pkt.attributes)
if err != nil {
log.Printf("error while writing packet: %v", err)

Check failure on line 134 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

use of `log.Printf` forbidden by pattern `^log.(Panic|Fatal|Print)(f|ln)?$` (forbidigo)
}
if n != size {
log.Printf("copied wrong payload length")

Check failure on line 137 in pkg/pacing/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

use of `log.Printf` forbidden by pattern `^log.(Panic|Fatal|Print)(f|ln)?$` (forbidigo)
}
i.pacer.OnSent(now, size)

Check warning on line 139 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L118-L139

Added lines #L118 - L139 were not covered by tests
}
ticker.Reset(i.interval)

Check warning on line 141 in pkg/pacing/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/interceptor.go#L141

Added line #L141 was not covered by tests
}
}
}
62 changes: 62 additions & 0 deletions pkg/pacing/leaky_bucket_pacer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pacing

import (
"math"
"sync"
"time"
)

type LeakyBucket struct {

Check failure on line 9 in pkg/pacing/leaky_bucket_pacer.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported type LeakyBucket should have comment or be unexported (revive)
lock sync.Mutex
burstSize int
rate int
lastSent time.Time
lastBudget int
}

func NewLeakyBucketPacer(burstSize int, rate int) *LeakyBucket {

Check failure on line 17 in pkg/pacing/leaky_bucket_pacer.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported function NewLeakyBucketPacer should have comment or be unexported (revive)
return &LeakyBucket{
lock: sync.Mutex{},
burstSize: burstSize,
rate: 0,
lastSent: time.Time{},
lastBudget: 0,
}

Check warning on line 24 in pkg/pacing/leaky_bucket_pacer.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/leaky_bucket_pacer.go#L17-L24

Added lines #L17 - L24 were not covered by tests
}

func (b *LeakyBucket) setRate(rate int) {
b.lock.Lock()
defer b.lock.Unlock()

b.rate = rate

Check warning on line 31 in pkg/pacing/leaky_bucket_pacer.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/leaky_bucket_pacer.go#L27-L31

Added lines #L27 - L31 were not covered by tests
}

func (b *LeakyBucket) onSent(t time.Time, size int) {
budget := b.budget(t)

b.lock.Lock()
defer b.lock.Unlock()

if size > budget {
b.lastBudget = 0
} else {
b.lastBudget = budget - size
}
b.lastSent = t

Check warning on line 45 in pkg/pacing/leaky_bucket_pacer.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/leaky_bucket_pacer.go#L34-L45

Added lines #L34 - L45 were not covered by tests
}

func (b *LeakyBucket) budget(t time.Time) int {
b.lock.Lock()
defer b.lock.Unlock()

if b.lastSent.IsZero() {
return b.burstSize
}
td := t.Sub(b.lastSent)
budget := b.lastBudget + 8*int(float64(b.rate)*td.Seconds())
if budget < 0 {
budget = math.MaxInt
}
budget = min(budget, b.burstSize)
return budget

Check warning on line 61 in pkg/pacing/leaky_bucket_pacer.go

View check run for this annotation

Codecov / codecov/patch

pkg/pacing/leaky_bucket_pacer.go#L48-L61

Added lines #L48 - L61 were not covered by tests
}
Loading