Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#136182

135564: build/teamcity: add changes to enable openmetrics in nightly roachtests r=nameisbhaskar a=sambhav-jain-16

There was a regression in cockroachdb#135239.
The change was reverted and this PR aims to fix the regressions

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-41852

Release note: None

135596: security: bugfix, ensure cert expiry metrics reflect reloaded certs r=angles-n-daemons a=angles-n-daemons

security: bugfix, ensure cert expiry metrics reflect reloaded certs

The PR cockroachdb#130110 added certificate TTL metrics alongside our existing expiration metrics. Prior to that change, the certificate metrics values were updated on each metrics load. Afterwards, new metrics objects were created for each load of certificates.

This created a bug in that the new expiration values would not be found in any of the system exhaust (metrics scrape or tsdb) because the registered metrics objects were the ones created on startup.

This new change instead allows the metrics to close the whole CertificateManager object, so that they only need to be created once, and therefore the initial registration of metrics reflects persistently valid values.

Release note (bug fix): security.certificate.* metrics will now be updated if a node loads new certificates while running.

Epic: none
Fixes: cockroachdb#135093

136122: crosscluster/physical: update tokens when altering topology r=dt a=dt

Release note: none.
Epic: none.

This regressed in cockroachdb#135637 which was assigning all conusmer sub-partitions the whole partition due to sharing the original token.

136182: kvserver: use leader leases in various flow control tests r=kvoli a=arulajmani

See individual commits for details. 

