Skip to content

Commit

Permalink
fix: optimize segment reader (#29694)
Browse files Browse the repository at this point in the history
* fix: optimize segemant reader

* rename the method

* Add a test for refill request
  • Loading branch information
mutianf authored Dec 13, 2023
1 parent 276aa02 commit 8be85d2
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,16 @@ public boolean start() throws IOException {

@Override
public boolean advance() throws IOException {
if (future != null && future.isDone()) {
// Add rows from the future to the buffer and reset the future
// so we can do prefetching
consumeReadRowsFuture();
}
if (buffer.size() < refillSegmentWaterMark && future == null) {
future = fetchNextSegment();
}
if (buffer.isEmpty() && future != null) {
waitReadRowsFuture();
consumeReadRowsFuture();
}
currentRow = buffer.poll();
return currentRow != null;
Expand Down Expand Up @@ -407,7 +412,7 @@ public void onComplete() {
return future;
}

private void waitReadRowsFuture() throws IOException {
private void consumeReadRowsFuture() throws IOException {
try {
UpstreamResults r = future.get();
buffer.addAll(r.rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,69 @@ public void testReadFillBuffer() throws IOException {
Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
}

/** This test verifies that a refill request is sent before we read all the rows in the buffer. */
@Test
public void testRefillOnLowWatermark() throws IOException {
int segmentSize = 30;
RowSet.Builder ranges = RowSet.newBuilder();
// generate 3 pages of rows
ranges.addRowRanges(
generateRowRange(
generateByteString(DEFAULT_PREFIX, 0),
generateByteString(DEFAULT_PREFIX, segmentSize * 3)));

List<List<Row>> expectedResults =
ImmutableList.of(
generateSegmentResult(DEFAULT_PREFIX, 0, segmentSize),
generateSegmentResult(DEFAULT_PREFIX, segmentSize, segmentSize),
generateSegmentResult(DEFAULT_PREFIX, segmentSize * 2, segmentSize),
ImmutableList.of());

// Set up Callable to be returned by stub.createReadRowsCallable()
ServerStreamingCallable<Query, Row> mockCallable = Mockito.mock(ServerStreamingCallable.class);
// Return multiple answers when mockCallable is called
doAnswer(new MultipleAnswer<Row>(expectedResults))
.when(mockCallable)
.call(any(Query.class), any(ResponseObserver.class), any(ApiCallContext.class));
when(mockStub.createReadRowsCallable(any(RowAdapter.class))).thenReturn(mockCallable);
ServerStreamingCallable<Query, Row> callable =
mockStub.createReadRowsCallable(new BigtableServiceImpl.BigtableRowProtoAdapter());
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);

RetrySettings retrySettings =
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
bigtableDataSettings.getProjectId(),
bigtableDataSettings.getInstanceId(),
TABLE_ID,
ranges.build(),
RowFilter.getDefaultInstance(),
segmentSize,
DEFAULT_BYTE_SEGMENT_SIZE,
Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);

Assert.assertTrue(underTest.start());

int refillWatermark = Math.max(1, (int) (segmentSize * 0.1));

Assert.assertTrue(refillWatermark > 1);

// Make sure refill happens on the second page. At this point, there
// should be 3 calls made on the callable.
for (int i = 0; i < segmentSize * 2 - refillWatermark + 1; i++) {
underTest.getCurrentRow();
underTest.advance();
}

verify(callable, times(3))
.call(queryCaptor.capture(), any(ResponseObserver.class), any(ApiCallContext.class));
}

/**
* This test checks that the buffer will stop filling up once the byte limit is reached. It will
* cancel the controller after reached the limit. This test completes one fill and contains one
Expand Down

0 comments on commit 8be85d2

Please sign in to comment.