Skip to content

Commit

Permalink
filter metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Sep 4, 2023
1 parent 4f8a9fa commit 0289878
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 145 deletions.
9 changes: 9 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}, nil
}

// FIXME: This is a hack to transport more metadata to the storage.FS InitiateUpload implementation
// we should use a request object that can carry
// * if-match
// * if-unmodified-since
// * uploadLength from the tus Upload-Length header
// * checksum from the tus Upload-Checksum header
// * mtime from the X-OC-Mtime header
// * expires from the s.conf.UploadExpiration ... should that not be part of the driver?
// * providerID
metadata := map[string]string{}
ifMatch := req.GetIfMatch()
if ifMatch != "" {
Expand Down
4 changes: 4 additions & 0 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (s *svc) handleSpacesTusPost(w http.ResponseWriter, r *http.Request, spaceI

sublog := appctx.GetLogger(ctx).With().Str("spaceid", spaceID).Str("path", r.URL.Path).Logger()

// use filename to build a storage space reference
// but what if upload happens directly to toh resourceid .. and filename is empty?
// currently there is always a validator thet requires the filename is not empty ...
// hm -> bug: clients currently cannot POST to an existing source with a resource id only
ref, err := spacelookup.MakeStorageSpaceReference(spaceID, path.Join(r.URL.Path, meta["filename"]))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand Down
44 changes: 44 additions & 0 deletions pkg/rhttp/datatx/manager/tus/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tus

import (
"net/http"
"strings"

tusd "github.com/tus/tusd/pkg/handler"
)

type FilterResponseWriter struct {
w http.ResponseWriter
header http.Header
}

const TusPrefix = "tus."
const CS3Prefix = "cs3."

func NewFilterResponseWriter(w http.ResponseWriter) *FilterResponseWriter {
return &FilterResponseWriter{
w: w,
header: http.Header{},
}
}

func (fw *FilterResponseWriter) Header() http.Header {
return fw.w.Header()
}

func (fw *FilterResponseWriter) Write(b []byte) (int, error) {
return fw.w.Write(b)
}

func (fw *FilterResponseWriter) WriteHeader(statusCode int) {
metadata := tusd.ParseMetadataHeader(fw.w.Header().Get("Upload-Metadata"))
tusMetadata := map[string]string{}
for k, v := range metadata {
if strings.HasPrefix(k, TusPrefix) {
tusMetadata[strings.TrimPrefix(k, TusPrefix)] = v
}
}

fw.w.Header().Set("Upload-Metadata", tusd.SerializeMetadataHeader(tusMetadata))
fw.w.WriteHeader(statusCode)
}
45 changes: 33 additions & 12 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,23 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
ev := <-handler.CompleteUploads
info := ev.Upload
spaceOwner := &userv1beta1.UserId{
OpaqueId: info.MetaData["_SpaceOwnerOrManager"],
OpaqueId: info.MetaData[CS3Prefix+"SpaceOwnerOrManager"],
}
executant := &userv1beta1.UserId{
Type: userv1beta1.UserType(userv1beta1.UserType_value[info.MetaData["_ExecutantType"]]),
Idp: info.MetaData["_ExecutantIdp"],
OpaqueId: info.MetaData["_ExecutantId"],
Type: userv1beta1.UserType(userv1beta1.UserType_value[info.MetaData[CS3Prefix+"ExecutantType"]]),
Idp: info.MetaData[CS3Prefix+"ExecutantIdp"],
OpaqueId: info.MetaData[CS3Prefix+"ExecutantId"],
}
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.MetaData["_SpaceRoot"],
OpaqueId: info.MetaData["_SpaceRoot"], // TODO shouldn't this be the node id?
StorageId: info.MetaData[CS3Prefix+"providerID"],
SpaceId: info.MetaData[CS3Prefix+"SpaceRoot"],
OpaqueId: info.MetaData[CS3Prefix+"SpaceRoot"],
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
// FIXME this seems wrong, path is not really relative to space root
// actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root...
// hm is that robust? what if the file is moved? shouldn't we store the parent id, then?
Path: utils.MakeRelativePath(filepath.Join(info.MetaData[CS3Prefix+"dir"], info.MetaData[CS3Prefix+"filename"])),
}
datatx.InvalidateCache(executant, ref, m.statCache)
if m.publisher != nil {
Expand All @@ -162,6 +165,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
}()

h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// filter metadata headers
w = NewFilterResponseWriter(w)

method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
if r.Header.Get("X-HTTP-Method-Override") != "" {
Expand Down Expand Up @@ -231,14 +237,29 @@ func setHeaders(datastore tusd.DataStore, w http.ResponseWriter, r *http.Request
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload info for upload")
return
}
expires := info.MetaData["expires"]
expires := info.MetaData[CS3Prefix+"expires"]
// fallback for outdated storageproviders that implement a tus datastore
if expires == "" {
expires = info.MetaData["expires"]
}
if expires != "" {
w.Header().Set(net.HeaderTusUploadExpires, expires)
}
resourceid := provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.MetaData["_SpaceRoot"],
OpaqueId: info.MetaData["_NodeId"],
StorageId: info.MetaData[CS3Prefix+"providerID"],
SpaceId: info.MetaData[CS3Prefix+"SpaceRoot"],
OpaqueId: info.MetaData[CS3Prefix+"NodeId"],
}
// fallback for outdated storageproviders that implement a tus datastore
if resourceid.StorageId == "" {
resourceid.StorageId = info.MetaData["providerID"]
}
if resourceid.SpaceId == "" {
resourceid.SpaceId = info.MetaData["SpaceRoot"]
}
if resourceid.OpaqueId == "" {
resourceid.OpaqueId = info.MetaData["NodeId"]
}

w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(resourceid))
}
28 changes: 16 additions & 12 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
Expand Down Expand Up @@ -284,7 +285,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
keepUpload bool
)

