Skip to content

Commit

Permalink
make filestore upload session compatible
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Dec 7, 2023
1 parent be393b3 commit f8264b5
Show file tree
Hide file tree
Showing 10 changed files with 409 additions and 194 deletions.
3 changes: 1 addition & 2 deletions pkg/storage/fs/ocis/ocis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package ocis

import (
"path"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
Expand Down Expand Up @@ -48,7 +47,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, err
}

tusDataStore := tus.NewFileStore(filepath.Join(o.Root, "uploads"))
tusDataStore := tus.NewFileStore(o.Root)

return decomposedfs.NewDefault(m, bs, tusDataStore, stream)
}
3 changes: 3 additions & 0 deletions pkg/storage/fs/s3ng/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Options struct {

// ForcePathStyle for the s3 blobstore
S3ForcePathStyle bool `mapstructure:"s3.force_path_style"`

// Root for the upload sessions
Root string `mapstructure:"root"`
}

// S3ConfigComplete return true if all required s3 fields are set
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/fs/s3ng/s3ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
WithS3ForcePathStyle(o.S3ForcePathStyle).
WithDisableSSL(o.S3DisableSSL)

tusDataStore := tus.NewS3Store(o.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config))
tusDataStore := tus.NewS3Store(o.Root, o.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config))
tusDataStore.ObjectPrefix = o.S3UploadObjectPrefix
tusDataStore.MetadataObjectPrefix = o.S3UploadMetadataPrefix
tusDataStore.TemporaryDirectory = o.S3UploadTemporaryDirectory
Expand Down
155 changes: 87 additions & 68 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,23 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
usr := ctxpkg.ContextMustGetUser(ctx)

newBlobID := uuid.New().String()
uploadSession := tus.Session{
ID: tus.BuildUploadId(n.SpaceID, newBlobID),
MetaData: tusd.MetaData{},

BlobID: newBlobID,
Filename: n.Name,
SpaceRoot: n.SpaceRoot.ID,
SpaceOwnerOrManager: n.SpaceOwnerOrManager(ctx).GetOpaqueId(),
ProviderID: headers["providerID"],
MTime: time.Now().UTC().Format(time.RFC3339Nano),
NodeID: n.ID,
NodeParentID: n.ParentID,
ExecutantIdp: usr.Id.Idp,
ExecutantID: usr.Id.OpaqueId,
ExecutantType: utils.UserTypeToString(usr.Id.Type),
ExecutantUserName: usr.Username,
LogLevel: sublog.GetLevel().String(),
}
uploadSession := tus.NewSession(ctx, fs.o.Root)
uploadSession.ID = tus.BuildUploadId(n.SpaceID, newBlobID)
uploadSession.MetaData = tusd.MetaData{}

uploadSession.BlobID = newBlobID
uploadSession.Filename = n.Name
uploadSession.SpaceRoot = n.SpaceRoot.ID
uploadSession.SpaceOwnerOrManager = n.SpaceOwnerOrManager(ctx).GetOpaqueId()
uploadSession.ProviderID = headers["providerID"]
uploadSession.MTime = time.Now().UTC().Format(time.RFC3339Nano)
uploadSession.NodeID = n.ID
uploadSession.NodeParentID = n.ParentID
uploadSession.ExecutantIdp = usr.Id.Idp
uploadSession.ExecutantID = usr.Id.OpaqueId
uploadSession.ExecutantType = utils.UserTypeToString(usr.Id.Type)
uploadSession.ExecutantUserName = usr.Username
uploadSession.LogLevel = sublog.GetLevel().String()

// checksum is sent as tus Upload-Checksum header and should not magically become a metadata property
if checksum, ok := headers["checksum"]; ok {
Expand Down Expand Up @@ -226,22 +225,27 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
return nil, errtypes.Aborted(fmt.Sprintf("parent %s already has a child %s", n.ID, n.Name))
}

