From 57b46ab08b623fc9fa1854484d4060aead0b7d22 Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Mon, 16 Dec 2024 14:52:40 -0500 Subject: [PATCH] changefeedccl: migrate pts records to include new tables Update the pts records of running feeds when the set of targets changes, such as when we add new system tables to protect. Fixes: #133578 Part of: #128806 Release note (general change): The pts records of running feeds are now updated when the set of targets changes, such as when system tables are added to the protected tables list. --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/changefeed_processors.go | 75 +++++-- .../protected_timestamps_test.go | 208 ++++++++++++++---- pkg/ccl/changefeedccl/testing_knobs.go | 4 + 4 files changed, 225 insertions(+), 63 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 2fd68e6122d3..6b607ce37966 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -309,6 +309,7 @@ go_test( "//pkg/sql/randgen", "//pkg/sql/rowenc", "//pkg/sql/rowenc/keyside", + "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sem/volatility", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index d2e7510ef5dc..095314d3cd8d 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1821,40 +1822,66 @@ func (cf *changeFrontier) manageProtectedTimestamps( return false, err } - if rec.Target != nil { - // Only update the PTS timestamp if it is lagging behind the high - // watermark. This is to prevent a rush of updates to the PTS if the - // changefeed restarts, which can cause contention and second order effects - // on system tables. - if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) { + // If this changefeed was created in 22.1 or earlier, it may be using a + // deprecated pts record in which the target field is nil. If so, we + // "migrate" it to use the new style of pts records and delete the old one. + if rec.Target == nil { + if preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts(); preserveDeprecatedPts { return false, nil } - - log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater) - return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater) + if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil { + return false, err + } + return true, nil } - // If this changefeed was created in 22.1 or earlier, it may be using a deprecated pts record in which - // the target field is nil. If so, we "migrate" it to use the new style of pts records and delete the old one. - preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts() - if !preserveDeprecatedPts { - prevRecordId := progress.ProtectedTimestampRecord - ptr := createProtectedTimestampRecord( - ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, - ) - if err := pts.Protect(ctx, ptr); err != nil { - return false, err + // If we've identified more tables that need to be protected since this + // changefeed was created, it will be missing here. If so, we "migrate" it + // to include all the appropriate targets. + if targets := AllTargets(cf.spec.Feed); !makeTargetToProtect(targets).Equal(rec.Target) { + if preservePTSTargets := cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets(); preservePTSTargets { + return false, nil } - progress.ProtectedTimestampRecord = ptr.ID.GetUUID() - if err := pts.Release(ctx, prevRecordId); err != nil { + if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil { return false, err } + return true, nil + } - log.Eventf(ctx, "created new pts record %v to replace old pts record %v at %v", - progress.ProtectedTimestampRecord, prevRecordId, highWater) + // Only update the PTS timestamp if it is lagging behind the high + // watermark. This is to prevent a rush of updates to the PTS if the + // changefeed restarts, which can cause contention and second order effects + // on system tables. + if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) { + return false, nil } - return true, nil + log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater) + return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater) +} + +func (cf *changeFrontier) remakePTSRecord( + ctx context.Context, + pts protectedts.Storage, + progress *jobspb.ChangefeedProgress, + resolved hlc.Timestamp, +) error { + prevRecordId := progress.ProtectedTimestampRecord + ptr := createProtectedTimestampRecord( + ctx, cf.FlowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), resolved, + ) + if err := pts.Protect(ctx, ptr); err != nil { + return err + } + progress.ProtectedTimestampRecord = ptr.ID.GetUUID() + if err := pts.Release(ctx, prevRecordId); err != nil { + return err + } + + log.Eventf(ctx, "created new pts record %v to replace old pts record %v at %v", + progress.ProtectedTimestampRecord, prevRecordId, resolved) + + return nil } func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { diff --git a/pkg/ccl/changefeedccl/protected_timestamps_test.go b/pkg/ccl/changefeedccl/protected_timestamps_test.go index 9ac09309cac2..8d854322e6fc 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps_test.go +++ b/pkg/ccl/changefeedccl/protected_timestamps_test.go @@ -8,6 +8,7 @@ package changefeedccl import ( "context" "fmt" + "slices" "sync/atomic" "testing" "time" @@ -34,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -42,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -562,6 +565,121 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) { } +// TestChangefeedUpdateProtectedTimestampTargets tests that changefeeds will +// remake their PTS records if they detect that they lack required targets. +func TestChangefeedMigratesProtectedTimestampTargets(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { + ctx := context.Background() + + dontMigrate := atomic.Bool{} + dontMigrate.Store(true) + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + knobs.PreservePTSTargets = func() bool { + return dontMigrate.Load() + } + + ptsInterval := 50 * time.Millisecond + changefeedbase.ProtectTimestampInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, ptsInterval) + changefeedbase.ProtectTimestampLag.Override( + context.Background(), &s.Server.ClusterSettings().SV, ptsInterval) + + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sysDB := sqlutils.MakeSQLRunner(s.SystemServer.SQLConn(t)) + + sysDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'") + sysDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`) + defer closeFeed(t, foo) + + registry := s.Server.JobRegistry().(*jobs.Registry) + execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig) + ptp := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider + fooDesc := desctestutils.TestingGetPublicTableDescriptor(s.SystemServer.DB(), s.Codec, "d", "foo") + fooID := fooDesc.GetID() + + jobFeed := foo.(cdctest.EnterpriseTestFeed) + + // removes table 3 from the target of the PTS record. + removeOnePTSTarget := func(recordID uuid.UUID) error { + return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + s := `select target from system.protected_ts_records where id = $1` + datums, err := txn.QueryRowEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride, s, recordID) + require.NoError(t, err) + j := tree.MustBeDBytes(datums[0]) + + target := &ptpb.Target{} + require.NoError(t, protoutil.Unmarshal([]byte(j), target)) + + // remove '3' (system.descriptor) to simulate a missing system table + ids := target.GetSchemaObjects().IDs + idx := slices.Index(ids, catid.DescID(3)) + target.GetSchemaObjects().IDs = slices.Delete(ids, idx, idx+1) + + bs, err := protoutil.Marshal(target) + require.NoError(t, err) + + _, err = txn.ExecEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride, + "UPDATE system.protected_ts_records SET target = $1 WHERE id = $2", bs, recordID, + ) + require.NoError(t, err) + return nil + }) + } + + // Wipe out the targets from the changefeed PTS record, simulating an old-style PTS record. + oldRecordID := getPTSRecordID(ctx, t, registry, jobFeed) + require.NoError(t, removeOnePTSTarget(oldRecordID)) + + // Sanity check: make sure that it worked + oldRecord, err := readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) + require.NoError(t, err) + targetIDs := oldRecord.Target.GetSchemaObjects().IDs + require.Contains(t, targetIDs, fooID) + require.NotSubset(t, targetIDs, systemTablesToProtect) + + // Flip the knob so the changefeed migrates the record + dontMigrate.Store(false) + + getNewPTSRecord := func() *ptpb.Record { + var recID uuid.UUID + var record *ptpb.Record + testutils.SucceedsSoon(t, func() error { + recID = getPTSRecordID(ctx, t, registry, jobFeed) + if recID.Equal(oldRecordID) { + return errors.New("waiting for new PTS record") + } + return nil + }) + record, err := readPTSRecord(ctx, t, execCfg, ptp, recID) + require.NoError(t, err) + return record + } + + // Read the new PTS record. + newRec := getNewPTSRecord() + require.NotNil(t, newRec.Target) + + // Assert the new PTS record has the right targets. + targetIDs = newRec.Target.GetSchemaObjects().IDs + require.Contains(t, targetIDs, fooID) + require.Subset(t, targetIDs, systemTablesToProtect) + + // Ensure the old pts record was deleted. + _, err = readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) + require.ErrorContains(t, err, "does not exist") + } + + cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks) +} + // TestChangefeedUpdateProtectedTimestamp tests that changefeeds using the // old style PTS records will migrate themselves to use the new style PTS // records. @@ -605,41 +723,7 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { descID := descpb.ID(keys.DescriptorTableID) jobFeed := foo.(cdctest.EnterpriseTestFeed) - loadProgressErr := func() (jobspb.Progress, error) { - job, err := registry.LoadJob(ctx, jobFeed.JobID()) - if err != nil { - return jobspb.Progress{}, err - } - return job.Progress(), nil - } - getPTSRecordID := func() uuid.UUID { - var recordID uuid.UUID - testutils.SucceedsSoon(t, func() error { - progress, err := loadProgressErr() - if err != nil { - return err - } - uid := progress.GetChangefeed().ProtectedTimestampRecord - if uid == uuid.Nil { - return errors.Newf("no pts record") - } - recordID = uid - return nil - }) - return recordID - } - - readPTSRecord := func(recID uuid.UUID) (rec *ptpb.Record, err error) { - err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - rec, err = ptp.WithTxn(txn).GetRecord(ctx, recID) - if err != nil { - return err - } - return nil - }) - return - } removePTSTarget := func(recordID uuid.UUID) error { return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { if _, err := txn.ExecEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride, @@ -654,9 +738,9 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { } // Wipe out the targets from the changefeed PTS record, simulating an old-style PTS record. - oldRecordID := getPTSRecordID() + oldRecordID := getPTSRecordID(ctx, t, registry, jobFeed) require.NoError(t, removePTSTarget(oldRecordID)) - rec, err := readPTSRecord(oldRecordID) + rec, err := readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) require.NoError(t, err) require.NotNil(t, rec) require.Nil(t, rec.Target) @@ -668,14 +752,14 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { var recID uuid.UUID var record *ptpb.Record testutils.SucceedsSoon(t, func() error { - recID = getPTSRecordID() + recID = getPTSRecordID(ctx, t, registry, jobFeed) if recID.Equal(oldRecordID) { return errors.New("waiting for new PTS record") } return nil }) - record, err = readPTSRecord(recID) + record, err = readPTSRecord(ctx, t, execCfg, ptp, recID) if err != nil { t.Fatal(err) } @@ -692,7 +776,7 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { require.Contains(t, targetIDs, descID) // Ensure the old pts record was deleted. - _, err = readPTSRecord(oldRecordID) + _, err = readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) require.ErrorContains(t, err, "does not exist") } @@ -768,3 +852,49 @@ func fetchUsersAndPasswords( } return users, nil } + +func getPTSRecordID( + ctx context.Context, t *testing.T, registry *jobs.Registry, jobFeed cdctest.EnterpriseTestFeed, +) uuid.UUID { + var recordID uuid.UUID + testutils.SucceedsSoon(t, func() error { + progress, err := loadProgressErr(ctx, registry, jobFeed) + if err != nil { + return err + } + uid := progress.GetChangefeed().ProtectedTimestampRecord + if uid == uuid.Nil { + return errors.Newf("no pts record") + } + recordID = uid + return nil + }) + return recordID +} + +func readPTSRecord( + ctx context.Context, + t *testing.T, + execCfg sql.ExecutorConfig, + ptp protectedts.Provider, + recID uuid.UUID, +) (rec *ptpb.Record, err error) { + err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + rec, err = ptp.WithTxn(txn).GetRecord(ctx, recID) + if err != nil { + return err + } + return nil + }) + return +} + +func loadProgressErr( + ctx context.Context, registry *jobs.Registry, jobFeed cdctest.EnterpriseTestFeed, +) (jobspb.Progress, error) { + job, err := registry.LoadJob(ctx, jobFeed.JobID()) + if err != nil { + return jobspb.Progress{}, err + } + return job.Progress(), nil +} diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index f371195408f9..d7825975fd75 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -84,6 +84,10 @@ type TestingKnobs struct { // its PTS record from the deprecated style to the new style. PreserveDeprecatedPts func() bool + // PreservePTSTargets is used to prevent a changefeed from upgrading + // its PTS record to include all required targets. + PreservePTSTargets func() bool + // PulsarClientSkipCreation skips creating the sink client when // dialing. PulsarClientSkipCreation bool