-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathfsshell.go
284 lines (245 loc) · 7.55 KB
/
fsshell.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package gowfs
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path"
)
const MAX_UP_CHUNK int64 = 1 * (1024 * 1024) * 1024 // 1 GB.
const MAX_DOWN_CHUNK int64 = 500 * (1024 * 1024) // 500 MB
type FsShell struct {
FileSystem *FileSystem
WorkingPath string
}
// Appends the specified list of local files to the HDFS path.
func (shell FsShell) AppendToFile(filePaths []string, hdfsPath string, contenttype string) (bool, error) {
for _, path := range filePaths {
file, err := os.Open(path)
if err != nil {
return false, err
}
defer file.Close()
data, _, err := slirpLocalFile(*file, 0)
if err != nil {
return false, err
}
_, err = shell.FileSystem.Append(bytes.NewBuffer(data), Path{Name: hdfsPath}, 0, contenttype)
if err != nil {
return false, err
}
}
return true, nil
}
// Returns a writer with the content of the specified files.
func (shell FsShell) Cat(hdfsPaths []string, writr io.Writer) error {
for _, path := range hdfsPaths {
stat, err := shell.FileSystem.GetFileStatus(Path{Name: path})
if err != nil {
return err
}
//TODO add code to chunk super large files.
if stat.Length < MAX_DOWN_CHUNK {
readr, err := shell.FileSystem.Open(Path{Name: path}, 0, stat.Length, 4096)
if err != nil {
return err
}
io.Copy(writr, readr)
}
}
return nil
}
// Changes the group association of the given hdfs paths.
func (shell FsShell) Chgrp(hdfsPaths []string, grpName string) (bool, error) {
for _, path := range hdfsPaths {
_, err := shell.FileSystem.SetOwner(Path{Name: path}, "", grpName)
if err != nil {
return false, err
}
}
return true, nil
}
// Changes the owner of the specified hdfs paths.
func (shell FsShell) Chown(hdfsPaths []string, owner string) (bool, error) {
for _, path := range hdfsPaths {
_, err := shell.FileSystem.SetOwner(Path{Name: path}, owner, "")
if err != nil {
return false, err
}
}
return true, nil
}
// Changes the filemode of the provided hdfs paths.
func (shell FsShell) Chmod(hdfsPaths []string, perm os.FileMode) (bool, error) {
for _, path := range hdfsPaths {
_, err := shell.FileSystem.SetPermission(Path{Name: path}, perm)
if err != nil {
return false, err
}
}
return true, nil
}
// Tests the existence of a remote HDFS file/directory.
func (shell FsShell) Exists(hdfsPath string) (bool, error) {
_, err := shell.FileSystem.GetFileStatus(Path{Name: hdfsPath})
if err != nil {
if remoteErr, ok := err.(RemoteException); ok && remoteErr.JavaClassName == "java.io.FileNotFoundException" {
return false, nil
} else {
return false, err /* a different err */
}
}
return true, nil
}
// Copies one specified local file to the remote HDFS server.
// Uses default permission, blocksize, and replication.
func (shell FsShell) Put(localFile string, hdfsPath string, overwrite bool) (bool, error) {
if _, err := os.Stat(localFile); os.IsNotExist(err) {
return false, fmt.Errorf("File %v not found.", localFile)
}
file, err := os.Open(localFile)
if err != nil {
return false, err
}
defer file.Close()
// put as a new remote file
_, err = shell.FileSystem.Create(
file,
Path{Name: hdfsPath + "/" + µ(path.Split(localFile))[1].(string)},
overwrite,
134217728,
3,
0644,
4096,
"")
if err != nil {
return false, err
}
return true, nil
}
// Copies sepcified local files to remote HDFS server.
// The hdfsPath must be a directory (created if it does not exist).
// Uses default permission, blocksize, and replication.
func (shell FsShell) PutMany(files []string, hdfsPath string, overwrite bool) (bool, error) {
// if multiple files, put in remote directory
if len(files) > 1 {
stat, err := shell.FileSystem.GetFileStatus(Path{Name: hdfsPath})
// if remote dir missing, crete it.
if remoteErr := err.(RemoteException); remoteErr.JavaClassName == "java.io.FileNotFoundException" {
if _, err := shell.FileSystem.MkDirs(Path{Name: hdfsPath}, 0700); err != nil {
return false, err
}
}
if stat.Type == "FILE" {
return false, fmt.Errorf("HDFS resource %s must be a directory in this context.", hdfsPath)
}
}
for _, file := range files {
shell.Put(file, hdfsPath+"/"+µ(path.Split(file))[1].(string), overwrite)
}
return true, nil
}
// Retrieves a remote HDFS file and saves as the specified local file.
func (shell FsShell) Get(hdfsPath, localFile string) (bool, error) {
file, err := os.Create(localFile)
if err != nil {
return false, err
}
defer file.Close()
reader, err := shell.FileSystem.Open(Path{Name: hdfsPath}, 0, 0, 0)
if err != nil {
return false, err
}
data, err := ioutil.ReadAll(reader)
if err != nil {
return false, err
}
defer reader.Close()
_, err = file.Write(data)
if err != nil {
return false, err
}
file.Sync()
return true, nil
}
// Merges content of remote HDFS path into a single local file.
// func (shell FsShell) GetMerge(hdfsPath, localFile string)(bool, error){
// return false, fmt.Errorf("Function is unimplemented.")
// }
// Copies local file to remote destination, then local file is removed.
func (shell FsShell) MoveFromLocal(localFile, hdfsPath string, overwrite bool) (bool, error) {
ok, err := shell.Put(localFile, hdfsPath, overwrite)
// validate operation, then remove local
if ok && err == nil {
hdfStat, err := shell.FileSystem.GetFileStatus(Path{Name: path.Join(hdfsPath, path.Base(localFile))})
if err != nil {
return false, fmt.Errorf("Unable to verify remote file. err is %v", err.Error())
}
file, err := os.Open(localFile)
if err != nil {
return false, fmt.Errorf("Unable to validate operation. err is %v", err.Error())
}
if hdfStat.Length != µ(file.Stat())[0].(os.FileInfo).Size() {
return false, fmt.Errorf("Remote and local file size mismatch.")
}
file.Close() // close now.
err = os.Remove(localFile) // remove it.
if err != nil {
return false, err
}
} else {
return false, err
}
return true, nil
}
// Copies remote HDFS file locally. The remote file is then removed.
func (shell FsShell) MoveToLocal(hdfsPath, localFile string) (bool, error) {
hdfStat, err := shell.FileSystem.GetFileStatus(Path{Name: hdfsPath})
_, err = shell.Get(hdfsPath, localFile)
if err != nil {
return false, err
}
file, err := os.Open(localFile)
if err != nil {
return false, fmt.Errorf("Unable to access local file %s: %s", localFile, err.Error())
}
defer file.Close()
fileStat, err := file.Stat()
if err != nil {
return false, fmt.Errorf("Unable to access local file %s: %s", localFile, err.Error())
}
// ensure file was copied all the way
if hdfStat.Length != fileStat.Size() {
return false, fmt.Errorf("Local file size does not match remote file size. Aborting.")
}
// remove remote File
ok, err := shell.FileSystem.Delete(Path{Name: hdfsPath}, false)
if err != nil {
return false, fmt.Errorf("Unable to remove remote %s file: %s", hdfsPath, err.Error())
}
return ok, nil
}
// Removes the specified HDFS source.
func (shell FsShell) Rm(hdfsPath string) (bool, error) {
return false, fmt.Errorf("Function is unimplemented.")
}
// TODO: slirp file in x Gbyte chunks when file.Stat() >> X.
// this is to avoid blow up memory on large files.
func slirpLocalFile(file os.File, offset int64) ([]byte, int64, error) {
stat, err := file.Stat()
if err != nil {
return nil, 0, err
}
if stat.Size() < MAX_UP_CHUNK {
data, err := ioutil.ReadFile(file.Name())
if err != nil {
return nil, 0, err
}
return data, 0, nil
} // else chunck it
return nil, 0, nil
}
//TODO: slirp file in X GBytes chucks from server to avoid blowing up network.
// func slirpRemoteFile (hdfsPath string, offset int64, totalSize int64)([]byte, int64, error) {
// }