Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

do not allow indefinite write lock during deletes #1897

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,9 +1524,7 @@ func (m *UnpartitionedMemoryIdx) deleteTaggedByIdSet(orgId uint32, ids IdSet) []
func (m *UnpartitionedMemoryIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
var deletedDefs []idx.Archive
pre := time.Now()
bc := m.Lock()
defer func() {
bc.Unlock("Delete", nil)
if len(deletedDefs) == 0 {
return
}
Expand All @@ -1543,18 +1541,32 @@ func (m *UnpartitionedMemoryIdx) Delete(orgId uint32, pattern string) ([]idx.Arc
}
}
}()

m.RLockHigh()
tree, ok := m.tree[orgId]
if !ok {
m.RUnlockHigh()
return nil, nil
}
found, err := find(tree, pattern)
m.RUnlockHigh()
if err != nil {
return nil, err
}

for _, f := range found {
deleted := m.delete(orgId, f, true, true)
bc := BlockContext{}

// limit delete operations holding a write lock to 200ms of every second
tl := NewTimeLimiter(time.Second, time.Millisecond*200, time.Now())

for _, node := range found {
tl.Wait()
lockStart := time.Now()
bc = m.Lock()
Copy link
Collaborator

@shanson7 shanson7 Sep 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm misunderstanding something here, but it seems like the write lock is being acquired and released in each iteration. Perhaps it should only be released when it has used up its time slice?

Repeatedly acquiring write locks is very expensive in the memory index.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent here is to hold write locks for as short a period as possible. Additionally, the duration reported to the TimeLimiter for how long the lock has been held includes the time spent waiting for all existing RLocks to be released. In general, threads will be blocked for MAX(RLock duration) + Lock duration.
If all RLocks are held for a very short amount of time, then this current approach will work well. But as @shanson7
points out, if the RLocks are held for long periods then trying to acquire lots of Locks, no matter how fast they are, is going to result in low throughput due to threads spending all their time being blocked.

I think we need both approaches here. We still need to rate limit the how long we are blocking reads for, but we should also perform deletes in batches to reduce the number of locks needed. We dont want a single write lock to be held for the full 200ms, but holding it for 5ms or less would be fine. 5ms is a really long time, and most delete operations will complete in this time.

Copy link
Contributor Author

@robert-milan robert-milan Sep 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still experimenting with deleting by leaf. I don't think this current approach solves the entire problem set. If someone tries to delete toplevel.* when they have a lot of child nodes it will still lock up the index since the current method would recursively call all the way down the tree.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, deleting * would be even worse.

We definitely need the delete call to find all leaf nodes to be deleted while only holding read locks. Then delete these in batches with write locks.

deleted := m.delete(orgId, node, true, true)
deletedDefs = append(deletedDefs, deleted...)
bc.Unlock("Delete", nil)
tl.Add(time.Since(lockStart))
}

statMetricsActive.DecUint32(uint32(len(deletedDefs)))
Expand Down Expand Up @@ -1585,7 +1597,9 @@ func (m *UnpartitionedMemoryIdx) delete(orgId uint32, n *Node, deleteEmptyParent

// delete the metricDefs
for _, id := range n.Defs {
log.Debugf("memory-idx: deleting %s from index", id)
if log.IsLevelEnabled(log.DebugLevel) {
log.Debugf("memory-idx: deleting %s from index", id)
}
archivePointer, ok := m.defById[id]
if archivePointer == nil {
corruptIndex.Inc()
Expand Down