Skip to content

Commit

Permalink
CBG-4513 remove query based resync code
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Feb 5, 2025
1 parent 65c5ae4 commit 7701a84
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 1,340 deletions.
144 changes: 0 additions & 144 deletions db/background_mgr_resync.go

This file was deleted.

122 changes: 1 addition & 121 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ type UnsupportedOptions struct {
GuestReadOnly bool `json:"guest_read_only,omitempty"` // Config option to restrict GUEST document access to read-only
ForceAPIForbiddenErrors bool `json:"force_api_forbidden_errors,omitempty"` // Config option to force the REST API to return forbidden errors
ConnectedClient bool `json:"connected_client,omitempty"` // Enables BLIP connected-client APIs
UseQueryBasedResyncManager bool `json:"use_query_resync_manager,omitempty"` // Config option to use Query based resync manager to perform Resync op
DCPReadBuffer int `json:"dcp_read_buffer,omitempty"` // Enables user to set their own DCP read buffer
KVBufferSize int `json:"kv_buffer,omitempty"` // Enables user to set their own KV pool buffer
BlipSendDocsWithChannelRemoval bool `json:"blip_send_docs_with_channel_removal,omitempty"` // Enables sending docs with channel removals using channel filters
Expand Down Expand Up @@ -564,11 +563,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
return nil, err
}

if dbContext.UseQueryBasedResyncManager() {
dbContext.ResyncManager = NewResyncManager(metadataStore, metaKeys)
} else {
dbContext.ResyncManager = NewResyncManagerDCP(metadataStore, dbContext.UseXattrs(), metaKeys)
}
dbContext.ResyncManager = NewResyncManagerDCP(metadataStore, dbContext.UseXattrs(), metaKeys)

return dbContext, nil
}
Expand Down Expand Up @@ -1619,113 +1614,6 @@ func (db *DatabaseContext) GetMetadataPurgeInterval(ctx context.Context) time.Du
return DefaultPurgeInterval
}

// Re-runs the sync function on every current document in the database (if doCurrentDocs==true)
// and/or imports docs in the bucket not known to the gateway (if doImportDocs==true).
// To be used when the JavaScript sync function changes.
type updateAllDocChannelsCallbackFunc func(docsProcessed, docsChanged *int)

func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context, regenerateSequences bool, callback updateAllDocChannelsCallbackFunc, terminator *base.SafeTerminator) (int, error) {
base.InfofCtx(ctx, base.KeyAll, "Recomputing document channels...")
base.InfofCtx(ctx, base.KeyAll, "Re-running sync function on all documents...")

queryLimit := db.queryPaginationLimit()
startSeq := uint64(0)
endSeq, err := db.sequences().getSequence()
if err != nil {
return 0, err
}

docsChanged := 0
docsProcessed := 0

// In the event of an early exit we would like to ensure these values are up to date which they wouldn't be if they
// were unable to reach the end of the batch iteration.
defer callback(&docsProcessed, &docsChanged)

var unusedSequences []uint64
highSeq := uint64(0)
for {
results, err := db.QueryResync(ctx, queryLimit, startSeq, endSeq)
if err != nil {
return 0, err
}

queryRowCount := 0

var importRow QueryIdRow
for {
var found bool
if db.useViews() {
var viewRow channelsViewRow
found = results.Next(ctx, &viewRow)
if !found {
break
}
importRow = QueryIdRow{
Seq: uint64(viewRow.Key[1].(float64)),
Id: viewRow.ID,
}
} else {
found = results.Next(ctx, &importRow)
if !found {
break
}
}
select {
case <-terminator.Done():
base.InfofCtx(ctx, base.KeyAll, "Resync was stopped before the operation could be completed. System "+
"may be in an inconsistent state. Docs changed: %d Docs Processed: %d", docsChanged, docsProcessed)
closeErr := results.Close()
if closeErr != nil {
return 0, closeErr
}
return docsChanged, nil
default:
}

docid := importRow.Id
key := realDocID(docid)
queryRowCount++
docsProcessed++
_, unusedSequences, err = db.resyncDocument(ctx, docid, key, regenerateSequences, unusedSequences)
if err == nil {
docsChanged++
} else if err != base.ErrUpdateCancel {
base.WarnfCtx(ctx, "Error updating doc %q: %v", base.UD(docid), err)
}
highSeq = importRow.Seq
}

callback(&docsProcessed, &docsChanged)

// Close query results
closeErr := results.Close()
if closeErr != nil {
return 0, closeErr
}

if queryRowCount < queryLimit || highSeq >= endSeq {
break
}
startSeq = highSeq + 1
}

db.releaseSequences(ctx, unusedSequences)

if regenerateSequences {
if err := db.updateAllPrincipalsSequences(ctx); err != nil {
return docsChanged, err
}
}

base.InfofCtx(ctx, base.KeyAll, "Finished re-running sync function; %d/%d docs changed", docsChanged, docsProcessed)

if docsChanged > 0 {
db.invalidateAllPrincipals(ctx, endSeq)
}
return docsChanged, nil
}