Co-authored-by: Sambhav Jain <[email protected]>
Co-authored-by: Brian Dillmann <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
5 people committed Nov 26, 2024
5 parents 7342c63 + dcbff22 + 0e19000 + 9835356 + 4d9dbf3 commit b6bfe7b
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ dir="$(dirname $(dirname $(dirname $(dirname "${0}"))))"
source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e AWS_ACCESS_KEY_ID -e AWS_ACCESS_KEY_ID_ASSUME_ROLE -e AWS_KMS_KEY_ARN_A -e AWS_KMS_KEY_ARN_B -e AWS_KMS_REGION_A -e AWS_KMS_REGION_B -e AWS_ROLE_ARN -e AWS_SECRET_ACCESS_KEY -e AWS_SECRET_ACCESS_KEY_ASSUME_ROLE -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e SELECT_PROBABILITY -e COCKROACH_RANDOM_SEED -e ROACHTEST_ASSERTIONS_ENABLED_SEED -e ROACHTEST_FORCE_RUN_INVALID_RELEASE_BRANCH -e ARM_PROBABILITY -e USE_SPOT -e SELECTIVE_TESTS -e SFUSER -e SFPASSWORD -e SIDE_EYE_API_TOKEN -e COCKROACH_EA_PROBABILITY" \
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e AWS_ACCESS_KEY_ID -e AWS_ACCESS_KEY_ID_ASSUME_ROLE -e AWS_KMS_KEY_ARN_A -e AWS_KMS_KEY_ARN_B -e AWS_KMS_REGION_A -e AWS_KMS_REGION_B -e AWS_ROLE_ARN -e AWS_SECRET_ACCESS_KEY -e AWS_SECRET_ACCESS_KEY_ASSUME_ROLE -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e SELECT_PROBABILITY -e COCKROACH_RANDOM_SEED -e ROACHTEST_ASSERTIONS_ENABLED_SEED -e ROACHTEST_FORCE_RUN_INVALID_RELEASE_BRANCH -e ARM_PROBABILITY -e USE_SPOT -e SELECTIVE_TESTS -e SFUSER -e SFPASSWORD -e SIDE_EYE_API_TOKEN -e COCKROACH_EA_PROBABILITY -e EXPORT_OPENMETRICS -e ROACHPERF_OPENMETRICS_CREDENTIALS" \
run_bazel build/teamcity/cockroach/nightlies/roachtest_nightly_impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ dir="$(dirname $(dirname $(dirname $(dirname "${0}"))))"
source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e AZURE_CLIENT_ID -e AZURE_CLIENT_SECRET -e AZURE_SUBSCRIPTION_ID -e AZURE_TENANT_ID -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e SELECT_PROBABILITY -e COCKROACH_RANDOM_SEED -e ROACHTEST_ASSERTIONS_ENABLED_SEED -e ROACHTEST_FORCE_RUN_INVALID_RELEASE_BRANCH -e CLEAR_CLUSTER_CACHE -e USE_SPOT -e SELECTIVE_TESTS -e SFUSER -e SFPASSWORD -e SIDE_EYE_API_TOKEN -e COCKROACH_EA_PROBABILITY" \
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e AZURE_CLIENT_ID -e AZURE_CLIENT_SECRET -e AZURE_SUBSCRIPTION_ID -e AZURE_TENANT_ID -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e SELECT_PROBABILITY -e COCKROACH_RANDOM_SEED -e ROACHTEST_ASSERTIONS_ENABLED_SEED -e ROACHTEST_FORCE_RUN_INVALID_RELEASE_BRANCH -e CLEAR_CLUSTER_CACHE -e USE_SPOT -e SELECTIVE_TESTS -e SFUSER -e SFPASSWORD -e SIDE_EYE_API_TOKEN -e COCKROACH_EA_PROBABILITY -e EXPORT_OPENMETRICS -e ROACHPERF_OPENMETRICS_CREDENTIALS" \
run_bazel build/teamcity/cockroach/nightlies/roachtest_nightly_impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ dir="$(dirname $(dirname $(dirname $(dirname "${0}"))))"
source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e GOOGLE_KMS_KEY_A -e GOOGLE_KMS_KEY_B -e GOOGLE_CREDENTIALS_ASSUME_ROLE -e GOOGLE_SERVICE_ACCOUNT -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e SELECT_PROBABILITY -e COCKROACH_RANDOM_SEED -e ROACHTEST_ASSERTIONS_ENABLED_SEED -e ROACHTEST_FORCE_RUN_INVALID_RELEASE_BRANCH -e GRAFANA_SERVICE_ACCOUNT_JSON -e GRAFANA_SERVICE_ACCOUNT_AUDIENCE -e ARM_PROBABILITY -e USE_SPOT -e SELECTIVE_TESTS -e SFUSER -e SFPASSWORD -e SIDE_EYE_API_TOKEN -e COCKROACH_EA_PROBABILITY" \
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e GOOGLE_KMS_KEY_A -e GOOGLE_KMS_KEY_B -e GOOGLE_CREDENTIALS_ASSUME_ROLE -e GOOGLE_SERVICE_ACCOUNT -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e SELECT_PROBABILITY -e COCKROACH_RANDOM_SEED -e ROACHTEST_ASSERTIONS_ENABLED_SEED -e ROACHTEST_FORCE_RUN_INVALID_RELEASE_BRANCH -e GRAFANA_SERVICE_ACCOUNT_JSON -e GRAFANA_SERVICE_ACCOUNT_AUDIENCE -e ARM_PROBABILITY -e USE_SPOT -e SELECTIVE_TESTS -e SFUSER -e SFPASSWORD -e SIDE_EYE_API_TOKEN -e COCKROACH_EA_PROBABILITY -e EXPORT_OPENMETRICS -e ROACHPERF_OPENMETRICS_CREDENTIALS" \
run_bazel build/teamcity/cockroach/nightlies/roachtest_nightly_impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ build/teamcity-roachtest-invoke.sh \
--suite nightly \
--selective-tests="${SELECTIVE_TESTS:-true}" \
--side-eye-token="${SIDE_EYE_API_TOKEN}" \
--export-openmetrics="${EXPORT_OPENMETRICS:-false}" \
"${TESTS}"
2 changes: 1 addition & 1 deletion build/teamcity/cockroach/nightlies/roachtest_weekly.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ dir="$(dirname $(dirname $(dirname $(dirname "${0}"))))"
source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e GRAFANA_SERVICE_ACCOUNT_JSON -e GRAFANA_SERVICE_ACCOUNT_AUDIENCE -e ARM_PROBABILITY -e USE_SPOT -e SIDE_EYE_API_TOKEN" \
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e GRAFANA_SERVICE_ACCOUNT_JSON -e GRAFANA_SERVICE_ACCOUNT_AUDIENCE -e ARM_PROBABILITY -e USE_SPOT -e SIDE_EYE_API_TOKEN -e EXPORT_OPENMETRICS -e ROACHPERF_OPENMETRICS_CREDENTIALS" \
run_bazel build/teamcity/cockroach/nightlies/roachtest_weekly_impl.sh
2 changes: 1 addition & 1 deletion build/teamcity/cockroach/nightlies/roachtest_weekly_aws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ dir="$(dirname $(dirname $(dirname $(dirname "${0}"))))"
source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e AWS_ACCESS_KEY_ID -e AWS_ACCESS_KEY_ID_ASSUME_ROLE -e AWS_KMS_KEY_ARN_A -e AWS_KMS_KEY_ARN_B -e AWS_KMS_REGION_A -e AWS_KMS_REGION_B -e AWS_ROLE_ARN -e AWS_SECRET_ACCESS_KEY -e AWS_SECRET_ACCESS_KEY_ASSUME_ROLE -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e ARM_PROBABILITY -e USE_SPOT -e SIDE_EYE_API_TOKEN" \
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e LITERAL_ARTIFACTS_DIR=$root/artifacts -e AWS_ACCESS_KEY_ID -e AWS_ACCESS_KEY_ID_ASSUME_ROLE -e AWS_KMS_KEY_ARN_A -e AWS_KMS_KEY_ARN_B -e AWS_KMS_REGION_A -e AWS_KMS_REGION_B -e AWS_ROLE_ARN -e AWS_SECRET_ACCESS_KEY -e AWS_SECRET_ACCESS_KEY_ASSUME_ROLE -e BUILD_VCS_NUMBER -e CLOUD -e COCKROACH_DEV_LICENSE -e TESTS -e COUNT -e GITHUB_API_TOKEN -e GITHUB_ORG -e GITHUB_REPO -e GOOGLE_EPHEMERAL_CREDENTIALS -e SLACK_TOKEN -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e ARM_PROBABILITY -e USE_SPOT -e SIDE_EYE_API_TOKEN -e EXPORT_OPENMETRICS -e ROACHPERF_OPENMETRICS_CREDENTIALS" \
run_bazel build/teamcity/cockroach/nightlies/roachtest_weekly_impl.sh
3 changes: 2 additions & 1 deletion build/teamcity/cockroach/nightlies/roachtest_weekly_impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ timeout -s INT $((7800*60)) build/teamcity-roachtest-invoke.sh \
--metamorphic-arm64-probability="${ARM_PROBABILITY:-0.5}" \
--use-spot="${USE_SPOT:-auto}" \
--slack-token="${SLACK_TOKEN}" \
--side-eye-token="${SIDE_EYE_API_TOKEN}"
--side-eye-token="${SIDE_EYE_API_TOKEN}" \
--export-openmetrics="${EXPORT_OPENMETRICS:-false}"
98 changes: 59 additions & 39 deletions build/teamcity/util/roachtest_util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,70 @@ source $root/build/teamcity/util/roachtest_arch_util.sh
# date at the time of the start of the run (which identifies the version of the
# code run best).
stats_dir="$(date +"%Y%m%d")-${TC_BUILD_ID}"
stats_file_name="stats.json"

