This repository has been archived by the owner on May 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 344
/
Copy pathring_mutex.go
115 lines (96 loc) · 2.53 KB
/
ring_mutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package sync
import (
"sync"
"hash"
"encoding/binary"
"github.com/OneOfOne/xxhash"
)
type RingMutex struct {
mutexes []sync.RWMutex
values []Value
hash func(address []byte) uint64
mutexCount uint64
}
type Value struct {
set bool
value interface{}
}
func (v *Value) IsSet() bool {
return v.set
}
func (v *Value) Set(value interface{}) {
v.value = value
v.set = true
}
func (v *Value) Get() interface{} {
return v.value
}
// Create a RW mutex that provides a pseudo-independent set of mutexes for addresses
// where the address space is mapped into possibly much smaller set of backing
// mutexes using the xxhash (non-cryptographic)
// hash function // modulo size. If some addresses collide modulo size they will be unnecessary
// contention between those addresses, but you can trade space against contention
// as desired.
func NewRingMutex(mutexCount int, hashMaker func() hash.Hash64) *RingMutex {
ringMutex := &RingMutex{
mutexCount: uint64(mutexCount),
// max slice length is bounded by max(int) thus the argument type
mutexes: make([]sync.RWMutex, mutexCount),
values: make([]Value, mutexCount),
hash: func(address []byte) uint64 {
buf := make([]byte, 8)
copy(buf, address)
return binary.LittleEndian.Uint64(buf)
},
}
if hashMaker != nil {
hasherPool := &sync.Pool{
New: func() interface{} {
return hashMaker()
},
}
ringMutex.hash = func(address []byte) uint64 {
h := hasherPool.Get().(hash.Hash64)
defer func() {
h.Reset()
hasherPool.Put(h)
}()
h.Write(address)
return h.Sum64()
}
}
return ringMutex
}
func NewRingMutexNoHash(mutexCount int) *RingMutex {
return NewRingMutex(mutexCount, nil)
}
func NewRingMutexXXHash(mutexCount int) *RingMutex {
return NewRingMutex(mutexCount, func() hash.Hash64 {
return xxhash.New64()
})
}
func (mtx *RingMutex) Lock(address []byte) (value *Value) {
index := mtx.index(address)
mtx.mutexes[index].Lock()
return &mtx.values[index]
}
func (mtx *RingMutex) Unlock(address []byte) {
index := mtx.index(address)
mtx.mutexes[index].Unlock()
}
func (mtx *RingMutex) RLock(address []byte) {
mtx.Mutex(address).RLock()
}
func (mtx *RingMutex) RUnlock(address []byte) {
mtx.Mutex(address).RUnlock()
}
// Return the size of the underlying array of mutexes
func (mtx *RingMutex) MutexCount() uint64 {
return mtx.mutexCount
}
func (mtx *RingMutex) Mutex(address []byte) *sync.RWMutex {
return &mtx.mutexes[mtx.index(address)]
}
func (mtx *RingMutex) index(address []byte) uint64 {
return mtx.hash(address) % mtx.mutexCount
}