From 92ca78bdb29c2c7c5d5bf790d5de92354dc6a859 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 20 Dec 2023 12:07:38 -0800 Subject: [PATCH 1/2] logictest: skip TestLogic_lookup_join_local under race Release note: None --- pkg/cmd/generate-logictest/templates.go | 3 +++ pkg/sql/logictest/tests/local/generated_test.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/cmd/generate-logictest/templates.go b/pkg/cmd/generate-logictest/templates.go index 4ecd2ce01128..65e28ee3424b 100644 --- a/pkg/cmd/generate-logictest/templates.go +++ b/pkg/cmd/generate-logictest/templates.go @@ -52,6 +52,9 @@ const templateText = ` {{- if .LogicTest -}} func runLogicTest(t *testing.T, file string) { {{ if .Is3NodeTenant }}skip.UnderRace(t, "large engflow executor is overloaded by 3node-tenant config") + {{ else if eq .TestRuleName "local"}}if file == "lookup_join_local" { + skip.UnderRace(t, "this file is too slow under race") + } {{ end }}skip.UnderDeadlock(t, "times out and/or hangs") logictest.RunLogicTest(t, logictest.TestServerArgs{}, configIdx, filepath.Join(logicTestDir, file)) } diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index d43a14765fe9..323340bcb6ea 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -60,6 +60,9 @@ func TestMain(m *testing.M) { } func runLogicTest(t *testing.T, file string) { + if file == "lookup_join_local" { + skip.UnderRace(t, "this file is too slow under race") + } skip.UnderDeadlock(t, "times out and/or hangs") logictest.RunLogicTest(t, logictest.TestServerArgs{}, configIdx, filepath.Join(logicTestDir, file)) } From 421b9ebd2f284928400e4b3220a4488229485913 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 20 Dec 2023 16:56:27 -0800 Subject: [PATCH 2/2] stats: improve TestCreateStatsControlJob This commit improves `TestCreateStatsControlJob` in the following manner: - it fixes a possible race between another job being created after having issued CREATE STATISTICS query so that in `RunJob` helper we end up getting the job ID of that other job. We now specify the job type to avoid this race which fixes an issue we've seen once. - it now uses a single server since I don't see a reason for why we need 3 node cluster for this test. - it moves `jobutil.RunJob` helper into the stats test code since it's now the only caller of that method. - it disables auto stats to have lower load on the cluster since they could only interfere with the meat of the test. - it unskips the test under race and deadlock since it should now be safe to run under those configs (time will show). Release note: None --- pkg/sql/stats/create_stats_job_test.go | 80 ++++++++++++++------- pkg/testutils/jobutils/jobs_verification.go | 42 ----------- 2 files changed, 55 insertions(+), 67 deletions(-) diff --git a/pkg/sql/stats/create_stats_job_test.go b/pkg/sql/stats/create_stats_job_test.go index 302d493b9fde..64ac3e6d5394 100644 --- a/pkg/sql/stats/create_stats_job_test.go +++ b/pkg/sql/stats/create_stats_job_test.go @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -43,17 +42,11 @@ import ( ) // TestCreateStatsControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB -// work as intended on create statistics jobs. +// work as intended on CREATE STATISTICS jobs. func TestCreateStatsControlJob(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderDeadlock(t, "possible OOM") - skip.UnderRace(t, "possible OOM") - - // Test with 3 nodes and rowexec.SamplerProgressInterval=100 to ensure - // that progress metadata is sent correctly after every 100 input rows. - const nodes = 3 defer func(oldSamplerInterval int, oldSampleAgggregatorInterval time.Duration) { rowexec.SamplerProgressInterval = oldSamplerInterval rowexec.SampleAggregatorProgressInterval = oldSampleAgggregatorInterval @@ -62,19 +55,19 @@ func TestCreateStatsControlJob(t *testing.T) { rowexec.SampleAggregatorProgressInterval = time.Millisecond var allowRequest chan struct{} - - var serverArgs base.TestServerArgs filter, setTableID := createStatsRequestFilter(&allowRequest) - params := base.TestClusterArgs{ServerArgs: serverArgs} - params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() - params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + var serverArgs base.TestServerArgs + serverArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + serverArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ TestingRequestFilter: filter, } ctx := context.Background() - tc := testcluster.StartTestCluster(t, nodes, params) - defer tc.Stopper().Stop(ctx) - sqlDB := sqlutils.MakeSQLRunner(tc.ApplicationLayer(0).SQLConn(t)) + srv, db, _ := serverutils.StartServer(t, serverArgs) + defer srv.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(db) + // Disable auto stats so that they don't interfere with the test. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false;") sqlDB.Exec(t, `CREATE DATABASE d`) sqlDB.Exec(t, `CREATE TABLE d.t (x INT PRIMARY KEY)`) var tID descpb.ID @@ -86,9 +79,7 @@ func TestCreateStatsControlJob(t *testing.T) { query := `CREATE STATISTICS s1 FROM d.t` setTableID(tID) - if _, err := jobutils.RunJob( - t, sqlDB, &allowRequest, []string{"cancel"}, query, - ); err == nil { + if _, err := runCreateStatsJob(ctx, t, sqlDB, &allowRequest, "CANCEL", query); err == nil { t.Fatal("expected an error") } @@ -102,9 +93,7 @@ func TestCreateStatsControlJob(t *testing.T) { // Test that CREATE STATISTICS can be paused and resumed. query := `CREATE STATISTICS s2 FROM d.t` - jobID, err := jobutils.RunJob( - t, sqlDB, &allowRequest, []string{"PAUSE"}, query, - ) + jobID, err := runCreateStatsJob(ctx, t, sqlDB, &allowRequest, "PAUSE", query) if !testutils.IsError(err, "pause") && !testutils.IsError(err, "liveness") { t.Fatalf("unexpected: %v", err) } @@ -129,6 +118,47 @@ func TestCreateStatsControlJob(t *testing.T) { }) } +// runCreateStatsJob runs the provided CREATE STATISTICS job control statement, +// initializing, notifying and closing the chan at the passed pointer (see below +// for why) and returning the jobID and error result. PAUSE JOB and CANCEL JOB +// are racy in that it's hard to guarantee that the job is still running when +// executing a PAUSE or CANCEL -- or that the job has even started running. To +// synchronize, we can install a store response filter which does a blocking +// receive for one of the responses used by our job (for example, Export for a +// BACKUP). Later, when we want to guarantee the job is in progress, we do +// exactly one blocking send. When this send completes, we know the job has +// started, as we've seen one expected response. We also know the job has not +// finished, because we're blocking all future responses until we close the +// channel, and our operation is large enough that it will generate more than +// one of the expected response. +func runCreateStatsJob( + ctx context.Context, + t *testing.T, + db *sqlutils.SQLRunner, + allowProgressIota *chan struct{}, + op string, + query string, + args ...interface{}, +) (jobspb.JobID, error) { + *allowProgressIota = make(chan struct{}) + errCh := make(chan error) + go func() { + _, err := db.DB.ExecContext(ctx, query, args...) + errCh <- err + }() + select { + case *allowProgressIota <- struct{}{}: + case err := <-errCh: + return 0, errors.Wrapf(err, "query returned before expected: %s", query) + } + var jobID jobspb.JobID + db.QueryRow(t, `SELECT id FROM system.jobs WHERE job_type = 'CREATE STATS' ORDER BY created DESC LIMIT 1`).Scan(&jobID) + db.Exec(t, fmt.Sprintf("%s JOB %d", op, jobID)) + *allowProgressIota <- struct{}{} + close(*allowProgressIota) + return jobID, <-errCh +} + func TestCreateStatisticsCanBeCancelled(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -538,9 +568,9 @@ func TestCreateStatsAsOfTime(t *testing.T) { }) } -// Create a blocking request filter for the actions related -// to CREATE STATISTICS, i.e. Scanning a user table. See discussion -// on jobutils.RunJob for where this might be useful. +// Create a blocking request filter for the actions related to CREATE +// STATISTICS, i.e. Scanning a user table. See discussion on runCreateStatsJob +// for where this might be useful. // // Note that it only supports system tenants as well as the secondary tenant // with serverutils.TestTenantID() tenant ID. diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 0af1654356a2..405cec5e49ca 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -13,7 +13,6 @@ package jobutils import ( "context" gosql "database/sql" - "fmt" "sort" "testing" "time" @@ -105,47 +104,6 @@ func waitForJobToHaveStatus( }, 2*time.Minute) } -// RunJob runs the provided job control statement, initializing, notifying and -// closing the chan at the passed pointer (see below for why) and returning the -// jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard -// to guarantee that the job is still running when executing a PAUSE or -// CANCEL--or that the job has even started running. To synchronize, we can -// install a store response filter which does a blocking receive for one of the -// responses used by our job (for example, Export for a BACKUP). Later, when we -// want to guarantee the job is in progress, we do exactly one blocking send. -// When this send completes, we know the job has started, as we've seen one -// expected response. We also know the job has not finished, because we're -// blocking all future responses until we close the channel, and our operation -// is large enough that it will generate more than one of the expected response. -func RunJob( - t *testing.T, - db *sqlutils.SQLRunner, - allowProgressIota *chan struct{}, - ops []string, - query string, - args ...interface{}, -) (jobspb.JobID, error) { - *allowProgressIota = make(chan struct{}) - errCh := make(chan error) - go func() { - _, err := db.DB.ExecContext(context.TODO(), query, args...) - errCh <- err - }() - select { - case *allowProgressIota <- struct{}{}: - case err := <-errCh: - return 0, errors.Wrapf(err, "query returned before expected: %s", query) - } - var jobID jobspb.JobID - db.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) - for _, op := range ops { - db.Exec(t, fmt.Sprintf("%s JOB %d", op, jobID)) - *allowProgressIota <- struct{}{} - } - close(*allowProgressIota) - return jobID, <-errCh -} - // BulkOpResponseFilter creates a blocking response filter for the responses // related to bulk IO/backup/restore/import: Export, Import and AddSSTable. See // discussion on RunJob for where this might be useful.