# Provide a default value for EXPORT_OPENMETRICS if it is not set
EXPORT_OPENMETRICS="${EXPORT_OPENMETRICS:-false}"

if [[ "${EXPORT_OPENMETRICS}" == "true" ]]; then
stats_file_name="stats.om"
fi

# Set up a function we'll invoke at the end.
function upload_stats {
if tc_release_branch; then
bucket="${ROACHTEST_BUCKET:-cockroach-nightly-${CLOUD}}"
if [[ "${CLOUD}" == "gce" ]]; then
# GCE, having been there first, gets an exemption.
bucket="cockroach-nightly"
fi
bucket="${ROACHTEST_BUCKET:-cockroach-nightly-${CLOUD}}"
if [[ "${EXPORT_OPENMETRICS}" == "true" ]]; then

branch=$(tc_build_branch)
remote_artifacts_dir="artifacts-${branch}"
if [[ "${branch}" == "master" ]]; then
# The master branch is special, as roachperf hard-codes
# the location.
remote_artifacts_dir="artifacts"
fi
# TODO: FIPS_ENABLED is deprecated, use roachtest --metamorphic-fips-probability, instead.
# In FIPS-mode, keep artifacts separate by using the 'fips' suffix.
if [[ ${FIPS_ENABLED:-0} == 1 ]]; then
remote_artifacts_dir="${remote_artifacts_dir}-fips"
fi

# The stats.json files need some path translation:
# ${artifacts}/path/to/test/stats.json
# to
# gs://${bucket}/artifacts/${stats_dir}/path/to/test/stats.json
#
# `find` below will expand "{}" as ./path/to/test/stats.json. We need
# to bend over backwards to remove the `./` prefix or gsutil will have
# a `.` folder in ${stats_dir}, which we don't want.
(cd "${artifacts}" && \
while IFS= read -r f; do
if [[ -n "${f}" ]]; then
artifacts_dir="${remote_artifacts_dir}"
# If 'cpu_arch=xxx' is encoded in the path, use it as suffix to separate artifacts by cpu_arch.
if [[ "${f}" == *"/cpu_arch=arm64/"* ]]; then
artifacts_dir="${artifacts_dir}-arm64"
elif [[ "${f}" == *"/cpu_arch=fips/"* ]]; then
artifacts_dir="${artifacts_dir}-fips"
fi
gsutil cp "${f}" "gs://${bucket}/${artifacts_dir}/${stats_dir}/${f}"
# TODO(sambhav-jain-16): Change the bucket after new buckets are created
bucket="${ROACHTEST_BUCKET:-cockroach-testeng-metrics/omloader/incoming/${CLOUD}}"
fi

if [[ "${CLOUD}" == "gce" && "${EXPORT_OPENMETRICS}" == "false" ]]; then
# GCE, having been there first, gets an exemption.
bucket="cockroach-nightly"
fi

branch=$(tc_build_branch)
remote_artifacts_dir="artifacts-${branch}"
if [[ "${branch}" == "master" ]]; then
# The master branch is special, as roachperf hard-codes
# the location.
remote_artifacts_dir="artifacts"
fi
# TODO: FIPS_ENABLED is deprecated, use roachtest --metamorphic-fips-probability, instead.
# In FIPS-mode, keep artifacts separate by using the 'fips' suffix.
if [[ ${FIPS_ENABLED:-0} == 1 ]]; then
remote_artifacts_dir="${remote_artifacts_dir}-fips"
fi

# If using openmetrics, activate new service account for uploading to openmetrics bucket
if [[ "${EXPORT_OPENMETRICS}" == "true" && "$ROACHPERF_OPENMETRICS_CREDENTIALS" ]]; then
echo "$ROACHPERF_OPENMETRICS_CREDENTIALS" > roachperf.json
gcloud auth activate-service-account --key-file=roachperf.json
fi

# The ${stats_file_name} files need some path translation:
# ${artifacts}/path/to/test/${stats_file_name}
# to
# gs://${bucket}/artifacts/${stats_dir}/path/to/test/${stats_file_name}
#
# `find` below will expand "{}" as ./path/to/test/${stats_file_name}. We need
# to bend over backwards to remove the `./` prefix or gsutil will have
# a `.` folder in ${stats_dir}, which we don't want.
(cd "${artifacts}" && \
while IFS= read -r f; do
if [[ -n "${f}" ]]; then
artifacts_dir="${remote_artifacts_dir}"
# If 'cpu_arch=xxx' is encoded in the path, use it as suffix to separate artifacts by cpu_arch.
if [[ "${f}" == *"/cpu_arch=arm64/"* ]]; then
artifacts_dir="${artifacts_dir}-arm64"
elif [[ "${f}" == *"/cpu_arch=fips/"* ]]; then
artifacts_dir="${artifacts_dir}-fips"
fi
done <<< "$(find . -name stats.json | sed 's/^\.\///')")
gsutil cp "${f}" "gs://${bucket}/${artifacts_dir}/${stats_dir}/${f}"
fi
done <<< "$(find . -name ${stats_file_name} | sed 's/^\.\///')")
fi
}

