-
Notifications
You must be signed in to change notification settings - Fork 205
/
Copy pathring-writer.go
139 lines (125 loc) · 3.09 KB
/
ring-writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package util
import (
"sync/atomic"
)
type emptyLocker struct{}
func (emptyLocker) Lock() {}
func (emptyLocker) Unlock() {}
var EmptyLocker emptyLocker
type IDataFrame[T any] interface {
Reset() // 重置数据,复用内存
Ready() // 标记为可读取
ReaderEnter() // 读取者数量+1
ReaderTryEnter() bool // 尝试读取
ReaderLeave() // 读取者数量-1
StartWrite() bool // 开始写入
SetSequence(uint32) // 设置序号
GetSequence() uint32 // 获取序号
IsDiscarded() bool // 是否已废弃
}
type RingWriter[T any, F IDataFrame[T]] struct {
*Ring[F] `json:"-" yaml:"-"`
ReaderCount atomic.Int32 `json:"-" yaml:"-"`
pool *Ring[F]
poolSize int
Size int
LastValue F
constructor func() F
disposeFlag atomic.Int32
}
func (rb *RingWriter[T, F]) create(n int) (ring *Ring[F]) {
ring = NewRing[F](n)
for p, i := ring, n; i > 0; p, i = p.Next(), i-1 {
p.Value = rb.constructor()
}
return
}
func (rb *RingWriter[T, F]) Init(n int, constructor func() F) *RingWriter[T, F] {
rb.constructor = constructor
rb.Ring = rb.create(n)
rb.Size = n
rb.LastValue = rb.Value
rb.Value.StartWrite()
return rb
}
func (rb *RingWriter[T, F]) Glow(size int) (newItem *Ring[F]) {
if size < rb.poolSize {
newItem = rb.pool.Unlink(size)
rb.poolSize -= size
} else if size == rb.poolSize {
newItem = rb.pool
rb.poolSize = 0
rb.pool = nil
} else {
newItem = rb.create(size - rb.poolSize).Link(rb.pool)
rb.poolSize = 0
rb.pool = nil
}
rb.Link(newItem)
rb.Size += size
return
}
func (rb *RingWriter[T, F]) recycle(r *Ring[F]) {
if rb.pool == nil {
rb.pool = r
} else {
rb.pool.Link(r)
}
}
func (rb *RingWriter[T, F]) Reduce(size int) {
p := rb.Unlink(size)
pSize := size
rb.Size -= size
// 遍历即将回收的节点,如果有读锁未释放,则丢弃,不回收该节点
for i := 0; i < size; i++ {
if !p.Value.IsDiscarded() && p.Value.StartWrite() { // 尝试加写锁,成功则说明该节点可正常回收
p.Value.Reset()
p.Value.Ready()
rb.poolSize++
} else {
p.Value.Reset()
if pSize == 1 {
// last one,无法删除最后一个节点,直接返回即可(不回收)
return
}
p = p.Prev()
p.Unlink(1) // 丢弃该节点,不回收
pSize--
}
p = p.Next()
}
rb.recycle(p)
}
func (rb *RingWriter[T, F]) Dispose() {
if rb.disposeFlag.Add(-2) == -2 {
rb.Value.Ready()
}
}
func (rb *RingWriter[T, F]) Step() (normal bool) {
if !rb.disposeFlag.CompareAndSwap(0, 1) {
// already disposed
return
}
rb.LastValue = rb.Value
nextSeq := rb.LastValue.GetSequence() + 1
next := rb.Next()
if normal = next.Value.StartWrite(); normal {
next.Value.Reset()
rb.Ring = next
} else {
rb.Reduce(1) //抛弃还有订阅者的节点
rb.Ring = rb.Glow(1) //补充一个新节点
if !rb.Value.StartWrite() {
panic("can't start write")
}
}
rb.Value.SetSequence(nextSeq)
rb.LastValue.Ready()
if !rb.disposeFlag.CompareAndSwap(1, 0) {
rb.Value.Ready()
}
return
}
func (rb *RingWriter[T, F]) GetReaderCount() int32 {
return rb.ReaderCount.Load()
}