From 7701a844cafc2fecde06078fe08f0b8366e5231c Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 4 Feb 2025 17:36:12 -0500 Subject: [PATCH] CBG-4513 remove query based resync code --- db/background_mgr_resync.go | 144 ------- db/database.go | 122 +----- db/database_collection.go | 5 - db/database_test.go | 36 -- rest/access_test.go | 21 +- rest/adminapitest/admin_api_test.go | 563 +-------------------------- rest/changestest/changes_api_test.go | 53 --- rest/sync_fn_test.go | 416 +------------------- rest/utilities_testing_resttester.go | 51 ++- 9 files changed, 71 insertions(+), 1340 deletions(-) delete mode 100644 db/background_mgr_resync.go diff --git a/db/background_mgr_resync.go b/db/background_mgr_resync.go deleted file mode 100644 index 36dc9720e5..0000000000 --- a/db/background_mgr_resync.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2012-Present Couchbase, Inc. -// -// Use of this software is governed by the Business Source License included -// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified -// in that file, in accordance with the Business Source License, use of this -// software will be governed by the Apache License, Version 2.0, included in -// the file licenses/APL2.txt. - -package db - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/couchbase/sync_gateway/base" -) - -// ====================================================== -// Resync Implementation of Background Manager Process -// ====================================================== - -type ResyncManager struct { - DocsProcessed int - DocsChanged int - ResyncedCollections map[string][]string - lock sync.Mutex -} - -var _ BackgroundManagerProcessI = &ResyncManager{} - -func NewResyncManager(metadataStore base.DataStore, metaKeys *base.MetadataKeys) *BackgroundManager { - return &BackgroundManager{ - name: "resync", - Process: &ResyncManager{}, - clusterAwareOptions: &ClusterAwareBackgroundManagerOptions{ - metadataStore: metadataStore, - metaKeys: metaKeys, - processSuffix: "resync", - }, - terminator: base.NewSafeTerminator(), - } -} - -func (r *ResyncManager) Init(ctx context.Context, options map[string]interface{}, clusterStatus []byte) error { - return nil -} - -func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{}, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error { - database := options["database"].(*Database) - regenerateSequences := options["regenerateSequences"].(bool) - resyncCollections := options["collections"].(ResyncCollections) - - persistClusterStatus := func() { - err := persistClusterStatusCallback(ctx) - if err != nil { - base.WarnfCtx(ctx, "Failed to persist cluster status on-demand for resync operation: %v", err) - } - } - defer persistClusterStatus() - - defer atomic.CompareAndSwapUint32(&database.State, DBResyncing, DBOffline) - callback := func(docsProcessed, docsChanged *int) { - r.SetStats(*docsProcessed, *docsChanged) - persistClusterStatus() - } - - collectionIDs, hasAllCollections, collectionNames, err := getCollectionIdsAndNames(database, resyncCollections) - if err != nil { - return err - } - // add collection list to manager for use in status call - r.SetCollectionStatus(collectionNames) - if hasAllCollections { - base.InfofCtx(ctx, base.KeyAll, "running resync against all collections") - } else { - base.InfofCtx(ctx, base.KeyAll, "running resync against specified collections") - } - - for _, collectionID := range collectionIDs { - dbc := &DatabaseCollectionWithUser{ - DatabaseCollection: database.CollectionByID[collectionID], - } - ctx = dbc.AddCollectionContext(ctx) - _, err := dbc.UpdateAllDocChannels(ctx, regenerateSequences, callback, terminator) - if err != nil { - return err - } - if regenerateSequences { - err := base.SetSyncInfo(dbc.dataStore, dbc.dbCtx.Options.MetadataID) - if err != nil { - base.InfofCtx(ctx, base.KeyAll, "Failed to updateSyncInfo after resync: %v", err) - } - } - } - - return nil -} - -func (r *ResyncManager) ResetStatus() { - r.lock.Lock() - defer r.lock.Unlock() - - r.DocsProcessed = 0 - r.DocsChanged = 0 - r.ResyncedCollections = nil -} - -func (r *ResyncManager) SetStats(docsProcessed, docsChanged int) { - r.lock.Lock() - defer r.lock.Unlock() - - r.DocsProcessed = docsProcessed - r.DocsChanged = docsChanged -} - -func (r *ResyncManager) SetCollectionStatus(collectionNames map[string][]string) { - r.lock.Lock() - defer r.lock.Unlock() - - r.ResyncedCollections = collectionNames -} - -type ResyncManagerResponse struct { - BackgroundManagerStatus - DocsChanged int `json:"docs_changed"` - DocsProcessed int `json:"docs_processed"` - CollectionsProcessing map[string][]string `json:"collections_processing,omitempty"` -} - -func (r *ResyncManager) GetProcessStatus(backgroundManagerStatus BackgroundManagerStatus) ([]byte, []byte, error) { - r.lock.Lock() - defer r.lock.Unlock() - - retStatus := ResyncManagerResponse{ - BackgroundManagerStatus: backgroundManagerStatus, - DocsChanged: r.DocsChanged, - DocsProcessed: r.DocsProcessed, - CollectionsProcessing: r.ResyncedCollections, - } - - statusJSON, err := base.JSONMarshal(retStatus) - return statusJSON, nil, err -} diff --git a/db/database.go b/db/database.go index 020bf1c29c..a840da675c 100644 --- a/db/database.go +++ b/db/database.go @@ -242,7 +242,6 @@ type UnsupportedOptions struct { GuestReadOnly bool `json:"guest_read_only,omitempty"` // Config option to restrict GUEST document access to read-only ForceAPIForbiddenErrors bool `json:"force_api_forbidden_errors,omitempty"` // Config option to force the REST API to return forbidden errors ConnectedClient bool `json:"connected_client,omitempty"` // Enables BLIP connected-client APIs - UseQueryBasedResyncManager bool `json:"use_query_resync_manager,omitempty"` // Config option to use Query based resync manager to perform Resync op DCPReadBuffer int `json:"dcp_read_buffer,omitempty"` // Enables user to set their own DCP read buffer KVBufferSize int `json:"kv_buffer,omitempty"` // Enables user to set their own KV pool buffer BlipSendDocsWithChannelRemoval bool `json:"blip_send_docs_with_channel_removal,omitempty"` // Enables sending docs with channel removals using channel filters @@ -564,11 +563,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, return nil, err } - if dbContext.UseQueryBasedResyncManager() { - dbContext.ResyncManager = NewResyncManager(metadataStore, metaKeys) - } else { - dbContext.ResyncManager = NewResyncManagerDCP(metadataStore, dbContext.UseXattrs(), metaKeys) - } + dbContext.ResyncManager = NewResyncManagerDCP(metadataStore, dbContext.UseXattrs(), metaKeys) return dbContext, nil } @@ -1619,113 +1614,6 @@ func (db *DatabaseContext) GetMetadataPurgeInterval(ctx context.Context) time.Du return DefaultPurgeInterval } -// Re-runs the sync function on every current document in the database (if doCurrentDocs==true) -// and/or imports docs in the bucket not known to the gateway (if doImportDocs==true). -// To be used when the JavaScript sync function changes. -type updateAllDocChannelsCallbackFunc func(docsProcessed, docsChanged *int) - -func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context, regenerateSequences bool, callback updateAllDocChannelsCallbackFunc, terminator *base.SafeTerminator) (int, error) { - base.InfofCtx(ctx, base.KeyAll, "Recomputing document channels...") - base.InfofCtx(ctx, base.KeyAll, "Re-running sync function on all documents...") - - queryLimit := db.queryPaginationLimit() - startSeq := uint64(0) - endSeq, err := db.sequences().getSequence() - if err != nil { - return 0, err - } - - docsChanged := 0 - docsProcessed := 0 - - // In the event of an early exit we would like to ensure these values are up to date which they wouldn't be if they - // were unable to reach the end of the batch iteration. - defer callback(&docsProcessed, &docsChanged) - - var unusedSequences []uint64 - highSeq := uint64(0) - for { - results, err := db.QueryResync(ctx, queryLimit, startSeq, endSeq) - if err != nil { - return 0, err - } - - queryRowCount := 0 - - var importRow QueryIdRow - for { - var found bool - if db.useViews() { - var viewRow channelsViewRow - found = results.Next(ctx, &viewRow) - if !found { - break - } - importRow = QueryIdRow{ - Seq: uint64(viewRow.Key[1].(float64)), - Id: viewRow.ID, - } - } else { - found = results.Next(ctx, &importRow) - if !found { - break - } - } - select { - case <-terminator.Done(): - base.InfofCtx(ctx, base.KeyAll, "Resync was stopped before the operation could be completed. System "+ - "may be in an inconsistent state. Docs changed: %d Docs Processed: %d", docsChanged, docsProcessed) - closeErr := results.Close() - if closeErr != nil { - return 0, closeErr - } - return docsChanged, nil - default: - } - - docid := importRow.Id - key := realDocID(docid) - queryRowCount++ - docsProcessed++ - _, unusedSequences, err = db.resyncDocument(ctx, docid, key, regenerateSequences, unusedSequences) - if err == nil { - docsChanged++ - } else if err != base.ErrUpdateCancel { - base.WarnfCtx(ctx, "Error updating doc %q: %v", base.UD(docid), err) - } - highSeq = importRow.Seq - } - - callback(&docsProcessed, &docsChanged) - - // Close query results - closeErr := results.Close() - if closeErr != nil { - return 0, closeErr - } - - if queryRowCount < queryLimit || highSeq >= endSeq { - break - } - startSeq = highSeq + 1 - } - - db.releaseSequences(ctx, unusedSequences) - - if regenerateSequences { - if err := db.updateAllPrincipalsSequences(ctx); err != nil { - return docsChanged, err - } - } - - base.InfofCtx(ctx, base.KeyAll, "Finished re-running sync function; %d/%d docs changed", docsChanged, docsProcessed) - - if docsChanged > 0 { - db.invalidateAllPrincipals(ctx, endSeq) - } - return docsChanged, nil -} - func (c *DatabaseCollection) updateAllPrincipalsSequences(ctx context.Context) error { users, roles, err := c.allPrincipalIDs(ctx) if err != nil { @@ -2028,14 +1916,6 @@ func (context *DatabaseContext) UseMou() bool { return context.EnableMou } -// UseQueryBasedResyncManager returns if query bases resync manager should be used for Resync operation -func (context *DatabaseContext) UseQueryBasedResyncManager() bool { - if context.Options.UnsupportedOptions != nil { - return context.Options.UnsupportedOptions.UseQueryBasedResyncManager - } - return false -} - func (context *DatabaseContext) DeltaSyncEnabled() bool { return context.Options.DeltaSyncOptions.Enabled } diff --git a/db/database_collection.go b/db/database_collection.go index 495865f8bb..063e812c18 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -191,11 +191,6 @@ func (c *DatabaseCollection) oldRevExpirySeconds() uint32 { return c.dbCtx.Options.OldRevExpirySeconds } -// queryPaginationLimit limits the size of large queries. This is is controlled at a database level. -func (c *DatabaseCollection) queryPaginationLimit() int { - return c.dbCtx.Options.QueryPaginationLimit -} - // ReloadUser the User object, in case its persistent properties have been changed. This code does not lock and is not safe to call from concurrent goroutines. func (c *DatabaseCollectionWithUser) ReloadUser(ctx context.Context) error { if c.user == nil { diff --git a/db/database_test.go b/db/database_test.go index a4ca4c1931..0bfef2e88b 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -2630,42 +2630,6 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) { assert.Equal(t, float64(2), xattr["sequence"]) } -func TestResyncUpdateAllDocChannels(t *testing.T) { - syncFn := ` - function(doc) { - channel("x") - }` - - db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{QueryPaginationLimit: 5000}) - collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) - - _, err := collection.UpdateSyncFun(ctx, syncFn) - require.NoError(t, err) - - defer db.Close(ctx) - - for i := 0; i < 10; i++ { - updateBody := make(map[string]interface{}) - updateBody["val"] = i - _, _, err := collection.Put(ctx, fmt.Sprintf("doc%d", i), updateBody) - require.NoError(t, err) - } - - err = db.TakeDbOffline(base.NewNonCancelCtx(), "") - assert.NoError(t, err) - - waitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&db.State) - return state == DBOffline - }) - - _, err = collection.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator()) - assert.NoError(t, err) - - syncFnCount := int(db.DbStats.Database().SyncFunctionCount.Value()) - assert.Equal(t, 20, syncFnCount) -} - func TestTombstoneCompactionStopWithManager(t *testing.T) { if !base.TestUseXattrs() { diff --git a/rest/access_test.go b/rest/access_test.go index 86b870fe93..5375a80486 100644 --- a/rest/access_test.go +++ b/rest/access_test.go @@ -762,11 +762,12 @@ func TestAllDocsAccessControl(t *testing.T) { assert.Equal(t, "doc2", allDocsResult.Rows[1].ID) } func TestChannelAccessChanges(t *testing.T) { - + base.TestRequiresDCPResync(t) base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges, base.KeyCRUD) - rtConfig := RestTesterConfig{SyncFn: `function(doc) {access(doc.owner, doc._id);channel(doc.channel)}`} + rtConfig := RestTesterConfig{SyncFn: `function(doc) {access(doc.owner, doc._id);channel(doc.channel)}`, PersistentConfig: true} rt := NewRestTester(t, &rtConfig) defer rt.Close() + RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated) dataStore := rt.GetSingleDataStore() c := dataStore.CollectionName() s := dataStore.ScopeName() @@ -904,18 +905,10 @@ func TestChannelAccessChanges(t *testing.T) { // below, which could lead to a data race if the cache processor is paused while it's processing a change rt.MustWaitForDoc("epsilon", t) - // Finally, throw a wrench in the works by changing the sync fn. Note that normally this wouldn't - // be changed while the database is in use (only when it's re-opened) but for testing purposes - // we do it now because we can't close and re-open an ephemeral Walrus database. - collectionWithUser, ctx := rt.GetSingleTestDatabaseCollectionWithUser() - - changed, err := collectionWithUser.UpdateSyncFun(ctx, `function(doc) {access("alice", "beta");channel("beta");}`) - assert.NoError(t, err) - assert.True(t, changed) - changeCount, err := collectionWithUser.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator()) - assert.NoError(t, err) - assert.Equal(t, 9, changeCount) - + // Finally, throw a wrench in the works by changing the sync fn. + RequireStatus(t, rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/_config/sync", `function(doc) {access("alice", "beta");channel("beta");}`), http.StatusOK) + resyncStatus := rt.RunResync() + require.Equal(t, int64(9), resyncStatus.DocsChanged) expectedIDs := []string{"beta", "delta", "gamma", "a1", "b1", "d1", "g1", "alpha", "epsilon"} changes, err = rt.WaitForChanges(len(expectedIDs), "/{{.keyspace}}/_changes", "alice", false) assert.NoError(t, err, "Unexpected error") diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index 16e94e9296..cc1d17174c 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -19,7 +19,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "testing" "time" @@ -596,50 +595,8 @@ func TestDBGetConfigCustomLogging(t *testing.T) { assert.Equal(t, body.Logging.Console.LogKeys, logKeys) } -func TestDBOfflinePostResyncUsingDCPStream(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test requires gocb buckets") - } - - rt := rest.NewRestTester(t, nil) - defer rt.Close() - - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - require.True(t, ok) - - log.Printf("Taking DB offline") - response := rt.SendAdminRequest("GET", "/db/", "") - var body db.Body - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Online") - - response = rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, 200) - - response = rt.SendAdminRequest("GET", "/db/", "") - body = nil - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Offline") - - rest.RequireStatus(t, rt.SendAdminRequest("POST", "/db/_resync?action=start", ""), 200) - err := rt.WaitForCondition(func() bool { - response := rt.SendAdminRequest("GET", "/db/_resync", "") - var status db.ResyncManagerResponse - err := json.Unmarshal(response.BodyBytes(), &status) - assert.NoError(t, err) - - var val interface{} - _, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(t), &val) - - return status.State == db.BackgroundProcessStateCompleted && base.IsDocNotFoundError(err) - }) - assert.NoError(t, err) -} - func TestDBOfflineSingleResyncUsingDCPStream(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with walrus") - } + base.TestRequiresDCPResync(t) syncFn := ` function(doc) { channel("x") @@ -647,54 +604,26 @@ func TestDBOfflineSingleResyncUsingDCPStream(t *testing.T) { rt := rest.NewRestTester(t, &rest.RestTesterConfig{SyncFn: syncFn}) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - require.True(t, ok) - // create documents in DB to cause resync to take a few seconds for i := 0; i < 1000; i++ { rt.CreateTestDoc(fmt.Sprintf("doc%v", i)) } assert.Equal(t, int64(1000), rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) - response := rt.SendAdminRequest("GET", "/db/", "") - var body db.Body - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Online") - - response = rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, 200) - - response = rt.SendAdminRequest("GET", "/db/", "") - body = nil - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Offline") + rt.TakeDbOffline() - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") + response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") rest.RequireStatus(t, response, http.StatusOK) // Send a second _resync request. This must return a 400 since the first one is blocked processing rest.RequireStatus(t, rt.SendAdminRequest("POST", "/db/_resync?action=start", ""), 503) - err := rt.WaitForConditionWithOptions(func() bool { - response := rt.SendAdminRequest("GET", "/db/_resync", "") - var status db.ResyncManagerResponse - err := json.Unmarshal(response.BodyBytes(), &status) - assert.NoError(t, err) - - var val interface{} - _, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(t), &val) - - return status.State == db.BackgroundProcessStateCompleted && base.IsDocNotFoundError(err) - }, 200, 200) - assert.NoError(t, err) - + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) assert.Equal(t, int64(2000), rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) } func TestDCPResyncCollectionsStatus(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with walrus") - } + base.TestRequiresDCPResync(t) base.TestRequiresCollections(t) testCases := []struct { @@ -725,9 +654,6 @@ func TestDCPResyncCollectionsStatus(t *testing.T) { defer rt.Close() scopeName := "sg_test_0" - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - require.True(t, ok) - // create documents in DB to cause resync to take a few seconds for i := 0; i < 1000; i++ { resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace1}}/"+fmt.Sprint(i), `{"value":1}`) @@ -753,178 +679,8 @@ func TestDCPResyncCollectionsStatus(t *testing.T) { } } -func TestQueryResyncCollectionsStatus(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with walrus") - } - base.TestRequiresCollections(t) - - testCases := []struct { - name string - collectionNames []string - expectedResult map[string][]string - specifyCollection bool - }{ - { - name: "collections_specified", - collectionNames: []string{"sg_test_0"}, - expectedResult: map[string][]string{ - "sg_test_0": {"sg_test_0"}, - }, - specifyCollection: true, - }, - { - name: "collections_not_specified", - expectedResult: map[string][]string{ - "sg_test_0": {"sg_test_0", "sg_test_1", "sg_test_2"}, - }, - collectionNames: nil, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - rt := rest.NewRestTesterMultipleCollections(t, &rest.RestTesterConfig{ - DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ - Unsupported: &db.UnsupportedOptions{ - UseQueryBasedResyncManager: true, - }, - }, - }, - }, 3) - defer rt.Close() - scopeName := "sg_test_0" - - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManager) - require.True(t, ok) - - rt.TakeDbOffline() - - if !testCase.specifyCollection { - resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, resp, http.StatusOK) - } else { - payload := `{"scopes": {"sg_test_0":["sg_test_0"]}}` - resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", payload) - rest.RequireStatus(t, resp, http.StatusOK) - } - - statusResponse := rt.WaitForResyncStatus(db.BackgroundProcessStateCompleted) - - assert.ElementsMatch(t, statusResponse.CollectionsProcessing[scopeName], testCase.expectedResult[scopeName]) - }) - } -} - -func TestResyncQueryBased(t *testing.T) { - base.LongRunningTest(t) - - testCases := []struct { - name string - docsCreated int - expectedSyncFnRuns int - expectedQueryCount int - queryLimit int - }{ - { - name: "Docs 0, Limit Default", - docsCreated: 0, - expectedSyncFnRuns: 0, - expectedQueryCount: 1, - queryLimit: db.DefaultQueryPaginationLimit, - }, - { - name: "Docs 1000, Limit Default", - docsCreated: 1000, - expectedSyncFnRuns: 2000, - expectedQueryCount: 1, - queryLimit: db.DefaultQueryPaginationLimit, - }, - { - name: "Docs 1000, Limit 10", - docsCreated: 1000, - expectedSyncFnRuns: 2000, - expectedQueryCount: 101, - queryLimit: 10, - }, - } - - syncFn := ` - function(doc) { - channel("x") - }` - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - rt := rest.NewRestTester(t, - &rest.RestTesterConfig{ - DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ - QueryPaginationLimit: &testCase.queryLimit, - Unsupported: &db.UnsupportedOptions{ - UseQueryBasedResyncManager: true, - }, - }, - }, - SyncFn: syncFn, - }, - ) - defer rt.Close() - - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManager) - require.True(t, ok) - - for i := 0; i < testCase.docsCreated; i++ { - rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) - } - err := rt.WaitForCondition(func() bool { - return int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) == testCase.docsCreated - }) - assert.NoError(t, err) - - response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, response, http.StatusServiceUnavailable) - - response = rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) - - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, response, http.StatusOK) - - var resyncManagerStatus db.ResyncManagerResponseDCP - err = rt.WaitForCondition(func() bool { - response := rt.SendAdminRequest("GET", "/db/_resync", "") - err := json.Unmarshal(response.BodyBytes(), &resyncManagerStatus) - assert.NoError(t, err) - - var val interface{} - _, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(t), &val) - - if resyncManagerStatus.State == db.BackgroundProcessStateCompleted && base.IsDocNotFoundError(err) { - return true - } else { - t.Logf("resyncManagerStatus.State != %v: %v - err:%v", db.BackgroundProcessStateCompleted, resyncManagerStatus.State, err) - return false - } - }) - assert.NoError(t, err) - - assert.Equal(t, testCase.expectedSyncFnRuns, int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value())) - - assert.Equal(t, testCase.docsCreated, int(resyncManagerStatus.DocsProcessed)) - assert.Equal(t, 0, int(resyncManagerStatus.DocsChanged)) - }) - } - -} - func TestResyncUsingDCPStream(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with walrus") - } + base.TestRequiresDCPResync(t) base.LongRunningTest(t) testCases := []struct { @@ -952,9 +708,6 @@ func TestResyncUsingDCPStream(t *testing.T) { ) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - require.True(t, ok) - for i := 0; i < testCase.docsCreated; i++ { rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) } @@ -968,34 +721,12 @@ func TestResyncUsingDCPStream(t *testing.T) { response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") rest.RequireStatus(t, response, http.StatusServiceUnavailable) - response = rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) + rt.TakeDbOffline() response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") rest.RequireStatus(t, response, http.StatusOK) - var resyncManagerStatus db.ResyncManagerResponseDCP - err = rt.WaitForConditionWithOptions(func() bool { - response := rt.SendAdminRequest("GET", "/db/_resync", "") - err := json.Unmarshal(response.BodyBytes(), &resyncManagerStatus) - assert.NoError(t, err) - - var val interface{} - _, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(t), &val) - - if resyncManagerStatus.State == db.BackgroundProcessStateCompleted && base.IsDocNotFoundError(err) { - return true - } else { - t.Logf("resyncManagerStatus.State != %v: %v - err:%v", db.BackgroundProcessStateCompleted, resyncManagerStatus.State, err) - return false - } - }, 200, 200) - assert.NoError(t, err) + resyncManagerStatus := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) assert.Equal(t, testCase.docsCreated, int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value())) @@ -1006,9 +737,7 @@ func TestResyncUsingDCPStream(t *testing.T) { } func TestResyncUsingDCPStreamReset(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with walrus") - } + base.TestRequiresDCPResync(t) base.LongRunningTest(t) syncFn := ` @@ -1025,24 +754,15 @@ func TestResyncUsingDCPStreamReset(t *testing.T) { const numDocs = 1000 - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - require.True(t, ok) - // create some docs for i := 0; i < numDocs; i++ { rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) } - // take db offline to run resync against it - response := rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, http.StatusOK) - - // wait for db to be offline - err := rt.WaitForDBState(db.RunStateString[db.DBOffline]) - require.NoError(t, err) + rt.TakeDbOffline() // start a resync run - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") + response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") rest.RequireStatus(t, response, http.StatusOK) resyncManagerStatus := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateRunning) @@ -1069,9 +789,7 @@ func TestResyncUsingDCPStreamReset(t *testing.T) { } func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("DCP client doesn't work with walrus. Waiting on CBG-2661") - } + base.TestRequiresDCPResync(t) base.TestRequiresCollections(t) numCollections := 2 @@ -1092,9 +810,6 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) { ) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - require.True(t, ok) - // put a docs in both collections for i := 1; i <= 10; i++ { resp := rt.SendAdminRequest(http.MethodPut, fmt.Sprintf("/{{.keyspace1}}/1000%d", i), `{"type":"test_doc"}`) @@ -1130,116 +845,8 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) { assert.LessOrEqual(t, 20, int(resyncManagerStatus.DocsProcessed)) } -func TestResyncErrorScenarios(t *testing.T) { - - if !base.UnitTestUrlIsWalrus() { - // Limitation of setting LeakyBucket on RestTester - t.Skip("This test only works with walrus") - } - - syncFn := ` - function(doc) { - channel("x") - }` - - leakyTestBucket := base.GetTestBucket(t).LeakyBucketClone(base.LeakyBucketConfig{}) - - rt := rest.NewRestTester(t, - &rest.RestTesterConfig{ - SyncFn: syncFn, - CustomTestBucket: leakyTestBucket, - DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ - Unsupported: &db.UnsupportedOptions{UseQueryBasedResyncManager: true}, - }}, - }, - ) - defer rt.Close() - - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManager) - require.True(t, ok) - - leakyDataStore, ok := base.AsLeakyDataStore(rt.TestBucket.GetSingleDataStore()) - require.Truef(t, ok, "Wanted *base.LeakyBucket but got %T", leakyTestBucket.Bucket) - - var ( - useCallback bool - callbackFired bool - ) - - if base.TestsDisableGSI() { - leakyDataStore.SetPostQueryCallback(func(ddoc, viewName string, params map[string]interface{}) { - if useCallback { - callbackFired = true - response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, response, http.StatusServiceUnavailable) - useCallback = false - } - }) - } else { - leakyDataStore.SetPostN1QLQueryCallback(func() { - if useCallback { - callbackFired = true - response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, response, http.StatusServiceUnavailable) - useCallback = false - } - }) - } - - for i := 0; i < 1000; i++ { - rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) - } - - response := rt.SendAdminRequest("GET", "/db/_resync", "") - rest.RequireStatus(t, response, http.StatusOK) - - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, response, http.StatusServiceUnavailable) - - response = rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") - rest.RequireStatus(t, response, http.StatusBadRequest) - - response = rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) - - useCallback = true - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - rest.WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) - - response = rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") - rest.RequireStatus(t, response, http.StatusBadRequest) - - response = rt.SendAdminRequest("POST", "/db/_resync?action=invalid", "") - rest.RequireStatus(t, response, http.StatusBadRequest) - - // Test empty action, should default to start - response = rt.SendAdminRequest("POST", "/db/_resync", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - rest.WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) - - assert.True(t, callbackFired, "expecting callback to be fired") -} - func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with walrus") - } + base.TestRequiresDCPResync(t) syncFn := ` function(doc) { @@ -1256,9 +863,6 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { ) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - require.True(t, ok) - numOfDocs := 1000 for i := 0; i < numOfDocs; i++ { rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) @@ -1276,13 +880,7 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { response = rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") rest.RequireStatus(t, response, http.StatusBadRequest) - response = rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) + rt.TakeDbOffline() response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") rest.RequireStatus(t, response, http.StatusOK) @@ -1291,11 +889,7 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") rest.RequireStatus(t, response, http.StatusServiceUnavailable) - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - rest.WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) assert.Equal(t, numOfDocs, int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value())) @@ -1309,101 +903,7 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { response = rt.SendAdminRequest("POST", "/db/_resync", "") rest.RequireStatus(t, response, http.StatusOK) - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - rest.WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) -} - -func TestResyncStop(t *testing.T) { - - if !base.UnitTestUrlIsWalrus() { - // Limitation of setting LeakyBucket on RestTester - t.Skip("This test only works with walrus") - } - - syncFn := ` - function(doc) { - channel("x") - }` - - leakyTestBucket := base.GetTestBucket(t).LeakyBucketClone(base.LeakyBucketConfig{}) - - rt := rest.NewRestTester(t, - &rest.RestTesterConfig{ - SyncFn: syncFn, - DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ - QueryPaginationLimit: base.IntPtr(10), - Unsupported: &db.UnsupportedOptions{UseQueryBasedResyncManager: true}, - }}, - CustomTestBucket: leakyTestBucket, - }, - ) - defer rt.Close() - - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManager) - require.True(t, ok) - - leakyDataStore, ok := base.AsLeakyDataStore(rt.TestBucket.GetSingleDataStore()) - require.Truef(t, ok, "Wanted *base.LeakyBucket but got %T", leakyTestBucket.Bucket) - - var ( - useCallback bool - callbackFired bool - ) - - if base.TestsDisableGSI() { - leakyDataStore.SetPostQueryCallback(func(ddoc, viewName string, params map[string]interface{}) { - if useCallback { - callbackFired = true - response := rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") - rest.RequireStatus(t, response, http.StatusOK) - useCallback = false - } - }) - } else { - leakyDataStore.SetPostN1QLQueryCallback(func() { - if useCallback { - callbackFired = true - response := rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") - rest.RequireStatus(t, response, http.StatusOK) - useCallback = false - } - }) - } - - for i := 0; i < 1000; i++ { - rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) - } - - err := rt.WaitForCondition(func() bool { - return int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) == 1000 - }) - assert.NoError(t, err) - - response := rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) - - useCallback = true - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateStopped, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - rest.WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) - - assert.True(t, callbackFired, "expecting callback to be fired") - - syncFnCount := int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) - assert.True(t, syncFnCount < 2000, "Expected syncFnCount < 2000 but syncFnCount=%d", syncFnCount) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) } // TestCorruptDbConfigHandling: @@ -1870,10 +1370,7 @@ func TestConfigPollingRemoveDatabase(t *testing.T) { } func TestResyncStopUsingDCPStream(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - // This test requires a gocb bucket - t.Skip("This test doesn't works with walrus") - } + base.TestRequiresDCPResync(t) syncFn := ` function(doc) { @@ -1890,11 +1387,6 @@ func TestResyncStopUsingDCPStream(t *testing.T) { ) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - if !ok { - rt.GetDatabase().ResyncManager = db.NewResyncManagerDCP(rt.GetSingleDataStore(), base.TestUseXattrs(), rt.GetDatabase().MetadataKeys) - } - numOfDocs := 1000 for i := 0; i < numOfDocs; i++ { rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) @@ -1905,29 +1397,16 @@ func TestResyncStopUsingDCPStream(t *testing.T) { }) assert.NoError(t, err) - response := rt.SendAdminRequest("POST", "/db/_offline", "") - rest.RequireStatus(t, response, http.StatusOK) - - rest.WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) + rt.TakeDbOffline() - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") + response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") rest.RequireStatus(t, response, http.StatusOK) - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateRunning, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateRunning) response = rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") rest.RequireStatus(t, response, http.StatusOK) - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateStopped, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - rest.WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateStopped) syncFnCount := int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) assert.True(t, syncFnCount < 2000, "Expected syncFnCount < 2000 but syncFnCount=%d", syncFnCount) diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index d5bfeb92e7..6118e18347 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -3440,59 +3440,6 @@ func TestCacheCompactDuringChangesWait(t *testing.T) { longpollWg.Wait() } -func TestResyncAllTombstones(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Walrus does not support Xattrs") - } - - if !base.TestUseXattrs() { - t.Skip("If running with no xattrs compact acts as a no-op") - } - - const queryPaginationLimit = 5 - tests := []int{ - 0, - queryPaginationLimit - 1, - queryPaginationLimit, - queryPaginationLimit + 1, - (queryPaginationLimit * 2) - 1, - (queryPaginationLimit * 2), - (queryPaginationLimit * 2) + 1, - } - - for _, numTombstones := range tests { - t.Run(fmt.Sprintf("limit:%d-numTombstones:%d", queryPaginationLimit, numTombstones), func(t *testing.T) { - rt := rest.NewRestTester(t, &rest.RestTesterConfig{ - DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ - QueryPaginationLimit: base.IntPtr(queryPaginationLimit), - Unsupported: &db.UnsupportedOptions{UseQueryBasedResyncManager: true}, - }}, - }) - defer rt.Close() - - zero := time.Duration(0) - rt.GetDatabase().Options.PurgeInterval = &zero - - for i := 0; i < numTombstones; i++ { - docID := fmt.Sprintf("doc%d", i) - docVersion := rt.PutDoc(docID, `{"foo":"bar"}`) - rt.DeleteDoc(docID, docVersion) - } - - resp := rt.SendAdminRequest(http.MethodPost, fmt.Sprintf("/%s/_offline", rt.GetDatabase().Name), "") - rest.RequireStatus(t, resp, http.StatusOK) - require.NoError(t, rt.WaitForDBState(db.RunStateString[db.DBOffline])) - - resp = rt.SendAdminRequest(http.MethodPost, fmt.Sprintf("/%s/_resync?action=start", rt.GetDatabase().Name), "") - rest.RequireStatus(t, resp, http.StatusOK) - - status := rt.WaitForResyncStatus(db.BackgroundProcessStateCompleted) - assert.Equal(t, numTombstones, status.DocsProcessed) - assert.Equal(t, 0, status.DocsChanged) - }) - } -} - func TestTombstoneCompaction(t *testing.T) { base.LongRunningTest(t) diff --git a/rest/sync_fn_test.go b/rest/sync_fn_test.go index 1b4a902130..11cd0ae6ec 100644 --- a/rest/sync_fn_test.go +++ b/rest/sync_fn_test.go @@ -11,11 +11,9 @@ package rest import ( "encoding/json" "fmt" - "log" "net/http" "strconv" "sync" - "sync/atomic" "testing" "time" @@ -406,203 +404,8 @@ func TestSyncFnTimeout(t *testing.T) { assert.NoError(t, timeoutErr) } -// Take DB offline and ensure can post _resync -func TestDBOfflinePostResync(t *testing.T) { - - rt := NewRestTester(t, nil) - defer rt.Close() - - _, isDCPResync := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - if isDCPResync && base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't work with Walrus when ResyncManagerDCP is used") - } - - log.Printf("Taking DB offline") - response := rt.SendAdminRequest("GET", "/db/", "") - var body db.Body - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Online") - - response = rt.SendAdminRequest("POST", "/db/_offline", "") - RequireStatus(t, response, 200) - - response = rt.SendAdminRequest("GET", "/db/", "") - body = nil - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Offline") - - RequireStatus(t, rt.SendAdminRequest("POST", "/{{.db}}/_resync?action=start", ""), 200) - err := rt.WaitForCondition(func() bool { - response := rt.SendAdminRequest("GET", "/{{.db}}/_resync", "") - var status db.ResyncManagerResponse - err := json.Unmarshal(response.BodyBytes(), &status) - assert.NoError(t, err) - - var val interface{} - _, err = rt.MetadataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(t), &val) - - return status.State == db.BackgroundProcessStateCompleted && base.IsDocNotFoundError(err) - }) - assert.NoError(t, err) -} - -// Take DB offline and ensure only one _resync can be in progress -func TestDBOfflineSingleResync(t *testing.T) { - - syncFn := ` - function(doc) { - channel("x") - }` - rt := NewRestTester(t, &RestTesterConfig{SyncFn: syncFn}) - defer rt.Close() - - _, isDCPResync := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - if isDCPResync && base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't work with Walrus when ResyncManagerDCP is used") - } - - // create documents in DB to cause resync to take a few seconds - for i := 0; i < 1000; i++ { - rt.CreateTestDoc(fmt.Sprintf("doc%v", i)) - } - assert.Equal(t, int64(1000), rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) - - log.Printf("Taking DB offline") - response := rt.SendAdminRequest("GET", "/db/", "") - var body db.Body - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Online") - - response = rt.SendAdminRequest("POST", "/db/_offline", "") - RequireStatus(t, response, 200) - - response = rt.SendAdminRequest("GET", "/db/", "") - body = nil - require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &body)) - assert.True(t, body["state"].(string) == "Offline") - - response = rt.SendAdminRequest("POST", "/{{.db}}/_resync?action=start", "") - RequireStatus(t, response, http.StatusOK) - - // Send a second _resync request. This must return a 400 since the first one is blocked processing - RequireStatus(t, rt.SendAdminRequest("POST", "/{{.db}}/_resync?action=start", ""), 503) - - err := rt.WaitForCondition(func() bool { - response := rt.SendAdminRequest("GET", "/{{.db}}/_resync", "") - var status db.ResyncManagerResponse - err := json.Unmarshal(response.BodyBytes(), &status) - assert.NoError(t, err) - - var val interface{} - _, err = rt.MetadataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(t), &val) - - return status.State == db.BackgroundProcessStateCompleted && base.IsDocNotFoundError(err) - }) - assert.NoError(t, err) - - assert.Equal(t, int64(2000), rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) -} - -func TestResyncOverDCP(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("DCP requires gocb and CBS") - } - base.LongRunningTest(t) - - testCases := []struct { - name string - docsCreated int - expectedSyncFnRuns int - expectedQueryCount int - queryLimit int - }{ - { - name: "Docs 0, Limit Default", - docsCreated: 0, - expectedSyncFnRuns: 0, - expectedQueryCount: 1, - queryLimit: db.DefaultQueryPaginationLimit, - }, - { - name: "Docs 1000, Limit Default", - docsCreated: 1000, - expectedSyncFnRuns: 2000, - expectedQueryCount: 1, - queryLimit: db.DefaultQueryPaginationLimit, - }, - { - name: "Docs 1000, Limit 10", - docsCreated: 1000, - expectedSyncFnRuns: 2000, - expectedQueryCount: 101, - queryLimit: 10, - }, - } - - syncFn := ` - function(doc) { - channel("x") - }` - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - rt := NewRestTester(t, - &RestTesterConfig{ - DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{ - QueryPaginationLimit: &testCase.queryLimit, - }}, - SyncFn: syncFn, - }, - ) - defer rt.Close() - - for i := 0; i < testCase.docsCreated; i++ { - rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) - } - - response := rt.SendAdminRequest("POST", "/{{.db}}/_resync?action=start", "") - RequireStatus(t, response, http.StatusServiceUnavailable) - - response = rt.SendAdminRequest("POST", "/db/_offline", "") - RequireStatus(t, response, http.StatusOK) - - WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) - - response = rt.SendAdminRequest("POST", "/{{.db}}/_resync?action=start", "") - RequireStatus(t, response, http.StatusOK) - - var resyncManagerStatus db.ResyncManagerResponse - err := rt.WaitForCondition(func() bool { - response := rt.SendAdminRequest("GET", "/{{.db}}/_resync", "") - err := json.Unmarshal(response.BodyBytes(), &resyncManagerStatus) - assert.NoError(t, err) - - var val interface{} - _, err = rt.MetadataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(t), &val) - - if resyncManagerStatus.State == db.BackgroundProcessStateCompleted && base.IsDocNotFoundError(err) { - return true - } else { - t.Logf("resyncManagerStatus.State != %v: %v - err:%v", db.BackgroundProcessStateCompleted, resyncManagerStatus.State, err) - return false - } - }) - require.NoError(t, err) - - assert.GreaterOrEqual(t, int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()), testCase.expectedSyncFnRuns) - assert.GreaterOrEqual(t, resyncManagerStatus.DocsProcessed, testCase.docsCreated) - assert.Equal(t, 0, resyncManagerStatus.DocsChanged) - }) - } -} - func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't work with walrus") - } + base.TestRequiresDCPResync(t) syncFn := ` function(doc) { @@ -616,11 +419,6 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { ) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - if !ok { - t.Skip("This test only works when ResyncManagerDCP is used") - } - for i := 0; i < 1000; i++ { rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) } @@ -634,22 +432,12 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { response = rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") RequireStatus(t, response, http.StatusBadRequest) - response = rt.SendAdminRequest("POST", "/db/_offline", "") - RequireStatus(t, response, http.StatusOK) - - WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) + rt.TakeDbOffline() response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") RequireStatus(t, response, http.StatusOK) - WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) response = rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") RequireStatus(t, response, http.StatusBadRequest) @@ -661,108 +449,11 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { response = rt.SendAdminRequest("POST", "/db/_resync", "") RequireStatus(t, response, http.StatusOK) - WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) -} - -func TestResyncStop(t *testing.T) { - - if !base.UnitTestUrlIsWalrus() { - // Limitation of setting LeakyBucket on RestTester - t.Skip("This test only works with walrus") - } - - syncFn := ` - function(doc) { - channel("x") - }` - - leakyTestBucket := base.GetTestBucket(t).LeakyBucketClone(base.LeakyBucketConfig{}) - - rt := NewRestTester(t, - &RestTesterConfig{ - SyncFn: syncFn, - DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{ - QueryPaginationLimit: base.IntPtr(10), - }}, - CustomTestBucket: leakyTestBucket, - }, - ) - defer rt.Close() - - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManager) - if !ok { - t.Skip("This test only works when ResyncManager is used") - } - - leakyDataStore, ok := base.AsLeakyDataStore(rt.Bucket().DefaultDataStore()) - require.Truef(t, ok, "Wanted *base.LeakyBucket but got %T", leakyTestBucket.Bucket) - - var ( - useCallback bool - callbackFired bool - ) - - if base.TestsDisableGSI() { - leakyDataStore.SetPostQueryCallback(func(ddoc, viewName string, params map[string]interface{}) { - if useCallback { - callbackFired = true - response := rt.SendAdminRequest("POST", "/{{.keyspace}}/_resync?action=stop", "") - RequireStatus(t, response, http.StatusOK) - useCallback = false - } - }) - } else { - leakyDataStore.SetPostN1QLQueryCallback(func() { - if useCallback { - callbackFired = true - response := rt.SendAdminRequest("POST", "/{{.keyspace}}/_resync?action=stop", "") - RequireStatus(t, response, http.StatusOK) - useCallback = false - } - }) - } - - for i := 0; i < 1000; i++ { - rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) - } - - err := rt.WaitForCondition(func() bool { - return int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) == 1000 - }) - assert.NoError(t, err) - - response := rt.SendAdminRequest("POST", "/db/_offline", "") - RequireStatus(t, response, http.StatusOK) - - WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) - - useCallback = true - response = rt.SendAdminRequest("POST", "/{{.keyspace}}/_resync?action=start", "") - RequireStatus(t, response, http.StatusOK) - - WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateStopped, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) - - assert.True(t, callbackFired, "expecting callback to be fired") - - syncFnCount := int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) - assert.True(t, syncFnCount < 2000, "Expected syncFnCount < 2000 but syncFnCount=%d", syncFnCount) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) } func TestResyncStopUsingDCPStream(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't work with walrus") - } + base.TestRequiresDCPResync(t) syncFn := ` function(doc) { @@ -779,11 +470,6 @@ func TestResyncStopUsingDCPStream(t *testing.T) { ) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - if !ok { - t.Skip("This test only works when ResyncManagerDCP is used") - } - for i := 0; i < 1000; i++ { rt.CreateTestDoc(fmt.Sprintf("doc%d", i)) } @@ -793,38 +479,23 @@ func TestResyncStopUsingDCPStream(t *testing.T) { }) assert.NoError(t, err) - response := rt.SendAdminRequest("POST", "/db/_offline", "") - RequireStatus(t, response, http.StatusOK) + rt.TakeDbOffline() - WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOffline - }) - - response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "") + response := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") RequireStatus(t, response, http.StatusOK) - WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateRunning, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - - time.Sleep(500 * time.Microsecond) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateRunning) response = rt.SendAdminRequest("POST", "/db/_resync?action=stop", "") RequireStatus(t, response, http.StatusOK) - WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateStopped, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateStopped) syncFnCount := int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) assert.Less(t, syncFnCount, 2000, "Expected syncFnCount < 2000 but syncFnCount=%d", syncFnCount) } func TestResyncRegenerateSequences(t *testing.T) { - + base.TestRequiresDCPResync(t) base.LongRunningTest(t) syncFn := ` function(doc) { @@ -842,11 +513,6 @@ func TestResyncRegenerateSequences(t *testing.T) { ) defer rt.Close() - _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) - if ok && base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with Walrus when ResyncManagerDCP is used") - } - var response *TestResponse var docSeqArr []float64 var body db.Body @@ -911,11 +577,7 @@ func TestResyncRegenerateSequences(t *testing.T) { response = rt.SendAdminRequest("POST", "/db/_resync?action=start®enerate_sequences=true", "") RequireStatus(t, response, http.StatusOK) - WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return rt.GetDatabase().ResyncManager.GetRunState() - }) - WaitAndAssertBackgroundManagerExpiredHeartbeat(t, rt.GetDatabase().ResyncManager) + resyncStatus := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) _, err = rt.MetadataStore().Get(rt.GetDatabase().MetadataKeys.RoleKey("role1"), &body) assert.NoError(t, err) @@ -938,24 +600,11 @@ func TestResyncRegenerateSequences(t *testing.T) { assert.True(t, float64(doc.Sequence) > docSeqArr[i]) } - response = rt.SendAdminRequest("GET", "/db/_resync", "") - RequireStatus(t, response, http.StatusOK) - var resyncStatus db.ResyncManagerResponse - err = base.JSONUnmarshal(response.BodyBytes(), &resyncStatus) - assert.NoError(t, err) - assert.Equal(t, 12, resyncStatus.DocsChanged) - assert.Equal(t, 12, resyncStatus.DocsProcessed) - - response = rt.SendAdminRequest("POST", "/db/_online", "") - RequireStatus(t, response, http.StatusOK) + assert.Equal(t, int64(12), resyncStatus.DocsChanged) + assert.Equal(t, int64(12), resyncStatus.DocsProcessed) - err = rt.WaitForCondition(func() bool { - state := atomic.LoadUint32(&rt.GetDatabase().State) - return state == db.DBOnline - }) - assert.NoError(t, err) + rt.TakeDbOnline() - // Data is wiped from walrus when brought back online changesResp = rt.GetChanges("/{{.keyspace}}/_changes", "user1") assert.Len(t, changesResp.Results, 3) assert.True(t, changesRespContains(changesResp, "userdoc")) @@ -964,9 +613,7 @@ func TestResyncRegenerateSequences(t *testing.T) { // CBG-2150: Tests that resync status is cluster aware func TestResyncPersistence(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test only works against Couchbase Server") - } + base.TestRequiresDCPResync(t) tb := base.GetTestBucket(t) noCloseTB := tb.NoCloseClone() @@ -986,39 +633,16 @@ func TestResyncPersistence(t *testing.T) { rt1.CreateTestDoc("doc1") // Start resync - resp := rt1.SendAdminRequest("POST", "/db/_offline", "") - RequireStatus(t, resp, http.StatusOK) - - WaitAndAssertCondition(t, func() bool { - state := atomic.LoadUint32(&rt1.GetDatabase().State) - return state == db.DBOffline - }) + rt1.TakeDbOffline() - resp = rt1.SendAdminRequest("POST", "/{{.db}}/_resync?action=start", "") + resp := rt1.SendAdminRequest("POST", "/{{.db}}/_resync?action=start", "") RequireStatus(t, resp, http.StatusOK) // Wait for resync to complete - var resyncManagerStatus db.ResyncManagerResponse - err := rt1.WaitForCondition(func() bool { - resp = rt1.SendAdminRequest("GET", "/{{.db}}/_resync", "") - err := json.Unmarshal(resp.BodyBytes(), &resyncManagerStatus) - assert.NoError(t, err) + rt1Status := rt1.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) - if resyncManagerStatus.State == db.BackgroundProcessStateCompleted { - return true - } else { - t.Logf("resyncManagerStatus.State != %v: %v", db.BackgroundProcessStateCompleted, resyncManagerStatus.State) - return false - } - }) - require.NoError(t, err) - - // Check statuses match - resp2 := rt2.SendAdminRequest("GET", "/{{.db}}/_resync", "") - RequireStatus(t, resp, http.StatusOK) - fmt.Printf("RT1 Resync Status: %s\n", resp.BodyBytes()) - fmt.Printf("RT2 Resync Status: %s\n", resp2.BodyBytes()) - assert.Equal(t, resp.BodyBytes(), resp2.BodyBytes()) + rt2Status := rt2.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) + require.Equal(t, rt1Status, rt2Status) } func TestExpiryUpdateSyncFunction(t *testing.T) { diff --git a/rest/utilities_testing_resttester.go b/rest/utilities_testing_resttester.go index 9fba6df2fa..ed737d8aef 100644 --- a/rest/utilities_testing_resttester.go +++ b/rest/utilities_testing_resttester.go @@ -14,6 +14,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "slices" "sync/atomic" "testing" "time" @@ -261,43 +262,28 @@ func (rt *RestTester) GetReplicationStatuses(queryString string) (statuses []db. return statuses } -func (rt *RestTester) WaitForResyncStatus(status db.BackgroundProcessState) db.ResyncManagerResponse { - var resyncStatus db.ResyncManagerResponse - successFunc := func() bool { - response := rt.SendAdminRequest("GET", "/{{.db}}/_resync", "") - err := json.Unmarshal(response.BodyBytes(), &resyncStatus) - require.NoError(rt.TB(), err) - - var val interface{} - _, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(rt.TB()), &val) - - if status == db.BackgroundProcessStateCompleted { - return resyncStatus.State == status && base.IsDocNotFoundError(err) - } else { - return resyncStatus.State == status - } - } - require.NoError(rt.TB(), rt.WaitForCondition(successFunc), "Expected status: %s, actual status: %s", status, resyncStatus.State) - return resyncStatus +// RunResync takes database offline, runs resync and waits for it to complete and takes database online. Returns the completed resync status. +func (rt *RestTester) RunResync() db.ResyncManagerResponseDCP { + rt.TakeDbOffline() + resp := rt.SendAdminRequest("POST", "/{{.db}}/_resync", "") + RequireStatus(rt.TB(), resp, http.StatusOK) + return rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) } +// WaitForResyncDCPStatus waits for the resync status to reach the expected status and returns the final status. func (rt *RestTester) WaitForResyncDCPStatus(status db.BackgroundProcessState) db.ResyncManagerResponseDCP { var resyncStatus db.ResyncManagerResponseDCP - successFunc := func() bool { + require.EventuallyWithT(rt.TB(), func(c *assert.CollectT) { response := rt.SendAdminRequest("GET", "/{{.db}}/_resync", "") err := json.Unmarshal(response.BodyBytes(), &resyncStatus) - require.NoError(rt.TB(), err) + assert.NoError(c, err) - var val interface{} - _, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(rt.TB()), &val) - - if status == db.BackgroundProcessStateCompleted { - return resyncStatus.State == status && base.IsDocNotFoundError(err) - } else { - return resyncStatus.State == status + assert.Equal(c, status, resyncStatus.State) + if slices.Contains([]db.BackgroundProcessState{db.BackgroundProcessStateCompleted, db.BackgroundProcessStateStopped}, status) { + _, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(rt.TB()), nil) + assert.True(c, base.IsDocNotFoundError(err), "expected heartbeat doc to be deleted, got: %v", err) } - } - require.NoError(rt.TB(), rt.WaitForCondition(successFunc), "Expected status: %s, actual status: %s", status, resyncStatus.State) + }, time.Second*10, time.Millisecond*10) return resyncStatus } @@ -405,6 +391,13 @@ func (rt *RestTester) TakeDbOffline() { require.Equal(rt.TB(), db.DBOffline, atomic.LoadUint32(&rt.GetDatabase().State)) } +// TakeDbOnline takes the database online and waits for online status. +func (rt *RestTester) TakeDbOnline() { + resp := rt.SendAdminRequest(http.MethodPost, "/{{.db}}/_online", "") + RequireStatus(rt.TB(), resp, http.StatusOK) + rt.WaitForDBOnline() +} + // RequireDbOnline asserts that the state of the database is online func (rt *RestTester) RequireDbOnline() { response := rt.SendAdminRequest("GET", "/{{.db}}/", "")