Skip to content

Commit

Permalink
Add option to adjust ReadChangeStream API timeout to allow longer tim…
Browse files Browse the repository at this point in the history
…eout for longer checkpoint durations (#33001)

* Add option to adjust ReadChangeStream API timeout to allow longer timeout for longer checkpoint durations

Change-Id: Ia61388bed5997b7138b15a014d6370d49cbf5277

* Update comment to clarify timeout padding

Change-Id: If34866637045314dfeef1807122cc34b97cc5ddc

* Move default change stream timeout into the accessor to avoid confusion about where default is set

Change-Id: Ia76ff2e3ba4f6a91299e907ec404b720e900c76f

* fix format issue

Change-Id: I89ba1d48742d5204168249d632accf86d68e140c
  • Loading branch information
tonytanger authored Jan 14, 2025
1 parent bb2e0ad commit 44ad80a
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,8 @@ static ReadChangeStream create() {

abstract @Nullable Duration getBacklogReplicationAdjustment();

abstract @Nullable Duration getReadChangeStreamTimeout();

abstract @Nullable Boolean getValidateConfig();

abstract ReadChangeStream.Builder toBuilder();
Expand Down Expand Up @@ -2351,6 +2353,22 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) {
return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that overrides timeout for ReadChangeStream
* requests.
*
* <p>This is useful to override the default of 15s timeout if the checkpoint duration is longer
* than 15s. Setting this value to longer (to add some padding) than periodic checkpoint
* duration ensures that ReadChangeStream will stream until the next checkpoint is initiated.
*
* <p>Optional: defaults to 15 seconds.
*
* <p>Does not modify this object.
*/
public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) {
return toBuilder().setReadChangeStreamTimeout(timeout).build();
}

/**
* Disables validation that the table being read and the metadata table exists, and that the app
* profile used is single cluster and single row transaction enabled. Set this option if the
Expand Down Expand Up @@ -2466,6 +2484,7 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
DaoFactory daoFactory =
new DaoFactory(
bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName);
daoFactory.setReadChangeStreamTimeout(getReadChangeStreamTimeout());

// Validate the configuration is correct before creating the pipeline, if required.
try {
Expand Down Expand Up @@ -2542,6 +2561,8 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions(

abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment);

abstract ReadChangeStream.Builder setReadChangeStreamTimeout(Duration timeout);

abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig);

abstract ReadChangeStream build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class BigtableChangeStreamAccessor implements AutoCloseable {
// Create one bigtable data/admin client per bigtable config (project/instance/table/app profile)
private static final ConcurrentHashMap<BigtableConfig, BigtableChangeStreamAccessor>
bigtableAccessors = new ConcurrentHashMap<>();
private static Duration readChangeStreamTimeout = Duration.ofSeconds(15);

private final BigtableDataClient dataClient;
private final BigtableTableAdminClient tableAdminClient;
Expand Down Expand Up @@ -83,6 +84,10 @@ public synchronized void close() {
bigtableAccessors.remove(bigtableConfig);
}

public static void setReadChangeStreamTimeout(Duration timeout) {
readChangeStreamTimeout = timeout;
}

/**
* Create a BigtableAccess if it doesn't exist and store it in the cache for faster access. If it
* does exist, just return it.
Expand Down Expand Up @@ -204,9 +209,9 @@ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConf
.readChangeStreamSettings()
.setRetrySettings(
readChangeStreamRetrySettings
.setInitialRpcTimeout(Duration.ofSeconds(15))
.setTotalTimeout(Duration.ofSeconds(15))
.setMaxRpcTimeout(Duration.ofSeconds(15))
.setInitialRpcTimeout(readChangeStreamTimeout)
.setTotalTimeout(readChangeStreamTimeout)
.setMaxRpcTimeout(readChangeStreamTimeout)
.setMaxAttempts(10)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import java.io.Serializable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

// Allows transient fields to be intialized later
@SuppressWarnings("initialization.fields.uninitialized")
@Internal
public class DaoFactory implements Serializable, AutoCloseable {
private static final long serialVersionUID = 3732208768248394205L;
private static final long serialVersionUID = -3423959768580600281L;

private transient ChangeStreamDao changeStreamDao;
private transient MetadataTableAdminDao metadataTableAdminDao;
Expand All @@ -45,6 +47,8 @@ public class DaoFactory implements Serializable, AutoCloseable {
private final String metadataTableId;
private final String changeStreamName;

private @Nullable Duration readChangeStreamTimeout;

public DaoFactory(
BigtableConfig changeStreamConfig,
BigtableConfig metadataTableConfig,
Expand All @@ -71,6 +75,10 @@ public void close() {
}
}

public void setReadChangeStreamTimeout(@Nullable Duration readChangeStreamTimeout) {
this.readChangeStreamTimeout = readChangeStreamTimeout;
}

public String getChangeStreamName() {
return changeStreamName;
}
Expand Down Expand Up @@ -106,6 +114,10 @@ public synchronized ChangeStreamDao getChangeStreamDao() throws IOException {
checkArgumentNotNull(changeStreamConfig.getProjectId());
checkArgumentNotNull(changeStreamConfig.getInstanceId());
checkArgumentNotNull(changeStreamConfig.getAppProfileId());
if (readChangeStreamTimeout != null) {
BigtableChangeStreamAccessor.setReadChangeStreamTimeout(
org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.getMillis()));
}
BigtableDataClient dataClient =
BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient();
changeStreamDao = new ChangeStreamDao(dataClient, this.tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,7 @@ public void testReadChangeStreamBuildsCorrectly() {
.withMetadataTableAppProfileId("metadata-app-profile")
.withStartTime(startTime)
.withBacklogReplicationAdjustment(Duration.standardMinutes(1))
.withReadChangeStreamTimeout(Duration.standardMinutes(1))
.withCreateOrUpdateMetadataTable(false)
.withExistingPipelineOptions(BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
assertEquals("project", readChangeStream.getBigtableConfig().getProjectId().get());
Expand All @@ -2071,6 +2072,7 @@ public void testReadChangeStreamBuildsCorrectly() {
assertEquals("change-stream-name", readChangeStream.getChangeStreamName());
assertEquals(startTime, readChangeStream.getStartTime());
assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment());
assertEquals(Duration.standardMinutes(1), readChangeStream.getReadChangeStreamTimeout());
assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable());
assertEquals(
BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS,
Expand Down

0 comments on commit 44ad80a

Please sign in to comment.