-
Notifications
You must be signed in to change notification settings - Fork 205
/
Copy pathreorder.go
77 lines (72 loc) · 1.65 KB
/
reorder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package util
var RTPReorderBufferLen uint16 = 50
// RTPReorder RTP包乱序重排
type RTPReorder[T comparable] struct {
lastSeq uint16 //最新收到的rtp包序号
queue []T // 缓存队列,0号元素位置代表lastReq+1,永远保持为空
Total uint32 // 总共收到的包数量
Drop uint32 // 丢弃的包数量
}
func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) {
p.Total++
// 初始化
if len(p.queue) == 0 {
p.lastSeq = seq
p.queue = make([]T, RTPReorderBufferLen)
return v
}
if seq < p.lastSeq && p.lastSeq-seq < 0x8000 {
// 旧的包直接丢弃
p.Drop++
return
}
delta := seq - p.lastSeq
if delta == 0 {
// 重复包
p.Drop++
return
}
if delta == 1 {
// 正常顺序,无需缓存
p.lastSeq = seq
p.pop()
return v
}
if RTPReorderBufferLen < delta {
//超过缓存最大范围,无法挽回,只能造成丢包(序号断裂)
for {
p.lastSeq++
delta--
head := p.pop()
// 可以放得进去了
if delta == RTPReorderBufferLen {
p.queue[RTPReorderBufferLen-1] = v
p.queue[0] = result
return head
} else if head != result {
p.Drop++
}
}
} else {
// 出现后面的包先到达,缓存起来
p.queue[delta-1] = v
return
}
}
func (p *RTPReorder[T]) pop() (result T) {
copy(p.queue, p.queue[1:]) //整体数据向前移动一位,保持0号元素代表lastSeq+1
p.queue[RTPReorderBufferLen-1] = result
return p.queue[0]
}
// Pop 从缓存中取出一个包,需要连续调用直到返回nil
func (p *RTPReorder[T]) Pop() (result T) {
if len(p.queue) == 0 {
return
}
if next := p.queue[0]; next != result {
result = next
p.lastSeq++
p.pop()
}
return
}