Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#136321 cockroachdb#136499

135217: roachtest: add bandwidth limit to tests r=herkolategan,kvoli a=andrewbaptist

This change adds the ability to set the bandwidth limit for all the
perturbation/* tests. The default full tests still run without it set to
mimic the behavior of the "out of the box" configuration, but the
metamorphic tests will choose a bandwidth limit and it can be set for
manual testing using an env variable.

Epic: none

Release note: None

135566: roachtest: remove unnecessary parameter to writePerfArtifacts r=kvoli a=andrewbaptist

This change updates the perturbation/* tests to remove an unnessary argument to writePerfArtifacts since the variations receiver already has a cluster internally.

Epic: none

Release note: None

136039: roachtest: run a read from standby workload on c2c/kv0 r=fqazi a=msbutler

This patch teaches the c2c roachtest driver to create a standby reader tenant
and run a workload on it. These tools are then used to spin up a kv100 workload
on the reader tenant in the c2c/kv0 roachtest.

Epic: none

Release note: none

136321: rac2,replica_rac2: add ForceFlushIndexChangedLocked to Processor, Ran… r=kvoli a=sumeerbhola

…geController

This is used to set the highest index up to which all send-queues in pull mode must be force-flushed.

Informs cockroachdb#135601

Epic: CRDB-37515

Release note: None

136499: kvserver: skip expiration based leases under deadlock r=iskettaneh a=iskettaneh

Expiration-based leases overload the testing cluster when run under deadlock. We used to metamorphically skip it under deadlock. However, with the recent push where we explicitly ran some tests in all lease types, it made us not skip expiration-based leases under deadlock. This
 commit changes that.

References: cockroachdb#133763

Release note: None

Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Ibrahim Kettaneh <[email protected]>
  • Loading branch information
5 people committed Dec 2, 2024
6 parents fd2fec8 + aa48de0 + e263d23 + 49b489d + 56547ac + 117586a commit 7a8707d
Show file tree
Hide file tree
Showing 21 changed files with 1,223 additions and 178 deletions.
73 changes: 57 additions & 16 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (ikv replicateImportKV) runDriver(
type replicateKV struct {
readPercent int

tolerateErrors bool

// This field is merely used to debug the c2c framework for finite workloads.
debugRunDuration time.Duration

Expand Down Expand Up @@ -309,6 +311,11 @@ type replicateKV struct {
// antiRegion is the region we do not expect any kv data to reside in if
// partitionKVDatabaseInRegion is set.
antiRegion string

// readOnly sets the prepare-read-only flag in the kv workload, which elides
// preparing writing statements. This is necessary to get the workload running
// properly on a read only standby tenant.
readOnly bool
}

func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
Expand All @@ -319,18 +326,21 @@ func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOpti
MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes).
MaybeFlag(kv.initWithSplitAndScatter, "splits", 100).
MaybeOption(kv.initWithSplitAndScatter, "scatter").
Arg("{pgurl%s:%s}", nodes, tenantName)
Arg("{pgurl%s:%s}", nodes, tenantName).
WithEqualsSyntax()
return cmd.String()
}

func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string {
cmd := roachtestutil.NewCommand(`./cockroach workload run kv`).
Option("tolerate-errors").
Flag("max-block-bytes", kv.maxBlockBytes).
MaybeOption(kv.tolerateErrors, "tolerate-errors").
MaybeFlag(kv.maxBlockBytes > 0, "max-block-bytes", kv.maxBlockBytes).
Flag("read-percent", kv.readPercent).
MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration).
MaybeFlag(kv.maxQPS > 0, "max-rate", kv.maxQPS).
Arg("{pgurl%s:%s}", nodes, tenantName)
MaybeFlag(kv.readOnly, "prepare-read-only", true).
Arg("{pgurl%s:%s}", nodes, tenantName).
WithEqualsSyntax()
return cmd.String()
}

Expand Down Expand Up @@ -433,6 +443,9 @@ type replicationSpec struct {
// multiregion specifies multiregion cluster specs
multiregion multiRegionSpecs

// withReaderOnlyWorkload creates a reader tenant that runs the given workload.
withReaderWorkload streamingWorkload

// overrideTenantTTL specifies the TTL that will be applied by the system tenant on
// both the source and destination tenant range.
overrideTenantTTL time.Duration
Expand Down Expand Up @@ -696,6 +709,9 @@ func (rd *replicationDriver) preStreamingWorkload(ctx context.Context) {
func (rd *replicationDriver) startReplicationStream(ctx context.Context) int {
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",
rd.setup.dst.name, rd.setup.src.name, rd.setup.src.pgURL.String())
if rd.rs.withReaderWorkload != nil {
streamReplStmt += " WITH READ VIRTUAL CLUSTER"
}
rd.setup.dst.sysSQL.Exec(rd.t, streamReplStmt)
rd.replicationStartHook(ctx, rd)
return getIngestionJobID(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name)
Expand Down Expand Up @@ -878,6 +894,26 @@ func (rd *replicationDriver) backupAfterFingerprintMismatch(
return nil
}

func (rd *replicationDriver) maybeRunReaderTenantWorkload(
ctx context.Context, workloadMonitor cluster.Monitor,
) {
if rd.rs.withReaderWorkload != nil {
rd.t.Status("running reader tenant workload")
readerTenantName := fmt.Sprintf("%s-readonly", rd.setup.dst.name)
workloadMonitor.Go(func(ctx context.Context) error {
err := rd.c.RunE(ctx, option.WithNodes(rd.setup.workloadNode), rd.rs.withReaderWorkload.sourceRunCmd(readerTenantName, rd.setup.dst.gatewayNodes))
// The workload should only return an error if the roachtest driver cancels the
// ctx after the rd.additionalDuration has elapsed after the initial scan completes.
if err != nil && ctx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
}
return nil
})
}
}

// checkParticipatingNodes asserts that multiple nodes in the source and dest cluster are
// participating in the replication stream.
//
Expand Down Expand Up @@ -991,6 +1027,8 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`,
rd.rs.additionalDuration))

rd.maybeRunReaderTenantWorkload(ctx, workloadMonitor)

select {
case <-workloadDoneCh:
rd.t.L().Printf("workload finished on its own")
Expand Down Expand Up @@ -1094,7 +1132,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster
dstNodes: 1,
// The timeout field ensures the c2c roachtest driver behaves properly.
timeout: 10 * time.Minute,
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1, initWithSplitAndScatter: true},
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1, initWithSplitAndScatter: true, tolerateErrors: true},
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
skipNodeDistributionCheck: true,
Expand Down Expand Up @@ -1145,6 +1183,7 @@ func registerClusterToCluster(r registry.Registry) {
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
withReaderWorkload: replicateKV{readPercent: 100, readOnly: true, tolerateErrors: true},
sometimesTestFingerprintMismatchCode: true,
clouds: registry.OnlyGCE,
suites: registry.Suites(registry.Nightly),
Expand All @@ -1160,7 +1199,7 @@ func registerClusterToCluster(r registry.Registry) {
// gives us max write BW of 800MB/s.
pdSize: 1667,
// Write ~50GB total (~12.5GB per node).
workload: replicateKV{readPercent: 0, initRows: 50000000, maxBlockBytes: 2048},
workload: replicateKV{readPercent: 0, initRows: 50000000, maxBlockBytes: 2048, tolerateErrors: true},
timeout: 1 * time.Hour,
additionalDuration: 5 * time.Minute,
cutover: 0,
Expand All @@ -1178,7 +1217,7 @@ func registerClusterToCluster(r registry.Registry) {
// Write ~7TB data to disk via Import -- takes a little over 1 hour.
workload: replicateImportKV{
replicateSplits: true,
replicateKV: replicateKV{readPercent: 0, initRows: 5000000000, maxBlockBytes: 1024}},
replicateKV: replicateKV{readPercent: 0, initRows: 5000000000, maxBlockBytes: 1024, tolerateErrors: true}},
timeout: 3 * time.Hour,
// While replicating a bulk op, expect the max latency to be the runtime
// of the bulk op.
Expand Down Expand Up @@ -1237,10 +1276,11 @@ func registerClusterToCluster(r registry.Registry) {

workload: replicateKV{
// Write a ~2TB initial scan.
initRows: 350000000,
readPercent: 50,
maxBlockBytes: 4096,
maxQPS: 2000,
initRows: 350000000,
readPercent: 50,
maxBlockBytes: 4096,
maxQPS: 2000,
tolerateErrors: true,
},
maxAcceptedLatency: time.Minute * 5,
timeout: 12 * time.Hour,
Expand All @@ -1266,6 +1306,7 @@ func registerClusterToCluster(r registry.Registry) {
initWithSplitAndScatter: true,
partitionKVDatabaseInRegion: "us-west1",
antiRegion: "us-central1",
tolerateErrors: true,
},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
Expand All @@ -1286,11 +1327,11 @@ func registerClusterToCluster(r registry.Registry) {
cpus: 4,
pdSize: 10,
workload: replicateKV{
readPercent: 0,
debugRunDuration: 1 * time.Minute,
readPercent: 50,
debugRunDuration: 10 * time.Minute,
initWithSplitAndScatter: true,
maxBlockBytes: 1024},
timeout: 5 * time.Minute,
timeout: 30 * time.Minute,
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
skipNodeDistributionCheck: true,
Expand Down Expand Up @@ -1587,7 +1628,7 @@ func registerClusterReplicationResilience(r registry.Registry) {
srcNodes: 4,
dstNodes: 4,
cpus: 8,
workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024, initWithSplitAndScatter: true},
workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024, initWithSplitAndScatter: true, tolerateErrors: true},
timeout: 20 * time.Minute,
additionalDuration: 6 * time.Minute,
cutover: 3 * time.Minute,
Expand Down Expand Up @@ -1703,7 +1744,7 @@ func registerClusterReplicationDisconnect(r registry.Registry) {
srcNodes: 3,
dstNodes: 3,
cpus: 4,
workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024, initWithSplitAndScatter: true},
workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024, initWithSplitAndScatter: true, tolerateErrors: true},
timeout: 20 * time.Minute,
additionalDuration: 10 * time.Minute,
cutover: 2 * time.Minute,
Expand Down
7 changes: 6 additions & 1 deletion pkg/cmd/roachtest/tests/logical_data_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func TestLDRBasic(
debugRunDuration: duration,
maxBlockBytes: maxBlockBytes,
initRows: initRows,
tolerateErrors: true,
initWithSplitAndScatter: !c.IsLocal()},
dbName: "kv",
tableNames: []string{"kv"},
Expand Down Expand Up @@ -273,7 +274,9 @@ func TestLDRSchemaChange(
debugRunDuration: duration,
maxBlockBytes: 1024,
initRows: 1000,
initWithSplitAndScatter: true},
initWithSplitAndScatter: true,
tolerateErrors: true,
},
dbName: "kv",
tableNames: []string{"kv"},
}
Expand Down Expand Up @@ -413,6 +416,7 @@ func TestLDROnNodeShutdown(
debugRunDuration: duration,
maxBlockBytes: 1024,
initRows: 1000,
tolerateErrors: true,
initWithSplitAndScatter: true},
dbName: "kv",
tableNames: []string{"kv"},
Expand Down Expand Up @@ -530,6 +534,7 @@ func TestLDROnNetworkPartition(
debugRunDuration: duration,
maxBlockBytes: 1024,
initRows: 1000,
tolerateErrors: true,
initWithSplitAndScatter: true},
dbName: "kv",
tableNames: []string{"kv"},
Expand Down
26 changes: 15 additions & 11 deletions pkg/cmd/roachtest/tests/perturbation/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type variations struct {
acceptableChange float64
cloud registry.CloudSet
acMode admissionControlMode
diskBandwidthLimit string
profileOptions []roachtestutil.ProfileOptionFunc
specOptions []spec.Option
clusterSettings map[string]string
Expand All @@ -108,6 +109,7 @@ var numDisks = []int{1, 2}
var memOptions = []spec.MemPerCPU{spec.Low, spec.Standard, spec.High}
var cloudSets = []registry.CloudSet{registry.OnlyAWS, registry.OnlyGCE, registry.OnlyAzure}
var admissionControlOptions = []admissionControlMode{elasticOnlyBoth, fullNormalElasticRepl, fullBoth}
var diskBandwidthLimitOptions = []string{"0", "350MiB"}

var leases = []registry.LeaseType{
registry.EpochLeases,
Expand Down Expand Up @@ -187,10 +189,12 @@ func (a admissionControlMode) getSettings() map[string]string {
func (v variations) String() string {
return fmt.Sprintf("seed: %d, fillDuration: %s, maxBlockBytes: %d, perturbationDuration: %s, "+
"validationDuration: %s, ratioOfMax: %f, splits: %d, numNodes: %d, numWorkloadNodes: %d, "+
"vcpu: %d, disks: %d, memory: %s, leaseType: %s, cloud: %v, acMode: %s, perturbation: %+v",
"vcpu: %d, disks: %d, memory: %s, leaseType: %s, cloud: %v, acMode: %s, diskBandwidthLimit %s, "+
"perturbation: %+v",
v.seed, v.fillDuration, v.maxBlockBytes,
v.perturbationDuration, v.validationDuration, v.ratioOfMax, v.splits, v.numNodes, v.numWorkloadNodes,
v.vcpu, v.disks, v.mem, v.leaseType, v.cloud, v.acMode, v.perturbation)
v.vcpu, v.disks, v.mem, v.leaseType, v.cloud, v.acMode, v.diskBandwidthLimit,
v.perturbation)
}

// Normally a single worker can handle 20-40 nodes. If we find this is
Expand All @@ -213,6 +217,7 @@ func (v variations) randomize(rng *rand.Rand) variations {
v.cloud = registry.OnlyGCE
v.mem = memOptions[rng.Intn(len(memOptions))]
v.acMode = admissionControlOptions[rng.Intn(len(admissionControlOptions))]
v.diskBandwidthLimit = diskBandwidthLimitOptions[rng.Intn(len(diskBandwidthLimitOptions))]
return v
}

Expand All @@ -233,6 +238,7 @@ func setup(p perturbation, acceptableChange float64) variations {
v.ratioOfMax = 0.5
v.cloud = registry.OnlyGCE
v.mem = spec.Standard
v.diskBandwidthLimit = "0"
v.perturbation = p
v.profileOptions = []roachtestutil.ProfileOptionFunc{
roachtestutil.ProfDbName("target"),
Expand Down Expand Up @@ -295,6 +301,7 @@ func (v variations) finishSetup() variations {
}
// Enable raft tracing. Remove this once raft tracing is the default.
v.clusterSettings["kv.raft.max_concurrent_traces"] = "10"
v.clusterSettings["kvadmission.store.provisioned_bandwidth"] = v.diskBandwidthLimit
return v
}

Expand Down Expand Up @@ -326,6 +333,8 @@ func (v *variations) applyEnvOverride(key string, val string) (err error) {
if v.mem == -1 {
err = errors.Errorf("unknown memory setting: %s", val)
}
case "diskBandwidthLimit":
v.diskBandwidthLimit = val
case "leaseType":
for _, l := range leases {
if l.String() == val {
Expand Down Expand Up @@ -634,9 +643,7 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)

t.Status("T5: validating results")
require.NoError(t, roachtestutil.DownloadProfiles(ctx, c, t.L(), t.ArtifactsDir()))

require.NoError(t, v.writePerfArtifacts(ctx, t, c, baselineStats, perturbationStats,
afterStats))
require.NoError(t, v.writePerfArtifacts(ctx, t, baselineStats, perturbationStats, afterStats))

t.L().Printf("validating stats during the perturbation")
failures := isAcceptableChange(t.L(), baselineStats, perturbationStats, v.acceptableChange)
Expand Down Expand Up @@ -840,13 +847,10 @@ func sortedStringKeys(m map[string]trackedStat) []string {
// can be picked up by roachperf. Currently it only writes the write stats since
// there would be too many lines on the graph otherwise.
func (v variations) writePerfArtifacts(
ctx context.Context,
t test.Test,
c cluster.Cluster,
baseline, perturbation, recovery map[string]trackedStat,
ctx context.Context, t test.Test, baseline, perturbation, recovery map[string]trackedStat,
) error {

exporter := roachtestutil.CreateWorkloadHistogramExporter(t, c)
exporter := roachtestutil.CreateWorkloadHistogramExporter(t, v)

reg := histogram.NewRegistryWithExporter(
time.Second,
Expand All @@ -857,7 +861,7 @@ func (v variations) writePerfArtifacts(
bytesBuf := bytes.NewBuffer([]byte{})
writer := io.Writer(bytesBuf)
exporter.Init(&writer)
defer roachtestutil.CloseExporter(ctx, exporter, t, c, bytesBuf, v.Node(1), "")
defer roachtestutil.CloseExporter(ctx, exporter, t, v, bytesBuf, v.Node(1), "")

reg.GetHandle().Get("baseline").Record(baseline["write"].score)
reg.GetHandle().Get("perturbation").Record(perturbation["write"].score)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4699,7 +4699,7 @@ func TestPartialPartition(t *testing.T) {
}
for _, test := range testCases {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers), func(t *testing.T) {
testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunValues(t, "lease-type", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
if leaseType == roachpb.LeaseEpoch {
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
Expand Down Expand Up @@ -4817,7 +4817,7 @@ func TestProxyTracing(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunValues(t, "lease-type", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
if leaseType == roachpb.LeaseExpiration {
skip.UnderRace(t, "too slow")
skip.UnderDeadlock(t, "too slow")
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunValues(t, "lease-type", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunTrueAndFalse(t, "served-future-reads", func(t *testing.T, servedFutureReads bool) {
ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestLeaseRequestTypeSwitchForwardsExpiration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunValues(t, "lease-type", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunTrueAndFalse(t, "revoke", func(t *testing.T, revoke bool) {
ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,7 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunValues(t, "lease-type", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
kvserver.OverrideDefaultLeaseType(ctx, &st.SV, roachpb.LeaseEpoch)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func TestStoreRangeMergeTimestampCacheCausality(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "leaseType", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
testutils.RunValues(t, "leaseType", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
ctx := context.Background()
var readTS hlc.Timestamp
rhsKey := scratchKey("c")
Expand Down
Loading

0 comments on commit 7a8707d

Please sign in to comment.