From 975cbada601ced5c291940d07930d2dcecd25f08 Mon Sep 17 00:00:00 2001 From: Alex Hughes Date: Thu, 7 Mar 2024 16:27:00 -0800 Subject: [PATCH] Implement logic that allows uploading and downloading empty files These are zero length S3 objects with a name ending in a slash. We record the metadata about ownership, timestamps about these folders --- command/cp.go | 84 +++++++++++++++++++-------- command/expand.go | 2 +- command/rm.go | 2 +- command/sync.go | 6 +- storage/fs.go | 20 ++++++- storage/fs_windows.go | 23 +++++++- storage/s3.go | 132 +++++++++++++++++++++++++++++++++++++++++- 7 files changed, 239 insertions(+), 30 deletions(-) diff --git a/command/cp.go b/command/cp.go index ee9410ec7..d140ce8bb 100644 --- a/command/cp.go +++ b/command/cp.go @@ -491,7 +491,7 @@ func (c Copy) Run(ctx context.Context) error { } for object := range objch { - if errorpkg.IsCancelation(object.Err) || object.Type.IsDir() { + if errorpkg.IsCancelation(object.Err) { continue } @@ -541,7 +541,7 @@ func (c Copy) Run(ctx context.Context) error { case srcurl.Type == c.dst.Type: // local->local or remote->remote task = c.prepareCopyTask(ctx, srcurl, c.dst, isBatch, c.metadata) case srcurl.IsRemote(): // remote->local - task = c.prepareDownloadTask(ctx, srcurl, c.dst, isBatch) + task = c.prepareDownloadTask(ctx, srcurl, c.dst, isBatch, object.Type.IsDir()) case c.dst.IsRemote(): // local->remote task = c.prepareUploadTask(ctx, srcurl, c.dst, isBatch, c.metadata) default: @@ -583,9 +583,10 @@ func (c Copy) prepareDownloadTask( srcurl *url.URL, dsturl *url.URL, isBatch bool, + srcIsDir bool, ) func() error { return func() error { - dsturl, err := prepareLocalDestination(ctx, srcurl, dsturl, c.flatten, isBatch, c.storageOpts) + dsturl, err := prepareLocalDestination(ctx, srcurl, dsturl, c.flatten, isBatch, c.storageOpts, srcIsDir) if err != nil { return err } @@ -644,35 +645,53 @@ func (c Copy) doDownload(ctx context.Context, srcurl *url.URL, dsturl *url.URL) } return err } + // Check to see if the source is a directory for locally creation a directory too + srcObj, err := srcClient.Stat(ctx, srcurl) + if err != nil { + var objNotFound *storage.ErrGivenObjectNotFound + if !errors.As(err, &objNotFound) { + return err + } + + } dstPath := filepath.Dir(dsturl.Absolute()) dstFile := filepath.Base(dsturl.Absolute()) - file, err := dstClient.CreateTemp(dstPath, dstFile) - if err != nil { - return err - } - writer := newCountingReaderWriter(file, c.progressbar) - size, err := srcClient.Get(ctx, srcurl, writer, c.concurrency, c.partSize) - file.Close() + isDir := srcObj.Type.IsDir() + var size int64 = 0 + if isDir { + err = dstClient.CreateDir(ctx, dsturl.Absolute(), storage.Metadata{}) + if err != nil { + return err + } + } else { + file, err := dstClient.CreateTemp(dstPath, dstFile) + if err != nil { + return err + } - if err != nil { - dErr := dstClient.Delete(ctx, &url.URL{Path: file.Name(), Type: dsturl.Type}) - if dErr != nil { - printDebug(c.op, dErr, srcurl, dsturl) + writer := newCountingReaderWriter(file, c.progressbar) + size, err = srcClient.Get(ctx, srcurl, writer, c.concurrency, c.partSize) + + file.Close() + if err != nil { + dErr := dstClient.Delete(ctx, &url.URL{Path: file.Name(), Type: dsturl.Type}) + if dErr != nil { + printDebug(c.op, dErr, srcurl, dsturl) + } + return err + } + err = dstClient.Rename(file, dsturl.Absolute()) + if err != nil { + return err } - return err } if c.deleteSource { _ = srcClient.Delete(ctx, srcurl) } - err = dstClient.Rename(file, dsturl.Absolute()) - if err != nil { - return err - } - if c.preserveOwnership { obj, err := srcClient.Stat(ctx, srcurl) if err != nil { @@ -786,7 +805,15 @@ func (c Copy) doUpload(ctx context.Context, srcurl *url.URL, dsturl *url.URL, ex } reader := newCountingReaderWriter(file, c.progressbar) - err = dstClient.Put(ctx, reader, dsturl, metadata, c.concurrency, c.partSize) + fi, err := file.Stat() + if err != nil { + return err + } + if fi.IsDir() { + err = dstClient.CreateDir(ctx, dsturl, metadata) + } else { + err = dstClient.Put(ctx, reader, dsturl, metadata, c.concurrency, c.partSize) + } if err != nil { return err @@ -957,6 +984,10 @@ func prepareRemoteDestination( objname = srcurl.Relative() } + if objname == "." { + return dsturl + } + if dsturl.IsPrefix() || dsturl.IsBucket() { dsturl = dsturl.Join(objname) } @@ -972,6 +1003,7 @@ func prepareLocalDestination( flatten bool, isBatch bool, storageOpts storage.Options, + srcIsDir bool, ) (*url.URL, error) { objname := srcurl.Base() if isBatch && !flatten { @@ -1008,11 +1040,11 @@ func prepareLocalDestination( if err != nil { return nil, err } - if strings.HasSuffix(dsturl.Absolute(), "/") { + if strings.HasSuffix(dsturl.Absolute(), "/") && !srcIsDir { dsturl = dsturl.Join(objname) } } else { - if obj.Type.IsDir() { + if obj.Type.IsDir() && !srcIsDir { dsturl = obj.URL.Join(objname) } } @@ -1058,7 +1090,7 @@ func validateCopyCommand(c *cli.Context) error { } // we don't operate on S3 prefixes for copy and delete operations. - if srcurl.IsBucket() || srcurl.IsPrefix() { + if srcurl.IsBucket() { return fmt.Errorf("source argument must contain wildcard character") } @@ -1107,6 +1139,10 @@ func validateUpload(ctx context.Context, srcurl, dsturl *url.URL, storageOpts st return err } + if obj.Type.IsDir() { + return nil + } + // 'cp dir/ s3://bucket/prefix-without-slash': expect a trailing slash to // avoid any surprises. if obj.Type.IsDir() && !dsturl.IsBucket() && !dsturl.IsPrefix() { diff --git a/command/expand.go b/command/expand.go index d079c2251..a446db194 100644 --- a/command/expand.go +++ b/command/expand.go @@ -23,7 +23,7 @@ func expandSource( // if the source is local, we send a Stat call to know if we have // directory or file to walk. For remote storage, we don't want to send // Stat since it doesn't have any folder semantics. - if !srcurl.IsWildcard() && !srcurl.IsRemote() { + if !srcurl.IsWildcard() { obj, err := client.Stat(ctx, srcurl) if err != nil { return nil, err diff --git a/command/rm.go b/command/rm.go index 3ba092b23..9e25588e7 100644 --- a/command/rm.go +++ b/command/rm.go @@ -275,7 +275,7 @@ func validateRMCommand(c *cli.Context) error { ) for i, srcurl := range srcurls { // we don't operate on S3 prefixes for copy and delete operations. - if srcurl.IsBucket() || srcurl.IsPrefix() { + if srcurl.IsBucket() { return fmt.Errorf("s3 bucket/prefix cannot be used for delete operations (forgot wildcard character?)") } diff --git a/command/sync.go b/command/sync.go index f0ff738e2..1343f507d 100644 --- a/command/sync.go +++ b/command/sync.go @@ -547,6 +547,10 @@ func generateDestinationURL(srcurl, dsturl *url.URL, isBatch bool) *url.URL { objname = srcurl.Relative() } + if strings.HasSuffix(srcurl.Absolute(), "/") && !strings.HasSuffix(objname, "/") { + objname += "/" + } + if dsturl.IsRemote() { if dsturl.IsPrefix() || dsturl.IsBucket() { return dsturl.Join(objname) @@ -560,7 +564,7 @@ func generateDestinationURL(srcurl, dsturl *url.URL, isBatch bool) *url.URL { // shouldSkipObject checks is object should be skipped. func (s Sync) shouldSkipObject(object *storage.Object, verbose bool) bool { - if object.Type.IsDir() || errorpkg.IsCancelation(object.Err) { + if errorpkg.IsCancelation(object.Err) { return true } diff --git a/storage/fs.go b/storage/fs.go index c85fad0d4..a05206f96 100644 --- a/storage/fs.go +++ b/storage/fs.go @@ -130,7 +130,7 @@ func walkDir(ctx context.Context, fs *Filesystem, src *url.URL, followSymlinks b Callback: func(pathname string, dirent *godirwalk.Dirent) error { // we're interested in files if dirent.IsDir() { - return nil + pathname += string(os.PathSeparator) } fileurl, err := url.New(pathname) @@ -230,6 +230,24 @@ func (f *Filesystem) Create(path string) (*os.File, error) { return os.Create(path) } +func (f *Filesystem) CreateDir( + ctx context.Context, + path string, + metadata Metadata, +) error { + if f.dryRun { + return nil + } + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return os.Mkdir(path, os.ModePerm) + } else { + return err + } + } + return nil +} + // Open opens the given source. func (f *Filesystem) Open(path string) (*os.File, error) { file, err := os.OpenFile(path, os.O_RDONLY, 0644) diff --git a/storage/fs_windows.go b/storage/fs_windows.go index 8074fcb47..f4aa2cc47 100644 --- a/storage/fs_windows.go +++ b/storage/fs_windows.go @@ -59,7 +59,13 @@ func SetFileTime(filename string, accessTime, modificationTime, creationTime tim mft := syscall.NsecToFiletime(modificationTime.UnixNano()) cft := syscall.NsecToFiletime(creationTime.UnixNano()) - fd, err := syscall.Open(filename, os.O_RDWR, 0775) + var fd syscall.Handle + fi, err := os.Stat(filename) + if fi.IsDir() { + fd, err = getDirectoryHandle(filename) + } else { + fd, err = syscall.Open(filename, os.O_RDWR, 0775) + } if err != nil { return err } @@ -73,6 +79,21 @@ func SetFileTime(filename string, accessTime, modificationTime, creationTime tim return nil } +func getDirectoryHandle(filename string) (syscall.Handle, error) { + pathp, err := syscall.UTF16PtrFromString(filename) + if err != nil { + return syscall.InvalidHandle, err + } + + h, e := syscall.CreateFile(pathp, + syscall.FILE_WRITE_ATTRIBUTES, syscall.FILE_SHARE_WRITE, nil, + syscall.OPEN_EXISTING, syscall.FILE_FLAG_BACKUP_SEMANTICS, 0) + if e != nil { + return syscall.InvalidHandle, e + } + return h, nil +} + // GetFileUserGroup will take a filename and return the userId and groupId associated with it. // // On windows this is in the format of a SID, on linux/darwin this is in the format of a UID/GID. diff --git a/storage/s3.go b/storage/s3.go index 33c6612d9..77ad11cba 100644 --- a/storage/s3.go +++ b/storage/s3.go @@ -50,6 +50,10 @@ const ( // the key of the object metadata which is used to handle retry decision on NoSuchUpload error metadataKeyRetryID = "s5cmd-upload-retry-id" + + // the ETag of an empty object. Should be used to assume folders. + // Limitation being we can't have zero width objects + folderETag = "d41d8cd98f00b204e9800998ecf8427e" ) // Re-used AWS sessions dramatically improve performance. @@ -149,6 +153,10 @@ func (s *S3) Stat(ctx context.Context, url *url.URL) (*Object, error) { GroupID: groupID, } + if strings.Trim(etag, `"`) == folderETag && strings.HasSuffix(url.Absolute(), "/") { + obj.Type = ObjectType{mode: os.ModeDir} + } + cTimeS := aws.StringValue(output.Metadata["file-ctime"]) if cTimeS != "" { ctime, err := strconv.ParseInt(cTimeS, 10, 64) @@ -375,7 +383,7 @@ func (s *S3) listObjectsV2(ctx context.Context, url *url.URL) <-chan *Object { for _, c := range p.Contents { key := aws.StringValue(c.Key) - if !url.Match(key) { + if !url.Match(key) && key != url.Path { continue } @@ -841,6 +849,128 @@ func (s *S3) Select(ctx context.Context, url *url.URL, query *SelectQuery, resul return resp.EventStream.Reader.Err() } +var emptyDirPath string + +func getEmptyDirFile() (*os.File, error) { + if emptyDirPath == "" { + os.TempDir() + file, err := os.CreateTemp(os.TempDir(), "s5cmd_tmp") + if err != nil { + return nil, err + } + emptyDirPath = file.Name() + return file, nil + } + return os.Open(emptyDirPath) +} + +func (s *S3) CreateDir( + ctx context.Context, + to *url.URL, + metadata Metadata, +) error { + + file, err := getEmptyDirFile() + if err != nil { + return err + } + path := to.Path + if !strings.HasSuffix(path, "/") { + path += "/" + } + input := &s3.PutObjectInput{ + Bucket: aws.String(to.Bucket), + Key: aws.String(path), + Metadata: make(map[string]*string), + RequestPayer: s.RequestPayer(), + Body: file, + } + + storageClass := metadata.StorageClass + if storageClass != "" { + input.StorageClass = aws.String(storageClass) + } + + acl := metadata.ACL + if acl != "" { + input.ACL = aws.String(acl) + } + + cacheControl := metadata.CacheControl + if cacheControl != "" { + input.CacheControl = aws.String(cacheControl) + } + + expires := metadata.Expires + if expires != "" { + t, err := time.Parse(time.RFC3339, expires) + if err != nil { + return err + } + input.Expires = aws.Time(t) + } + + sseEncryption := metadata.EncryptionMethod + if sseEncryption != "" { + input.ServerSideEncryption = aws.String(sseEncryption) + sseKmsKeyID := metadata.EncryptionKeyID + if sseKmsKeyID != "" { + input.SSEKMSKeyId = aws.String(sseKmsKeyID) + } + } + + contentEncoding := metadata.ContentEncoding + if contentEncoding != "" { + input.ContentEncoding = aws.String(contentEncoding) + } + + contentDisposition := metadata.ContentDisposition + if contentDisposition != "" { + input.ContentDisposition = aws.String(contentDisposition) + } + + // add retry ID to the object metadata + if s.noSuchUploadRetryCount > 0 { + input.Metadata[metadataKeyRetryID] = generateRetryID() + } + + ctime := metadata.FileCtime + if ctime != "" { + input.Metadata["file-ctime"] = aws.String(ctime) + } + + mtime := metadata.FileMtime + if ctime != "" { + input.Metadata["file-mtime"] = aws.String(mtime) + } + + atime := metadata.FileAtime + if ctime != "" { + input.Metadata["file-atime"] = aws.String(atime) + } + + userId := metadata.FileUID + if userId != "" { + input.Metadata["file-owner"] = aws.String(userId) + } + + groupId := metadata.FileGID + if groupId != "" { + input.Metadata["file-group"] = aws.String(groupId) + } + + if len(metadata.UserDefined) != 0 { + m := make(map[string]*string) + for k, v := range metadata.UserDefined { + m[k] = aws.String(v) + } + input.Metadata = m + } + + _, err = s.api.PutObjectWithContext(ctx, input) + return err +} + // Put is a multipart upload operation to upload resources, which implements // io.Reader interface, into S3 destination. func (s *S3) Put(