// keep track of upload
err = uploadSession.Persist(ctx)
// NewUploadWithSession will also call Persist, TODO it should not
// NewUploadWithSession may change the id? eg for s3
nu, err := fs.tusDataStore.NewUploadWithSession(ctx, uploadSession)
if err != nil {
return nil, err
}
info, err := nu.GetInfo(ctx)
if err != nil {
return nil, err
}

sublog.Debug().Interface("info", uploadSession).Msg("Decomposedfs: initiated upload")
sublog.Debug().Interface("info", info).Msg("Decomposedfs: initiated upload")

return map[string]string{
"simple": uploadSession.ID,
"tus": uploadSession.ID,
"simple": info.ID,
"tus": info.ID,
}, nil
}

// GetDataStore returns the initialized Datastore
func (fs *Decomposedfs) GetDataStore() tus.DataStore {
func (fs *Decomposedfs) GetDataStore() tusd.DataStore {
return fs.tusDataStore
}

Expand All @@ -258,21 +262,21 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
return err
}

uploadMetadata, err := upload.ReadMetadata(ctx, fs.lu, info.ID)
uploadSession, err := tus.ReadSession(ctx, fs.lu.InternalRoot(), info.ID)
if err != nil {
return err
}

// put lockID from upload back into context
if uploadMetadata.LockID != "" {
ctx = ctxpkg.ContextSetLockID(ctx, uploadMetadata.LockID)
if uploadSession.LockID != "" {
ctx = ctxpkg.ContextSetLockID(ctx, uploadSession.LockID)
}

// restore logger from file info
log, err := logger.FromConfig(&logger.LogConf{
Output: "stderr",
Mode: "json",
Level: uploadMetadata.LogLevel,
Level: uploadSession.LogLevel,
})
if err != nil {
return err
Expand Down Expand Up @@ -321,9 +325,9 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {

// compare if they match the sent checksum
// TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads
if uploadMetadata.Checksum != "" {
if uploadSession.Checksum != "" {
var err error
parts := strings.SplitN(uploadMetadata.Checksum, " ", 2)
parts := strings.SplitN(uploadSession.Checksum, " ", 2)
if len(parts) != 2 {
return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'")
}
Expand All @@ -349,14 +353,14 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
}

// update checksums
uploadMetadata.ChecksumSHA1 = sha1h.Sum(nil)
uploadMetadata.ChecksumMD5 = md5h.Sum(nil)
uploadMetadata.ChecksumADLER32 = adler32h.Sum(nil)
uploadSession.ChecksumSHA1 = sha1h.Sum(nil)
uploadSession.ChecksumMD5 = md5h.Sum(nil)
uploadSession.ChecksumADLER32 = adler32h.Sum(nil)

log.Debug().Str("id", info.ID).Msg("upload.UpdateMetadata")
uploadMetadata, n, err := upload.UpdateMetadata(ctx, fs.lu, info.ID, info.Size, uploadMetadata)
uploadSession, n, err := upload.UpdateMetadata(ctx, fs.lu, info.ID, info.Size, uploadSession)
if err != nil {
upload.Cleanup(ctx, fs.lu, n, info.ID, uploadMetadata.MTime, true)
upload.Cleanup(ctx, fs.lu, n, info.ID, uploadSession.MTime, true)
if tup, ok := up.(tusd.TerminatableUpload); ok {
terr := tup.Terminate(ctx)
if terr != nil {
Expand All @@ -369,11 +373,11 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
if fs.stream != nil {
user := &userpb.User{
Id: &userpb.UserId{
Type: userpb.UserType(userpb.UserType_value[uploadMetadata.ExecutantType]),
Idp: uploadMetadata.ExecutantIdp,
OpaqueId: uploadMetadata.ExecutantID,
Type: userpb.UserType(userpb.UserType_value[uploadSession.ExecutantType]),
Idp: uploadSession.ExecutantIdp,
OpaqueId: uploadSession.ExecutantID,
},
Username: uploadMetadata.ExecutantUserName,
Username: uploadSession.ExecutantUserName,
}
s, err := fs.downloadURL(ctx, info.ID)
if err != nil {
Expand All @@ -387,7 +391,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
SpaceOwner: n.SpaceOwnerOrManager(ctx),
ExecutingUser: user,
ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID},
Filename: uploadMetadata.Filename, // TODO what and when do we publish chunking v2 names? Currently, this uses the chunk name.
Filename: uploadSession.Filename, // TODO what and when do we publish chunking v2 names? Currently, this uses the chunk name.
Filesize: uint64(info.Size),
}); err != nil {
return err
Expand Down Expand Up @@ -430,23 +434,36 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
if !fs.o.AsyncFileUploads {
// handle postprocessing synchronously
log.Debug().Str("id", info.ID).Msg("upload.Finalize")
err = upload.Finalize(ctx, fs.blobstore, uploadMetadata.MTime, info, n, uploadMetadata.BlobID) // moving or copying the blob only reads the blobid, no need to change the revision nodes nodeid
err = upload.Finalize(ctx, fs.blobstore, uploadSession.MTime, info, n, uploadSession.BlobID) // moving or copying the blob only reads the blobid, no need to change the revision nodes nodeid

log.Debug().Str("id", info.ID).Msg("upload.Cleanup")
upload.Cleanup(ctx, fs.lu, n, info.ID, uploadMetadata.MTime, err != nil)
if tup, ok := up.(tusd.TerminatableUpload); ok {
log.Debug().Str("id", info.ID).Msg("tup.Terminate")
terr := tup.Terminate(ctx)
if terr != nil {
log.Error().Err(terr).Interface("info", info).Msg("failed to terminate upload")
}
upload.Cleanup(ctx, fs.lu, n, info.ID, uploadSession.MTime, err != nil)
cerr := uploadSession.CleanupMetadata(ctx)
if cerr != nil {
log.Error().Err(cerr).Interface("info", info).Msg("failed to cleanup upload session metadata")
}
/*
if tup, ok := up.(tusd.TerminatableUpload); ok {
log.Debug().Str("id", info.ID).Msg("tup.Terminate")
terr := tup.Terminate(ctx)
if terr != nil {
log.Error().Err(terr).Interface("info", info).Msg("failed to terminate upload")
}
}
*/
if err != nil {
log.Error().Err(err).Msg("failed to upload")
return err
}
log.Debug().Str("id", info.ID).Msg("upload.SetNodeToUpload")
sizeDiff, err = upload.SetNodeToUpload(ctx, fs.lu, n, uploadMetadata)
sizeDiff, err = upload.SetNodeToUpload(ctx, fs.lu, n, upload.RevisionMetadata{
MTime: uploadSession.MTime,
BlobID: uploadSession.BlobID,
BlobSize: info.Size,
ChecksumSHA1: uploadSession.ChecksumSHA1,
ChecksumMD5: uploadSession.ChecksumMD5,
ChecksumADLER32: uploadSession.ChecksumADLER32,
})
if err != nil {
log.Error().Err(err).Msg("failed update Node to revision")
return err
Expand Down Expand Up @@ -500,7 +517,9 @@ func checkHash(expected string, h hash.Hash) error {
// Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated?
func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) {
sublog := appctx.GetLogger(ctx).With().Str("path", req.Ref.Path).Int64("uploadLength", req.Length).Logger()
up, err := fs.tusDataStore.GetUpload(ctx, req.Ref.GetPath())
// TODO GetUpload, GetInfo and ReadSession are pretty redundant here ...
uploadID := strings.TrimLeft(req.Ref.GetPath(), "/")
up, err := fs.tusDataStore.GetUpload(ctx, uploadID)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error retrieving upload")
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload")
Expand All @@ -512,24 +531,24 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload info")
}

uploadMetadata, err := upload.ReadMetadata(ctx, fs.lu, uploadInfo.ID)
uploadSession, err := tus.ReadSession(ctx, fs.lu.InternalRoot(), uploadInfo.ID)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error retrieving upload metadata")
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload metadata")
}

if chunking.IsChunked(uploadMetadata.Chunk) { // check chunking v1, TODO, actually there is a 'OC-Chunked: 1' header, at least when the testsuite uses chunking v1
if chunking.IsChunked(uploadSession.Chunk) { // check chunking v1, TODO, actually there is a 'OC-Chunked: 1' header, at least when the testsuite uses chunking v1
var assembledFile, p string
p, assembledFile, err = fs.chunkHandler.WriteChunk(uploadMetadata.Chunk, req.Body)
p, assembledFile, err = fs.chunkHandler.WriteChunk(uploadSession.Chunk, req.Body)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: could not write chunk")
return provider.ResourceInfo{}, err
}
if p == "" {
sublog.Debug().Err(err).Str("chunk", uploadMetadata.Chunk).Msg("Decomposedfs: wrote chunk")
sublog.Debug().Err(err).Str("chunk", uploadSession.Chunk).Msg("Decomposedfs: wrote chunk")
return provider.ResourceInfo{}, errtypes.PartialContent(req.Ref.String())
}
uploadMetadata.Filename = p
uploadSession.Filename = p
fd, err := os.Open(assembledFile)
if err != nil {
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error opening assembled file")
Expand All @@ -543,14 +562,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
}

// fake a new upload with the correct size
uploadMetadata.Size = chunkStat.Size()
uploadMetadata.BlobSize = chunkStat.Size()
err = upload.WriteMetadata(ctx, fs.lu, uploadMetadata.ID, uploadMetadata)
if err != nil {
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing upload metadata for legacy chunking")
}

nup, err := fs.tusDataStore.NewUpload(ctx, uploadMetadata)
uploadSession.Size = chunkStat.Size()
uploadSession.BlobSize = chunkStat.Size()
//err = uploadSession.Persist(ctx)

Check failure on line 567 in pkg/storage/utils/decomposedfs/upload.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
//if err != nil {
// return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing upload metadata for legacy chunking")
//}

nup, err := fs.tusDataStore.NewUploadWithSession(ctx, uploadSession)
if err != nil {
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: could not create new tus upload for legacy chunking")
}
Expand Down Expand Up @@ -602,7 +621,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
return provider.ResourceInfo{}, err
}

n, err := upload.ReadNode(ctx, fs.lu, uploadMetadata)
n, err := upload.ReadNode(ctx, fs.lu, uploadSession)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error reading node")
return provider.ResourceInfo{}, err
Expand All @@ -613,11 +632,11 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
// but then the search has to walk the path. it might be more efficient if search called GetPath itself ... or we send the path as additional metadata in the event
uploadRef := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: uploadMetadata.ProviderID,
StorageId: uploadSession.ProviderID,
SpaceId: n.SpaceID,
OpaqueId: n.SpaceID,
},
Path: utils.MakeRelativePath(filepath.Join(uploadMetadata.Dir, uploadMetadata.Filename)),
Path: utils.MakeRelativePath(filepath.Join(uploadSession.Dir, uploadSession.Filename)),
}
executant, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
Expand All @@ -639,7 +658,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
ri := provider.ResourceInfo{
// fill with at least fileid, mtime and etag
Id: &provider.ResourceId{
StorageId: uploadMetadata.ProviderID,
StorageId: uploadSession.ProviderID,
SpaceId: n.SpaceID,
OpaqueId: n.ID,
},
Expand Down Expand Up @@ -761,7 +780,7 @@ func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (stor
return nil, fmt.Errorf("invalid upload path")
}

metadata, err := upload.ReadMetadata(ctx, fs.lu, match[1])
metadata, err := tus.ReadSession(ctx, fs.lu.InternalRoot(), match[1])
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f8264b5

Please sign in to comment.