Skip to content

Commit

Permalink
Merge branch 'main' into already-present
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jan 25, 2025
2 parents 91e05ce + 10a6992 commit 149e1ab
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 17 deletions.
2 changes: 0 additions & 2 deletions common/error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const (
CodeInvalidSessionTimeout codes.Code = 109
CodeNamespaceNotFound codes.Code = 110
CodeNotificationsNotEnabled codes.Code = 111
CodeFollowerAlreadyFenced codes.Code = 113
)

var (
Expand All @@ -48,5 +47,4 @@ var (
ErrorInvalidSessionTimeout = status.Error(CodeInvalidSessionTimeout, "oxia: invalid session timeout")
ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found")
ErrorNotificationsNotEnabled = status.Error(CodeNotificationsNotEnabled, "oxia: notifications not enabled on namespace")
ErrorFollowerAlreadyFenced = status.Error(CodeFollowerAlreadyFenced, "oxia: follower is already fenced")
)
2 changes: 1 addition & 1 deletion coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (s *shardController) newTermAndAddFollower(ctx context.Context, node model.

func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, node model.ServerAddress, res chan error) {
fr, err := s.newTerm(ctx, node)
if err != nil && status.Code(err) != common.CodeFollowerAlreadyFenced {
if err != nil {
res <- err
return
}
Expand Down
10 changes: 0 additions & 10 deletions server/follower_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,6 @@ func (fc *followerController) NewTerm(req *proto.NewTermRequest) (*proto.NewTerm
slog.Int64("new-term", req.Term),
)
return nil, common.ErrorInvalidTerm
} else if req.Term == fc.term && fc.status != proto.ServingStatus_FENCED {
// It's OK to receive a duplicate Fence request, for the same term, as long as we haven't moved
// out of the Fenced state for that term
fc.log.Warn(
"Failed to fence with same term in invalid state",
slog.Int64("follower-term", fc.term),
slog.Int64("new-term", req.Term),
slog.Any("status", fc.status),
)
return nil, common.ErrorFollowerAlreadyFenced
}

if fc.db == nil {
Expand Down
20 changes: 16 additions & 4 deletions server/follower_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,7 @@ func TestFollower_DuplicateNewTermInFollowerState(t *testing.T) {

stream := newMockServerReplicateStream()
go func() {
// cancelled due to fc.Close() below
assert.ErrorIs(t, fc.Replicate(stream), context.Canceled)
assert.NoError(t, fc.Replicate(stream))
}()

stream.AddRequest(createAddRequest(t, 1, 0, map[string]string{"a": "0", "b": "1"}, 10))
Expand All @@ -384,8 +383,21 @@ func TestFollower_DuplicateNewTermInFollowerState(t *testing.T) {
}, 10*time.Second, 10*time.Millisecond)

r, err := fc.NewTerm(&proto.NewTermRequest{Term: 1})
assert.Nil(t, r)
assert.Equal(t, common.CodeFollowerAlreadyFenced, status.Code(err))
assert.NoError(t, err)
assert.NotNil(t, r)
assert.EqualValues(t, r1.Offset, r.HeadEntryId.Offset)
assert.EqualValues(t, 1, r.HeadEntryId.Term)

stream.AddRequest(createAddRequest(t, 1, 1, map[string]string{"a": "1", "b": "2"}, 11))

// Wait for acks
r2 := stream.GetResponse()

assert.EqualValues(t, 1, r2.Offset)

assert.Eventually(t, func() bool {
return fc.CommitOffset() == 1
}, 10*time.Second, 10*time.Millisecond)

assert.NoError(t, fc.Close())
assert.NoError(t, kvFactory.Close())
Expand Down

0 comments on commit 149e1ab

Please sign in to comment.