diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 2fd68e6122d3..6b607ce37966 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -309,6 +309,7 @@ go_test( "//pkg/sql/randgen", "//pkg/sql/rowenc", "//pkg/sql/rowenc/keyside", + "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sem/volatility", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index d2e7510ef5dc..095314d3cd8d 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1821,40 +1822,66 @@ func (cf *changeFrontier) manageProtectedTimestamps( return false, err } - if rec.Target != nil { - // Only update the PTS timestamp if it is lagging behind the high - // watermark. This is to prevent a rush of updates to the PTS if the - // changefeed restarts, which can cause contention and second order effects - // on system tables. - if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) { + // If this changefeed was created in 22.1 or earlier, it may be using a + // deprecated pts record in which the target field is nil. If so, we + // "migrate" it to use the new style of pts records and delete the old one. + if rec.Target == nil { + if preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts(); preserveDeprecatedPts { return false, nil } - - log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater) - return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater) + if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil { + return false, err + } + return true, nil } - // If this changefeed was created in 22.1 or earlier, it may be using a deprecated pts record in which - // the target field is nil. If so, we "migrate" it to use the new style of pts records and delete the old one. - preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts() - if !preserveDeprecatedPts { - prevRecordId := progress.ProtectedTimestampRecord - ptr := createProtectedTimestampRecord( - ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, - ) - if err := pts.Protect(ctx, ptr); err != nil { - return false, err + // If we've identified more tables that need to be protected since this + // changefeed was created, it will be missing here. If so, we "migrate" it + // to include all the appropriate targets. + if targets := AllTargets(cf.spec.Feed); !makeTargetToProtect(targets).Equal(rec.Target) { + if preservePTSTargets := cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets(); preservePTSTargets { + return false, nil } - progress.ProtectedTimestampRecord = ptr.ID.GetUUID() - if err := pts.Release(ctx, prevRecordId); err != nil { + if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil { return false, err } + return true, nil + } - log.Eventf(ctx, "created new pts record %v to replace old pts record %v at %v", - progress.ProtectedTimestampRecord, prevRecordId, highWater) + // Only update the PTS timestamp if it is lagging behind the high + // watermark. This is to prevent a rush of updates to the PTS if the + // changefeed restarts, which can cause contention and second order effects + // on system tables. + if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) { + return false, nil } - return true, nil + log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater) + return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater) +} + +func (cf *changeFrontier) remakePTSRecord( + ctx context.Context, + pts protectedts.Storage, + progress *jobspb.ChangefeedProgress, + resolved hlc.Timestamp, +) error { + prevRecordId := progress.ProtectedTimestampRecord + ptr := createProtectedTimestampRecord( + ctx, cf.FlowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), resolved, + ) + if err := pts.Protect(ctx, ptr); err != nil { + return err + } + progress.ProtectedTimestampRecord = ptr.ID.GetUUID() + if err := pts.Release(ctx, prevRecordId); err != nil { + return err + } + + log.Eventf(ctx, "created new pts record %v to replace old pts record %v at %v", + progress.ProtectedTimestampRecord, prevRecordId, resolved) + + return nil } func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { diff --git a/pkg/ccl/changefeedccl/protected_timestamps_test.go b/pkg/ccl/changefeedccl/protected_timestamps_test.go index 9ac09309cac2..8d854322e6fc 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps_test.go +++ b/pkg/ccl/changefeedccl/protected_timestamps_test.go @@ -8,6 +8,7 @@ package changefeedccl import ( "context" "fmt" + "slices" "sync/atomic" "testing" "time" @@ -34,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -42,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -562,6 +565,121 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) { } +// TestChangefeedUpdateProtectedTimestampTargets tests that changefeeds will +// remake their PTS records if they detect that they lack required targets. +func TestChangefeedMigratesProtectedTimestampTargets(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) { + ctx := context.Background() + + dontMigrate := atomic.Bool{} + dontMigrate.Store(true) + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + knobs.PreservePTSTargets = func() bool { + return dontMigrate.Load() + } + + ptsInterval := 50 * time.Millisecond + changefeedbase.ProtectTimestampInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, ptsInterval) + changefeedbase.ProtectTimestampLag.Override( + context.Background(), &s.Server.ClusterSettings().SV, ptsInterval) + + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sysDB := sqlutils.MakeSQLRunner(s.SystemServer.SQLConn(t)) + + sysDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'") + sysDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`) + defer closeFeed(t, foo) + + registry := s.Server.JobRegistry().(*jobs.Registry) + execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig) + ptp := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider + fooDesc := desctestutils.TestingGetPublicTableDescriptor(s.SystemServer.DB(), s.Codec, "d", "foo") + fooID := fooDesc.GetID() + + jobFeed := foo.(cdctest.EnterpriseTestFeed) + + // removes table 3 from the target of the PTS record. + removeOnePTSTarget := func(recordID uuid.UUID) error { + return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + s := `select target from system.protected_ts_records where id = $1` + datums, err := txn.QueryRowEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride, s, recordID) + require.NoError(t, err) + j := tree.MustBeDBytes(datums[0]) + + target := &ptpb.Target{} + require.NoError(t, protoutil.Unmarshal([]byte(j), target)) + + // remove '3' (system.descriptor) to simulate a missing system table + ids := target.GetSchemaObjects().IDs + idx := slices.Index(ids, catid.DescID(3)) + target.GetSchemaObjects().IDs = slices.Delete(ids, idx, idx+1) + + bs, err := protoutil.Marshal(target) + require.NoError(t, err) + + _, err = txn.ExecEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride, + "UPDATE system.protected_ts_records SET target = $1 WHERE id = $2", bs, recordID, + ) + require.NoError(t, err) + return nil + }) + } + + // Wipe out the targets from the changefeed PTS record, simulating an old-style PTS record. + oldRecordID := getPTSRecordID(ctx, t, registry, jobFeed) + require.NoError(t, removeOnePTSTarget(oldRecordID)) + + // Sanity check: make sure that it worked + oldRecord, err := readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) + require.NoError(t, err) + targetIDs := oldRecord.Target.GetSchemaObjects().IDs + require.Contains(t, targetIDs, fooID) + require.NotSubset(t, targetIDs, systemTablesToProtect) + + // Flip the knob so the changefeed migrates the record + dontMigrate.Store(false) + + getNewPTSRecord := func() *ptpb.Record { + var recID uuid.UUID + var record *ptpb.Record + testutils.SucceedsSoon(t, func() error { + recID = getPTSRecordID(ctx, t, registry, jobFeed) + if recID.Equal(oldRecordID) { + return errors.New("waiting for new PTS record") + } + return nil + }) + record, err := readPTSRecord(ctx, t, execCfg, ptp, recID) + require.NoError(t, err) + return record + } + + // Read the new PTS record. + newRec := getNewPTSRecord() + require.NotNil(t, newRec.Target) + + // Assert the new PTS record has the right targets. + targetIDs = newRec.Target.GetSchemaObjects().IDs + require.Contains(t, targetIDs, fooID) + require.Subset(t, targetIDs, systemTablesToProtect) + + // Ensure the old pts record was deleted. + _, err = readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) + require.ErrorContains(t, err, "does not exist") + } + + cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks) +} + // TestChangefeedUpdateProtectedTimestamp tests that changefeeds using the // old style PTS records will migrate themselves to use the new style PTS // records. @@ -605,41 +723,7 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { descID := descpb.ID(keys.DescriptorTableID) jobFeed := foo.(cdctest.EnterpriseTestFeed) - loadProgressErr := func() (jobspb.Progress, error) { - job, err := registry.LoadJob(ctx, jobFeed.JobID()) - if err != nil { - return jobspb.Progress{}, err - } - return job.Progress(), nil - } - getPTSRecordID := func() uuid.UUID { - var recordID uuid.UUID - testutils.SucceedsSoon(t, func() error { - progress, err := loadProgressErr() - if err != nil { - return err - } - uid := progress.GetChangefeed().ProtectedTimestampRecord - if uid == uuid.Nil { - return errors.Newf("no pts record") - } - recordID = uid - return nil - }) - return recordID - } - - readPTSRecord := func(recID uuid.UUID) (rec *ptpb.Record, err error) { - err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - rec, err = ptp.WithTxn(txn).GetRecord(ctx, recID) - if err != nil { - return err - } - return nil - }) - return - } removePTSTarget := func(recordID uuid.UUID) error { return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { if _, err := txn.ExecEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride, @@ -654,9 +738,9 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { } // Wipe out the targets from the changefeed PTS record, simulating an old-style PTS record. - oldRecordID := getPTSRecordID() + oldRecordID := getPTSRecordID(ctx, t, registry, jobFeed) require.NoError(t, removePTSTarget(oldRecordID)) - rec, err := readPTSRecord(oldRecordID) + rec, err := readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) require.NoError(t, err) require.NotNil(t, rec) require.Nil(t, rec.Target) @@ -668,14 +752,14 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { var recID uuid.UUID var record *ptpb.Record testutils.SucceedsSoon(t, func() error { - recID = getPTSRecordID() + recID = getPTSRecordID(ctx, t, registry, jobFeed) if recID.Equal(oldRecordID) { return errors.New("waiting for new PTS record") } return nil }) - record, err = readPTSRecord(recID) + record, err = readPTSRecord(ctx, t, execCfg, ptp, recID) if err != nil { t.Fatal(err) } @@ -692,7 +776,7 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) { require.Contains(t, targetIDs, descID) // Ensure the old pts record was deleted. - _, err = readPTSRecord(oldRecordID) + _, err = readPTSRecord(ctx, t, execCfg, ptp, oldRecordID) require.ErrorContains(t, err, "does not exist") } @@ -768,3 +852,49 @@ func fetchUsersAndPasswords( } return users, nil } + +func getPTSRecordID( + ctx context.Context, t *testing.T, registry *jobs.Registry, jobFeed cdctest.EnterpriseTestFeed, +) uuid.UUID { + var recordID uuid.UUID + testutils.SucceedsSoon(t, func() error { + progress, err := loadProgressErr(ctx, registry, jobFeed) + if err != nil { + return err + } + uid := progress.GetChangefeed().ProtectedTimestampRecord + if uid == uuid.Nil { + return errors.Newf("no pts record") + } + recordID = uid + return nil + }) + return recordID +} + +func readPTSRecord( + ctx context.Context, + t *testing.T, + execCfg sql.ExecutorConfig, + ptp protectedts.Provider, + recID uuid.UUID, +) (rec *ptpb.Record, err error) { + err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + rec, err = ptp.WithTxn(txn).GetRecord(ctx, recID) + if err != nil { + return err + } + return nil + }) + return +} + +func loadProgressErr( + ctx context.Context, registry *jobs.Registry, jobFeed cdctest.EnterpriseTestFeed, +) (jobspb.Progress, error) { + job, err := registry.LoadJob(ctx, jobFeed.JobID()) + if err != nil { + return jobspb.Progress{}, err + } + return job.Progress(), nil +} diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index f371195408f9..d7825975fd75 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -84,6 +84,10 @@ type TestingKnobs struct { // its PTS record from the deprecated style to the new style. PreserveDeprecatedPts func() bool + // PreservePTSTargets is used to prevent a changefeed from upgrading + // its PTS record to include all required targets. + PreservePTSTargets func() bool + // PulsarClientSkipCreation skips creating the sink client when // dialing. PulsarClientSkipCreation bool