Skip to content

Commit

Permalink
gateway: add one more hierarchy to upload tmp/multiupload dir to redu…
Browse files Browse the repository at this point in the history
…ce txn conflicts

Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit committed Jan 12, 2025
1 parent 5714660 commit 91c784d
Showing 1 changed file with 68 additions and 36 deletions.
104 changes: 68 additions & 36 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func (n *jfsObjects) tpath(p ...string) string {
}

func (n *jfsObjects) upath(bucket, uploadID string) string {
return n.tpath(bucket, "uploads", uploadID)
return n.tpath(bucket, "uploads", uploadID[:2], uploadID)
}

func (n *jfsObjects) ppath(bucket, uploadID, part string) string {
return n.tpath(bucket, "uploads", uploadID, part)
return n.tpath(bucket, "uploads", uploadID[:2], uploadID, part)
}

func (n *jfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
Expand Down Expand Up @@ -536,7 +536,8 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
if minio.IsStringEqual(src, dst) {
return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
}
tmp := n.tpath(dstBucket, "tmp", minio.MustGetUUID())
uuid := minio.MustGetUUID()
tmp := n.tpath(dstBucket, "tmp", uuid[:2], uuid)
f, eno := n.fs.Create(mctx, tmp, 0666, n.gConf.Umask)
if eno == syscall.ENOENT {
_ = n.mkdirAll(ctx, path.Dir(tmp))
Expand Down Expand Up @@ -725,7 +726,8 @@ func (n *jfsObjects) mkdirAll(ctx context.Context, p string) error {
}

func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions, applyObjTaggingFunc func(tmpName string)) (err error) {
tmpname := n.tpath(bucket, "tmp", minio.MustGetUUID())
uuid := minio.MustGetUUID()
tmpname := n.tpath(bucket, "tmp", uuid[:2], uuid)
f, eno := n.fs.Create(mctx, tmpname, 0666, n.gConf.Umask)
if eno == syscall.ENOENT {
_ = n.mkdirAll(ctx, path.Dir(tmpname))
Expand Down Expand Up @@ -880,7 +882,7 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr
return // no found
}
defer f.Close(mctx)
entries, eno := f.ReaddirPlus(mctx, 0)
parents, eno := f.ReaddirPlus(mctx, 0)
if eno != 0 {
err = jfsToObjectErr(ctx, eno, bucket)
return
Expand All @@ -891,22 +893,38 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr
lmi.MaxUploads = maxUploads
lmi.Delimiter = delimiter
commPrefixSet := make(map[string]struct{})
for _, e := range entries {
uploadID := string(e.Name)
// todo: parallel
object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName)
for _, p := range parents {
f, eno := n.fs.Open(mctx, n.tpath(bucket, "uploads", string(p.Name)), 0)
if eno != 0 {
logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno)
continue
return
}
object := string(object_)
if strings.HasPrefix(object, prefix) {
if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" {
lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{
Object: object,
UploadID: uploadID,
Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)),
})
defer f.Close(mctx)
entries, eno := f.ReaddirPlus(mctx, 0)
if eno != 0 {
err = jfsToObjectErr(ctx, eno, bucket)
return
}

for _, e := range entries {
if len(e.Name) != 36 {
continue // not an uuid
}
uploadID := string(e.Name)
// todo: parallel
object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName)
if eno != 0 {
logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno)
continue
}
object := string(object_)
if strings.HasPrefix(object, prefix) {
if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" {
lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{
Object: object,
UploadID: uploadID,
Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)),
})
}
}
}
}
Expand Down Expand Up @@ -1148,7 +1166,7 @@ func (n *jfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
}

func (n *jfsObjects) cleanup() {
for t := range time.Tick(24 * time.Hour) {
for range time.Tick(24 * time.Hour) {
// default bucket tmp dirs
tmpDirs := []string{".sys/tmp/", ".sys/uploads/"}
if n.gConf.MultiBucket {
Expand All @@ -1163,27 +1181,41 @@ func (n *jfsObjects) cleanup() {
}
}
for _, dir := range tmpDirs {
f, errno := n.fs.Open(mctx, dir, 0)
if errno != 0 {
n.cleanupDir(dir)
}
}
}

func (n *jfsObjects) cleanupDir(dir string) bool {
mctx := n.mctx

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / sdktest

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unittests (test.fdb)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

n.mctx undefined (type *jfsObjects has no field or method mctx) (typecheck)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

n.mctx undefined (type *jfsObjects has no field or method mctx)) (typecheck)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / sync (sync)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integrationtests (tikv)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / sync (sync_minio)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integrationtests (redis)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / sync (sync_fsrand)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / pjdfstest (badger)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / sync (sync_cluster)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / pjdfstest (redis)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / sync (sync_exclude)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unittests (test.cmd)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integrationtests (mysql)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unittests (test.meta.core)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unittests (test.pkg)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / pjdfstest (sqlite3)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / xattr (redis)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unittests (test.meta.non-core)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / pjdfstest (redis)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / build (1.21)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / build (1.20)

n.mctx undefined (type *jfsObjects has no field or method mctx)

Check failure on line 1190 in pkg/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / build (1.22)

n.mctx undefined (type *jfsObjects has no field or method mctx)
f, errno := n.fs.Open(mctx, dir, 0)
if errno != 0 {
return false
}
defer f.Close(mctx)
entries, _ := f.ReaddirPlus(mctx, 0)
now := time.Now()
deleted := 0
for _, entry := range entries {
dirPath := n.path(dir, string(entry.Name))
if entry.Attr.Typ == meta.TypeDirectory && len(entry.Name) == 2 {
if !n.cleanupDir(dirPath) {
continue
}
entries, _ := f.ReaddirPlus(mctx, 0)
for _, entry := range entries {
if _, err := uuid.Parse(string(entry.Name)); err != nil {
continue
}
if t.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour {
p := n.path(dir, string(entry.Name))
if errno := n.fs.Rmr(mctx, p); errno != 0 {
logger.Errorf("failed to delete expired temporary files path: %s,", p)
} else {
logger.Infof("delete expired temporary files path: %s, mtime: %s", p, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339))
}
}
} else if _, err := uuid.Parse(string(entry.Name)); err != nil {
logger.Warnf("unexpected file path: %s", dirPath)
continue
}
if now.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour {
if errno = n.fs.Rmr(mctx, dirPath); errno != 0 {
logger.Errorf("failed to delete expired temporary files path: %s, err: %s", dirPath, errno)
} else {
deleted += 1
logger.Infof("delete expired temporary files path: %s, mtime: %s", dirPath, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339))
}
_ = f.Close(mctx)
}
}
return deleted == len(entries)
}

type jfsFLock struct {
Expand Down

0 comments on commit 91c784d

Please sign in to comment.