Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Bigtable): add attempts headers on retry and exception logging #7428

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Bigtable/src/BigtableClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Google\Auth\FetchAuthTokenInterface;
use Google\Cloud\Bigtable\V2\BigtableClient as GapicClient;
use Google\Cloud\Core\ArrayTrait;
use Psr\Log\LoggerInterface;

/**
* Google Cloud Bigtable is Google's NoSQL Big Data database service.
Expand Down Expand Up @@ -144,6 +145,7 @@ public function __construct(array $config = [])
* @type string $appProfileId This value specifies routing for
* replication. **Defaults to** the "default" application profile.
* @type array $headers Headers to be passed with each request.
* @type LoggerInterface $logger
* }
* @return Table
*/
Expand Down
3 changes: 2 additions & 1 deletion Bigtable/src/ChunkFormatter.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ function ($ex) {
}
return false;
},
$this->pluck('retries', $this->options, false)
$this->pluck('retries', $this->options, false),
$this->pluck('logger', $this->options, false)
);
}

Expand Down
21 changes: 20 additions & 1 deletion Bigtable/src/ResumableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Google\ApiCore\ApiException;
use Google\Cloud\Core\ExponentialBackoff;
use Google\Rpc\Code;
use Psr\Log\LoggerInterface;

/**
* User stream which handles failure from upstream, retries if necessary and
Expand Down Expand Up @@ -58,6 +59,11 @@ class ResumableStream implements \IteratorAggregate
*/
private $retryFunction;

/**
* @var LoggerInterface
*/
private $logger;

