Skip to content

Commit

Permalink
feat: count row merging errors as internal errors
Browse files Browse the repository at this point in the history
Currently they dont have a status associated and thus get counted as UNKOWN

Change-Id: Ida3470a0609f2e2ad51534eb3141db394af1dcdc
  • Loading branch information
igorbernstein2 committed Jan 9, 2024
1 parent 6b48606 commit 40573ab
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
import com.google.cloud.bigtable.data.v2.models.RowAdapter.RowBuilder;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.protobuf.ByteString;
import io.grpc.Status;

import java.util.List;

/**
Expand Down Expand Up @@ -252,6 +257,16 @@ State handleChunk(CellChunk chunk) {
new State() {
@Override
State handleLastScannedRow(ByteString rowKey) {
if (lastCompleteRowKey != null) {
int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, rowKey);
String direction = "increasing";
if (reversed) {
cmp *= -1;
direction = "decreasing";
}

validate(cmp < 0, "AWAITING_NEW_ROW: key must be strictly " + direction);
}
completeRow = adapter.createScanMarkerRow(rowKey);
lastCompleteRowKey = rowKey;
return AWAITING_ROW_CONSUME;
Expand Down Expand Up @@ -468,9 +483,9 @@ private void validate(boolean condition, String message) {
}
}

static class InvalidInputException extends RuntimeException {
static class InvalidInputException extends InternalException {
InvalidInputException(String message) {
super(message);
super(message, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.google.cloud.bigtable.data.v2.functional;

import com.google.api.gax.rpc.InternalException;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
import io.grpc.Server;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RunWith(JUnit4.class)
public class ReadRowsTest {
private FakeService service;
private Server server;

@Before
public void setUp() throws Exception {
service = new FakeService();
server = FakeServiceBuilder.create(service)
.start();
}

@After
public void tearDown() throws Exception {
server.shutdown();
}

@Test
public void rowMergingErrorsUseInternalStatus() throws Exception {
BigtableDataSettings settings = BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId("fake-project")
.setInstanceId("fake-instance")
.build();

service.readRowsResponses.add(
ReadRowsResponse.newBuilder()
.addChunks(
ReadRowsResponse.CellChunk.newBuilder()
.setRowKey(ByteString.copyFromUtf8("z"))
.setFamilyName(StringValue.newBuilder().setValue("f"))
.setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
.setTimestampMicros(1000)
.setValue(ByteString.copyFromUtf8("v"))
.setCommitRow(true)
)
.addChunks(
ReadRowsResponse.CellChunk.newBuilder()
.setRowKey(ByteString.copyFromUtf8("a"))
.setFamilyName(StringValue.newBuilder().setValue("f"))
.setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
.setTimestampMicros(1000)
.setValue(ByteString.copyFromUtf8("v"))
.setCommitRow(true)
)
.build()
);

try (BigtableDataClient client = BigtableDataClient.create(settings)) {
Assert.assertThrows(
InternalException.class,
() -> {
for (Row ignored : client.readRows(Query.create("fake-table"))) {

}
}
);
}
}


static class FakeService extends BigtableGrpc.BigtableImplBase {
private List<ReadRowsResponse> readRowsResponses = Collections.synchronizedList(new ArrayList<>());

@Override
public void readRows(ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
for (ReadRowsResponse r : readRowsResponses) {
responseObserver.onNext(r);
}
responseObserver.onCompleted();
}
}
}

0 comments on commit 40573ab

Please sign in to comment.