Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent leader checker from generating excessive duplicate leader tasks #39000

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/querycoordv2/checkers/leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,16 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
continue
}

// when segment's version in leader view doesn't match segment's version in dist
// which means leader view store wrong segment location in leader view, then we should update segment location and segment's version
// The routing table on the delegator points to the nodes where segments are loaded. There are two scenarios that require updating the routing table on the delegator:
// 1. Missing Segment Routing - The routing table lacks the route for a specific segment.
// 2. Outdated Segment Routing - A segment has multiple copies loaded, but the routing table points to a node that does not host the most recently loaded copy.
// This ensures the routing table remains accurate and up-to-date, reflecting the latest segment distribution.
version, ok := leaderView.Segments[s.GetID()]
if !ok || version.GetVersion() != s.Version {
if !ok || version.GetNodeID() != s.Node {
log.RatedDebug(10, "leader checker append a segment to set",
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))

action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), time.Now().UnixNano())
t := task.NewLeaderSegmentTask(
ctx,
Expand Down
20 changes: 13 additions & 7 deletions internal/querycoordv2/checkers/leader_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,28 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// test segment's version in leader view doesn't match segment's version in dist
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
// Verify that the segment routing table in the leader view does not point to the most recent segment replica.
// the leader view points to the segment on querynode-2, with version 1
// the distribution shows that the segment is on querynode-1, with latest version 2
node1, node2 := int64(1), int64(2)
version1, version2 := int64(1), int64(2)
observer.dist.SegmentDistManager.Update(node1)
observer.dist.SegmentDistManager.Update(node2, utils.CreateTestSegment(1, 1, 1, node2, version2, "test-insert-channel"))
view = utils.CreateTestLeaderView(node2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget)
view.Segments[1] = &querypb.SegmentDist{
NodeID: 0,
Version: time.Now().UnixMilli() - 1,
NodeID: node1,
Version: version1,
}
observer.dist.LeaderViewManager.Update(2, view)
observer.dist.LeaderViewManager.Update(node2, view)

tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].Node(), node2)
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).GetLeaderID(), node2)
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

Expand Down
Loading