/**
* Constructs a resumable stream.
*
Expand All @@ -72,12 +78,14 @@ public function __construct(
callable $apiFunction,
callable $argumentFunction,
callable $retryFunction,
$retries = self::DEFAULT_MAX_RETRIES
$retries = self::DEFAULT_MAX_RETRIES,
LoggerInterface $logger = null
) {
$this->retries = $retries ?: self::DEFAULT_MAX_RETRIES;
$this->apiFunction = $apiFunction;
$this->argumentFunction = $argumentFunction;
$this->retryFunction = $retryFunction;
$this->logger = $logger;
}

/**
Expand All @@ -95,12 +103,23 @@ public function readAll()
$ex = null;
$args = $argumentFunction();
if (!isset($args[1]['requestCompleted']) || $args[1]['requestCompleted'] !== true) {
if ($tries > 0) {
// Send in "bigtable-attempt" header on retry
$optionalArgs = array_pop($args);
$headers = $optionalArgs['headers'] ?? [];
$headers['bigtable-attempt'] = [(string) $tries];
$optionalArgs['headers'] = $headers;
$args[] = $optionalArgs;
}
$stream = $this->createExponentialBackoff()->execute($this->apiFunction, $args);
try {
foreach ($stream->readAll() as $item) {
yield $item;
}
} catch (\Exception $ex) {
if ($this->logger) {
$this->logger->error($ex->getMessage());
}
}
}
$tries++;
Expand Down
5 changes: 4 additions & 1 deletion Bigtable/src/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
use Google\Cloud\Bigtable\V2\RowSet;
use Google\Cloud\Core\ArrayTrait;
use Google\Rpc\Code;
use Psr\Log\LoggerInterface;

/**
* A table instance can be used to read rows and to perform insert, update, and
Expand Down Expand Up @@ -83,6 +84,7 @@ class Table
* This settings only applies to {@see \Google\Cloud\Bigtable\Table::mutateRows()},
* {@see \Google\Cloud\Bigtable\Table::upsert()} and
* {@see \Google\Cloud\Bigtable\Table::readRows()}.
* @type LoggerInterface $logger
* }
*/
public function __construct(
Expand Down Expand Up @@ -519,7 +521,8 @@ private function mutateRowsWithEntries(array $entries, array $options = [])
[$this->gapicClient, 'mutateRows'],
$argumentFunction,
$retryFunction,
$this->pluck('retries', $options, false)
$this->pluck('retries', $options, false),
$this->pluck('logger', $options, false)
);
$message = 'partial failure';
try {
Expand Down
159 changes: 151 additions & 8 deletions Bigtable/tests/Unit/SmartRetriesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Prophecy\Argument;
use Psr\Log\LoggerInterface;

/**
* @group bigtable
Expand Down Expand Up @@ -91,12 +92,22 @@ public function testReadRowsShouldRetryDefaultTimes()
$this->expectExceptionMessage('DEADLINE_EXCEEDED');

$expectedArgs = $this->options;
$attempt = 0;
$this->serverStream->readAll()
->shouldBeCalledTimes(4)
->willThrow(
$this->retryingApiException
);
$this->bigtableClient->readRows(self::TABLE_NAME, $expectedArgs)
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($optionalArgs) use ($expectedArgs, &$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $expectedArgs
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(4)
->willReturn(
$this->serverStream->reveal()
Expand All @@ -112,12 +123,22 @@ public function testReadRowsShouldRetryForProvidedAttempts()
$this->expectExceptionMessage('DEADLINE_EXCEEDED');

$expectedArgs = $this->options;
$attempt = 0;
$this->serverStream->readAll()
->shouldBeCalledTimes(6)
->willThrow(
$this->retryingApiException
);
$this->bigtableClient->readRows(self::TABLE_NAME, $expectedArgs)
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($optionalArgs) use ($expectedArgs, &$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $expectedArgs
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(6)
->willReturn(
$this->serverStream->reveal()
Expand Down Expand Up @@ -147,6 +168,7 @@ public function testReadRowsPartialSuccess()
->willReturn(
$this->serverStream->reveal()
);
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$secondCallArgument = [
'rows' => (new RowSet)->setRowRanges([(new RowRange)->setStartKeyOpen('rk2')])
] + $expectedArgs;
Expand All @@ -169,6 +191,39 @@ public function testReadRowsPartialSuccess()
}
}

public function testReadRowsContainsAttemptHeader()
{
$attempt = 0;
$expectedArgs = $this->options;
$retryingApiException = $this->retryingApiException;
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($callOptions) use (&$attempt) {
$attemptHeader = $callOptions['headers']['bigtable-attempt'][0] ?? null;
($attempt === 0)
? $this->assertNull($attemptHeader)
: $this->assertSame('1', $attemptHeader);

return true;
})
)->shouldBeCalledTimes(2)
->willReturn(
$this->serverStream->reveal()
);

$this->serverStream->readAll()
->will(function () use (&$attempt, $retryingApiException) {
// throw a retriable exception on the first call
if (0 === $attempt++) {
throw $retryingApiException;
}
return [];
});

$iterator = $this->table->readRows();
$iterator->getIterator()->current();
}

public function testReadRowsWithRowsLimit()
{
$args = ['rowsLimit' => 5];
Expand All @@ -189,6 +244,7 @@ public function testReadRowsWithRowsLimit()
->willReturn(
$this->serverStream->reveal()
);
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$secondCallArgument = [
'rows' => (new RowSet)->setRowRanges([(new RowRange)->setStartKeyOpen('rk2')]),
'rowsLimit' => 3
Expand Down Expand Up @@ -519,7 +575,18 @@ public function testMutateRowsShouldRetryDefaultNumberOfTimes()
);
$mutations = $this->generateMutations(1, 5);
$entries = $this->generateEntries(1, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$attempt = 0;
$this->bigtableClient->mutateRows(
self::TABLE_NAME,
$entries,
Argument::that(function ($optionalArgs) use (&$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $this->options
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(4)
->willReturn(
$this->serverStream->reveal()
Expand All @@ -539,7 +606,19 @@ public function testMutateRowsRespectRetriesAttempt()
);
$mutations = $this->generateMutations(1, 5);
$entries = $this->generateEntries(1, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, ['retries' => 5] + $this->options)
$expectedArgs = ['retries' => 5] + $this->options;
$attempt = 0;
$this->bigtableClient->mutateRows(
self::TABLE_NAME,
$entries,
Argument::that(function ($optionalArgs) use ($expectedArgs, &$attempt) {
$attemptHeader = $optionalArgs['headers']['bigtable-attempt'][0] ?? null;
unset($optionalArgs['headers']['bigtable-attempt']);

return $optionalArgs === $expectedArgs
&& ($attemptHeader === (string) $attempt++ || $attempt === 1);
})
)
->shouldBeCalledTimes(6)
->willReturn(
$this->serverStream->reveal()
Expand All @@ -549,6 +628,7 @@ public function testMutateRowsRespectRetriesAttempt()

public function testMutateRowsOnlyRetriesFailedEntries()
{
$expectedArgs = $this->options;
$this->serverStream->readAll()
->shouldBeCalledTimes(2)
->willReturn(
Expand All @@ -560,13 +640,14 @@ public function testMutateRowsOnlyRetriesFailedEntries()
)
);
$entries = $this->generateEntries(0, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalledTimes(1)
->willReturn(
$this->serverStream->reveal()
);
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$entries = $this->generateEntries(2, 3);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalled()
->willReturn(
$this->serverStream->reveal()
Expand All @@ -577,6 +658,7 @@ public function testMutateRowsOnlyRetriesFailedEntries()

public function testMutateRowsExceptionShouldAddEntryToPendingMutations()
{
$expectedArgs = $this->options;
$this->serverStream->readAll()
->shouldBeCalledTimes(2)
->willReturn(
Expand All @@ -589,13 +671,14 @@ public function testMutateRowsExceptionShouldAddEntryToPendingMutations()
)
);
$entries = $this->generateEntries(0, 5);
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalledTimes(1)
->willReturn(
$this->serverStream->reveal()
);
$entries = array_merge($this->generateEntries(1, 2), $this->generateEntries(4, 5));
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $this->options)
$expectedArgs['headers']['bigtable-attempt'] = ['1'];
$this->bigtableClient->mutateRows(self::TABLE_NAME, $entries, $expectedArgs)
->shouldBeCalled()
->willReturn(
$this->serverStream->reveal()
Expand Down Expand Up @@ -667,6 +750,66 @@ public function testMutateRowsShouldNotRetryIfAnyMutationIsNotRetryable()
}
}

public function testReadRowsShouldLogRetryableExeception()
{
$attempt = 0;
$expectedArgs = $this->options;
$retryingApiException = $this->retryingApiException;
$logger = $this->prophesize(LoggerInterface::class);
$logger->error('DEADLINE_EXCEEDED')
->shouldBeCalledOnce();
$this->bigtableClient->readRows(
self::TABLE_NAME,
Argument::that(function ($callOptions) use (&$attempt) {
$attemptHeader = $callOptions['headers']['bigtable-attempt'][0] ?? null;
($attempt === 0)
? $this->assertNull($attemptHeader)
: $this->assertSame('1', $attemptHeader);

return true;
})
)->shouldBeCalledTimes(2)
->willReturn(
$this->serverStream->reveal()
);

$this->serverStream->readAll()
->will(function () use (&$attempt, $retryingApiException) {
// throw a retriable exception on the first call
if (0 === $attempt++) {
throw $retryingApiException;
}
return [];
});

$iterator = $this->table->readRows(['logger' => $logger->reveal()]);
$iterator->getIterator()->current();
}

public function testReadRowsShouldLogNonRetryableExeception()
{
$this->expectException(ApiException::class);
$this->expectExceptionMessage('UNAUTHENTICATED');

$logger = $this->prophesize(LoggerInterface::class);
$logger->error('UNAUTHENTICATED')
->shouldBeCalledOnce();

$this->bigtableClient->readRows(
self::TABLE_NAME,
$this->options
)->shouldBeCalledOnce()
->willReturn(
$this->serverStream->reveal()
);

$this->serverStream->readAll()
->willThrow($this->nonRetryingApiException);

$iterator = $this->table->readRows(['logger' => $logger->reveal()]);
$iterator->getIterator()->current();
}

private function generateRowsResponse($from, $to)
{
$rows = [];
Expand Down
4 changes: 4 additions & 0 deletions Core/src/InsecureCredentialsWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ public function getAuthorizationHeaderCallback($audience = null)
{
return null;
}

public function checkUniverseDomain()
{
}
}
Loading