diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index c46b7e79f..f88ad7dcd 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -121,7 +121,7 @@ final class ExportedApi { // Only delete the item if it's older than _minGarbageAge // This avoids any races where we delete files we've just created // TODO: Conditionally deletion API from package:gcloud would be better! - await _bucket.tryDelete(item.name); + await _bucket.tryDeleteWithRetry(item.name); } }); @@ -137,7 +137,7 @@ final class ExportedApi { item.updated.isBefore(gcFilesBefore)) { // Only delete the item if it's older than _minGarbageAge // This avoids any races where we delete files we've just created - await _bucket.tryDelete(item.name); + await _bucket.tryDeleteWithRetry(item.name); } }); } @@ -184,7 +184,7 @@ final class ExportedApi { await _listBucket( prefix: entry.name, delimiter: '', - (entry) async => await _bucket.tryDelete(entry.name), + (entry) async => await _bucket.tryDeleteWithRetry(entry.name), ); } })); @@ -336,7 +336,7 @@ final class ExportedPackage { item.updated.isBefore(clock.agoBy(_minGarbageAge))) { // Only delete if the item if it's older than _minGarbageAge // This avoids any races where we delete files we've just created - await _owner._bucket.tryDelete(item.name); + await _owner._bucket.tryDeleteWithRetry(item.name); } }); @@ -380,7 +380,7 @@ final class ExportedPackage { if (info.updated.isBefore(clock.agoBy(_minGarbageAge))) { // Only delete if the item if it's older than _minGarbageAge // This avoids any races where we delete files we've just created - await _owner._bucket.tryDelete(item.name); + await _owner._bucket.tryDeleteWithRetry(item.name); } } // Ignore cases where tryInfo fails, assuming the object has been @@ -399,7 +399,7 @@ final class ExportedPackage { await _owner._listBucket( prefix: prefix + '/api/archives/$_package-', delimiter: '', - (item) async => await _owner._bucket.tryDelete(item.name), + (item) async => await _owner._bucket.tryDeleteWithRetry(item.name), ); }), ]); @@ -442,7 +442,7 @@ sealed class ExportedObject { Future delete() async { await Future.wait(_owner._prefixes.map((prefix) async { await _owner._pool.withResource(() async { - await _owner._bucket.tryDelete(prefix + _objectName); + await _owner._bucket.tryDeleteWithRetry(prefix + _objectName); }); })); } diff --git a/app/lib/package/tarball_storage.dart b/app/lib/package/tarball_storage.dart index 7f94a00fe..b8e437178 100644 --- a/app/lib/package/tarball_storage.dart +++ b/app/lib/package/tarball_storage.dart @@ -176,8 +176,8 @@ class TarballStorage { Future deleteArchiveFromAllBuckets( String package, String version) async { final objectName = tarballObjectName(package, version); - await deleteFromBucket(_canonicalBucket, objectName); - await deleteFromBucket(_publicBucket, objectName); + await _canonicalBucket.deleteWithRetry(objectName); + await _publicBucket.deleteWithRetry(objectName); } /// Deletes the package archive file from the canonical bucket. diff --git a/app/lib/shared/storage.dart b/app/lib/shared/storage.dart index 7e01ac4da..8ceb20e22 100644 --- a/app/lib/shared/storage.dart +++ b/app/lib/shared/storage.dart @@ -21,19 +21,12 @@ import 'package:retry/retry.dart'; import 'configuration.dart'; import 'utils.dart' - show - contentType, - jsonUtf8Encoder, - retryAsync, - ByteArrayEqualsExt, - DeleteCounts; + show contentType, jsonUtf8Encoder, ByteArrayEqualsExt, DeleteCounts; import 'versions.dart' as versions; final _gzip = GZipCodec(); final _logger = Logger('shared.storage'); -const _retryStatusCodes = {502, 503, 504}; - /// Additional methods on the storage service. extension StorageExt on Storage { /// Verifies bucket existence and access. @@ -115,18 +108,19 @@ extension BucketExt on Bucket { } /// Deletes [name] if it exists, ignores 404 otherwise. - Future tryDelete(String name) async { - return await retry( + Future tryDeleteWithRetry(String name) async { + return await _retry( () async { try { - return await delete(name); + await delete(name); + return true; } on DetailedApiRequestError catch (e) { - if (e.status == 404) return null; + if (e.status == 404) { + return false; + } rethrow; } }, - maxAttempts: 3, - retryIf: _retryIf, ); } @@ -158,7 +152,7 @@ extension BucketExt on Bucket { if (maxSize != null && length != null && maxSize < length) { throw MaximumSizeExceeded(maxSize); } - return retry( + return _retry( () async { final timeout = Duration(seconds: 30); final deadline = clock.now().add(timeout); @@ -175,8 +169,6 @@ extension BucketExt on Bucket { } return builder.toBytes(); }, - maxAttempts: 3, - retryIf: _retryIf, ); } @@ -270,8 +262,17 @@ extension PageExt on Page { } } -Future _retry(Future Function() fn) async { - return await retry(fn, maxAttempts: 3, retryIf: _retryIf); +Future _retry( + Future Function() fn, { + FutureOr Function(Exception)? onRetry, +}) async { + return await retry( + fn, + maxAttempts: 3, + delayFactor: Duration(seconds: 2), + retryIf: _retryIf, + onRetry: onRetry, + ); } bool _retryIf(Exception e) { @@ -295,32 +296,6 @@ bool _retryIf(Exception e) { String bucketUri(Bucket bucket, String path) => 'gs://${bucket.bucketName}/$path'; -/// Deletes a single object from the [bucket]. -/// -/// Returns `true` if the object was deleted by this operation, `false` if it -/// didn't exist at the time of the operation. -Future deleteFromBucket(Bucket bucket, String objectName) async { - Future delete() async { - try { - await bucket.delete(objectName); - return true; - } on DetailedApiRequestError catch (e) { - if (e.status != 404) { - rethrow; - } - return false; - } - } - - return await retry( - delete, - delayFactor: Duration(seconds: 10), - maxAttempts: 3, - retryIf: (e) => - e is DetailedApiRequestError && _retryStatusCodes.contains(e.status), - ); -} - Future updateContentDispositionToAttachment( ObjectInfo info, Bucket bucket) async { if (info.metadata.contentDisposition != 'attachment') { @@ -351,23 +326,19 @@ Future deleteBucketFolderRecursively( var count = 0; Page? page; while (page == null || !page.isLast) { - page = await retry( + page = await _retry( () async { return page == null ? await bucket.pageWithRetry( prefix: folder, delimiter: '', pageSize: 100) : await page.nextWithRetry(pageSize: 100); }, - delayFactor: Duration(seconds: 10), - maxAttempts: 3, - retryIf: (e) => - e is DetailedApiRequestError && _retryStatusCodes.contains(e.status), ); final futures = []; final pool = Pool(concurrency ?? 1); for (final entry in page!.items) { final f = pool.withResource(() async { - final deleted = await deleteFromBucket(bucket, entry.name); + final deleted = await bucket.tryDeleteWithRetry(entry.name); if (deleted) count++; }); futures.add(f); @@ -382,7 +353,7 @@ Future deleteBucketFolderRecursively( Future uploadWithRetry(Bucket bucket, String objectName, int length, Stream> Function() openStream, {ObjectMetadata? metadata}) async { - await retryAsync( + await _retry( () async { final sink = bucket.write(objectName, length: length, @@ -391,9 +362,9 @@ Future uploadWithRetry(Bucket bucket, String objectName, int length, await sink.addStream(openStream()); await sink.close(); }, - description: 'Upload to $objectName', - shouldRetryOnError: _retryIf, - sleep: Duration(seconds: 10), + onRetry: (e) { + _logger.info('Upload to $objectName failed.', e, StackTrace.current); + }, ); } @@ -506,7 +477,7 @@ class VersionedJsonStorage { final age = clock.now().difference(info.updated); if (minAgeThreshold == null || age > minAgeThreshold) { deleted++; - await deleteFromBucket(_bucket, entry.name); + await _bucket.tryDeleteWithRetry(entry.name); } } }); diff --git a/app/lib/shared/utils.dart b/app/lib/shared/utils.dart index 12ef7d7fd..6990c243f 100644 --- a/app/lib/shared/utils.dart +++ b/app/lib/shared/utils.dart @@ -11,7 +11,6 @@ import 'dart:typed_data'; import 'package:appengine/appengine.dart'; import 'package:intl/intl.dart'; -import 'package:logging/logging.dart'; // ignore: implementation_imports import 'package:mime/src/default_extension_map.dart' as mime; import 'package:path/path.dart' as p; @@ -28,7 +27,6 @@ final Duration twoYears = const Duration(days: 2 * 365); /// Appengine. const _cloudTraceContextHeader = 'X-Cloud-Trace-Context'; -final Logger _logger = Logger('pub.utils'); final _random = Random.secure(); final DateFormat shortDateFormat = DateFormat.yMMMd(); @@ -171,30 +169,6 @@ List boundedList(List list, {int? offset, int? limit}) { return iterable.toList(); } -/// Executes [body] and returns with the same result. -/// When it throws an exception, it will be re-run until [maxAttempt] is reached. -Future retryAsync( - Future Function() body, { - int maxAttempt = 3, - bool Function(Exception)? shouldRetryOnError, - String description = 'Async operation', - Duration sleep = const Duration(seconds: 1), -}) async { - for (int i = 1;; i++) { - try { - return await body(); - } on Exception catch (e, st) { - _logger.info('$description failed (attempt: $i of $maxAttempt).', e, st); - if (i < maxAttempt && - (shouldRetryOnError == null || shouldRetryOnError(e))) { - await Future.delayed(sleep); - continue; - } - rethrow; - } - } -} - /// Returns a UUID in v4 format as a `String`. /// /// If [bytes] is provided, it must be length 16 and have values between `0` and