Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
116886: logictest: skip TestLogic_lookup_join_local under race r=yuzefovich a=yuzefovich

Fixes: cockroachdb#116860.

Release note: None

116909: stats: improve TestCreateStatsControlJob r=yuzefovich a=yuzefovich

This commit improves `TestCreateStatsControlJob` in the following manner:
- it fixes a possible race between another job being created after having issued CREATE STATISTICS query so that in `RunJob` helper we end up getting the job ID of that other job. We now specify the job type to avoid this race which fixes an issue we've seen once.
- it now uses a single server since I don't see a reason for why we need 3 node cluster for this test.
- it moves `jobutil.RunJob` helper into the stats test code since it's now the only caller of that method.
- it disables auto stats to have lower load on the cluster since they could only interfere with the meat of the test.
- it unskips the test under race and deadlock since it should now be safe to run under those configs (time will show).

Fixes: cockroachdb#115072.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Dec 21, 2023
3 parents 3d75a08 + 92ca78b + 421b9eb commit a918371
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 67 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/generate-logictest/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const templateText = `
{{- if .LogicTest -}}
func runLogicTest(t *testing.T, file string) {
{{ if .Is3NodeTenant }}skip.UnderRace(t, "large engflow executor is overloaded by 3node-tenant config")
{{ else if eq .TestRuleName "local"}}if file == "lookup_join_local" {
skip.UnderRace(t, "this file is too slow under race")
}
{{ end }}skip.UnderDeadlock(t, "times out and/or hangs")
logictest.RunLogicTest(t, logictest.TestServerArgs{}, configIdx, filepath.Join(logicTestDir, file))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/tests/local/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 55 additions & 25 deletions pkg/sql/stats/create_stats_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand All @@ -43,17 +42,11 @@ import (
)

// TestCreateStatsControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB
// work as intended on create statistics jobs.
// work as intended on CREATE STATISTICS jobs.
func TestCreateStatsControlJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderDeadlock(t, "possible OOM")
skip.UnderRace(t, "possible OOM")

// Test with 3 nodes and rowexec.SamplerProgressInterval=100 to ensure
// that progress metadata is sent correctly after every 100 input rows.
const nodes = 3
defer func(oldSamplerInterval int, oldSampleAgggregatorInterval time.Duration) {
rowexec.SamplerProgressInterval = oldSamplerInterval
rowexec.SampleAggregatorProgressInterval = oldSampleAgggregatorInterval
Expand All @@ -62,19 +55,19 @@ func TestCreateStatsControlJob(t *testing.T) {
rowexec.SampleAggregatorProgressInterval = time.Millisecond

var allowRequest chan struct{}

var serverArgs base.TestServerArgs
filter, setTableID := createStatsRequestFilter(&allowRequest)
params := base.TestClusterArgs{ServerArgs: serverArgs}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
var serverArgs base.TestServerArgs
serverArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
serverArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: filter,
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, nodes, params)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ApplicationLayer(0).SQLConn(t))
srv, db, _ := serverutils.StartServer(t, serverArgs)
defer srv.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
// Disable auto stats so that they don't interfere with the test.
sqlDB.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false;")
sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, `CREATE TABLE d.t (x INT PRIMARY KEY)`)
var tID descpb.ID
Expand All @@ -86,9 +79,7 @@ func TestCreateStatsControlJob(t *testing.T) {
query := `CREATE STATISTICS s1 FROM d.t`

setTableID(tID)
if _, err := jobutils.RunJob(
t, sqlDB, &allowRequest, []string{"cancel"}, query,
); err == nil {
if _, err := runCreateStatsJob(ctx, t, sqlDB, &allowRequest, "CANCEL", query); err == nil {
t.Fatal("expected an error")
}

Expand All @@ -102,9 +93,7 @@ func TestCreateStatsControlJob(t *testing.T) {
// Test that CREATE STATISTICS can be paused and resumed.
query := `CREATE STATISTICS s2 FROM d.t`

jobID, err := jobutils.RunJob(
t, sqlDB, &allowRequest, []string{"PAUSE"}, query,
)
jobID, err := runCreateStatsJob(ctx, t, sqlDB, &allowRequest, "PAUSE", query)
if !testutils.IsError(err, "pause") && !testutils.IsError(err, "liveness") {
t.Fatalf("unexpected: %v", err)
}
Expand All @@ -129,6 +118,47 @@ func TestCreateStatsControlJob(t *testing.T) {
})
}

// runCreateStatsJob runs the provided CREATE STATISTICS job control statement,
// initializing, notifying and closing the chan at the passed pointer (see below
// for why) and returning the jobID and error result. PAUSE JOB and CANCEL JOB
// are racy in that it's hard to guarantee that the job is still running when
// executing a PAUSE or CANCEL -- or that the job has even started running. To
// synchronize, we can install a store response filter which does a blocking
// receive for one of the responses used by our job (for example, Export for a
// BACKUP). Later, when we want to guarantee the job is in progress, we do
// exactly one blocking send. When this send completes, we know the job has
// started, as we've seen one expected response. We also know the job has not
// finished, because we're blocking all future responses until we close the
// channel, and our operation is large enough that it will generate more than
// one of the expected response.
func runCreateStatsJob(
ctx context.Context,
t *testing.T,
db *sqlutils.SQLRunner,
allowProgressIota *chan struct{},
op string,
query string,
args ...interface{},
) (jobspb.JobID, error) {
*allowProgressIota = make(chan struct{})
errCh := make(chan error)
go func() {
_, err := db.DB.ExecContext(ctx, query, args...)
errCh <- err
}()
select {
case *allowProgressIota <- struct{}{}:
case err := <-errCh:
return 0, errors.Wrapf(err, "query returned before expected: %s", query)
}
var jobID jobspb.JobID
db.QueryRow(t, `SELECT id FROM system.jobs WHERE job_type = 'CREATE STATS' ORDER BY created DESC LIMIT 1`).Scan(&jobID)
db.Exec(t, fmt.Sprintf("%s JOB %d", op, jobID))
*allowProgressIota <- struct{}{}
close(*allowProgressIota)
return jobID, <-errCh
}

func TestCreateStatisticsCanBeCancelled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -538,9 +568,9 @@ func TestCreateStatsAsOfTime(t *testing.T) {
})
}

// Create a blocking request filter for the actions related
// to CREATE STATISTICS, i.e. Scanning a user table. See discussion
// on jobutils.RunJob for where this might be useful.
// Create a blocking request filter for the actions related to CREATE
// STATISTICS, i.e. Scanning a user table. See discussion on runCreateStatsJob
// for where this might be useful.
//
// Note that it only supports system tenants as well as the secondary tenant
// with serverutils.TestTenantID() tenant ID.
Expand Down
42 changes: 0 additions & 42 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package jobutils
import (
"context"
gosql "database/sql"
"fmt"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -105,47 +104,6 @@ func waitForJobToHaveStatus(
}, 2*time.Minute)
}

// RunJob runs the provided job control statement, initializing, notifying and
// closing the chan at the passed pointer (see below for why) and returning the
// jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard
// to guarantee that the job is still running when executing a PAUSE or
// CANCEL--or that the job has even started running. To synchronize, we can
// install a store response filter which does a blocking receive for one of the
// responses used by our job (for example, Export for a BACKUP). Later, when we
// want to guarantee the job is in progress, we do exactly one blocking send.
// When this send completes, we know the job has started, as we've seen one
// expected response. We also know the job has not finished, because we're
// blocking all future responses until we close the channel, and our operation
// is large enough that it will generate more than one of the expected response.
func RunJob(
t *testing.T,
db *sqlutils.SQLRunner,
allowProgressIota *chan struct{},
ops []string,
query string,
args ...interface{},
) (jobspb.JobID, error) {
*allowProgressIota = make(chan struct{})
errCh := make(chan error)
go func() {
_, err := db.DB.ExecContext(context.TODO(), query, args...)
errCh <- err
}()
select {
case *allowProgressIota <- struct{}{}:
case err := <-errCh:
return 0, errors.Wrapf(err, "query returned before expected: %s", query)
}
var jobID jobspb.JobID
db.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)
for _, op := range ops {
db.Exec(t, fmt.Sprintf("%s JOB %d", op, jobID))
*allowProgressIota <- struct{}{}
}
close(*allowProgressIota)
return jobID, <-errCh
}

// BulkOpResponseFilter creates a blocking response filter for the responses
// related to bulk IO/backup/restore/import: Export, Import and AddSSTable. See
// discussion on RunJob for where this might be useful.
Expand Down

0 comments on commit a918371

Please sign in to comment.