Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add integration test to ensure that reverse scan resumption produces correct results #2029

Merged
merged 8 commits into from
Dec 11, 2023
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.27.0')
implementation platform('com.google.cloud:libraries-bom:26.28.0')

implementation 'com.google.cloud:google-cloud-bigtable'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
Expand All @@ -38,8 +42,17 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -308,6 +321,92 @@ public void reversed() {
.inOrder();
}

@Test
public void reversedWithForcedResumption() throws IOException, InterruptedException {
assume()
.withMessage("reverse scans are not supported in the emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);

BigtableDataClient client = testEnvRule.env().getDataClient();
String tableId = testEnvRule.env().getTableId();
String familyId = testEnvRule.env().getFamilyId();
String uniqueKey = prefix + "-rev-queries2";

// Add enough rows that ensures resumption logic is forced
Random random;
List<Row> expectedResults;
try (Batcher<RowMutationEntry, Void> batcher = client.newBulkMutationBatcher(tableId)) {

byte[] valueBytes = new byte[1024];
random = new Random();

expectedResults = new ArrayList<>();

for (int i = 0; i < 2 * 1024; i++) {
ByteString key = ByteString.copyFromUtf8(String.format("%s-%05d", uniqueKey, i));
ByteString qualifier = ByteString.copyFromUtf8("q");
long timestamp = System.currentTimeMillis() * 1000;
random.nextBytes(valueBytes);
ByteString value = ByteString.copyFrom(valueBytes);

batcher.add(RowMutationEntry.create(key).setCell(familyId, qualifier, timestamp, value));
expectedResults.add(
Row.create(
key,
ImmutableList.of(
RowCell.create(familyId, qualifier, timestamp, ImmutableList.of(), value))));
}
}
Collections.reverse(expectedResults);

BigtableDataSettings.Builder settingsBuilder =
testEnvRule.env().getDataClientSettings().toBuilder();

settingsBuilder.stubSettings().readRowsSettings().retrySettings().setMaxAttempts(100);

InstantiatingGrpcChannelProvider.Builder transport =
((InstantiatingGrpcChannelProvider)
settingsBuilder.stubSettings().getTransportChannelProvider())
.toBuilder();
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConfigurator =
transport.getChannelConfigurator();

// Randomly camp the deadline to force a timeout to force a retry
transport.setChannelConfigurator(
(ManagedChannelBuilder c) -> {
if (oldConfigurator != null) {
c = oldConfigurator.apply(c);
}
return c.intercept(
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (method.getBareMethodName().equals("ReadRows")) {
callOptions =
callOptions.withDeadlineAfter(random.nextInt(200), TimeUnit.MILLISECONDS);
}

return next.newCall(method, callOptions);
}
});
});
settingsBuilder.stubSettings().setTransportChannelProvider(transport.build());

try (BigtableDataClient patchedClient = BigtableDataClient.create(settingsBuilder.build())) {
for (int i = 0; i < 10; i++) {
List<Row> actualResults = new ArrayList<>();
for (Row row :
patchedClient.readRows(Query.create(tableId).prefix(uniqueKey).reversed(true))) {
actualResults.add(row);
Thread.sleep(1);
}
assertThat(actualResults).containsExactlyElementsIn(expectedResults).inOrder();
}
}
}

@Test
public void readSingleNonexistentAsyncCallback() throws Exception {
ApiFuture<Row> future =
Expand Down
Loading