Skip to content

Commit

Permalink
test: add integration test to ensure that reverse scan resumption pro…
Browse files Browse the repository at this point in the history
…duces correct results

Change-Id: I909ec95bebe87219d66b387cd80e7095e75252a2
  • Loading branch information
igorbernstein2 committed Dec 11, 2023
1 parent 4b64482 commit 9ff12b4
Showing 1 changed file with 97 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
Expand All @@ -38,8 +42,17 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -308,6 +321,90 @@ public void reversed() {
.inOrder();
}

@Test
public void reversedWithFocedResumption() throws IOException, InterruptedException {
assume()
.withMessage("reverse scans are not supported in the emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);

BigtableDataClient client = testEnvRule.env().getDataClient();
String tableId = testEnvRule.env().getTableId();
String familyId = testEnvRule.env().getFamilyId();
String uniqueKey = prefix + "-rev-queries2";

// Add enough rows that ensures resumption logic is forced
Random random;
List<Row> expectedResults;
try (Batcher<RowMutationEntry, Void> batcher = client.newBulkMutationBatcher(tableId)) {

byte[] valueBytes = new byte[1024];
random = new Random();

expectedResults = new ArrayList<>();

for (int i = 0; i < 2 * 1024; i++) {
ByteString key = ByteString.copyFromUtf8(String.format("%s-%05d", uniqueKey, i));
ByteString qualifier = ByteString.copyFromUtf8("q");
long timestamp = System.currentTimeMillis() * 1000;
random.nextBytes(valueBytes);
ByteString value = ByteString.copyFrom(valueBytes);

batcher.add(RowMutationEntry.create(key).setCell(familyId, qualifier, timestamp, value));
expectedResults.add(
Row.create(
key,
ImmutableList.of(
RowCell.create(familyId, qualifier, timestamp, ImmutableList.of(), value))));
}
}
Collections.reverse(expectedResults);

BigtableDataSettings.Builder settingsBuilder =
testEnvRule.env().getDataClientSettings().toBuilder();

InstantiatingGrpcChannelProvider.Builder transport =
((InstantiatingGrpcChannelProvider)
settingsBuilder.stubSettings().getTransportChannelProvider())
.toBuilder();
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConfigurator =
transport.getChannelConfigurator();

// Randomly camp the deadline to force a timeout to force a retry
transport.setChannelConfigurator(
(ManagedChannelBuilder c) -> {
if (oldConfigurator != null) {
c = oldConfigurator.apply(c);
}
return c.intercept(
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (method.getBareMethodName().equals("ReadRows")) {
callOptions =
callOptions.withDeadlineAfter(random.nextInt(200), TimeUnit.MILLISECONDS);
}

return next.newCall(method, callOptions);
}
});
});
settingsBuilder.stubSettings().setTransportChannelProvider(transport.build());

try (BigtableDataClient patchedClient = BigtableDataClient.create(settingsBuilder.build())) {
for (int i = 0; i < 10; i++) {
List<Row> actualResults = new ArrayList<>();
for (Row row :
patchedClient.readRows(Query.create(tableId).prefix(uniqueKey).reversed(true))) {
actualResults.add(row);
}
assertThat(actualResults).containsExactlyElementsIn(expectedResults).inOrder();
Thread.sleep(10);
}
}
}

@Test
public void readSingleNonexistentAsyncCallback() throws Exception {
ApiFuture<Row> future =
Expand Down

0 comments on commit 9ff12b4

Please sign in to comment.