Skip to content

Commit

Permalink
feat(Bigtable): add attempts headers on retry and exception logging (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bshaffer authored Jun 21, 2024
1 parent f59ff28 commit 48fd79b
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 11 deletions.
2 changes: 2 additions & 0 deletions Bigtable/src/BigtableClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Google\Auth\FetchAuthTokenInterface;
use Google\Cloud\Bigtable\V2\BigtableClient as GapicClient;
use Google\Cloud\Core\ArrayTrait;
use Psr\Log\LoggerInterface;

/**
* Google Cloud Bigtable is Google's NoSQL Big Data database service.
Expand Down Expand Up @@ -144,6 +145,7 @@ public function __construct(array $config = [])
* @type string $appProfileId This value specifies routing for
* replication. **Defaults to** the "default" application profile.
* @type array $headers Headers to be passed with each request.
* @type LoggerInterface $logger
* }
* @return Table
*/
Expand Down
3 changes: 2 additions & 1 deletion Bigtable/src/ChunkFormatter.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ function ($ex) {
}
return false;
},
$this->pluck('retries', $this->options, false)
$this->pluck('retries', $this->options, false),
$this->pluck('logger', $this->options, false)
);
}

Expand Down
21 changes: 20 additions & 1 deletion Bigtable/src/ResumableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Google\ApiCore\ApiException;
use Google\Cloud\Core\ExponentialBackoff;
use Google\Rpc\Code;
use Psr\Log\LoggerInterface;

/**
* User stream which handles failure from upstream, retries if necessary and
Expand Down Expand Up @@ -58,6 +59,11 @@ class ResumableStream implements \IteratorAggregate
*/
private $retryFunction;

/**
* @var LoggerInterface
*/
private $logger;

