forked from wal-g/wal-g
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextract.go
181 lines (153 loc) · 3.79 KB
/
extract.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package walg
import (
"archive/tar"
"github.com/pkg/errors"
"io"
)
func min(a, b int) int {
if a < b {
return a
}
return b
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
// EmptyWriteIgnorer handles 0 byte write in LZ4 package
// to stop pipe reader/writer from blocking.
type EmptyWriteIgnorer struct {
io.WriteCloser
}
func (e EmptyWriteIgnorer) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
return e.WriteCloser.Write(p)
}
// Extract exactly one tar bundle. Returns an error
// upon failure. Able to configure behavior by passing
// in different TarInterpreters.
func extractOne(ti TarInterpreter, s io.Reader) error {
tr := tar.NewReader(s)
for {
cur, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return errors.Wrap(err, "extractOne: tar extract failed")
}
err = ti.Interpret(tr, cur)
if err != nil {
return errors.Wrap(err, "extractOne: Interpret failed")
}
}
return nil
}
// Ensures that file extension is valid. Any subsequent behavior
// depends on file type.
func tarHandler(wc io.WriteCloser, rm ReaderMaker, crypter Crypter) error {
defer wc.Close()
r, err := rm.Reader()
if err != nil {
return errors.Wrap(err, "ExtractAll: failed to create new reader")
}
defer r.Close()
if crypter.IsUsed() {
var reader io.Reader
reader, err = crypter.Decrypt(r)
if err != nil {
return errors.Wrap(err, "ExtractAll: decrypt failed")
}
r = ReadCascadeClose{reader, r}
}
if rm.Format() == "lzo" {
err = DecompressLzo(wc, r)
if err != nil {
return errors.Wrap(err, "ExtractAll: lzo decompress failed. Is archive encrypted?")
}
} else if rm.Format() == "lz4" {
_, err = DecompressLz4(wc, r)
if err != nil {
return errors.Wrap(err, "ExtractAll: lz4 decompress failed. Is archive encrypted?")
}
} else if rm.Format() == "tar" {
_, err = io.Copy(wc, r)
if err != nil {
return errors.Wrap(err, "ExtractAll: tar extract failed")
}
} else if rm.Format() == "nop" {
} else {
return errors.Wrap(UnsupportedFileTypeError{rm.Path(), rm.Format()}, "ExtractAll:")
}
return nil
}
// ExtractAll Handles all files passed in. Supports `.lzo`, `.lz4, and `.tar`.
// File type `.nop` is used for testing purposes. Each file is extracted
// in its own goroutine and ExtractAll will wait for all goroutines to finish.
// Returns the first error encountered.
func ExtractAll(ti TarInterpreter, files []ReaderMaker) error {
if len(files) < 1 {
return errors.New("ExtractAll: did not provide files to extract")
}
var err error
sem := make(chan Empty, len(files))
collectAll := make(chan error)
defer close(collectAll)
go func() {
for e := range collectAll {
if e != nil {
err = e
}
}
}()
// Set maximum number of goroutines spun off by ExtractAll
var con = getMaxDownloadConcurrency(min(len(files), 10))
concurrent := make(chan Empty, con)
for i := 0; i < con; i++ {
concurrent <- Empty{}
}
var crypter OpenPGPCrypter
for i, val := range files {
<-concurrent
go func(i int, val ReaderMaker) {
defer func() {
concurrent <- Empty{}
sem <- Empty{}
}()
pr, tempW := io.Pipe()
pw := &EmptyWriteIgnorer{tempW}
// Collect errors returned by tarHandler or parsing.
collectLow := make(chan error)
go func() {
collectLow <- tarHandler(pw, val, &crypter)
}()
// Collect errors returned by extractOne.
collectTop := make(chan error)
go func() {
defer pr.Close()
err := extractOne(ti, pr)
collectTop <- err
}()
finishedTop := false
finishedLow := false
for !(finishedTop && finishedLow) {
select {
case err := <-collectTop:
finishedTop = true
collectAll <- err
case err := <-collectLow:
finishedLow = true
collectAll <- err
}
}
}(i, val)
}
for i := 0; i < len(files); i++ {
<-sem
}
return err
}