From dd13f9ef7858fe8a8bd881d5a7d26dec108ffbf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Mon, 13 Jan 2025 13:09:29 +0100 Subject: [PATCH] Retry storage list operations using retried pagination. (#8431) --- app/lib/package/api_export/exported_api.dart | 3 +- app/lib/package/tarball_storage.dart | 29 ++++++----- .../download_counts/sync_download_counts.dart | 3 +- app/lib/shared/storage.dart | 48 +++++++++++++++---- app/lib/task/backend.dart | 10 ++-- .../lib/retry_enforcer_storage.dart | 1 + 6 files changed, 63 insertions(+), 31 deletions(-) diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index a071f3dfc..c46b7e79f 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -149,7 +149,8 @@ final class ExportedApi { Future _gcOldPrefixes() async { // List all top-level prefixes, and delete the ones we don't need final topLevelprefixes = await _pool.withResource( - () async => await _bucket.list(prefix: '', delimiter: '/').toList(), + () async => + await _bucket.listAllItemsWithRetry(prefix: '', delimiter: '/'), ); await Future.wait(topLevelprefixes.map((entry) async { if (entry.isObject) { diff --git a/app/lib/package/tarball_storage.dart b/app/lib/package/tarball_storage.dart index 0fe99fb3f..7f94a00fe 100644 --- a/app/lib/package/tarball_storage.dart +++ b/app/lib/package/tarball_storage.dart @@ -67,12 +67,10 @@ class TarballStorage { String package, ) async { final prefix = _tarballObjectNamePackagePrefix(package); - final items = await _canonicalBucket - .list( - prefix: prefix, - delimiter: '', - ) - .toList(); + final items = await _canonicalBucket.listAllItemsWithRetry( + prefix: prefix, + delimiter: '', + ); return Map.fromEntries(items.whereType().map((item) { final version = item.name.without(prefix: prefix, suffix: '.tar.gz'); return MapEntry( @@ -255,24 +253,25 @@ class TarballStorage { final filterForNamePrefix = package == null ? 'packages/' : _tarballObjectNamePackagePrefix(package); - await for (final entry in _publicBucket.list(prefix: filterForNamePrefix)) { + await _publicBucket.listWithRetry(prefix: filterForNamePrefix, + (entry) async { // Skip non-objects. if (!entry.isObject) { - continue; + return; } // Skip objects that were matched in the previous step. if (objectNamesInPublicBucket.contains(entry.name)) { - continue; + return; } if (deleteObjects.contains(entry.name)) { - continue; + return; } final publicInfo = await _publicBucket.tryInfo(entry.name); if (publicInfo == null) { _logger.warning( 'Failed to get info for public bucket object "${entry.name}".'); - continue; + return; } await updateContentDispositionToAttachment(publicInfo, _publicBucket); @@ -280,7 +279,7 @@ class TarballStorage { // Skip recently updated objects. if (publicInfo.age < ageCheckThreshold) { // Ignore recent files. - continue; + return; } final canonicalInfo = await _canonicalBucket.tryInfo(entry.name); @@ -289,11 +288,11 @@ class TarballStorage { // but it wasn't matched through the [PackageVersion] query above. if (canonicalInfo.age < ageCheckThreshold) { // Ignore recent files. - continue; + return; } _logger.severe( 'Object without matching PackageVersion in canonical and public buckets: "${entry.name}".'); - continue; + return; } else { // The object in the public bucket has no matching file in the canonical bucket. // We can assume it is stale and can delete it. @@ -305,7 +304,7 @@ class TarballStorage { deleteObjects.add(entry.name); } } - } + }); for (final objectName in deleteObjects) { _logger.shout('Deleting object from public bucket: "$objectName".'); diff --git a/app/lib/service/download_counts/sync_download_counts.dart b/app/lib/service/download_counts/sync_download_counts.dart index 26f3b37d4..32b5666b0 100644 --- a/app/lib/service/download_counts/sync_download_counts.dart +++ b/app/lib/service/download_counts/sync_download_counts.dart @@ -64,7 +64,8 @@ Future> processDownloadCounts(DateTime date) async { final failedFiles = {}; - final bucketEntries = await bucket.list(prefix: fileNamePrefix).toList(); + final bucketEntries = + await bucket.listAllItemsWithRetry(prefix: fileNamePrefix); if (bucketEntries.isEmpty) { _logger.info('Failed to read any files with prefix "$fileNamePrefix"./n'); diff --git a/app/lib/shared/storage.dart b/app/lib/shared/storage.dart index 65304564c..7e01ac4da 100644 --- a/app/lib/shared/storage.dart +++ b/app/lib/shared/storage.dart @@ -188,6 +188,36 @@ extension BucketExt on Bucket { return await _retry(() async => fn(read(objectName))); } + /// List objects in the bucket with default retry with pagination. + Future listWithRetry( + FutureOr Function(BucketEntry input) fn, { + String? prefix, + String? delimiter, + }) async { + var p = await pageWithRetry(prefix: prefix, delimiter: delimiter); + for (;;) { + for (final item in p.items) { + await fn(item); + } + if (p.isLast) break; + p = await p.nextWithRetry(); + } + } + + /// Lists all entries with default retry pagination, returns them as List. + Future> listAllItemsWithRetry({ + String? prefix, + String? delimiter, + }) async { + final entries = []; + await listWithRetry( + prefix: prefix, + delimiter: delimiter, + entries.add, + ); + return entries; + } + /// The HTTP URL of a publicly accessable GCS object. String objectUrl(String objectName) { return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName'; @@ -324,8 +354,9 @@ Future deleteBucketFolderRecursively( page = await retry( () async { return page == null - ? await bucket.page(prefix: folder, delimiter: '', pageSize: 100) - : await page.next(pageSize: 100); + ? await bucket.pageWithRetry( + prefix: folder, delimiter: '', pageSize: 100) + : await page.nextWithRetry(pageSize: 100); }, delayFactor: Duration(seconds: 10), maxAttempts: 3, @@ -430,8 +461,7 @@ class VersionedJsonStorage { } // fallback to earlier runtimes final currentPath = _objectName(); - final list = await _bucket - .list(prefix: _prefix) + final list = (await _bucket.listAllItemsWithRetry(prefix: _prefix)) .map((entry) => entry.name) .where((name) => name.endsWith(_extension)) .where((name) => name.compareTo(currentPath) <= 0) @@ -456,19 +486,19 @@ class VersionedJsonStorage { Future deleteOldData({Duration? minAgeThreshold}) async { var found = 0; var deleted = 0; - await for (final entry in _bucket.list(prefix: _prefix)) { + await _bucket.listWithRetry(prefix: _prefix, (entry) async { if (entry.isDirectory) { - continue; + return; } final name = p.basename(entry.name); if (!name.endsWith(_extension)) { - continue; + return; } final version = name.substring(0, name.length - _extension.length); final matchesPattern = version.length == 10 && versions.runtimeVersionPattern.hasMatch(version); if (!matchesPattern) { - continue; + return; } found++; if (versions.shouldGCVersion(version)) { @@ -479,7 +509,7 @@ class VersionedJsonStorage { await deleteFromBucket(_bucket, entry.name); } } - } + }); return DeleteCounts(found, deleted); } diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 501c5f873..710fcff66 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -516,10 +516,10 @@ class TaskBackend { // Objects in the bucket are stored under the following pattern: // `///...` // Thus, we list with `/` as delimiter and get a list of runtimeVersions - await for (final d in _bucket.list(prefix: '', delimiter: '/')) { + await _bucket.listWithRetry(prefix: '', delimiter: '/', (d) async { if (!d.isDirectory) { _log.warning('bucket should not contain any top-level object'); - continue; + return; } // Remove trailing slash from object prefix, to get a runtimeVersion @@ -529,7 +529,7 @@ class TaskBackend { // Check if the runtimeVersion should be GC'ed if (shouldGCVersion(rtVersion)) { // List all objects under the `/` - await for (final obj in _bucket.list(prefix: d.name, delimiter: '')) { + await _bucket.listWithRetry(prefix: d.name, delimiter: '', (obj) async { // Limit concurrency final r = await pool.request(); @@ -545,9 +545,9 @@ class TaskBackend { r.release(); // always release to avoid deadlock } }); - } + }); } - } + }); // Close the pool, and wait for all pending deletion request to complete. await pool.close(); diff --git a/pkg/fake_gcloud/lib/retry_enforcer_storage.dart b/pkg/fake_gcloud/lib/retry_enforcer_storage.dart index 85cc89f16..24eeb46fa 100644 --- a/pkg/fake_gcloud/lib/retry_enforcer_storage.dart +++ b/pkg/fake_gcloud/lib/retry_enforcer_storage.dart @@ -162,6 +162,7 @@ class _RetryEnforcerBucket implements Bucket { @override Stream list({String? prefix, String? delimiter}) { // TODO: verify retry wrapper here + _verifyRetryOnStack(); return _bucket.list( prefix: prefix, delimiter: delimiter,