-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmart_ptr.go
185 lines (156 loc) · 6.58 KB
/
smart_ptr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// Observation, make use of this local-global pattern to minimize locking when reading is not much useful in go.
// Because, locking is incurred at the first read for a goroutine, and goroutine has a short lifetime in go, e.g.
// as short as a HTTP request, as short as a TCP connection.
// However, this patten works perfectly in languages having an idea of thread-pool, like cpp and Java. In such
// language, a thread is long living and might serve many requests.
// ImmRscPtr is used to safely and efficently share a mult-version immutable resource that needs to be cleaned up
// whenever not used by anyone (e.g. a snapshot in DB) across multiple goroutines. It has the following features:
//
// (1) If the call of UpdateResource() in a writer happens-before the call of GetResource() in a reader
// then the reader sees what the writer puts in.
// (2) If the ImmutableResource has not been changed since the last GetResource(), then GetResource() has a cost
// of only two atomic operations.
// (3) As long as the caller of GetResource() manages to call Unref() on the returning ImmRscPtr, then no ImmutableResource
// will be leaked. By saying leaked, we means that Delete() is never be called on an obsolute ImmutableResource even
// there is no one using it.
// (3) As long as the caller of GetResource() does not call Unref() on the returning ImmRscPtr, then it will
// not be Delete() even it becomes obsolute
type ImmutableResource interface {
Delete()
}
var maxGID int32 = -1
var gLocalImmRscHandles [1024]*ImmRscHandle
var kInuse *ImmRscHandle = newImmRscHandle(nil)
func atomicSwapGLocalImmRscHandle(gID int32, new *ImmRscHandle) *ImmRscHandle {
gLocalImmRscHandles[gID] = new
return (*ImmRscHandle)(atomic.SwapPointer((*unsafe.Pointer)((unsafe.Pointer)(&gLocalImmRscHandles[gID])), unsafe.Pointer(new)))
}
func atomicCmpAndSwapGLocalImmRscHandles(gID int32, old, new *ImmRscHandle) bool {
if old == gLocalImmRscHandles[gID] {
gLocalImmRscHandles[gID] = new
return true
}
return atomic.CompareAndSwapPointer((*unsafe.Pointer)((unsafe.Pointer)(&gLocalImmRscHandles[gID])), unsafe.Pointer(old), unsafe.Pointer(new))
}
// each goroutine can call this function at most once
func AllocateGLocalImmRscHandle() int32 {
newId := atomic.AddInt32(&maxGID, 1) // goroutine id starts from 0
gLocalImmRscHandles[newId] = nil
return newId
}
var latestImmRscHandle *ImmRscHandle = newImmRscHandle(nil)
var latestImmRscHandleMutex sync.Mutex
type ImmRscHandleWrap struct {
*ImmRscHandle
mightPaasToOtherGoroutine bool
}
// If `mightShare`, caller is allowed to call Ref() to make a copy of the ownership
// and then paas the copy to other goroutines. If caller uses in this way, then it must use Unref() to
// release the ownership for evey copy.
//
// Otherwise, caller can not
// 1. call Ref()
// 2. call GetResouce() again before DoneUsingResource()
// 3. share the handle with other goroutines
// Caller must call DoneUsingResource() to release the ownership
//
// The first way is a user-friendly one but it is slower.
func GetResouce(gID int32, mightShare bool) *ImmRscHandle {
if gID < 0 || gID > maxGID {
panic("unallocated goroutine ID")
}
// gLocalImmRscHandles[gID] is shared between this goroutine and the writer(there can be one writer only at any given time)
// so this is a one-reader-one-writer conflict. We let them compete by conducting swap instruction so that the winer sees
// the original value while the loser sees the special mark(kInuse or nil).
//
// This is also why we have eventual consistency.
// Assuming UpdateResouce() is called once and called simultaneously with this func.
// If this goroutine swaps before the writer, this func returns the second latest version.
// Otherwise, this func returns the latest version.
var res *ImmRscHandle
local := atomicSwapGLocalImmRscHandle(gID, kInuse) // A
if local == nil { // This is the first time for the current goroutine to call this func or `globalResouce` has been updated
latestImmRscHandleMutex.Lock()
res = latestImmRscHandle.Ref() // Ref() can only be called with mutex held, otherwise, it might race with Unref()
latestImmRscHandleMutex.Unlock()
} else if local == kInuse {
panic("gLocalImmRscHandles[" + fmt.Sprint(gID) + "] must be either a valid ptr or a nil set by writer")
} else {
res = local
}
if mightShare {
// Make a copy and then return to local store, otherwise we have to lock the mutex to
// read the global `latestImmRscPtr` at the next call
if !atomicCmpAndSwapGLocalImmRscHandles(gID, kInuse, res.Ref()) {
// Failed due to the local ptr(gLocalImmRscHandles[gID]) has been changed to nil by writer since A,
// then we rather than the next writer are responsible for Unref()
res.Unref()
}
// else the next writer will Unref() when it invalidates our local ptr
}
return res
}
func DoneUsingResource(gID int32, gotFromLocal *ImmRscHandle) {
if !atomicCmpAndSwapGLocalImmRscHandles(gID, kInuse, gotFromLocal) {
gotFromLocal.Unref()
}
// else the next writer will Unref() when it invalidates our local ptr
}
// can be called by any goroutine without any synchronization
func UpdateResouce(r ImmutableResource) {
latestImmRscHandleMutex.Lock()
old := latestImmRscHandle
latestImmRscHandle = newImmRscHandle(r) // overwritten with mutex held and before invalidate local ptrs
for i := 0; i <= int(maxGID); i ++ {
local := atomicSwapGLocalImmRscHandle(int32(i), nil)
if local != kInuse && local != nil {
if old != local {
panic("gLocalImmRscHandles[" + fmt.Sprint(i) + "] does not hold the latest version")
}
if local.Unref() { // This could not be the last reference, see the last line of this function
panic("bad refcnt")
}
}
}
latestImmRscHandleMutex.Unlock()
// this might call Resouce::Delete(), which might be time-consuming
// therefore we call it here without mutex held
old.Unref()
}
// unexported stuffs:
func newImmRscHandle(rsc ImmutableResource) *ImmRscHandle {
return &ImmRscHandle{
refcnt: 1,
R: rsc,
}
}
// ImmRscHandle must be allocated in heap and can not be copied using = operator
type ImmRscHandle struct {
refcnt int32
R ImmutableResource
}
// The goroutine create this ImmRscHandle is the first owner of the underlying resource
// Owner is allowed to call Ref() without any external synchronization
func (p *ImmRscHandle) Ref() *ImmRscHandle {
if atomic.AddInt32(&p.refcnt, 1) <= 0 {
panic("bad refcnt")
}
return p
}
func (p *ImmRscHandle) Unref() (deleted bool) {
after := atomic.AddInt32(&p.refcnt, -1)
if after == 0 {
p.R.Delete()
return true
} else if after < 0 {
panic("bad refcnt")
}
return false
}