Skip to content

Commit

Permalink
REP-5550 Add resumability for recheck generations (#85)
Browse files Browse the repository at this point in the history
Previously migration-verifier did not persist its generation, so if it crashed in a generation after generation 0 it would still restart at generation 0.

This could cause any enqueued rechecks not to happen until the restarted verifier reached the prior verifier run’s generation. If the user called `writesOff` before then, though, the rechecks would never happen. Thus, migration-verifier could neglect important rechecks.

This fixes that by persisting migration-verifier’s generation at the start of each generation. Thus, any tasks that need to be rerun, or enqueued rechecks that need to be processed into checks for the next generation, will be replayed in the event of a crash.
  • Loading branch information
FGasper authored Jan 24, 2025
1 parent 508d185 commit 51e1ef0
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 4 deletions.
43 changes: 39 additions & 4 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
}
verifier.running = true
verifier.globalFilter = filter
verifier.initializeChangeStreamReaders()
verifier.mux.Unlock()

defer func() {
verifier.mux.Lock()
verifier.running = false
Expand All @@ -194,9 +193,31 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
verifier.logger.Info().Msg("Dropping old verifier metadata")
err = verifier.verificationDatabase().Drop(ctx)
if err != nil {
verifier.mux.Unlock()
return err
}
} else {
genOpt, err := verifier.readGeneration(ctx)
if err != nil {
verifier.mux.Unlock()
return err
}

if gen, has := genOpt.Get(); has {
verifier.generation = gen
verifier.logger.Info().
Int("generation", verifier.generation).
Msg("Resuming in-progress verification.")
} else {
verifier.logger.Info().Msg("Starting new verification.")
}
}

// Now that we’ve initialized verifier.generation we can
// start the change stream readers.
verifier.initializeChangeStreamReaders()
verifier.mux.Unlock()

