Skip to content

Commit

Permalink
changes in change stream
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarquezp committed Dec 5, 2024
1 parent e2b1621 commit dde9326
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static Builder createUserMutation(
.setRowKey(rowKey)
.setType(MutationType.USER)
.setSourceClusterId(sourceClusterId)
.setCommitTimestampInstant(commitTimestamp)
.setCommitTime(commitTimestamp)
.setTieBreaker(tieBreaker);
}

Expand All @@ -97,7 +97,7 @@ static Builder createGcMutation(
.setRowKey(rowKey)
.setType(MutationType.GARBAGE_COLLECTION)
.setSourceClusterId("")
.setCommitTimestampInstant(commitTimestamp)
.setCommitTime(commitTimestamp)
.setTieBreaker(tieBreaker);
}

Expand All @@ -113,12 +113,12 @@ static Builder createGcMutation(
@Nonnull
public abstract String getSourceClusterId();

/** This method is obsolete. Use {@link #getCommitTimestampInstant()} instead. */
@ObsoleteApi("Use getCommitTimestampInstant() instead")
/** This method is obsolete. Use {@link #getCommitTime()} instead. */
@ObsoleteApi("Use getCommitTime() instead")
public abstract org.threeten.bp.Instant getCommitTimestamp();

/** Get the commit timestamp of the current mutation. */
public java.time.Instant getCommitTimestampInstant() {
public java.time.Instant getCommitTime() {
return toJavaTimeInstant(getCommitTimestamp());
}

Expand All @@ -132,12 +132,12 @@ public java.time.Instant getCommitTimestampInstant() {
@Nonnull
public abstract String getToken();

/** This method is obsolete. Use {@link #getCommitTimestampInstant()} instead. */
@ObsoleteApi("Use getEstimatedLowWatermarkInstant() instead")
/** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */
@ObsoleteApi("Use getEstimatedLowWatermarkTime() instead")
public abstract org.threeten.bp.Instant getEstimatedLowWatermark();

/** Get the low watermark of the current mutation. */
public java.time.Instant getEstimatedLowWatermarkInstant() {
public java.time.Instant getEstimatedLowWatermarkTime() {
return toJavaTimeInstant(getEstimatedLowWatermark());
}

Expand All @@ -160,14 +160,12 @@ abstract static class Builder {

abstract Builder setSourceClusterId(@Nonnull String sourceClusterId);

Builder setCommitTimestampInstant(java.time.Instant commitTimestamp) {
Builder setCommitTime(java.time.Instant commitTimestamp) {
return setCommitTimestamp(toThreetenInstant(commitTimestamp));
}

/**
* This method is obsolete. Use {@link #setCommitTimestampInstant(java.time.Instant)} instead.
*/
@ObsoleteApi("Use setCommitTimestampInstant(java.time.Instant) instead")
/** This method is obsolete. Use {@link #setCommitTime(java.time.Instant)} instead. */
@ObsoleteApi("Use setCommitTime(java.time.Instant) instead")
abstract Builder setCommitTimestamp(org.threeten.bp.Instant commitTimestamp);

abstract Builder setTieBreaker(int tieBreaker);
Expand All @@ -176,14 +174,11 @@ Builder setCommitTimestampInstant(java.time.Instant commitTimestamp) {

abstract Builder setToken(@Nonnull String token);

Builder setEstimatedLowWatermarkInstant(java.time.Instant estimatedLowWatermark) {
Builder setLowWatermarkTime(java.time.Instant estimatedLowWatermark) {
return setEstimatedLowWatermark(toThreetenInstant(estimatedLowWatermark));
}

/**
* This method is obsolete. Use {@link #setEstimatedLowWatermarkInstant(java.time.Instant)}
* instead.
*/
/** This method is obsolete. Use {@link #setLowWatermarkTime(java.time.Instant)} instead. */
@ObsoleteApi("Use setEstimatedLowWatermarkInstant(java.time.Instant) instead")
abstract Builder setEstimatedLowWatermark(org.threeten.bp.Instant estimatedLowWatermark);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;

import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -110,47 +107,24 @@ interface ChangeStreamRecordBuilder<ChangeStreamRecordT> {
*/
ChangeStreamRecordT onCloseStream(ReadChangeStreamResponse.CloseStream closeStream);

/**
* This method is obsolete. Use {@link #startUserMutationInstant(ByteString, String,
* java.time.Instant, int)} instead.
*/
@ObsoleteApi("Use startUserMutationInstant(ByteString, String, java.time.Instant, int) instead")
void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
org.threeten.bp.Instant commitTimestamp,
int tieBreaker);

/**
* Called to start a new user initiated ChangeStreamMutation. This will be called at most once.
* If called, the current change stream record must not include any close stream message or
* heartbeat.
*/
default void startUserMutationInstant(
void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
java.time.Instant commitTimestamp,
int tieBreaker) {
startUserMutation(rowKey, sourceClusterId, toThreetenInstant(commitTimestamp), tieBreaker);
}

/**
* This method is obsolete. Use {@link #startGcMutationInstant(ByteString, java.time.Instant,
* int)} instead.
*/
@ObsoleteApi("Use startGcMutationInstant(ByteString, java.time.Instant, int) instead")
void startGcMutation(
@Nonnull ByteString rowKey, org.threeten.bp.Instant commitTimestamp, int tieBreaker);
int tieBreaker);

/**
* Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most
* once. If called, the current change stream record must not include any close stream message
* or heartbeat.
*/
default void startGcMutationInstant(
@Nonnull ByteString rowKey, java.time.Instant commitTimestamp, int tieBreaker) {
startGcMutation(rowKey, toThreetenInstant(commitTimestamp), tieBreaker);
}
void startGcMutation(
@Nonnull ByteString rowKey, java.time.Instant commitTimestamp, int tieBreaker);

/** Called to add a DeleteFamily mod. */
void deleteFamily(@Nonnull String familyName);
Expand Down Expand Up @@ -201,19 +175,9 @@ void mergeToCell(
/** Called once per cell to signal the end of the value (unless reset). */
void finishCell();

/**
* This method is obsolete. Use {@link #finishChangeStreamMutationInstant(String,
* java.time.Instant)} instead.
*/
@ObsoleteApi("Use finishChangeStreamMutationInstant(String, java.time.Instant) instead")
ChangeStreamRecordT finishChangeStreamMutation(
@Nonnull String token, org.threeten.bp.Instant estimatedLowWatermark);

/** Called once per stream record to signal that all mods have been processed (unless reset). */
default ChangeStreamRecordT finishChangeStreamMutationInstant(
@Nonnull String token, java.time.Instant estimatedLowWatermark) {
return finishChangeStreamMutation(token, toThreetenInstant(estimatedLowWatermark));
}
ChangeStreamRecordT finishChangeStreamMutation(
@Nonnull String token, java.time.Instant estimatedLowWatermark);

/** Called when the current in progress change stream record should be dropped */
void reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant;

import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.time.Instant;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -102,10 +101,10 @@ public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream clo

/** {@inheritDoc} */
@Override
public void startUserMutationInstant(
public void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
java.time.Instant commitTimestamp,
Instant commitTimestamp,
int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createUserMutation(
Expand All @@ -114,27 +113,7 @@ public void startUserMutationInstant(

/** {@inheritDoc} */
@Override
@ObsoleteApi("Use startUserMutationInstant(ByteString, String, java.time.Instant, int) instead")
public void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
org.threeten.bp.Instant commitTimestamp,
int tieBreaker) {
startUserMutationInstant(
rowKey, sourceClusterId, toJavaTimeInstant(commitTimestamp), tieBreaker);
}

/** {@inheritDoc} */
@Override
@ObsoleteApi("Use startUserMutationInstant(ByteString, java.time.Instant, int) instead")
public void startGcMutation(
@Nonnull ByteString rowKey, org.threeten.bp.Instant commitTimestamp, int tieBreaker) {
startGcMutationInstant(rowKey, toJavaTimeInstant(commitTimestamp), tieBreaker);
}

/** {@inheritDoc} */
@Override
public void startGcMutationInstant(
ByteString rowKey, java.time.Instant commitTimestamp, int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker);
Expand Down Expand Up @@ -197,18 +176,10 @@ public void finishCell() {

/** {@inheritDoc} */
@Override
@ObsoleteApi("Use finishChangeStreamMutationInstant(String, java.time.Instant) instead")
public ChangeStreamRecord finishChangeStreamMutation(
@Nonnull String token, org.threeten.bp.Instant estimatedLowWatermark) {
return finishChangeStreamMutationInstant(token, toJavaTimeInstant(estimatedLowWatermark));
}

/** {@inheritDoc} */
@Override
public ChangeStreamRecord finishChangeStreamMutationInstant(
String token, java.time.Instant estimatedLowWatermark) {
this.changeStreamMutationBuilder.setToken(token);
this.changeStreamMutationBuilder.setEstimatedLowWatermarkInstant(estimatedLowWatermark);
this.changeStreamMutationBuilder.setLowWatermarkTime(estimatedLowWatermark);
return this.changeStreamMutationBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ public ReadChangeStreamQuery streamPartition(ByteStringRange range) {
return streamPartition(rangeBuilder.build());
}

/** This method is obsolete. Use {@link #startTimeInstant(java.time.Instant)} instead. */
@ObsoleteApi("Use startTimeInstant(java.time.Instant) instead")
/** This method is obsolete. Use {@link #startTime(java.time.Instant)} instead. */
@ObsoleteApi("Use startTime(java.time.Instant) instead")
public ReadChangeStreamQuery startTime(org.threeten.bp.Instant value) {
return startTimeInstant(toJavaTimeInstant(value));
return startTime(toJavaTimeInstant(value));
}

/** Sets the startTime to read the change stream. */
public ReadChangeStreamQuery startTimeInstant(java.time.Instant value) {
public ReadChangeStreamQuery startTime(java.time.Instant value) {
Preconditions.checkState(
!builder.hasContinuationTokens(),
"startTime and continuationTokens can't be specified together");
Expand All @@ -165,14 +165,14 @@ public ReadChangeStreamQuery startTimeInstant(java.time.Instant value) {
return this;
}

/** This method is obsolete. Use {@link #endTimeInstant(java.time.Instant)} instead. */
@ObsoleteApi("Use endTimeInstant(java.time.Instant) instead")
/** This method is obsolete. Use {@link #endTime(java.time.Instant)} instead. */
@ObsoleteApi("Use endTime(java.time.Instant) instead")
public ReadChangeStreamQuery endTime(org.threeten.bp.Instant value) {
return endTimeInstant(toJavaTimeInstant(value));
return endTime(toJavaTimeInstant(value));
}

/** Sets the endTime to read the change stream. */
public ReadChangeStreamQuery endTimeInstant(java.time.Instant value) {
public ReadChangeStreamQuery endTime(java.time.Instant value) {
builder.setEndTime(
Timestamp.newBuilder()
.setSeconds(value.getEpochSecond())
Expand All @@ -196,16 +196,14 @@ public ReadChangeStreamQuery continuationTokens(
return this;
}

/**
* This method is obsolete. Use {@link #heartbeatDurationJavaTime(java.time.Duration)} instead.
*/
@ObsoleteApi("Use heartbeatDurationJavaTime(java.time.Duration) instead")
/** This method is obsolete. Use {@link #heartbeatDuration(java.time.Duration)} instead. */
@ObsoleteApi("Use heartbeatDuration(java.time.Duration) instead")
public ReadChangeStreamQuery heartbeatDuration(org.threeten.bp.Duration duration) {
return heartbeatDurationJavaTime(toJavaTimeDuration(duration));
return heartbeatDuration(toJavaTimeDuration(duration));
}

/** Sets the heartbeat duration for the change stream. */
public ReadChangeStreamQuery heartbeatDurationJavaTime(java.time.Duration duration) {
public ReadChangeStreamQuery heartbeatDuration(java.time.Duration duration) {
builder.setHeartbeatDuration(
Duration.newBuilder()
.setSeconds(duration.getSeconds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
validate(
dataChange.getSourceClusterId().isEmpty(),
"AWAITING_NEW_STREAM_RECORD: GC mutation shouldn't have source cluster id.");
builder.startGcMutationInstant(
builder.startGcMutation(
dataChange.getRowKey(),
java.time.Instant.ofEpochSecond(
dataChange.getCommitTimestamp().getSeconds(),
Expand All @@ -341,7 +341,7 @@ State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
validate(
!dataChange.getSourceClusterId().isEmpty(),
"AWAITING_NEW_STREAM_RECORD: User initiated data change missing source cluster id.");
builder.startUserMutationInstant(
builder.startUserMutation(
dataChange.getRowKey(),
dataChange.getSourceClusterId(),
java.time.Instant.ofEpochSecond(
Expand Down Expand Up @@ -575,7 +575,7 @@ private State checkAndFinishMutationIfNeeded(ReadChangeStreamResponse.DataChange
validate(!dataChange.getToken().isEmpty(), "Last data change missing token");
validate(dataChange.hasEstimatedLowWatermark(), "Last data change missing lowWatermark");
completeChangeStreamRecord =
builder.finishChangeStreamMutationInstant(
builder.finishChangeStreamMutation(
dataChange.getToken(),
java.time.Instant.ofEpochSecond(
dataChange.getEstimatedLowWatermark().getSeconds(),
Expand Down
Loading

0 comments on commit dde9326

Please sign in to comment.