Skip to content

Commit

Permalink
object/ceph: Speedup list for ceph (#4218)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored Nov 30, 2023
1 parent e89ecd3 commit 6d54ed4
Showing 1 changed file with 64 additions and 12 deletions.
76 changes: 64 additions & 12 deletions pkg/object/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,24 +230,76 @@ func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, e
}
// the keys are not ordered, sort them first
sort.Strings(keys)
c.release(ctx)

var objs = make(chan Object, 1000)
// TODO: parallel
var concurrent = 20
ms := make([]sync.Mutex, concurrent)
conds := make([]*sync.Cond, concurrent)
ready := make([]bool, concurrent)
results := make([]Object, concurrent)
errs := make([]error, concurrent)
for j := 0; j < concurrent; j++ {
conds[j] = sync.NewCond(&ms[j])
if j < len(keys) {
go func(j int) {
ctx, err := c.newContext()
if err != nil {
logger.Errorf("new context: %s", err)
errs[j] = err
return
}
defer ctx.Destroy()
for i := j; i < len(keys); i += concurrent {
key := keys[i]
st, err := ctx.Stat(key)
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
logger.Debugf("Skip non-existent key: %s", key)
results[j] = nil
} else {
logger.Errorf("Stat key %s: %s", key, err)
errs[j] = err
}
} else {
results[j] = &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""}
}

ms[j].Lock()
ready[j] = true
conds[j].Signal()
if errs[j] != nil {
ms[j].Unlock()
break
}
for ready[j] {
conds[j].Wait()
}
ms[j].Unlock()
}
}(j)
}
}
go func() {
defer close(objs)
defer ctx.Destroy()
for _, key := range keys {
st, err := ctx.Stat(key)
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
logger.Debugf("Skip non-existent key: %s", key)
continue
}
for i := range keys {
j := i % concurrent
ms[j].Lock()
for !ready[j] {
conds[j].Wait()
}
if errs[j] != nil {
objs <- nil
logger.Errorf("Stat key %s: %s", key, err)
return
ms[j].Unlock()
// some goroutines will be leaked, but it's ok
// since we won't call ListAll() many times in a process
break
} else if results[j] != nil {
objs <- results[j]
}
objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""}
ready[j] = false
conds[j].Signal()
ms[j].Unlock()
}
}()
return objs, nil
Expand Down

0 comments on commit 6d54ed4

Please sign in to comment.