n, err := node.ReadNode(ctx, fs.lu, info.MetaData["_SpaceRoot"], info.MetaData["_NodeId"], false, nil, true)
n, err := node.ReadNode(ctx, fs.lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeId"], false, nil, true)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
continue
Expand Down Expand Up @@ -318,7 +319,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {

now := time.Now()
if failed {
sizeDiff, err := strconv.ParseInt(info.MetaData["_sizeDiff"], 10, 64)
sizeDiff, err := strconv.ParseInt(info.MetaData[tus.CS3Prefix+"sizeDiff"], 10, 64)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("info", info).Msg("could not parse sizediff")
continue
Expand Down Expand Up @@ -348,20 +349,23 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
Failed: failed,
ExecutingUser: &user.User{
Id: &user.UserId{
Type: user.UserType(user.UserType_value[info.MetaData["_ExecutantType"]]),
Idp: info.MetaData["_ExecutantIdp"],
OpaqueId: info.MetaData["_ExecutantId"],
Type: user.UserType(user.UserType_value[info.MetaData[tus.CS3Prefix+"ExecutantType"]]),
Idp: info.MetaData[tus.CS3Prefix+"ExecutantIdp"],
OpaqueId: info.MetaData[tus.CS3Prefix+"ExecutantId"],
},
Username: info.MetaData["_ExecutantUserName"],
Username: info.MetaData[tus.CS3Prefix+"ExecutantUserName"],
},
Filename: ev.Filename,
FileRef: &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.MetaData["_SpaceRoot"],
OpaqueId: info.MetaData["_SpaceRoot"],
StorageId: info.MetaData[tus.CS3Prefix+"providerID"],
SpaceId: info.MetaData[tus.CS3Prefix+"SpaceRoot"],
OpaqueId: info.MetaData[tus.CS3Prefix+"SpaceRoot"],
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
// FIXME this seems wrong, path is not really relative to space root
// actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root...
// hm is that robust? what if the file is moved? shouldn't we store the parent id, then?
Path: utils.MakeRelativePath(filepath.Join(info.MetaData[tus.CS3Prefix+"dir"], info.MetaData[tus.CS3Prefix+"filename"])),
},
Timestamp: utils.TimeToTS(now),
SpaceOwner: n.SpaceOwnerOrManager(ctx),
Expand All @@ -381,7 +385,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue // NOTE: since we can't get the upload, we can't restart postprocessing
}

n, err := node.ReadNode(ctx, fs.lu, info.MetaData["_SpaceRoot"], info.MetaData["_NodeId"], false, nil, true)
n, err := node.ReadNode(ctx, fs.lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeId"], false, nil, true)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
continue
Expand All @@ -398,7 +402,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
SpaceOwner: n.SpaceOwnerOrManager(ctx),
ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead?
ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID},
Filename: info.MetaData["_NodeName"],
Filename: info.MetaData[tus.CS3Prefix+"NodeName"],
Filesize: uint64(info.Size),
}); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event")
Expand Down
Loading

0 comments on commit 0289878

Please sign in to comment.