diff --git a/pkg/storage/fs/ocis/ocis.go b/pkg/storage/fs/ocis/ocis.go index 42ecdad030..15449e4622 100644 --- a/pkg/storage/fs/ocis/ocis.go +++ b/pkg/storage/fs/ocis/ocis.go @@ -20,7 +20,6 @@ package ocis import ( "path" - "path/filepath" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" @@ -48,7 +47,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) { return nil, err } - tusDataStore := tus.NewFileStore(filepath.Join(o.Root, "uploads")) + tusDataStore := tus.NewFileStore(o.Root) return decomposedfs.NewDefault(m, bs, tusDataStore, stream) } diff --git a/pkg/storage/fs/s3ng/option.go b/pkg/storage/fs/s3ng/option.go index cf8fc08ecc..ecc94d40f0 100644 --- a/pkg/storage/fs/s3ng/option.go +++ b/pkg/storage/fs/s3ng/option.go @@ -58,6 +58,9 @@ type Options struct { // ForcePathStyle for the s3 blobstore S3ForcePathStyle bool `mapstructure:"s3.force_path_style"` + + // Root for the upload sessions + Root string `mapstructure:"root"` } // S3ConfigComplete return true if all required s3 fields are set diff --git a/pkg/storage/fs/s3ng/s3ng.go b/pkg/storage/fs/s3ng/s3ng.go index fd337b442e..8e06db8be4 100644 --- a/pkg/storage/fs/s3ng/s3ng.go +++ b/pkg/storage/fs/s3ng/s3ng.go @@ -61,7 +61,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) { WithS3ForcePathStyle(o.S3ForcePathStyle). WithDisableSSL(o.S3DisableSSL) - tusDataStore := tus.NewS3Store(o.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) + tusDataStore := tus.NewS3Store(o.Root, o.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) tusDataStore.ObjectPrefix = o.S3UploadObjectPrefix tusDataStore.MetadataObjectPrefix = o.S3UploadMetadataPrefix tusDataStore.TemporaryDirectory = o.S3UploadTemporaryDirectory diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index d68d532488..b35b622c11 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -90,24 +90,23 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere usr := ctxpkg.ContextMustGetUser(ctx) newBlobID := uuid.New().String() - uploadSession := tus.Session{ - ID: tus.BuildUploadId(n.SpaceID, newBlobID), - MetaData: tusd.MetaData{}, - - BlobID: newBlobID, - Filename: n.Name, - SpaceRoot: n.SpaceRoot.ID, - SpaceOwnerOrManager: n.SpaceOwnerOrManager(ctx).GetOpaqueId(), - ProviderID: headers["providerID"], - MTime: time.Now().UTC().Format(time.RFC3339Nano), - NodeID: n.ID, - NodeParentID: n.ParentID, - ExecutantIdp: usr.Id.Idp, - ExecutantID: usr.Id.OpaqueId, - ExecutantType: utils.UserTypeToString(usr.Id.Type), - ExecutantUserName: usr.Username, - LogLevel: sublog.GetLevel().String(), - } + uploadSession := tus.NewSession(ctx, fs.o.Root) + uploadSession.ID = tus.BuildUploadId(n.SpaceID, newBlobID) + uploadSession.MetaData = tusd.MetaData{} + + uploadSession.BlobID = newBlobID + uploadSession.Filename = n.Name + uploadSession.SpaceRoot = n.SpaceRoot.ID + uploadSession.SpaceOwnerOrManager = n.SpaceOwnerOrManager(ctx).GetOpaqueId() + uploadSession.ProviderID = headers["providerID"] + uploadSession.MTime = time.Now().UTC().Format(time.RFC3339Nano) + uploadSession.NodeID = n.ID + uploadSession.NodeParentID = n.ParentID + uploadSession.ExecutantIdp = usr.Id.Idp + uploadSession.ExecutantID = usr.Id.OpaqueId + uploadSession.ExecutantType = utils.UserTypeToString(usr.Id.Type) + uploadSession.ExecutantUserName = usr.Username + uploadSession.LogLevel = sublog.GetLevel().String() // checksum is sent as tus Upload-Checksum header and should not magically become a metadata property if checksum, ok := headers["checksum"]; ok { @@ -226,22 +225,27 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere return nil, errtypes.Aborted(fmt.Sprintf("parent %s already has a child %s", n.ID, n.Name)) } - // keep track of upload - err = uploadSession.Persist(ctx) + // NewUploadWithSession will also call Persist, TODO it should not + // NewUploadWithSession may change the id? eg for s3 + nu, err := fs.tusDataStore.NewUploadWithSession(ctx, uploadSession) + if err != nil { + return nil, err + } + info, err := nu.GetInfo(ctx) if err != nil { return nil, err } - sublog.Debug().Interface("info", uploadSession).Msg("Decomposedfs: initiated upload") + sublog.Debug().Interface("info", info).Msg("Decomposedfs: initiated upload") return map[string]string{ - "simple": uploadSession.ID, - "tus": uploadSession.ID, + "simple": info.ID, + "tus": info.ID, }, nil } // GetDataStore returns the initialized Datastore -func (fs *Decomposedfs) GetDataStore() tus.DataStore { +func (fs *Decomposedfs) GetDataStore() tusd.DataStore { return fs.tusDataStore } @@ -258,21 +262,21 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { return err } - uploadMetadata, err := upload.ReadMetadata(ctx, fs.lu, info.ID) + uploadSession, err := tus.ReadSession(ctx, fs.lu.InternalRoot(), info.ID) if err != nil { return err } // put lockID from upload back into context - if uploadMetadata.LockID != "" { - ctx = ctxpkg.ContextSetLockID(ctx, uploadMetadata.LockID) + if uploadSession.LockID != "" { + ctx = ctxpkg.ContextSetLockID(ctx, uploadSession.LockID) } // restore logger from file info log, err := logger.FromConfig(&logger.LogConf{ Output: "stderr", Mode: "json", - Level: uploadMetadata.LogLevel, + Level: uploadSession.LogLevel, }) if err != nil { return err @@ -321,9 +325,9 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { // compare if they match the sent checksum // TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads - if uploadMetadata.Checksum != "" { + if uploadSession.Checksum != "" { var err error - parts := strings.SplitN(uploadMetadata.Checksum, " ", 2) + parts := strings.SplitN(uploadSession.Checksum, " ", 2) if len(parts) != 2 { return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") } @@ -349,14 +353,14 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { } // update checksums - uploadMetadata.ChecksumSHA1 = sha1h.Sum(nil) - uploadMetadata.ChecksumMD5 = md5h.Sum(nil) - uploadMetadata.ChecksumADLER32 = adler32h.Sum(nil) + uploadSession.ChecksumSHA1 = sha1h.Sum(nil) + uploadSession.ChecksumMD5 = md5h.Sum(nil) + uploadSession.ChecksumADLER32 = adler32h.Sum(nil) log.Debug().Str("id", info.ID).Msg("upload.UpdateMetadata") - uploadMetadata, n, err := upload.UpdateMetadata(ctx, fs.lu, info.ID, info.Size, uploadMetadata) + uploadSession, n, err := upload.UpdateMetadata(ctx, fs.lu, info.ID, info.Size, uploadSession) if err != nil { - upload.Cleanup(ctx, fs.lu, n, info.ID, uploadMetadata.MTime, true) + upload.Cleanup(ctx, fs.lu, n, info.ID, uploadSession.MTime, true) if tup, ok := up.(tusd.TerminatableUpload); ok { terr := tup.Terminate(ctx) if terr != nil { @@ -369,11 +373,11 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { if fs.stream != nil { user := &userpb.User{ Id: &userpb.UserId{ - Type: userpb.UserType(userpb.UserType_value[uploadMetadata.ExecutantType]), - Idp: uploadMetadata.ExecutantIdp, - OpaqueId: uploadMetadata.ExecutantID, + Type: userpb.UserType(userpb.UserType_value[uploadSession.ExecutantType]), + Idp: uploadSession.ExecutantIdp, + OpaqueId: uploadSession.ExecutantID, }, - Username: uploadMetadata.ExecutantUserName, + Username: uploadSession.ExecutantUserName, } s, err := fs.downloadURL(ctx, info.ID) if err != nil { @@ -387,7 +391,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { SpaceOwner: n.SpaceOwnerOrManager(ctx), ExecutingUser: user, ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, - Filename: uploadMetadata.Filename, // TODO what and when do we publish chunking v2 names? Currently, this uses the chunk name. + Filename: uploadSession.Filename, // TODO what and when do we publish chunking v2 names? Currently, this uses the chunk name. Filesize: uint64(info.Size), }); err != nil { return err @@ -430,23 +434,36 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { if !fs.o.AsyncFileUploads { // handle postprocessing synchronously log.Debug().Str("id", info.ID).Msg("upload.Finalize") - err = upload.Finalize(ctx, fs.blobstore, uploadMetadata.MTime, info, n, uploadMetadata.BlobID) // moving or copying the blob only reads the blobid, no need to change the revision nodes nodeid + err = upload.Finalize(ctx, fs.blobstore, uploadSession.MTime, info, n, uploadSession.BlobID) // moving or copying the blob only reads the blobid, no need to change the revision nodes nodeid log.Debug().Str("id", info.ID).Msg("upload.Cleanup") - upload.Cleanup(ctx, fs.lu, n, info.ID, uploadMetadata.MTime, err != nil) - if tup, ok := up.(tusd.TerminatableUpload); ok { - log.Debug().Str("id", info.ID).Msg("tup.Terminate") - terr := tup.Terminate(ctx) - if terr != nil { - log.Error().Err(terr).Interface("info", info).Msg("failed to terminate upload") - } + upload.Cleanup(ctx, fs.lu, n, info.ID, uploadSession.MTime, err != nil) + cerr := uploadSession.CleanupMetadata(ctx) + if cerr != nil { + log.Error().Err(cerr).Interface("info", info).Msg("failed to cleanup upload session metadata") } + /* + if tup, ok := up.(tusd.TerminatableUpload); ok { + log.Debug().Str("id", info.ID).Msg("tup.Terminate") + terr := tup.Terminate(ctx) + if terr != nil { + log.Error().Err(terr).Interface("info", info).Msg("failed to terminate upload") + } + } + */ if err != nil { log.Error().Err(err).Msg("failed to upload") return err } log.Debug().Str("id", info.ID).Msg("upload.SetNodeToUpload") - sizeDiff, err = upload.SetNodeToUpload(ctx, fs.lu, n, uploadMetadata) + sizeDiff, err = upload.SetNodeToUpload(ctx, fs.lu, n, upload.RevisionMetadata{ + MTime: uploadSession.MTime, + BlobID: uploadSession.BlobID, + BlobSize: info.Size, + ChecksumSHA1: uploadSession.ChecksumSHA1, + ChecksumMD5: uploadSession.ChecksumMD5, + ChecksumADLER32: uploadSession.ChecksumADLER32, + }) if err != nil { log.Error().Err(err).Msg("failed update Node to revision") return err @@ -500,7 +517,9 @@ func checkHash(expected string, h hash.Hash) error { // Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { sublog := appctx.GetLogger(ctx).With().Str("path", req.Ref.Path).Int64("uploadLength", req.Length).Logger() - up, err := fs.tusDataStore.GetUpload(ctx, req.Ref.GetPath()) + // TODO GetUpload, GetInfo and ReadSession are pretty redundant here ... + uploadID := strings.TrimLeft(req.Ref.GetPath(), "/") + up, err := fs.tusDataStore.GetUpload(ctx, uploadID) if err != nil { sublog.Debug().Err(err).Msg("Decomposedfs: error retrieving upload") return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload") @@ -512,24 +531,24 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload info") } - uploadMetadata, err := upload.ReadMetadata(ctx, fs.lu, uploadInfo.ID) + uploadSession, err := tus.ReadSession(ctx, fs.lu.InternalRoot(), uploadInfo.ID) if err != nil { sublog.Debug().Err(err).Msg("Decomposedfs: error retrieving upload metadata") return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload metadata") } - if chunking.IsChunked(uploadMetadata.Chunk) { // check chunking v1, TODO, actually there is a 'OC-Chunked: 1' header, at least when the testsuite uses chunking v1 + if chunking.IsChunked(uploadSession.Chunk) { // check chunking v1, TODO, actually there is a 'OC-Chunked: 1' header, at least when the testsuite uses chunking v1 var assembledFile, p string - p, assembledFile, err = fs.chunkHandler.WriteChunk(uploadMetadata.Chunk, req.Body) + p, assembledFile, err = fs.chunkHandler.WriteChunk(uploadSession.Chunk, req.Body) if err != nil { sublog.Debug().Err(err).Msg("Decomposedfs: could not write chunk") return provider.ResourceInfo{}, err } if p == "" { - sublog.Debug().Err(err).Str("chunk", uploadMetadata.Chunk).Msg("Decomposedfs: wrote chunk") + sublog.Debug().Err(err).Str("chunk", uploadSession.Chunk).Msg("Decomposedfs: wrote chunk") return provider.ResourceInfo{}, errtypes.PartialContent(req.Ref.String()) } - uploadMetadata.Filename = p + uploadSession.Filename = p fd, err := os.Open(assembledFile) if err != nil { return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error opening assembled file") @@ -543,14 +562,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u } // fake a new upload with the correct size - uploadMetadata.Size = chunkStat.Size() - uploadMetadata.BlobSize = chunkStat.Size() - err = upload.WriteMetadata(ctx, fs.lu, uploadMetadata.ID, uploadMetadata) - if err != nil { - return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing upload metadata for legacy chunking") - } - - nup, err := fs.tusDataStore.NewUpload(ctx, uploadMetadata) + uploadSession.Size = chunkStat.Size() + uploadSession.BlobSize = chunkStat.Size() + //err = uploadSession.Persist(ctx) + //if err != nil { + // return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing upload metadata for legacy chunking") + //} + + nup, err := fs.tusDataStore.NewUploadWithSession(ctx, uploadSession) if err != nil { return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: could not create new tus upload for legacy chunking") } @@ -602,7 +621,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u return provider.ResourceInfo{}, err } - n, err := upload.ReadNode(ctx, fs.lu, uploadMetadata) + n, err := upload.ReadNode(ctx, fs.lu, uploadSession) if err != nil { sublog.Debug().Err(err).Msg("Decomposedfs: error reading node") return provider.ResourceInfo{}, err @@ -613,11 +632,11 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u // but then the search has to walk the path. it might be more efficient if search called GetPath itself ... or we send the path as additional metadata in the event uploadRef := &provider.Reference{ ResourceId: &provider.ResourceId{ - StorageId: uploadMetadata.ProviderID, + StorageId: uploadSession.ProviderID, SpaceId: n.SpaceID, OpaqueId: n.SpaceID, }, - Path: utils.MakeRelativePath(filepath.Join(uploadMetadata.Dir, uploadMetadata.Filename)), + Path: utils.MakeRelativePath(filepath.Join(uploadSession.Dir, uploadSession.Filename)), } executant, ok := ctxpkg.ContextGetUser(ctx) if !ok { @@ -639,7 +658,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u ri := provider.ResourceInfo{ // fill with at least fileid, mtime and etag Id: &provider.ResourceId{ - StorageId: uploadMetadata.ProviderID, + StorageId: uploadSession.ProviderID, SpaceId: n.SpaceID, OpaqueId: n.ID, }, @@ -761,7 +780,7 @@ func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (stor return nil, fmt.Errorf("invalid upload path") } - metadata, err := upload.ReadMetadata(ctx, fs.lu, match[1]) + metadata, err := tus.ReadSession(ctx, fs.lu.InternalRoot(), match[1]) if err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 81d906e201..fd6f8b0588 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -67,7 +67,7 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") continue // NOTE: since we can't get the upload, we can't delete the blob } - uploadMetadata, err := ReadMetadata(ctx, lu, info.ID) + uploadSession, err := tus.ReadSession(ctx, lu.InternalRoot(), info.ID) if err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload metadata") continue // NOTE: since we can't get the upload, we can't delete the blob @@ -82,9 +82,9 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa var sizeDiff int64 // propagate sizeDiff after failed postprocessing - n, err := ReadNode(ctx, lu, uploadMetadata) + n, err := ReadNode(ctx, lu, uploadSession) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("metadata", uploadMetadata).Msg("could not read revision node on postprocessing finished") + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("metadata", uploadSession).Msg("could not read revision node on postprocessing finished") continue } @@ -96,12 +96,19 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa failed = true case events.PPOutcomeContinue: deleteMetadata = true - if err := Finalize(ctx, blobstore, uploadMetadata.MTime, info, n, uploadMetadata.BlobID); err != nil { + if err := Finalize(ctx, blobstore, uploadSession.MTime, info, n, uploadSession.BlobID); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") deleteMetadata = false failed = true } - sizeDiff, err = SetNodeToUpload(ctx, lu, n, uploadMetadata) + sizeDiff, err = SetNodeToUpload(ctx, lu, n, RevisionMetadata{ + MTime: uploadSession.MTime, + BlobID: uploadSession.BlobID, + BlobSize: info.Size, + ChecksumSHA1: uploadSession.ChecksumSHA1, + ChecksumMD5: uploadSession.ChecksumMD5, + ChecksumADLER32: uploadSession.ChecksumADLER32, + }) if err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could set node to revision upload") deleteMetadata = false @@ -145,7 +152,7 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa p := Progress{ Upload: up, Path: lu.UploadPath(ev.UploadID), - Session: uploadMetadata, + Session: uploadSession, Processing: false, } if err := p.Purge(ctx); err != nil { @@ -153,7 +160,7 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa } } if deleteMetadata { - err := tusDataStore.CleanupMetadata(ctx, ev.UploadID) + err := uploadSession.CleanupMetadata(ctx) if err != nil { log.Info().Str("path", n.InternalPath()).Err(err).Msg("cleaning up the tus info file failed") } @@ -170,23 +177,23 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa Failed: failed, ExecutingUser: &user.User{ Id: &user.UserId{ - Type: user.UserType(user.UserType_value[uploadMetadata.ExecutantType]), - Idp: uploadMetadata.ExecutantIdp, - OpaqueId: uploadMetadata.ExecutantID, + Type: user.UserType(user.UserType_value[uploadSession.ExecutantType]), + Idp: uploadSession.ExecutantIdp, + OpaqueId: uploadSession.ExecutantID, }, - Username: uploadMetadata.ExecutantUserName, + Username: uploadSession.ExecutantUserName, }, Filename: ev.Filename, FileRef: &provider.Reference{ ResourceId: &provider.ResourceId{ - StorageId: uploadMetadata.ProviderID, - SpaceId: uploadMetadata.SpaceRoot, - OpaqueId: uploadMetadata.SpaceRoot, + StorageId: uploadSession.ProviderID, + SpaceId: uploadSession.SpaceRoot, + OpaqueId: uploadSession.SpaceRoot, }, // FIXME this seems wrong, path is not really relative to space root // actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root so soarch can index the path // hm is that robust? what if the file is moved? shouldn't we store the parent id, then? - Path: utils.MakeRelativePath(filepath.Join(uploadMetadata.Dir, uploadMetadata.Filename)), + Path: utils.MakeRelativePath(filepath.Join(uploadSession.Dir, uploadSession.Filename)), }, Timestamp: utils.TimeToTS(now), SpaceOwner: n.SpaceOwnerOrManager(ctx), @@ -205,15 +212,15 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") continue // NOTE: since we can't get the upload, we can't restart postprocessing } - uploadMetadata, err := ReadMetadata(ctx, lu, info.ID) + uploadSession, err := tus.ReadSession(ctx, lu.InternalRoot(), info.ID) if err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload metadata") continue // NOTE: since we can't get the upload, we can't delete the blob } - n, err := ReadNode(ctx, lu, uploadMetadata) + n, err := ReadNode(ctx, lu, uploadSession) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("metadata", uploadMetadata).Msg("could not read revision node on restart postprocessing") + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("metadata", uploadSession).Msg("could not read revision node on restart postprocessing") continue } @@ -229,7 +236,7 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa SpaceOwner: n.SpaceOwnerOrManager(ctx), ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, - Filename: uploadMetadata.Filename, + Filename: uploadSession.Filename, Filesize: uint64(info.Size), }); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") @@ -339,7 +346,7 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") continue } - uploadMetadata, err := ReadMetadata(ctx, lu, info.ID) + uploadSession, err := tus.ReadSession(ctx, lu.InternalRoot(), info.ID) if err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload metadata") continue // NOTE: since we can't get the upload, we can't delete the blob @@ -347,9 +354,9 @@ func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCa // scan data should be set on the node revision not the node ... then when postprocessing finishes we need to copy the state to the node - n, err = ReadNode(ctx, lu, uploadMetadata) + n, err = ReadNode(ctx, lu, uploadSession) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("metadata", uploadMetadata).Msg("could not read revision node on default event") + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("metadata", uploadSession).Msg("could not read revision node on default event") continue } } diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index cded28e5e8..295568823b 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -225,7 +225,7 @@ func CreateRevisionNode(ctx context.Context, lu *lookup.Lookup, revisionNode *no return f, nil } -func SetNodeToUpload(ctx context.Context, lu *lookup.Lookup, n *node.Node, uploadMetadata tus.Session) (int64, error) { +func SetNodeToUpload(ctx context.Context, lu *lookup.Lookup, n *node.Node, rm RevisionMetadata) (int64, error) { nodePath := n.InternalPath() // lock existing node metadata @@ -241,19 +241,10 @@ func SetNodeToUpload(ctx context.Context, lu *lookup.Lookup, n *node.Node, uploa return 0, err } - sizeDiff := uploadMetadata.BlobSize - n.Blobsize + sizeDiff := rm.BlobSize - n.Blobsize - n.BlobID = uploadMetadata.BlobID - n.Blobsize = uploadMetadata.BlobSize - - rm := RevisionMetadata{ - MTime: uploadMetadata.MTime, - BlobID: uploadMetadata.BlobID, - BlobSize: uploadMetadata.BlobSize, - ChecksumSHA1: uploadMetadata.ChecksumSHA1, - ChecksumMD5: uploadMetadata.ChecksumMD5, - ChecksumADLER32: uploadMetadata.ChecksumADLER32, - } + n.BlobID = rm.BlobID + n.Blobsize = rm.BlobSize if rm.MTime == "" { rm.MTime = time.Now().UTC().Format(time.RFC3339Nano) @@ -365,29 +356,33 @@ func Finalize(ctx context.Context, blobstore tree.Blobstore, revision string, in _, span := tracer.Start(ctx, "Finalize") defer span.End() - rn := n.RevisionNode(ctx, revision) - rn.BlobID = blobID - var err error - if mover, ok := blobstore.(tree.BlobstoreMover); ok { - err = mover.MoveBlob(rn, "", info.Storage["Bucket"], info.Storage["Key"]) - switch err { - case nil: - return nil - case tree.ErrBlobstoreCannotMove: - // fallback below - default: - return err + /* + rn := n.RevisionNode(ctx, revision) + rn.BlobID = blobID + var err error + if mover, ok := blobstore.(tree.BlobstoreMover); ok { + err = mover.MoveBlob(rn, "", info.Storage["Bucket"], info.Storage["Key"]) + switch err { + case nil: + return nil + case tree.ErrBlobstoreCannotMove: + // fallback below + default: + return err + } } - } - // upload the data to the blobstore - _, subspan := tracer.Start(ctx, "WriteBlob") - err = blobstore.Upload(rn, info.Storage["Path"]) // FIXME where do we read from - subspan.End() - if err != nil { - return errors.Wrap(err, "failed to upload file to blobstore") - } + // upload the data to the blobstore + _, subspan := tracer.Start(ctx, "WriteBlob") + err = blobstore.Upload(rn, info.Storage["Path"]) // FIXME where do we read from + subspan.End() + if err != nil { + return errors.Wrap(err, "failed to upload file to blobstore") + } + */ + // TODO delete info? no ... the upload session continues, but flag it as done? no we have postprocessing for that + // what if a client terminates an upload while postprocessing is running? // FIXME use a reader return nil } diff --git a/pkg/storage/utils/tus/datastore.go b/pkg/storage/utils/tus/datastore.go index 84bd30c939..efabb74646 100644 --- a/pkg/storage/utils/tus/datastore.go +++ b/pkg/storage/utils/tus/datastore.go @@ -20,19 +20,26 @@ package tus import ( "context" + "fmt" + "strings" tusd "github.com/tus/tusd/pkg/handler" ) // DataStore is an interface that extends the tusd.DataStore interface. type DataStore interface { - NewUpload(ctx context.Context, session Session) (upload tusd.Upload, err error) - GetUpload(ctx context.Context, id string) (upload tusd.Upload, err error) - - // CleanupMetadata cleans up an upload by its ID. - CleanupMetadata(ctx context.Context, id string) error + tusd.DataStore + NewUploadWithSession(ctx context.Context, session Session) (upload tusd.Upload, err error) } func BuildUploadId(spaceID, blobID string) string { return spaceID + ":" + blobID } + +func SplitUploadId(uploadID string) (spaceID string, blobID string, err error) { + parts := strings.SplitN(uploadID, ":", 2) + if len(parts) != 2 { + return "", "", fmt.Errorf("invalid uploadid") + } + return parts[0], parts[1], nil +} diff --git a/pkg/storage/utils/tus/filestore.go b/pkg/storage/utils/tus/filestore.go index 6dbaa2037a..1b43ca4e40 100644 --- a/pkg/storage/utils/tus/filestore.go +++ b/pkg/storage/utils/tus/filestore.go @@ -20,38 +20,249 @@ package tus import ( "context" + "fmt" + "io" "os" "path/filepath" + "strings" - tusfilestore "github.com/tus/tusd/pkg/filestore" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/tus/tusd/pkg/handler" ) -// FileStore is a wrapper around tusfilestore.FileStore that provides additional functionality. +var defaultFilePerm = os.FileMode(0664) + +// See the handler.DataStore interface for documentation about the different +// methods. type FileStore struct { - tusfilestore.FileStore + // Relative or absolute path to store files in. FileStore does not check + // whether the path exists, use os.MkdirAll in this case on your own. + Path string } -// NewFileStore creates a new instance of FileStore with the specified path. +// New creates a new file based storage backend. The directory specified will +// be used as the only storage entry. This method does not check +// whether the path exists, use os.MkdirAll to ensure. +// In addition, a locking mechanism is provided. func NewFileStore(path string) FileStore { - return FileStore{tusfilestore.New(path)} + return FileStore{path} +} + +// UseIn sets this store as the core data store in the passed composer and adds +// all possible extension to it. +func (store FileStore) UseIn(composer *handler.StoreComposer) { + composer.UseCore(store) + composer.UseTerminater(store) + composer.UseConcater(store) + composer.UseLengthDeferrer(store) +} + +func (store FileStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) { + return nil, fmt.Errorf("fileStore: must call NewUploadSession") } +func (store FileStore) NewUploadWithSession(ctx context.Context, session Session) (handler.Upload, error) { + + if session.ID == "" { + return nil, fmt.Errorf("s3store: upload id must be set") + } + + binPath := store.binPath(session.ID) + if err := os.MkdirAll(filepath.Dir(binPath), 0755); err != nil { + return nil, err + } + + session.Storage = map[string]string{ + "Type": "filestore", + "Path": binPath, + } + + // Create binary file with no content + file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) + if err != nil { + if os.IsNotExist(err) { + err = fmt.Errorf("upload directory does not exist: %s", store.Path) + } + return nil, err + } + err = file.Close() + if err != nil { + return nil, err + } -// NewUpload creates a new upload for the given file info in the file store. -func (store FileStore) NewUpload(ctx context.Context, session Session) (handler.Upload, error) { - err := os.MkdirAll(filepath.Dir(filepath.Join(store.Path, session.ID)), 0755) + upload := &fileUpload{session.ID, &store, &session} + + err = session.Persist(ctx) if err != nil { return nil, err } - return store.FileStore.NewUpload(ctx, session.ToFileInfo()) + + return upload, nil +} + +func (store FileStore) GetUpload(ctx context.Context, id string) (handler.Upload, error) { + return &fileUpload{ + id: id, + store: &store, + }, nil + /* + info := handler.FileInfo{} + data, err := ioutil.ReadFile(store.infoPath(id)) + if err != nil { + if os.IsNotExist(err) { + // Interpret os.ErrNotExist as 404 Not Found + err = handler.ErrNotFound + } + return nil, err + } + if err := json.Unmarshal(data, &info); err != nil { + return nil, err + } + + binPath := store.binPath(id) + infoPath := store.infoPath(id) + stat, err := os.Stat(binPath) + if err != nil { + if os.IsNotExist(err) { + // Interpret os.ErrNotExist as 404 Not Found + err = handler.ErrNotFound + } + return nil, err + } + + info.Offset = stat.Size() + + return &fileUpload{ + info: info, + binPath: binPath, + infoPath: infoPath, + }, nil + */ +} + +func (store FileStore) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { + return upload.(*fileUpload) +} + +func (store FileStore) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload { + return upload.(*fileUpload) +} + +func (store FileStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload { + return upload.(*fileUpload) +} + +// binPath returns the path to the file storing the binary data. +func (store FileStore) binPath(uploadID string) string { + // uploadID is of the format : + parts := strings.SplitN(uploadID, ":", 2) + return filepath.Clean(filepath.Join(store.Path, "spaces", lookup.Pathify(parts[0], 1, 2), "blobs", lookup.Pathify(parts[1], 4, 2))) +} + +type fileUpload struct { + id string + store *FileStore + + // session stores the upload's current Session struct. It may be nil if it hasn't + // been fetched yet from S3. Never read or write to it directly but instead use + // the GetInfo and writeInfo functions. + session *Session +} + +func (upload *fileUpload) GetInfo(ctx context.Context) (handler.FileInfo, error) { + if upload.session != nil { + return upload.session.ToFileInfo(), nil + } + + session, err := upload.GetSession(ctx) + if err != nil { + return handler.FileInfo{}, err + } + return session.ToFileInfo(), nil +} + +func (upload *fileUpload) GetSession(ctx context.Context) (Session, error) { + session, err := upload.fetchSession(ctx) + if err != nil { + return session, err + } + + upload.session = &session + + return session, nil +} +func (upload *fileUpload) fetchSession(ctx context.Context) (Session, error) { + id := upload.id + store := upload.store + + // Get file info stored in separate object + session, err := ReadSession(ctx, store.Path, id) + if err != nil { + return Session{}, err + } + return session, nil +} + +func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { + file, err := os.OpenFile(upload.store.binPath(upload.id), os.O_WRONLY|os.O_APPEND, defaultFilePerm) + if err != nil { + return 0, err + } + defer file.Close() + + n, err := io.Copy(file, src) + + upload.session.Offset += n + return n, err +} + +func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) { + return os.Open(upload.store.binPath(upload.id)) +} + +func (upload *fileUpload) Terminate(ctx context.Context) error { + if err := os.Remove(upload.store.sessionPath(upload.id)); err != nil { + return err + } + if err := os.Remove(upload.store.binPath(upload.id)); err != nil { + return err + } + return nil +} + +func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []handler.Upload) (err error) { + file, err := os.OpenFile(upload.store.binPath(upload.id), os.O_WRONLY|os.O_APPEND, defaultFilePerm) + if err != nil { + return err + } + defer file.Close() + + for _, partialUpload := range uploads { + fileUpload := partialUpload.(*fileUpload) + + src, err := os.Open(upload.store.binPath(fileUpload.id)) + if err != nil { + return err + } + + if _, err := io.Copy(file, src); err != nil { + return err + } + } + + return +} + +func (upload *fileUpload) DeclareLength(ctx context.Context, length int64) error { + upload.session.Size = length + upload.session.SizeIsDeferred = false + return upload.session.Persist(ctx) } -// CleanupMetadata removes the metadata associated with the given ID. -func (store FileStore) CleanupMetadata(_ context.Context, id string) error { - return os.Remove(store.infoPath(id)) +func (upload *fileUpload) FinishUpload(ctx context.Context) error { + return nil } // infoPath returns the path to the .info file storing the file's info. -func (store FileStore) infoPath(id string) string { - return filepath.Join(store.Path, id+".info") +func (store FileStore) sessionPath(id string) string { + return filepath.Join(store.Path, id+".mpk") } diff --git a/pkg/storage/utils/tus/s3store.go b/pkg/storage/utils/tus/s3store.go index 1ff8d1200b..245892b886 100644 --- a/pkg/storage/utils/tus/s3store.go +++ b/pkg/storage/utils/tus/s3store.go @@ -72,8 +72,6 @@ package tus import ( "bytes" "context" - "crypto/rand" - "encoding/hex" "errors" "fmt" "io" @@ -205,24 +203,25 @@ type s3Upload struct { id string store *S3Store - // info stores the upload's current Session struct. It may be nil if it hasn't + // session stores the upload's current Session struct. It may be nil if it hasn't // been fetched yet from S3. Never read or write to it directly but instead use // the GetInfo and writeInfo functions. session *Session } -func (store S3Store) NewUpload(ctx context.Context, session Session) (handler.Upload, error) { +func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) { + return nil, fmt.Errorf("s3store: must call NewUploadSession") +} + +// NewUploadWithSession creates a new tus s3 upload backed by an upload session +func (store S3Store) NewUploadWithSession(ctx context.Context, session Session) (handler.Upload, error) { // an upload larger than MaxObjectSize must throw an error if session.Size > store.MaxObjectSize { return nil, fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", session.Size, store.MaxObjectSize) } - var uploadId string if session.ID == "" { return nil, fmt.Errorf("s3store: upload id must be set") - } else { - // certain tests set info.ID in advance - uploadId = session.ID } // Convert meta data into a map of pointers for AWS Go SDK, sigh. @@ -237,23 +236,22 @@ func (store S3Store) NewUpload(ctx context.Context, session Session) (handler.Up // Create the actual multipart upload res, err := store.Service.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(uploadId), + Key: store.keyWithPrefix(session.ID), Metadata: metadata, }) if err != nil { return nil, fmt.Errorf("s3store: unable to create multipart upload:\n%s", err) } - id := uploadId + "+" + *res.UploadId - session.ID = id + session.ID = session.ID + "+" + *res.UploadId session.Storage = map[string]string{ "Type": "s3store", "Bucket": store.Bucket, - "Key": *store.keyWithPrefix(uploadId), + "Key": *store.keyWithPrefix(session.ID), } - upload := &s3Upload{id, &store, nil} + upload := &s3Upload{session.ID, &store, &session} err = session.Persist(ctx) if err != nil { return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err) @@ -266,16 +264,6 @@ func (store S3Store) GetUpload(ctx context.Context, id string) (handler.Upload, return &s3Upload{id, &store, nil}, nil } -// CleanupMetadata removes the metadata associated with the given ID. -func (store S3Store) CleanupMetadata(ctx context.Context, id string) error { - uploadID, _ := splitIds(id) - _, err := store.Service.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ - Bucket: aws.String(store.Bucket), - Key: store.metadataKeyWithPrefix(uploadID + ".info"), - }) - return err -} - func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { return upload.(*s3Upload) } @@ -464,7 +452,7 @@ func (upload *s3Upload) GetSession(ctx context.Context) (Session, error) { return session, nil } -func (upload s3Upload) fetchSession(ctx context.Context) (Session, error) { +func (upload *s3Upload) fetchSession(ctx context.Context) (Session, error) { id := upload.id store := upload.store uploadId, _ := splitIds(id) @@ -606,9 +594,6 @@ func (upload s3Upload) Terminate(ctx context.Context) error { { Key: store.metadataKeyWithPrefix(uploadId + ".part"), }, - { - Key: store.metadataKeyWithPrefix(uploadId + ".info"), - }, }, Quiet: aws.Bool(true), }, @@ -1069,22 +1054,6 @@ func (store S3Store) metadataKeyWithPrefix(key string) *string { return aws.String(prefix + key) } -// uid returns a unique id. These ids consist of 128 bits from a -// cryptographically strong pseudo-random generator and are like uuids, but -// without the dashes and significant bits. -// -// See: http://en.wikipedia.org/wiki/UUID#Random_UUID_probability_of_duplicates -func uid() string { - id := make([]byte, 16) - _, err := io.ReadFull(rand.Reader, id) - if err != nil { - // This is probably an appropriate way to handle errors from our source - // for random bits. - panic(err) - } - return hex.EncodeToString(id) -} - func newMultiError(errs []error) error { message := "Multiple errors occurred:\n" for _, err := range errs { diff --git a/pkg/storage/utils/tus/session.go b/pkg/storage/utils/tus/session.go index 101d9dbb9a..95ebbcfd3e 100644 --- a/pkg/storage/utils/tus/session.go +++ b/pkg/storage/utils/tus/session.go @@ -77,25 +77,26 @@ func NewSession(ctx context.Context, root string) Session { } func ReadSession(ctx context.Context, root, id string) (Session, error) { - uploadPath := uploadPath(root, id) + uploadPath := sessionPath(root, id) msgBytes, err := os.ReadFile(uploadPath) if err != nil { return Session{}, err } - metadata := Session{} + session := Session{} if len(msgBytes) > 0 { - err = msgpack.Unmarshal(msgBytes, &metadata) + err = msgpack.Unmarshal(msgBytes, &session) if err != nil { return Session{}, err } } - return metadata, nil + session.root = root + return session, nil } func (m Session) Persist(ctx context.Context) error { - uploadPath := uploadPath(m.root, m.ID) + uploadPath := sessionPath(m.root, m.ID) // create folder structure (if needed) if err := os.MkdirAll(filepath.Dir(uploadPath), 0700); err != nil { return err @@ -168,6 +169,10 @@ func (m Session) GetExpires() time.Time { return m.Expires } -func uploadPath(root, id string) string { +func (m Session) CleanupMetadata(ctx context.Context) error { + return os.Remove(sessionPath(m.root, m.ID)) +} + +func sessionPath(root, id string) string { return filepath.Join(root, "uploads", id+".mpk") }