Skip to content

Commit

Permalink
Retry storage list operations using retried pagination. (#8431)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Jan 13, 2025
1 parent 886976e commit dd13f9e
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 31 deletions.
3 changes: 2 additions & 1 deletion app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ final class ExportedApi {
Future<void> _gcOldPrefixes() async {
// List all top-level prefixes, and delete the ones we don't need
final topLevelprefixes = await _pool.withResource(
() async => await _bucket.list(prefix: '', delimiter: '/').toList(),
() async =>
await _bucket.listAllItemsWithRetry(prefix: '', delimiter: '/'),
);
await Future.wait(topLevelprefixes.map((entry) async {
if (entry.isObject) {
Expand Down
29 changes: 14 additions & 15 deletions app/lib/package/tarball_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ class TarballStorage {
String package,
) async {
final prefix = _tarballObjectNamePackagePrefix(package);
final items = await _canonicalBucket
.list(
prefix: prefix,
delimiter: '',
)
.toList();
final items = await _canonicalBucket.listAllItemsWithRetry(
prefix: prefix,
delimiter: '',
);
return Map.fromEntries(items.whereType<BucketObjectEntry>().map((item) {
final version = item.name.without(prefix: prefix, suffix: '.tar.gz');
return MapEntry(
Expand Down Expand Up @@ -255,32 +253,33 @@ class TarballStorage {
final filterForNamePrefix = package == null
? 'packages/'
: _tarballObjectNamePackagePrefix(package);
await for (final entry in _publicBucket.list(prefix: filterForNamePrefix)) {
await _publicBucket.listWithRetry(prefix: filterForNamePrefix,
(entry) async {
// Skip non-objects.
if (!entry.isObject) {
continue;
return;
}
// Skip objects that were matched in the previous step.
if (objectNamesInPublicBucket.contains(entry.name)) {
continue;
return;
}
if (deleteObjects.contains(entry.name)) {
continue;
return;
}

final publicInfo = await _publicBucket.tryInfo(entry.name);
if (publicInfo == null) {
_logger.warning(
'Failed to get info for public bucket object "${entry.name}".');
continue;
return;
}

await updateContentDispositionToAttachment(publicInfo, _publicBucket);

// Skip recently updated objects.
if (publicInfo.age < ageCheckThreshold) {
// Ignore recent files.
continue;
return;
}

final canonicalInfo = await _canonicalBucket.tryInfo(entry.name);
Expand All @@ -289,11 +288,11 @@ class TarballStorage {
// but it wasn't matched through the [PackageVersion] query above.
if (canonicalInfo.age < ageCheckThreshold) {
// Ignore recent files.
continue;
return;
}
_logger.severe(
'Object without matching PackageVersion in canonical and public buckets: "${entry.name}".');
continue;
return;
} else {
// The object in the public bucket has no matching file in the canonical bucket.
// We can assume it is stale and can delete it.
Expand All @@ -305,7 +304,7 @@ class TarballStorage {
deleteObjects.add(entry.name);
}
}
}
});

for (final objectName in deleteObjects) {
_logger.shout('Deleting object from public bucket: "$objectName".');
Expand Down
3 changes: 2 additions & 1 deletion app/lib/service/download_counts/sync_download_counts.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Future<Set<String>> processDownloadCounts(DateTime date) async {

final failedFiles = <String>{};

final bucketEntries = await bucket.list(prefix: fileNamePrefix).toList();
final bucketEntries =
await bucket.listAllItemsWithRetry(prefix: fileNamePrefix);

if (bucketEntries.isEmpty) {
_logger.info('Failed to read any files with prefix "$fileNamePrefix"./n');
Expand Down
48 changes: 39 additions & 9 deletions app/lib/shared/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,36 @@ extension BucketExt on Bucket {
return await _retry(() async => fn(read(objectName)));
}

/// List objects in the bucket with default retry with pagination.
Future<void> listWithRetry(
FutureOr<void> Function(BucketEntry input) fn, {
String? prefix,
String? delimiter,
}) async {
var p = await pageWithRetry(prefix: prefix, delimiter: delimiter);
for (;;) {
for (final item in p.items) {
await fn(item);
}
if (p.isLast) break;
p = await p.nextWithRetry();
}
}

/// Lists all entries with default retry pagination, returns them as List.
Future<List<BucketEntry>> listAllItemsWithRetry({
String? prefix,
String? delimiter,
}) async {
final entries = <BucketEntry>[];
await listWithRetry(
prefix: prefix,
delimiter: delimiter,
entries.add,
);
return entries;
}

/// The HTTP URL of a publicly accessable GCS object.
String objectUrl(String objectName) {
return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName';
Expand Down Expand Up @@ -324,8 +354,9 @@ Future<int> deleteBucketFolderRecursively(
page = await retry(
() async {
return page == null
? await bucket.page(prefix: folder, delimiter: '', pageSize: 100)
: await page.next(pageSize: 100);
? await bucket.pageWithRetry(
prefix: folder, delimiter: '', pageSize: 100)
: await page.nextWithRetry(pageSize: 100);
},
delayFactor: Duration(seconds: 10),
maxAttempts: 3,
Expand Down Expand Up @@ -430,8 +461,7 @@ class VersionedJsonStorage {
}
// fallback to earlier runtimes
final currentPath = _objectName();
final list = await _bucket
.list(prefix: _prefix)
final list = (await _bucket.listAllItemsWithRetry(prefix: _prefix))
.map((entry) => entry.name)
.where((name) => name.endsWith(_extension))
.where((name) => name.compareTo(currentPath) <= 0)
Expand All @@ -456,19 +486,19 @@ class VersionedJsonStorage {
Future<DeleteCounts> deleteOldData({Duration? minAgeThreshold}) async {
var found = 0;
var deleted = 0;
await for (final entry in _bucket.list(prefix: _prefix)) {
await _bucket.listWithRetry(prefix: _prefix, (entry) async {
if (entry.isDirectory) {
continue;
return;
}
final name = p.basename(entry.name);
if (!name.endsWith(_extension)) {
continue;
return;
}
final version = name.substring(0, name.length - _extension.length);
final matchesPattern = version.length == 10 &&
versions.runtimeVersionPattern.hasMatch(version);
if (!matchesPattern) {
continue;
return;
}
found++;
if (versions.shouldGCVersion(version)) {
Expand All @@ -479,7 +509,7 @@ class VersionedJsonStorage {
await deleteFromBucket(_bucket, entry.name);
}
}
}
});
return DeleteCounts(found, deleted);
}

Expand Down
10 changes: 5 additions & 5 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,10 @@ class TaskBackend {
// Objects in the bucket are stored under the following pattern:
// `<runtimeVersion>/<package>/<version>/...`
// Thus, we list with `/` as delimiter and get a list of runtimeVersions
await for (final d in _bucket.list(prefix: '', delimiter: '/')) {
await _bucket.listWithRetry(prefix: '', delimiter: '/', (d) async {
if (!d.isDirectory) {
_log.warning('bucket should not contain any top-level object');
continue;
return;
}

// Remove trailing slash from object prefix, to get a runtimeVersion
Expand All @@ -529,7 +529,7 @@ class TaskBackend {
// Check if the runtimeVersion should be GC'ed
if (shouldGCVersion(rtVersion)) {
// List all objects under the `<rtVersion>/`
await for (final obj in _bucket.list(prefix: d.name, delimiter: '')) {
await _bucket.listWithRetry(prefix: d.name, delimiter: '', (obj) async {
// Limit concurrency
final r = await pool.request();

Expand All @@ -545,9 +545,9 @@ class TaskBackend {
r.release(); // always release to avoid deadlock
}
});
}
});
}
}
});

// Close the pool, and wait for all pending deletion request to complete.
await pool.close();
Expand Down
1 change: 1 addition & 0 deletions pkg/fake_gcloud/lib/retry_enforcer_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class _RetryEnforcerBucket implements Bucket {
@override
Stream<BucketEntry> list({String? prefix, String? delimiter}) {
// TODO: verify retry wrapper here
_verifyRetryOnStack();
return _bucket.list(
prefix: prefix,
delimiter: delimiter,
Expand Down

0 comments on commit dd13f9e

Please sign in to comment.