Skip to content

Commit

Permalink
crosscluster: introduce ClusterUri and ConfigUri
Browse files Browse the repository at this point in the history
This change introduces two types to replace StreamAddress. ConfigUri is
the URI supplied by the user to the job. ConfigUri may be an external
connection. ClusterUri is the Uri used to dial nodes in the destination
cluster.

This split was made to support the LDR/PCR over load balancer changes.
The routing mode is defined by a URI parameter and the external
connection needs to be resolved before the routing mode can be
determined. The types allow us to add routing mode as ClusterUri method.

There is one non-cosmetic change in this PR. The PCR job now tries to
use the config URI as the first URI to connect to even if it
checkpointed URIs for individual nodes (see stream_ingestion_job.go).
This matches the behavior of LDR and is needed to properly handle
updates to external URIs.

Release Note: none
Epic: CRDB-40896
  • Loading branch information
jeffswenson committed Jan 9, 2025
1 parent 32b2ea2 commit d939b57
Show file tree
Hide file tree
Showing 46 changed files with 610 additions and 456 deletions.
15 changes: 7 additions & 8 deletions pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"flag"
"fmt"
"net/url"
"os"
"os/signal"
"time"
Expand Down Expand Up @@ -72,13 +71,13 @@ func main() {
fatalf("tenant name required")
}

streamAddr, err := url.Parse(*uri)
uri, err := streamclient.ParseClusterUri(*uri)
if err != nil {
fatalf("parse: %s", err)
}

