Skip to content

Commit

Permalink
feat: make addFollower idempotent to avoid FollowerAlreadyPresent
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Jan 25, 2025
1 parent c5ad34d commit 91e05ce
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 8 deletions.
2 changes: 0 additions & 2 deletions common/error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const (
CodeInvalidSessionTimeout codes.Code = 109
CodeNamespaceNotFound codes.Code = 110
CodeNotificationsNotEnabled codes.Code = 111
CodeFollowerAlreadyPresent codes.Code = 112
CodeFollowerAlreadyFenced codes.Code = 113
)

Expand All @@ -49,6 +48,5 @@ var (
ErrorInvalidSessionTimeout = status.Error(CodeInvalidSessionTimeout, "oxia: invalid session timeout")
ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found")
ErrorNotificationsNotEnabled = status.Error(CodeNotificationsNotEnabled, "oxia: notifications not enabled on namespace")
ErrorFollowerAlreadyPresent = status.Error(CodeFollowerAlreadyPresent, "oxia: follower is already present")
ErrorFollowerAlreadyFenced = status.Error(CodeFollowerAlreadyFenced, "oxia: follower is already fenced")
)
2 changes: 1 addition & 1 deletion coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, nod
if err = s.addFollower(*s.shardMetadata.Leader, node.Internal, &proto.EntryId{
Term: fr.Term,
Offset: fr.Offset,
}); err != nil && status.Code(err) != common.CodeFollowerAlreadyPresent {
}); err != nil {
res <- err
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (lc *leaderController) AddFollower(req *proto.AddFollowerRequest) (*proto.A
}

if _, followerAlreadyPresent := lc.followers[req.FollowerName]; followerAlreadyPresent {
return nil, errors.Wrapf(common.ErrorFollowerAlreadyPresent, "follower: %s", req.FollowerName)
return &proto.AddFollowerResponse{}, nil
}

if len(lc.followers) == int(lc.replicationFactor)-1 {
Expand Down
8 changes: 4 additions & 4 deletions server/leader_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,17 +589,17 @@ func TestLeaderController_AddFollowerRepeated(t *testing.T) {
FollowerName: "f1",
FollowerHeadEntryId: InvalidEntryId,
})
assert.Nil(t, afRes)
assert.Error(t, err)
assert.Nil(t, err)
assert.Equal(t, &proto.AddFollowerResponse{}, afRes)

_, err = lc.AddFollower(&proto.AddFollowerRequest{
Shard: shard,
Term: 5,
FollowerName: "f1",
FollowerHeadEntryId: InvalidEntryId,
})
assert.Error(t, err)
assert.Equal(t, common.CodeFollowerAlreadyPresent, status.Code(err))
assert.Nil(t, err)
assert.Equal(t, &proto.AddFollowerResponse{}, afRes)

assert.NoError(t, lc.Close())
assert.NoError(t, kvFactory.Close())
Expand Down

0 comments on commit 91e05ce

Please sign in to comment.