/**
* Constructs a resumable stream.
*
Expand All @@ -72,12 +78,14 @@ public function __construct(
callable $apiFunction,
callable $argumentFunction,
callable $retryFunction,
$retries = self::DEFAULT_MAX_RETRIES
$retries = self::DEFAULT_MAX_RETRIES,
LoggerInterface $logger = null
) {
$this->retries = $retries ?: self::DEFAULT_MAX_RETRIES;
$this->apiFunction = $apiFunction;
$this->argumentFunction = $argumentFunction;
$this->retryFunction = $retryFunction;
$this->logger = $logger;
}

/**
Expand All @@ -95,12 +103,23 @@ public function readAll()
$ex = null;
$args = $argumentFunction();
if (!isset($args[1]['requestCompleted']) || $args[1]['requestCompleted'] !== true) {
if ($tries > 0) {
// Send in "bigtable-attempt" header on retry
$optionalArgs = array_pop($args);
$headers = $optionalArgs['headers'] ?? [];
$headers['bigtable-attempt'] = [(string) $tries];
$optionalArgs['headers'] = $headers;
$args[] = $optionalArgs;
}
$stream = $this->createExponentialBackoff()->execute($this->apiFunction, $args);
try {
foreach ($stream->readAll() as $item) {
yield $item;
}
} catch (\Exception $ex) {
if ($this->logger) {
$this->logger->error($ex->getMessage());
}
}
}
$tries++;
Expand Down
5 changes: 4 additions & 1 deletion Bigtable/src/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
use Google\Cloud\Bigtable\V2\RowSet;
use Google\Cloud\Core\ArrayTrait;
use Google\Rpc\Code;
use Psr\Log\LoggerInterface;

/**
* A table instance can be used to read rows and to perform insert, update, and
Expand Down Expand Up @@ -83,6 +84,7 @@ class Table
* This settings only applies to {@see \Google\Cloud\Bigtable\Table::mutateRows()},
* {@see \Google\Cloud\Bigtable\Table::upsert()} and
* {@see \Google\Cloud\Bigtable\Table::readRows()}.
* @type LoggerInterface $logger
* }
*/
public function __construct(
Expand Down Expand Up @@ -519,7 +521,8 @@ private function mutateRowsWithEntries(array $entries, array $options = [])
[$this->gapicClient, 'mutateRows'],
$argumentFunction,
$retryFunction,
$this->pluck('retries', $options, false)
$this->pluck('retries', $options, false),
$this->pluck('logger', $options, false)
);
$message = 'partial failure';
try {
Expand Down
159 changes: 151 additions & 8 deletions Bigtable/tests/Unit/SmartRetriesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Prophecy\Argument;
use Psr\Log\LoggerInterface;

/**
* @group bigtable
Expand Down Expand Up @@ -91,12 +92,22 @@ public function testReadRowsShouldRetryDefaultTimes()
$this->expectExceptionMessage('DEADLINE_EXCEEDED');

$expectedArgs = $this->options;
$attempt = 0;
$this->serverStream->readAll()
->shouldBeCalledTimes(4)
->willThrow(
$this->retryingApiException
);
$this->bigtableClient->readRows(self::TABLE_NAME, $expectedArgs)
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($optionalArgs) use ($expectedArgs, &$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $expectedArgs
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(4)
->willReturn(
$this->serverStream->reveal()
Expand All @@ -112,12 +123,22 @@ public function testReadRowsShouldRetryForProvidedAttempts()
$this->expectExceptionMessage('DEADLINE_EXCEEDED');

$expectedArgs = $this->options;
$attempt = 0;
$this->serverStream->readAll()
->shouldBeCalledTimes(6)
->willThrow(
$this->retryingApiException
);
$this->bigtableClient->readRows(self::TABLE_NAME, $expectedArgs)
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($optionalArgs) use ($expectedArgs, &$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $expectedArgs
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(6)
->willReturn(
$this->serverStream->reveal()
Expand Down Expand Up @@ -147,6 +168,7 @@ public function testReadRowsPartialSuccess()
->willReturn(
$this->serverStream->reveal()
);
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$secondCallArgument = [
'rows' => (new RowSet)->setRowRanges([(new RowRange)->setStartKeyOpen('rk2')])
] + $expectedArgs;
Expand All @@ -169,6 +191,39 @@ public function testReadRowsPartialSuccess()
}
}

public function testReadRowsContainsAttemptHeader()
{
$attempt = 0;
$expectedArgs = $this->options;
$retryingApiException = $this->retryingApiException;
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($callOptions) use (&$attempt) {
$attemptHeader = $callOptions['headers']['bigtable-attempt'][0] ?? null;
($attempt === 0)
? $this->assertNull($attemptHeader)
: $this->assertSame('1', $attemptHeader);

return true;
})
)->shouldBeCalledTimes(2)
->willReturn(
$this->serverStream->reveal()
);

$this->serverStream->readAll()
->will(function () use (&$attempt, $retryingApiException) {
// throw a retriable exception on the first call
if (0 === $attempt++) {
throw $retryingApiException;
}
return [];
});

$iterator = $this->table->readRows();
$iterator->getIterator()->current();
}

public function testReadRowsWithRowsLimit()
{
$args = ['rowsLimit' => 5];
Expand All @@ -189,6 +244,7 @@ public function testReadRowsWithRowsLimit()
->willReturn(
$this->serverStream->reveal()
);
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$secondCallArgument = [
'rows' => (new RowSet)->setRowRanges([(new RowRange)->setStartKeyOpen('rk2')]),
'rowsLimit' => 3
Expand Down Expand Up @@ -519,7 +575,18 @@ public function testMutateRowsShouldRetryDefaultNumberOfTimes()
);
$mutations = $this->generateMutations(1, 5);
$entries = $this->generateEntries(1, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$attempt = 0;
$this->bigtableClient->mutateRows(
self::TABLE_NAME,
$entries,
Argument::that(function ($optionalArgs) use (&$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $this->options
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(4)
->willReturn(
$this->serverStream->reveal()
Expand All @@ -539,7 +606,19 @@ public function testMutateRowsRespectRetriesAttempt()
);
$mutations = $this->generateMutations(1, 5);
$entries = $this->generateEntries(1, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, ['retries' => 5] + $this->options)
$expectedArgs = ['retries' => 5] + $this->options;
$attempt = 0;
$this->bigtableClient->mutateRows(
self::TABLE_NAME,
$entries,
Argument::that(function ($optionalArgs) use ($expectedArgs, &$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $expectedArgs
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(6)
->willReturn(
$this->serverStream->reveal()
Expand All @@ -549,6 +628,7 @@ public function testMutateRowsRespectRetriesAttempt()

public function testMutateRowsOnlyRetriesFailedEntries()
{
$expectedArgs = $this->options;
$this->serverStream->readAll()
->shouldBeCalledTimes(2)
->willReturn(
Expand All @@ -560,13 +640,14 @@ public function testMutateRowsOnlyRetriesFailedEntries()
)
);
$entries = $this->generateEntries(0, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalledTimes(1)
->willReturn(
$this->serverStream->reveal()
);
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$entries = $this->generateEntries(2, 3);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalled()
->willReturn(
$this->serverStream->reveal()
Expand All @@ -577,6 +658,7 @@ public function testMutateRowsOnlyRetriesFailedEntries()

public function testMutateRowsExceptionShouldAddEntryToPendingMutations()
{
$expectedArgs = $this->options;
$this->serverStream->readAll()
->shouldBeCalledTimes(2)
->willReturn(
Expand All @@ -589,13 +671,14 @@ public function testMutateRowsExceptionShouldAddEntryToPendingMutations()
)
);
$entries = $this->generateEntries(0, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalledTimes(1)
->willReturn(
$this->serverStream->reveal()
);
$entries = array_merge($this->generateEntries(1, 2), $this->generateEntries(4, 5));
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalled()
->willReturn(
$this->serverStream->reveal()
Expand Down Expand Up @@ -667,6 +750,66 @@ public function testMutateRowsShouldNotRetryIfAnyMutationIsNotRetryable()
}
}

public function testReadRowsShouldLogRetryableExeception()
{
$attempt = 0;
$expectedArgs = $this->options;
$retryingApiException = $this->retryingApiException;
$logger = $this->prophesize(LoggerInterface::class);
$logger->error('DEADLINE_EXCEEDED')
->shouldBeCalledOnce();
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($callOptions) use (&$attempt) {
$attemptHeader = $callOptions['headers']['bigtable-attempt'][0] ?? null;
($attempt === 0)
? $this->assertNull($attemptHeader)
: $this->assertSame('1', $attemptHeader);

return true;
})
)->shouldBeCalledTimes(2)
->willReturn(
$this->serverStream->reveal()
);

$this->serverStream->readAll()
->will(function () use (&$attempt, $retryingApiException) {
// throw a retriable exception on the first call
if (0 === $attempt++) {
throw $retryingApiException;
}
return [];
});

$iterator = $this->table->readRows(['logger' => $logger->reveal()]);
$iterator->getIterator()->current();
}

public function testReadRowsShouldLogNonRetryableExeception()
{
$this->expectException(ApiException::class);
$this->expectExceptionMessage('UNAUTHENTICATED');

$logger = $this->prophesize(LoggerInterface::class);
$logger->error('UNAUTHENTICATED')
->shouldBeCalledOnce();

$this->bigtableClient->readRows(
self::TABLE_NAME,
$this->options
)->shouldBeCalledOnce()
->willReturn(
$this->serverStream->reveal()
);

$this->serverStream->readAll()
->willThrow($this->nonRetryingApiException);

$iterator = $this->table->readRows(['logger' => $logger->reveal()]);
$iterator->getIterator()->current();
}

private function generateRowsResponse($from, $to)
{
$rows = [];
Expand Down
4 changes: 4 additions & 0 deletions Core/src/InsecureCredentialsWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ public function getAuthorizationHeaderCallback($audience = null)
{
return null;
}

public function checkUniverseDomain()
{
}
}

0 comments on commit 48fd79b

Please sign in to comment.