err = retry.New().WithCallback(
func(ctx context.Context, _ *retry.FuncInfo) error {
err = verifier.AddMetaIndexes(ctx)
Expand Down Expand Up @@ -258,12 +279,26 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
verifier.logger.Debug().Msgf("Initial verification phase: %+v", verificationStatus)
}

err = verifier.CreateInitialTasks(ctx)
err = verifier.CreateInitialTasksIfNeeded(ctx)
if err != nil {
return err
}
// Now enter the multi-generational steady check state
for {
verifier.mux.Lock()
err = retry.New().WithCallback(
func(ctx context.Context, _ *retry.FuncInfo) error {
return verifier.persistGenerationWhileLocked(ctx)
},
"persisting generation (%d)",
verifier.generation,
).Run(ctx, verifier.logger)
if err != nil {
verifier.mux.Unlock()
return errors.Wrapf(err, "failed to persist generation (%d)", verifier.generation)
}
verifier.mux.Unlock()

verifier.generationStartTime = time.Now()
verifier.eventRecorder.Reset()

Expand Down Expand Up @@ -382,7 +417,7 @@ func (verifier *Verifier) setupAllNamespaceList(ctx context.Context) error {
return nil
}

func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error {
func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error {
// If we don't know the src namespaces, we're definitely not the primary task.
if !verifier.verifyAll {
if len(verifier.srcNamespaces) == 0 {
Expand Down
4 changes: 4 additions & 0 deletions internal/verifier/check_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ type CheckRunner struct {
doNextGenerationChan chan struct{}
}

// RunVerifierCheck starts a Check and returns a CheckRunner to track this
// Verifier.
//
// The next method to call is AwaitGenerationEnd.
func RunVerifierCheck(ctx context.Context, t *testing.T, verifier *Verifier) *CheckRunner {
verifierDoneChan := make(chan error)

Expand Down
63 changes: 63 additions & 0 deletions internal/verifier/generation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package verifier

import (
"context"
"fmt"

"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

const (
generationCollName = "generation"
generationFieldName = "generation"
)

func (v *Verifier) persistGenerationWhileLocked(ctx context.Context) error {
generation, _ := v.getGenerationWhileLocked()

db := v.verificationDatabase()

result, err := db.Collection(generationCollName).ReplaceOne(
ctx,
bson.D{},
bson.D{{generationFieldName, generation}},
options.Replace().SetUpsert(true),
)

if err == nil && (result.ModifiedCount+result.UpsertedCount != 1) {
panic(fmt.Sprintf("persist of generation (%d) should affect exactly 1 doc! (%+v)", generation, result))
}

return err
}

func (v *Verifier) readGeneration(ctx context.Context) (option.Option[int], error) {
db := v.verificationDatabase()

result := db.Collection(generationCollName).FindOne(
ctx,
bson.D{},
)

parsed := struct {
Generation int `bson:"generation"`
}{}

err := result.Decode(&parsed)

if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
err = nil
} else {
err = errors.Wrap(err, "failed to read persisted generation")
}

return option.None[int](), err
}

return option.Some(parsed.Generation), nil
}
104 changes: 104 additions & 0 deletions internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/10gen/migration-verifier/internal/testutil"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/mslices"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)
Expand Down Expand Up @@ -87,6 +88,109 @@ func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifie
return results
}

func (suite *IntegrationTestSuite) TestRecheckResumability() {
ctx := suite.Context()

verifier := suite.BuildVerifier()
verifier.SetVerifyAll(true)

runner := RunVerifierCheck(ctx, suite.T(), verifier)
suite.Require().NoError(runner.AwaitGenerationEnd())

suite.Require().NoError(runner.StartNextGeneration())
suite.Require().NoError(runner.AwaitGenerationEnd())

suite.Require().NoError(runner.StartNextGeneration())
suite.Require().NoError(runner.AwaitGenerationEnd())

suite.Require().EqualValues(2, verifier.generation)

verifier2 := suite.BuildVerifier()
verifier2.SetVerifyAll(true)

runner2 := RunVerifierCheck(ctx, suite.T(), verifier2)
suite.Require().NoError(runner2.AwaitGenerationEnd())

suite.Require().EqualValues(verifier.generation, verifier2.generation)
}

func (suite *IntegrationTestSuite) TestRecheckResumability_Mismatch() {
ctx := suite.Context()

srcColl := suite.srcMongoClient.
Database(suite.DBNameForTest()).
Collection("stuff")

ns := srcColl.Database().Name() + "." + srcColl.Name()

dstColl := suite.dstMongoClient.
Database(srcColl.Database().Name()).
Collection(srcColl.Name())

for _, coll := range mslices.Of(srcColl, dstColl) {
suite.Require().NoError(
coll.Database().CreateCollection(ctx, coll.Name()),
)
}

verifier := suite.BuildVerifier()
verifier.SetSrcNamespaces(mslices.Of(ns))
verifier.SetDstNamespaces(mslices.Of(ns))
verifier.SetNamespaceMap()

runner := RunVerifierCheck(ctx, suite.T(), verifier)
suite.Require().NoError(runner.AwaitGenerationEnd())

for range 10 {
suite.Require().NoError(runner.StartNextGeneration())
suite.Require().NoError(runner.AwaitGenerationEnd())
}

_, err := srcColl.InsertOne(ctx, bson.D{{"_id", "on src"}})
suite.Require().NoError(err)

_, err = dstColl.InsertOne(ctx, bson.D{{"_id", "on dst"}})
suite.Require().NoError(err)

suite.T().Logf("Running verifier until it shows mismatch (generation=%d) ...", verifier.generation)
for {
verificationStatus, err := verifier.GetVerificationStatus(ctx)
suite.Require().NoError(err)

recheckDocs := suite.fetchVerifierRechecks(ctx, verifier)

if verificationStatus.FailedTasks != 0 && len(recheckDocs) == 2 {
break
}

suite.Require().NoError(runner.StartNextGeneration())
suite.Require().NoError(runner.AwaitGenerationEnd())
}

suite.T().Logf("Starting a 2nd verifier and confirming that it sees the mismatches.")

verifier2 := suite.BuildVerifier()
verifier2.SetSrcNamespaces(mslices.Of(ns))
verifier2.SetDstNamespaces(mslices.Of(ns))
verifier2.SetNamespaceMap()

runner2 := RunVerifierCheck(ctx, suite.T(), verifier2)
suite.Require().NoError(runner2.AwaitGenerationEnd())

suite.Require().EqualValues(verifier.generation, verifier2.generation)
verificationStatus, err := verifier.GetVerificationStatus(ctx)
suite.Require().NoError(err)

suite.Require().EqualValues(
1,
verificationStatus.FailedTasks,
"restarted verifier should immediately see mismatches",
)

recheckDocs := suite.fetchVerifierRechecks(ctx, verifier2)
suite.Require().Len(recheckDocs, 2, "expect # of rechecks: %+v", recheckDocs)
}

func (suite *IntegrationTestSuite) TestLargeIDInsertions() {
verifier := suite.BuildVerifier()
ctx := suite.Context()
Expand Down

0 comments on commit 51e1ef0

Please sign in to comment.