ctx := cancelOnShutdown(context.Background())
if err := streamPartition(ctx, streamAddr); err != nil {
if err := streamPartition(ctx, uri); err != nil {
if errors.Is(err, context.Canceled) {
exit.WithCode(exit.Interrupted())
} else {
Expand All @@ -87,9 +86,9 @@ func main() {
}
}

func streamPartition(ctx context.Context, streamAddr *url.URL) error {
func streamPartition(ctx context.Context, uri streamclient.ClusterUri) error {
fmt.Println("creating producer stream")
client, err := streamclient.NewPartitionedStreamClient(ctx, streamAddr)
client, err := streamclient.NewPartitionedStreamClient(ctx, uri)
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +131,7 @@ func streamPartition(ctx context.Context, streamAddr *url.URL) error {

fmt.Printf("streaming %s (%s) as of %s\n", *tenant, tenantSpan, sps.InitialScanTimestamp)
if *noParse {
return rawStream(ctx, streamAddr, replicationProducerSpec.StreamID, spsBytes)
return rawStream(ctx, uri, replicationProducerSpec.StreamID, spsBytes)
}

sub, err := client.Subscribe(ctx, replicationProducerSpec.StreamID, 1, 1,
Expand All @@ -151,11 +150,11 @@ func streamPartition(ctx context.Context, streamAddr *url.URL) error {

func rawStream(
ctx context.Context,
uri *url.URL,
uri streamclient.ClusterUri,
streamID streampb.StreamID,
spec streamclient.SubscriptionToken,
) error {
config, err := pgx.ParseConfig(uri.String())
config, err := pgx.ParseConfig(uri.Serialize())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ func getPhase(rd *replicationDriver, dstJobID jobspb.JobID) c2cPhase {
streamIngestProgress := getJobProgress(rd.t, rd.setup.dst.sysSQL, dstJobID).GetStreamIngest()

if streamIngestProgress.ReplicatedTime.IsEmpty() {
if len(streamIngestProgress.StreamAddresses) == 0 {
if len(streamIngestProgress.PartitionConnUris) == 0 {
return phaseNotReady
}
// Only return phaseInitialScan once all available stream addresses from the
Expand Down
1 change: 0 additions & 1 deletion pkg/crosscluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "crosscluster",
srcs = [
"addresses.go",
"crosscluster_type_resolver.go",
"errors.go",
"event.go",
Expand Down
33 changes: 0 additions & 33 deletions pkg/crosscluster/addresses.go

This file was deleted.

13 changes: 8 additions & 5 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,22 @@ func createLogicalReplicationStreamPlanHook(
// txn during statement execution.
p.InternalSQLTxn().Descriptors().ReleaseAll(ctx)

streamAddress := crosscluster.StreamAddress(from)
streamURL, err := streamAddress.URL()
configUri, err := streamclient.ParseConfigUri(from)
if err != nil {
return err
}

clusterUri, err := configUri.AsClusterUri(ctx, p.ExecCfg().InternalDB)
if err != nil {
return err
}
streamAddress = crosscluster.StreamAddress(streamURL.String())

cleanedURI, err := cloud.SanitizeExternalStorageURI(from, nil)
if err != nil {
return err
}

client, err := streamclient.NewStreamClient(ctx, streamAddress, p.ExecCfg().InternalDB, streamclient.WithLogical())
client, err := streamclient.NewStreamClient(ctx, clusterUri, p.ExecCfg().InternalDB, streamclient.WithLogical())
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +262,7 @@ func createLogicalReplicationStreamPlanHook(
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
ReplicationPairs: repPairs,
SourceClusterConnStr: string(streamAddress),
SourceClusterConnUri: configUri.Serialize(),
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
Discard: discard,
Expand Down
13 changes: 6 additions & 7 deletions pkg/crosscluster/logical/logical_replication_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -23,7 +22,7 @@ import (

func constructLogicalReplicationWriterSpecs(
ctx context.Context,
streamAddress crosscluster.StreamAddress,
partitionUri streamclient.ClusterUri,
topology streamclient.Topology,
destSQLInstances []sql.InstanceLocality,
initialScanTimestamp hlc.Timestamp,
Expand All @@ -43,7 +42,7 @@ func constructLogicalReplicationWriterSpecs(
PreviousReplicatedTimestamp: previousReplicatedTimestamp,
InitialScanTimestamp: initialScanTimestamp,
Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info
StreamAddress: string(streamAddress),
PartitionConnUri: partitionUri.Serialize(),
TableMetadataByDestID: tableMetadataByDestID,
Discard: discard,
Mode: mode,
Expand All @@ -69,7 +68,7 @@ func constructLogicalReplicationWriterSpecs(
spec.PartitionSpec = execinfrapb.StreamIngestionPartitionSpec{
PartitionID: partition.ID,
SubscriptionToken: string(partition.SubscriptionToken),
Address: string(partition.SrcAddr),
PartitionConnUri: partition.ConnUri.Serialize(),
Spans: partition.Spans,
SrcInstanceID: base.SQLInstanceID(partition.SrcInstanceID),
DestInstanceID: destID,
Expand All @@ -85,7 +84,7 @@ func constructLogicalReplicationWriterSpecs(

func constructOfflineInitialScanSpecs(
ctx context.Context,
streamAddress crosscluster.StreamAddress,
clusterUri streamclient.ClusterUri,
topology streamclient.Topology,
destSQLInstances []sql.InstanceLocality,
initialScanTimestamp hlc.Timestamp,
Expand All @@ -101,7 +100,7 @@ func constructOfflineInitialScanSpecs(
JobID: int64(jobID),
InitialScanTimestamp: initialScanTimestamp,
Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info
StreamAddress: string(streamAddress),
StreamAddress: clusterUri.Serialize(),
Rekey: rekey,
MetricsLabel: metricsLabel,
}
Expand All @@ -127,7 +126,7 @@ func constructOfflineInitialScanSpecs(
spec.PartitionSpec = execinfrapb.StreamIngestionPartitionSpec{
PartitionID: partition.ID,
SubscriptionToken: string(partition.SubscriptionToken),
Address: string(partition.SrcAddr),
PartitionConnUri: partition.ConnUri.Serialize(),
Spans: partition.Spans,
SrcInstanceID: base.SQLInstanceID(partition.SrcInstanceID),
DestInstanceID: destID,
Expand Down
66 changes: 55 additions & 11 deletions pkg/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,31 @@ func (r *logicalReplicationResumer) updateRunningStatus(
}
}

func (r logicalReplicationResumer) getClusterUris(
ctx context.Context, job *jobs.Job, db *sql.InternalDB,
) ([]streamclient.ClusterUri, error) {
var (
progress = job.Progress().Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
payload = job.Details().(jobspb.LogicalReplicationDetails)
)

clusterUri, err := streamclient.LookupClusterUri(ctx, payload.SourceClusterConnUri, db)
if err != nil {
return nil, err
}

uris := []streamclient.ClusterUri{clusterUri}
for _, uri := range progress.PartitionConnUris {
parsed, err := streamclient.ParseClusterUri(uri)
if err != nil {
return nil, err
}
uris = append(uris, parsed)
}

return uris, nil
}

func (r *logicalReplicationResumer) ingest(
ctx context.Context, jobExecCtx sql.JobExecContext,
) error {
Expand All @@ -129,8 +154,13 @@ func (r *logicalReplicationResumer) ingest(
return err
}

uris, err := r.getClusterUris(ctx, r.job, execCfg.InternalDB)
if err != nil {
return err
}

client, err := streamclient.GetFirstActiveClient(ctx,
append([]string{payload.SourceClusterConnStr}, progress.StreamAddresses...),
uris,
execCfg.InternalDB,
streamclient.WithStreamID(streampb.StreamID(streamID)),
streamclient.WithLogical(),
Expand Down Expand Up @@ -158,7 +188,7 @@ func (r *logicalReplicationResumer) ingest(
}
if err := r.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
ldrProg := md.Progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
ldrProg.StreamAddresses = planInfo.streamAddress
ldrProg.PartitionConnUris = planInfo.partitionPgUrls
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
Expand Down Expand Up @@ -331,7 +361,7 @@ type logicalReplicationPlanner struct {

type logicalReplicationPlanInfo struct {
sourceSpans []roachpb.Span
streamAddress []string
partitionPgUrls []string
destTableBySrcID map[descpb.ID]dstTableMetadata
writeProcessorCount int
}
Expand Down Expand Up @@ -398,7 +428,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
}

info.sourceSpans = plan.SourceSpans
info.streamAddress = plan.Topology.StreamAddresses()
info.partitionPgUrls = plan.Topology.SerializedClusterUris()

var defaultFnOID oid.Oid
if defaultFnID := payload.DefaultConflictResolution.FunctionId; defaultFnID != 0 {
Expand Down Expand Up @@ -465,8 +495,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
return nil, nil, info, err
}

sourceUri, err := streamclient.LookupClusterUri(ctx, payload.SourceClusterConnUri, execCfg.InternalDB)
if err != nil {
return nil, nil, info, err
}

specs, err := constructLogicalReplicationWriterSpecs(ctx,
crosscluster.StreamAddress(payload.SourceClusterConnStr),
sourceUri,
plan.Topology,
destNodeLocalities,
payload.ReplicationStartTime,
Expand Down Expand Up @@ -529,8 +564,8 @@ func (p *logicalReplicationPlanner) planOfflineInitialScan(
progress = p.job.Progress().Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
payload = p.job.Payload().Details.(*jobspb.Payload_LogicalReplicationDetails).LogicalReplicationDetails
info = logicalReplicationPlanInfo{
sourceSpans: plan.SourceSpans,
streamAddress: plan.Topology.StreamAddresses(),
sourceSpans: plan.SourceSpans,
partitionPgUrls: plan.Topology.SerializedClusterUris(),
}
)

Expand Down Expand Up @@ -568,8 +603,13 @@ func (p *logicalReplicationPlanner) planOfflineInitialScan(
return nil, nil, info, err
}

uri, err := streamclient.ParseClusterUri(payload.SourceClusterConnUri)
if err != nil {
return nil, nil, info, err
}

specs, err := constructOfflineInitialScanSpecs(ctx,
crosscluster.StreamAddress(payload.SourceClusterConnStr),
uri,
plan.Topology,
destNodeLocalities,
payload.ReplicationStartTime,
Expand Down Expand Up @@ -821,16 +861,20 @@ func (r *logicalReplicationResumer) completeProducerJob(
ctx context.Context, internalDB *sql.InternalDB,
) {
var (
progress = r.job.Progress().Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
payload = r.job.Details().(jobspb.LogicalReplicationDetails)
payload = r.job.Details().(jobspb.LogicalReplicationDetails)
)

streamID := streampb.StreamID(payload.StreamID)
log.Infof(ctx, "attempting to update producer job %d", streamID)
if err := timeutil.RunWithTimeout(ctx, "complete producer job", 30*time.Second,
func(ctx context.Context) error {
uris, err := r.getClusterUris(ctx, r.job, internalDB)
if err != nil {
return err
}

client, err := streamclient.GetFirstActiveClient(ctx,
append([]string{payload.SourceClusterConnStr}, progress.StreamAddresses...),
uris,
internalDB,
streamclient.WithStreamID(streamID),
streamclient.WithLogical(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ func TestLogicalJobResiliency(t *testing.T) {
WaitUntilReplicatedTime(t, now, dbA, jobAID)

progress := jobutils.GetJobProgress(t, dbA, jobAID)
addresses := progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication.StreamAddresses
addresses := progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication.PartitionConnUris

require.Greaterf(t, len(addresses), 1, "Less than 2 addresses were persisted in system.job_info")
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/crosscluster/logical/logical_replication_writer_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,25 +302,24 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
// Start the subscription for our partition.
partitionSpec := lrw.spec.PartitionSpec
token := streamclient.SubscriptionToken(partitionSpec.SubscriptionToken)
addr := partitionSpec.Address
redactedAddr, redactedErr := streamclient.RedactSourceURI(addr)
if redactedErr != nil {
log.Warning(lrw.Ctx(), "could not redact stream address")
uri, err := streamclient.ParseClusterUri(partitionSpec.PartitionConnUri)
if err != nil {
lrw.MoveToDrainingAndLogError(errors.Wrap(err, "parsing partition uri uri"))
}
streamClient, err := streamclient.NewStreamClient(ctx, crosscluster.StreamAddress(addr), db,
streamClient, err := streamclient.NewStreamClient(ctx, uri, db,
streamclient.WithStreamID(streampb.StreamID(lrw.spec.StreamID)),
streamclient.WithCompression(true),
streamclient.WithLogical(),
)
if err != nil {
lrw.MoveToDrainingAndLogError(errors.Wrapf(err, "creating client for partition spec %q from %q", token, redactedAddr))
lrw.MoveToDrainingAndLogError(errors.Wrapf(err, "creating client for partition spec %q from %q", token, uri.Redacted()))
return
}
lrw.streamPartitionClient = streamClient

if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil {
streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)
streamingKnobs.BeforeClientSubscribe(uri.Serialize(), string(token), lrw.frontier, lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)
}
}
sub, err := streamClient.Subscribe(ctx,
Expand All @@ -335,7 +334,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
streamclient.WithBatchSize(batchSizeSetting.Get(&lrw.FlowCtx.Cfg.Settings.SV)),
)
if err != nil {
lrw.MoveToDrainingAndLogError(errors.Wrapf(err, "subscribing to partition from %s", redactedAddr))
lrw.MoveToDrainingAndLogError(errors.Wrapf(err, "subscribing to partition from %s", uri.Redacted()))
return
}

Expand Down
Loading

0 comments on commit d939b57

Please sign in to comment.