Expand All @@ -92,7 +112,7 @@ function upload_all {
upload_binaries
}

# Upload any stats.json we can find, and some binaries, no matter what happens.
# Upload any ${stats_file_name} we can find, and some binaries, no matter what happens.
trap upload_all EXIT

# Set up the parameters for the roachtest invocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,13 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {

_, ingestionJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlDB, "30")

// Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running.
allowResponse <- struct{}{}
// Start the ingestion stream and wait (for up to a minute) for at least one
// AddSSTable to ensure the job is running.
select {
case allowResponse <- struct{}{}:
case <-time.After(time.Minute):
t.Fatal("timed out waiting for stream ingestion to send an sst")
}
close(allowResponse)

// Wait for the job to signal that it is ready to be cutover, after it has
Expand Down
33 changes: 28 additions & 5 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -491,10 +492,12 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) {
return p.srcTenantID, nil
}

func repartitionTopology(in streamclient.Topology, targetPartCount int) streamclient.Topology {
func repartitionTopology(
in streamclient.Topology, targetPartCount int,
) (streamclient.Topology, error) {
growth := targetPartCount / len(in.Partitions)
if growth <= 1 {
return in
return in, nil
}

// Copy the topology and allocate a new partition slice.
Expand All @@ -504,15 +507,32 @@ func repartitionTopology(in streamclient.Topology, targetPartCount int) streamcl
// output each containing some fraction of its spans.
for _, p := range in.Partitions {
chunk := len(p.Spans)/growth + 1

// If this partition has too few spans to split, just add it as is. This is
// not strictly required; we could just let the below logic make one chunk
// as an instance of the general case and get the same result. However if we
// skip it, we preserve the original "token" for small partitions, which has
// no effect in production code but avoids clobbering special "randomgen"
// tokens used in a handful of (small partition) unit tests.
if len(p.Spans) <= chunk {
out.Partitions = append(out.Partitions, p)
continue
}

// Add chunks of spans to the output partitions until all are added.
for len(p.Spans) > 0 {
c := p
c.Spans = p.Spans[:min(chunk, len(p.Spans))]
tok, err := protoutil.Marshal(&streampb.SourcePartition{Spans: c.Spans})
if err != nil {
return out, err
}
c.SubscriptionToken = tok
out.Partitions = append(out.Partitions, c)
p.Spans = p.Spans[len(c.Spans):]
}
}

return out
return out, nil
}

func (p *replicationFlowPlanner) constructPlanGenerator(
Expand Down Expand Up @@ -542,7 +562,10 @@ func (p *replicationFlowPlanner) constructPlanGenerator(

// If we have fewer partitions than we have nodes, try to repartition the
// topology to have more partitions.
topology = repartitionTopology(topology, len(sqlInstanceIDs)*8)
topology, err = repartitionTopology(topology, len(sqlInstanceIDs)*8)
if err != nil {
return nil, nil, err
}

if !p.createdInitialPlan() {
p.initialTopology = topology
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,9 @@ func TestRepartition(t *testing.T) {
{p(1, 1, 0), p(2, 1, 0), p(3, 1, 0)},
{p(1, 43, 0), p(2, 44, 0), p(3, 38, 0)},
} {
got := repartitionTopology(streamclient.Topology{Partitions: input}, parts)
got, err := repartitionTopology(streamclient.Topology{Partitions: input}, parts)

require.NoError(t, err)

var expectedSpans, gotSpans roachpb.Spans
for _, part := range input {
Expand Down
Loading

0 comments on commit b6bfe7b

Please sign in to comment.