Skip to content

Commit

Permalink
Put the replicator into an error and stopped state when a reconnect l…
Browse files Browse the repository at this point in the history
…oop times out
  • Loading branch information
bbrks committed Feb 5, 2025
1 parent 65c5ae4 commit 4ca372d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
5 changes: 5 additions & 0 deletions db/active_replicator_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ func (a *activeReplicatorCommon) reconnectLoop() {
if err != nil {
a.replicationStats.NumReconnectsAborted.Add(1)
base.WarnfCtx(ctx, "couldn't reconnect replicator: %v", err)
a.lock.Lock()
defer a.lock.Unlock()
a.setState(ReplicationStateError)
a._publishStatus()
a._stop()
}
}

Expand Down
69 changes: 69 additions & 0 deletions rest/replicatortest/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2471,6 +2471,75 @@ func TestReconnectReplicator(t *testing.T) {

}

func TestReplicatorReconnectTimeout(t *testing.T) {
base.RequireNumTestBuckets(t, 2)
base.SetUpTestLogging(t, base.LevelInfo, base.KeyReplicate, base.KeyHTTP, base.KeyHTTPResp)
passiveRT := rest.NewRestTester(t, nil)
defer passiveRT.Close()
srv := httptest.NewServer(passiveRT.TestPublicHandler())
defer srv.Close()

// Build remoteDBURL with basic auth creds
remoteDBURL, err := url.Parse(srv.URL + "/db")
require.NoError(t, err)

// Add basic auth creds to target db URL
remoteDBURL.User = url.UserPassword("alice", "pass")

// Active
activeRT := rest.NewRestTester(t, nil)
defer activeRT.Close()
id, err := base.GenerateRandomID()
require.NoError(t, err)
stats, err := base.SyncGatewayStats.NewDBStats(t.Name(), false, false, false, nil, nil)
require.NoError(t, err)
dbstats, err := stats.DBReplicatorStats(t.Name())
require.NoError(t, err)
arConfig := db.ActiveReplicatorConfig{
ID: id,
Direction: db.ActiveReplicatorTypePushAndPull,
RemoteDBURL: remoteDBURL,
ActiveDB: &db.Database{
DatabaseContext: activeRT.GetDatabase(),
},
Continuous: true,
// aggressive reconnect intervals for testing purposes
TotalReconnectTimeout: time.Millisecond * 10,
ReplicationStatsMap: dbstats,
CollectionsEnabled: !activeRT.GetDatabase().OnlyDefaultCollection(),
}

// Create the first active replicator to pull from seq:0
ar, err := db.NewActiveReplicator(activeRT.Context(), &arConfig)
require.NoError(t, err)
require.Equal(t, int64(0), ar.Push.GetStats().NumConnectAttempts.Value())

expectedErrMsg := "unexpected status code 401 from target database"
require.ErrorContains(t, ar.Start(activeRT.Context()), expectedErrMsg)
require.EventuallyWithT(t, func(c *assert.CollectT) {
state, _ := ar.State(activeRT.Context())
assert.Equal(c, db.ReplicationStateError, state)
}, time.Second*10, time.Millisecond*100)

status, err := activeRT.GetDatabase().SGReplicateMgr.GetReplicationStatus(activeRT.Context(), id, db.DefaultReplicationStatusOptions())
require.NoError(t, err)
require.Equal(t, db.ReplicationStateError, status.Status)
require.Equal(t, expectedErrMsg, status.ErrorMessage)
require.Equal(t, int64(1), ar.Push.GetStats().NumReconnectsAborted.Value())
firstNumConnectAttempts := ar.Push.GetStats().NumConnectAttempts.Value()
require.GreaterOrEqual(t, firstNumConnectAttempts, int64(1))

// restart replicator to make sure we'll retry a reconnection, so the state can go back to reconnecting
require.ErrorContains(t, ar.Start(activeRT.Context()), expectedErrMsg)
require.EventuallyWithT(t, func(c *assert.CollectT) {
state, errMsg := ar.State(activeRT.Context())
assert.Equal(c, db.ReplicationStateError, state)
assert.Equal(c, expectedErrMsg, errMsg)
}, time.Second*10, time.Millisecond*100)
require.Equal(t, int64(2), ar.Push.GetStats().NumReconnectsAborted.Value())
require.GreaterOrEqual(t, ar.Push.GetStats().NumConnectAttempts.Value(), firstNumConnectAttempts+1)
}

// TestTotalSyncTimeStat:
// - starts a replicator to simulate a long lived websocket connection on a sync gateway
// - wait for this replication connection to be picked up on stats (NumReplicationsActive)
Expand Down

0 comments on commit 4ca372d

Please sign in to comment.