From 40573abe05b5c3eeab9560c13fa21e86a2765a13 Mon Sep 17 00:00:00 2001
From: Igor Berntein <igorbernstein@google.com>
Date: Tue, 9 Jan 2024 11:01:09 -0500
Subject: [PATCH] feat: count row merging errors as internal errors

Currently they dont have a status associated and thus get counted as UNKOWN

Change-Id: Ida3470a0609f2e2ad51534eb3141db394af1dcdc
---
 .../data/v2/stub/readrows/StateMachine.java   | 19 +++-
 .../data/v2/functional/ReadRowsTest.java      | 99 +++++++++++++++++++
 2 files changed, 116 insertions(+), 2 deletions(-)
 create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/functional/ReadRowsTest.java

diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/StateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/StateMachine.java
index 6791679829..32b35f0637 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/StateMachine.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/StateMachine.java
@@ -15,6 +15,9 @@
  */
 package com.google.cloud.bigtable.data.v2.stub.readrows;
 
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.InternalException;
 import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
 import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
 import com.google.cloud.bigtable.data.v2.models.RowAdapter.RowBuilder;
@@ -22,6 +25,8 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.EvictingQueue;
 import com.google.protobuf.ByteString;
+import io.grpc.Status;
+
 import java.util.List;
 
 /**
@@ -252,6 +257,16 @@ State handleChunk(CellChunk chunk) {
       new State() {
         @Override
         State handleLastScannedRow(ByteString rowKey) {
+          if (lastCompleteRowKey != null) {
+            int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, rowKey);
+            String direction = "increasing";
+            if (reversed) {
+              cmp *= -1;
+              direction = "decreasing";
+            }
+
+            validate(cmp < 0, "AWAITING_NEW_ROW: key must be strictly " + direction);
+          }
           completeRow = adapter.createScanMarkerRow(rowKey);
           lastCompleteRowKey = rowKey;
           return AWAITING_ROW_CONSUME;
@@ -468,9 +483,9 @@ private void validate(boolean condition, String message) {
     }
   }
 
-  static class InvalidInputException extends RuntimeException {
+  static class InvalidInputException extends InternalException {
     InvalidInputException(String message) {
-      super(message);
+      super(message, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
     }
   }
 }
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/functional/ReadRowsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/functional/ReadRowsTest.java
new file mode 100644
index 0000000000..bb7a1ca300
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/functional/ReadRowsTest.java
@@ -0,0 +1,99 @@
+package com.google.cloud.bigtable.data.v2.functional;
+
+import com.google.api.gax.rpc.InternalException;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.StringValue;
+import io.grpc.Server;
+import io.grpc.stub.StreamObserver;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(JUnit4.class)
+public class ReadRowsTest {
+    private FakeService service;
+    private Server server;
+
+    @Before
+    public void setUp() throws Exception {
+        service = new FakeService();
+        server = FakeServiceBuilder.create(service)
+                .start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        server.shutdown();
+    }
+
+    @Test
+    public void rowMergingErrorsUseInternalStatus() throws Exception {
+        BigtableDataSettings settings = BigtableDataSettings.newBuilderForEmulator(server.getPort())
+                .setProjectId("fake-project")
+                .setInstanceId("fake-instance")
+                .build();
+
+        service.readRowsResponses.add(
+                ReadRowsResponse.newBuilder()
+                        .addChunks(
+                                ReadRowsResponse.CellChunk.newBuilder()
+                                        .setRowKey(ByteString.copyFromUtf8("z"))
+                                        .setFamilyName(StringValue.newBuilder().setValue("f"))
+                                        .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
+                                        .setTimestampMicros(1000)
+                                        .setValue(ByteString.copyFromUtf8("v"))
+                                        .setCommitRow(true)
+                        )
+                        .addChunks(
+                                ReadRowsResponse.CellChunk.newBuilder()
+                                        .setRowKey(ByteString.copyFromUtf8("a"))
+                                        .setFamilyName(StringValue.newBuilder().setValue("f"))
+                                        .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
+                                        .setTimestampMicros(1000)
+                                        .setValue(ByteString.copyFromUtf8("v"))
+                                        .setCommitRow(true)
+                        )
+                        .build()
+        );
+
+        try (BigtableDataClient client = BigtableDataClient.create(settings)) {
+            Assert.assertThrows(
+                    InternalException.class,
+                    () -> {
+                        for (Row ignored : client.readRows(Query.create("fake-table"))) {
+
+                        }
+                    }
+            );
+        }
+    }
+
+
+    static class FakeService extends BigtableGrpc.BigtableImplBase {
+        private List<ReadRowsResponse> readRowsResponses = Collections.synchronizedList(new ArrayList<>());
+
+        @Override
+        public void readRows(ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
+            for (ReadRowsResponse r : readRowsResponses) {
+                responseObserver.onNext(r);
+            }
+            responseObserver.onCompleted();
+        }
+    }
+}