-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathlock.go
260 lines (225 loc) · 7.31 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package backupstore
import (
"fmt"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/longhorn/backupstore/util"
)
const (
LOCKS_DIRECTORY = "locks"
LOCK_PREFIX = "lock"
LOCK_SUFFIX = ".lck"
LOCK_DURATION = time.Second * 150
LOCK_REFRESH_INTERVAL = time.Second * 60
LOCK_CHECK_WAIT_TIME = time.Second * 2
)
type LockType int
const UNTYPED_LOCK LockType = 0
const BACKUP_LOCK LockType = 1
const RESTORE_LOCK LockType = 1
const DELETION_LOCK LockType = 2
type Operation string
const BackupOperationCreateRestore Operation = "create/restore"
const BackupOperationDelete Operation = "delete"
const BackupOperationUndefined Operation = "undefined"
type FileLock struct {
Name string
Type LockType
Acquired bool
driver BackupStoreDriver
volume string
count int32
serverTime time.Time // UTC time
keepAlive chan struct{}
mutex sync.Mutex
}
func New(driver BackupStoreDriver, volumeName string, lockType LockType) (*FileLock, error) {
return &FileLock{driver: driver, volume: volumeName,
Type: lockType, Name: util.GenerateName(LOCK_PREFIX)}, nil
}
// isExpired checks whether the current lock is expired
func (lock *FileLock) isExpired() bool {
// server time is always in UTC
isExpired := time.Now().UTC().Sub(lock.serverTime) > LOCK_DURATION
return isExpired
}
func (lock *FileLock) String() string {
return fmt.Sprintf("{ volume: %v, name: %v, type: %v, acquired: %v, serverTime: %v }",
lock.volume, lock.Name, lock.Type, lock.Acquired, lock.serverTime)
}
func (lock *FileLock) canAcquire() bool {
canAcquire := true
locks := getLocksForVolume(lock.volume, lock.driver)
file := getLockFilePath(lock.volume, lock.Name)
log.WithField("lock", lock).Infof("Trying to acquire lock %v", file)
log.Infof("backupstore volume %v contains locks %v", lock.volume, locks)
for _, serverLock := range locks {
serverLockHasDifferentType := serverLock.Type != lock.Type
serverLockHasPriority := compareLocks(serverLock, lock) < 0
if serverLockHasDifferentType && serverLockHasPriority && !serverLock.isExpired() {
canAcquire = false
break
}
}
return canAcquire
}
func (lock *FileLock) Lock() error {
lock.mutex.Lock()
defer lock.mutex.Unlock()
operation := BackupOperationUndefined
if lock.Type == BACKUP_LOCK || lock.Type == RESTORE_LOCK {
operation = BackupOperationCreateRestore
} else if lock.Type == DELETION_LOCK {
operation = BackupOperationDelete
}
if lock.Acquired {
atomic.AddInt32(&lock.count, 1)
_ = saveLock(lock)
return nil
}
// we create first then retrieve all locks
// because this way if another client creates at the same time
// one of us will be first in the times array
// the servers modification time is only the initial lock creation time
// and we do not need to start lock refreshing till after we acquired the lock
// since lock expiration is based on the serverTime + LOCK_DURATION
if err := saveLock(lock); err != nil {
return err
}
// since the node times might not be perfectly in sync and the servers file time has second precision
// we wait 2 seconds before retrieving the current set of locks, this eliminates a race condition
// where 2 processes request a lock at the same time
time.Sleep(LOCK_CHECK_WAIT_TIME)
// we only try to acquire once, since backup operations generally take a long time
// there is no point in trying to wait for lock acquisition, better to throw an error
// and let the calling code retry with an exponential backoff.
if !lock.canAcquire() {
file := getLockFilePath(lock.volume, lock.Name)
_ = removeLock(lock)
return fmt.Errorf("failed to acquire lock %v when performing backup %v, please try again later.", file, operation)
}
file := getLockFilePath(lock.volume, lock.Name)
log.Infof("Acquired lock %v for backup %v on backupstore", file, operation)
lock.Acquired = true
atomic.AddInt32(&lock.count, 1)
if err := saveLock(lock); err != nil {
_ = removeLock(lock)
return errors.Wrapf(err, "failed to store updated lock %v when performing backup %v, please try again later", file, operation)
}
// enable lock refresh
lock.keepAlive = make(chan struct{})
go func() {
refreshTimer := time.NewTicker(LOCK_REFRESH_INTERVAL)
defer refreshTimer.Stop()
for {
select {
case <-lock.keepAlive:
return
case <-refreshTimer.C:
lock.mutex.Lock()
if lock.Acquired {
if err := saveLock(lock); err != nil {
// nothing we can do here, that's why the lock acquisition time is 2x lock refresh interval
log.WithError(err).Warnf("Failed to refresh acquired lock %v when performing backup %v, please try again later", file, operation)
}
}
lock.mutex.Unlock()
}
}
}()
return nil
}
func (lock *FileLock) Unlock() error {
lock.mutex.Lock()
defer lock.mutex.Unlock()
if atomic.AddInt32(&lock.count, -1) <= 0 {
lock.Acquired = false
if lock.keepAlive != nil {
close(lock.keepAlive)
}
if err := removeLock(lock); err != nil {
return err
}
return nil
}
return nil
}
func loadLock(volumeName string, name string, driver BackupStoreDriver) (*FileLock, error) {
lock := &FileLock{}
file := getLockFilePath(volumeName, name)
if err := LoadConfigInBackupStore(driver, file, lock); err != nil {
return nil, err
}
lock.serverTime = driver.FileTime(file)
log.Infof("Loaded lock %v type %v on backupstore", file, lock.Type)
return lock, nil
}
func removeLock(lock *FileLock) error {
file := getLockFilePath(lock.volume, lock.Name)
if err := lock.driver.Remove(file); err != nil {
return err
}
log.Infof("Removed lock %v type %v on backupstore", file, lock.Type)
return nil
}
func saveLock(lock *FileLock) error {
file := getLockFilePath(lock.volume, lock.Name)
if err := SaveConfigInBackupStore(lock.driver, file, lock); err != nil {
return err
}
lock.serverTime = lock.driver.FileTime(file)
log.Infof("Stored lock %v type %v on backupstore", file, lock.Type)
return nil
}
// compareLocks compares the locks by Acquired
// then by serverTime (UTC) followed by Name
func compareLocks(a *FileLock, b *FileLock) int {
if a.Acquired == b.Acquired {
if a.serverTime.UTC().Equal(b.serverTime.UTC()) {
return strings.Compare(a.Name, b.Name)
} else if a.serverTime.UTC().Before(b.serverTime.UTC()) {
return -1
} else {
return 1
}
} else if a.Acquired {
return -1
} else {
return 1
}
}
func getLockNamesForVolume(volumeName string, driver BackupStoreDriver) []string {
fileList, err := driver.List(getLockPath(volumeName))
if err != nil {
// path doesn't exist
return []string{}
}
names := util.ExtractNames(fileList, "", LOCK_SUFFIX)
return names
}
func getLocksForVolume(volumeName string, driver BackupStoreDriver) []*FileLock {
names := getLockNamesForVolume(volumeName, driver)
locks := make([]*FileLock, 0, len(names))
for _, name := range names {
lock, err := loadLock(volumeName, name, driver)
if err != nil {
file := getLockFilePath(volumeName, name)
log.WithError(err).Warnf("Failed to load lock %v on backupstore", file)
continue
}
locks = append(locks, lock)
}
return locks
}
func getLockPath(volumeName string) string {
return filepath.Join(getVolumePath(volumeName), LOCKS_DIRECTORY) + "/"
}
func getLockFilePath(volumeName string, name string) string {
path := getLockPath(volumeName)
fileName := name + LOCK_SUFFIX
return filepath.Join(path, fileName)
}