diff --git a/README.md b/README.md index 8c7027f42f..d9ba07b218 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.27.0') +implementation platform('com.google.cloud:libraries-bom:26.28.0') implementation 'com.google.cloud:google-cloud-bigtable' ``` diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java index 7b58e14f7c..0fa7eb10bd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ReadIT.java @@ -18,13 +18,17 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.TruthJUnit.assume; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; @@ -38,8 +42,17 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -308,6 +321,92 @@ public void reversed() { .inOrder(); } + @Test + public void reversedWithForcedResumption() throws IOException, InterruptedException { + assume() + .withMessage("reverse scans are not supported in the emulator") + .that(testEnvRule.env()) + .isNotInstanceOf(EmulatorEnv.class); + + BigtableDataClient client = testEnvRule.env().getDataClient(); + String tableId = testEnvRule.env().getTableId(); + String familyId = testEnvRule.env().getFamilyId(); + String uniqueKey = prefix + "-rev-queries2"; + + // Add enough rows that ensures resumption logic is forced + Random random; + List expectedResults; + try (Batcher batcher = client.newBulkMutationBatcher(tableId)) { + + byte[] valueBytes = new byte[1024]; + random = new Random(); + + expectedResults = new ArrayList<>(); + + for (int i = 0; i < 2 * 1024; i++) { + ByteString key = ByteString.copyFromUtf8(String.format("%s-%05d", uniqueKey, i)); + ByteString qualifier = ByteString.copyFromUtf8("q"); + long timestamp = System.currentTimeMillis() * 1000; + random.nextBytes(valueBytes); + ByteString value = ByteString.copyFrom(valueBytes); + + batcher.add(RowMutationEntry.create(key).setCell(familyId, qualifier, timestamp, value)); + expectedResults.add( + Row.create( + key, + ImmutableList.of( + RowCell.create(familyId, qualifier, timestamp, ImmutableList.of(), value)))); + } + } + Collections.reverse(expectedResults); + + BigtableDataSettings.Builder settingsBuilder = + testEnvRule.env().getDataClientSettings().toBuilder(); + + settingsBuilder.stubSettings().readRowsSettings().retrySettings().setMaxAttempts(100); + + InstantiatingGrpcChannelProvider.Builder transport = + ((InstantiatingGrpcChannelProvider) + settingsBuilder.stubSettings().getTransportChannelProvider()) + .toBuilder(); + ApiFunction oldConfigurator = + transport.getChannelConfigurator(); + + // Randomly camp the deadline to force a timeout to force a retry + transport.setChannelConfigurator( + (ManagedChannelBuilder c) -> { + if (oldConfigurator != null) { + c = oldConfigurator.apply(c); + } + return c.intercept( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + if (method.getBareMethodName().equals("ReadRows")) { + callOptions = + callOptions.withDeadlineAfter(random.nextInt(200), TimeUnit.MILLISECONDS); + } + + return next.newCall(method, callOptions); + } + }); + }); + settingsBuilder.stubSettings().setTransportChannelProvider(transport.build()); + + try (BigtableDataClient patchedClient = BigtableDataClient.create(settingsBuilder.build())) { + for (int i = 0; i < 10; i++) { + List actualResults = new ArrayList<>(); + for (Row row : + patchedClient.readRows(Query.create(tableId).prefix(uniqueKey).reversed(true))) { + actualResults.add(row); + Thread.sleep(1); + } + assertThat(actualResults).containsExactlyElementsIn(expectedResults).inOrder(); + } + } + } + @Test public void readSingleNonexistentAsyncCallback() throws Exception { ApiFuture future =