-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcounter.go
130 lines (108 loc) · 2.89 KB
/
counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package raccoon
import (
"context"
"encoding/binary"
"sync"
)
const key = "i"
const kvPrefix = "/counter"
// NewCounterStore returns a new counter store
func NewCounterStore(prefix string, kv KVStore, logger Logger) CounterStore {
if prefix != "" {
kv = NewWrapperKV(kv, []byte(prefix))
}
kv = NewWrapperKV(kv, []byte(kvPrefix))
return CounterStore{
kv: kv,
logger: logger,
prefix: prefix,
lock: sync.Mutex{},
}
}
// CounterStore wraps a KV Store and creates a monotomically increasing counter
type CounterStore struct {
kv KVStore
logger Logger
prefix string
lock sync.Mutex
}
func (r *CounterStore) getStore() KVStore {
return r.kv
}
// GetFree returns the next free number in the counter
func (r *CounterStore) GetNext(ctx context.Context) (uint64, error) {
kv := r.getStore()
var currID uint64 = 0
counter, err := kv.Get([]byte(key))
if err != nil {
return 0, err
}
if counter != nil {
currID = r.decode(counter)
}
freeID := currID + 1
return freeID, nil
}
// Increment updates the counter to the next free number
func (r *CounterStore) setCounter(ctx context.Context, counter uint64) error {
kv := r.getStore()
err := kv.Set([]byte(key), r.encode(counter))
if err != nil {
return err
}
return nil
}
// encode maps a uin64 to a big endian byte slice
func (r *CounterStore) encode(counter uint64) []byte {
buff := make([]byte, 8)
binary.BigEndian.PutUint64(buff, counter)
return buff
}
// decode converts a big endian byte slice into a uint64
func (r *CounterStore) decode(counter []byte) uint64 {
return binary.BigEndian.Uint64(counter)
}
// Increment increments the counter by 1
func (r *CounterStore) Increment(ctx context.Context) error {
_, err := r.GetNextAndIncrement(ctx)
return err
}
// GetNextAndIncrement atomically gets the next free counter and increments it
func (r *CounterStore) GetNextAndIncrement(ctx context.Context) (uint64, error) {
free, err := r.GetNext(ctx)
if err != nil {
return 0, err
}
err = r.setCounter(ctx, free)
if err != nil {
return 0, err
}
return free, nil
}
// Acquire does a blocking call to acquire exclusive rights to CounteStore.
// Acquire MUST be followed by a defer call to the returned Releaser `Release`.
// Failing to do so means the store will be indefinitely locked,
// potentially causing a deadlock.
func (r *CounterStore) Acquire() *Releaser {
r.lock.Lock()
r.logger.Debug("counter store %v locked", r)
return &Releaser{
called: false,
store: r,
}
}
// Releaser models a one-shot callback which can be used to release a store
type Releaser struct {
store *CounterStore
called bool
}
// Release frees up a CounterStore to be used by some other thread.
// Calling release more than once will result in a panic.
func (r *Releaser) Release() {
if r.called {
panic("Releaser was previously released!")
}
r.store.lock.Unlock()
r.called = true
r.store.logger.Debug("counter store %v unlocked", r)
}