func (c *DatabaseCollection) updateAllPrincipalsSequences(ctx context.Context) error {
users, roles, err := c.allPrincipalIDs(ctx)
if err != nil {
Expand Down Expand Up @@ -2028,14 +1916,6 @@ func (context *DatabaseContext) UseMou() bool {
return context.EnableMou
}

// UseQueryBasedResyncManager returns if query bases resync manager should be used for Resync operation
func (context *DatabaseContext) UseQueryBasedResyncManager() bool {
if context.Options.UnsupportedOptions != nil {
return context.Options.UnsupportedOptions.UseQueryBasedResyncManager
}
return false
}

func (context *DatabaseContext) DeltaSyncEnabled() bool {
return context.Options.DeltaSyncOptions.Enabled
}
Expand Down
5 changes: 0 additions & 5 deletions db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,6 @@ func (c *DatabaseCollection) oldRevExpirySeconds() uint32 {
return c.dbCtx.Options.OldRevExpirySeconds
}

// queryPaginationLimit limits the size of large queries. This is is controlled at a database level.
func (c *DatabaseCollection) queryPaginationLimit() int {
return c.dbCtx.Options.QueryPaginationLimit
}

// ReloadUser the User object, in case its persistent properties have been changed. This code does not lock and is not safe to call from concurrent goroutines.
func (c *DatabaseCollectionWithUser) ReloadUser(ctx context.Context) error {
if c.user == nil {
Expand Down
36 changes: 0 additions & 36 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2630,42 +2630,6 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) {
assert.Equal(t, float64(2), xattr["sequence"])
}

func TestResyncUpdateAllDocChannels(t *testing.T) {
syncFn := `
function(doc) {
channel("x")
}`

db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{QueryPaginationLimit: 5000})
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

_, err := collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

defer db.Close(ctx)

for i := 0; i < 10; i++ {
updateBody := make(map[string]interface{})
updateBody["val"] = i
_, _, err := collection.Put(ctx, fmt.Sprintf("doc%d", i), updateBody)
require.NoError(t, err)
}

err = db.TakeDbOffline(base.NewNonCancelCtx(), "")
assert.NoError(t, err)

waitAndAssertCondition(t, func() bool {
state := atomic.LoadUint32(&db.State)
return state == DBOffline
})

_, err = collection.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator())
assert.NoError(t, err)

syncFnCount := int(db.DbStats.Database().SyncFunctionCount.Value())
assert.Equal(t, 20, syncFnCount)
}

func TestTombstoneCompactionStopWithManager(t *testing.T) {

if !base.TestUseXattrs() {
Expand Down
21 changes: 7 additions & 14 deletions rest/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,12 @@ func TestAllDocsAccessControl(t *testing.T) {
assert.Equal(t, "doc2", allDocsResult.Rows[1].ID)
}
func TestChannelAccessChanges(t *testing.T) {

base.TestRequiresDCPResync(t)
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges, base.KeyCRUD)
rtConfig := RestTesterConfig{SyncFn: `function(doc) {access(doc.owner, doc._id);channel(doc.channel)}`}
rtConfig := RestTesterConfig{SyncFn: `function(doc) {access(doc.owner, doc._id);channel(doc.channel)}`, PersistentConfig: true}
rt := NewRestTester(t, &rtConfig)
defer rt.Close()
RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated)
dataStore := rt.GetSingleDataStore()
c := dataStore.CollectionName()
s := dataStore.ScopeName()
Expand Down Expand Up @@ -904,18 +905,10 @@ func TestChannelAccessChanges(t *testing.T) {
// below, which could lead to a data race if the cache processor is paused while it's processing a change
rt.MustWaitForDoc("epsilon", t)

// Finally, throw a wrench in the works by changing the sync fn. Note that normally this wouldn't
// be changed while the database is in use (only when it's re-opened) but for testing purposes
// we do it now because we can't close and re-open an ephemeral Walrus database.
collectionWithUser, ctx := rt.GetSingleTestDatabaseCollectionWithUser()

changed, err := collectionWithUser.UpdateSyncFun(ctx, `function(doc) {access("alice", "beta");channel("beta");}`)
assert.NoError(t, err)
assert.True(t, changed)
changeCount, err := collectionWithUser.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator())
assert.NoError(t, err)
assert.Equal(t, 9, changeCount)

// Finally, throw a wrench in the works by changing the sync fn.
RequireStatus(t, rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/_config/sync", `function(doc) {access("alice", "beta");channel("beta");}`), http.StatusOK)
resyncStatus := rt.RunResync()
require.Equal(t, int64(9), resyncStatus.DocsChanged)
expectedIDs := []string{"beta", "delta", "gamma", "a1", "b1", "d1", "g1", "alpha", "epsilon"}
changes, err = rt.WaitForChanges(len(expectedIDs), "/{{.keyspace}}/_changes", "alice", false)
assert.NoError(t, err, "Unexpected error")
Expand Down
Loading

0 comments on commit 7701a84

Please sign in to comment.