forked from wal-g/wal-g
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbguploader.go
134 lines (108 loc) · 2.98 KB
/
bguploader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package walg
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
)
// BgUploader represents the state of concurrent WAL upload
type BgUploader struct {
// pg_[wals|xlog]
dir string
// count of running gorutines
parallelWorkers int32
// usually defined by WALG_DOWNLOAD_CONCURRENCY
maxParallelWorkers int32
// waitgroup to handle Stop gracefully
running sync.WaitGroup
// every file is attempted only once
started map[string]interface{}
// uploading structure
tu *TarUploader
// to control amount of work done in one cycle of archive_comand
totalUploaded int32
mutex sync.Mutex
pre *Prefix
verify bool
}
// Start up checking what's inside archive_status
func (u *BgUploader) Start(walFilePath string, maxParallelWorkers int32, tu *TarUploader, pre *Prefix, verify bool) {
if maxParallelWorkers < 1 {
return // Nothing to start
}
// prepare state
u.tu = tu
u.maxParallelWorkers = maxParallelWorkers
u.dir = filepath.Dir(walFilePath)
u.started = make(map[string]interface{})
u.started[filepath.Base(walFilePath)+readySuffix] = walFilePath
u.pre = pre
u.verify = verify
// This goroutine will spawn new if necessary
go scanOnce(u)
}
// Stop pipeline
func (u *BgUploader) Stop() {
for atomic.LoadInt32(&u.parallelWorkers) != 0 {
time.Sleep(50 * time.Millisecond)
} // Wait until noone works
u.mutex.Lock()
defer u.mutex.Unlock()
atomic.StoreInt32(&u.maxParallelWorkers, 0) // stop new jobs
u.running.Wait() // wait again for those how jumped to the closing door
}
var readySuffix = ".ready"
var archiveStatus = "archive_status"
var done = ".done"
func scanOnce(u *BgUploader) {
u.mutex.Lock()
defer u.mutex.Unlock()
files, err := ioutil.ReadDir(filepath.Join(u.dir, archiveStatus))
if err != nil {
log.Print("Error of parallel upload: ", err)
return
}
for _, f := range files {
if haveNoSlots(u) {
break
}
name := f.Name()
if !strings.HasSuffix(name, readySuffix) {
continue
}
if _, ok := u.started[name]; ok {
continue
}
u.started[name] = name
if shouldKeepScanning(u) {
u.running.Add(1)
atomic.AddInt32(&u.parallelWorkers, 1)
go u.Upload(f)
}
}
}
func shouldKeepScanning(u *BgUploader) bool {
return atomic.LoadInt32(&u.maxParallelWorkers) > 0 && atomic.LoadInt32(&u.totalUploaded) < 1024
}
func haveNoSlots(u *BgUploader) bool {
return atomic.LoadInt32(&u.parallelWorkers) >= atomic.LoadInt32(&u.maxParallelWorkers)
}
// Upload one WAL file
func (u *BgUploader) Upload(info os.FileInfo) {
walfilename := strings.TrimSuffix(info.Name(), readySuffix)
UploadWALFile(u.tu.Clone(), filepath.Join(u.dir, walfilename), u.pre, u.verify)
ready := filepath.Join(u.dir, archiveStatus, info.Name())
done := filepath.Join(u.dir, archiveStatus, walfilename+done)
err := os.Rename(ready, done)
if err != nil {
log.Print("Error renaming .ready to .done: ", err)
}
atomic.AddInt32(&u.totalUploaded, 1)
scanOnce(u)
atomic.AddInt32(&u.parallelWorkers, -1)
u.running.Done()
}