Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated retry handling in storage access. #8499

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ final class ExportedApi {
// Only delete the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
// TODO: Conditionally deletion API from package:gcloud would be better!
await _bucket.tryDelete(item.name);
await _bucket.tryDeleteWithRetry(item.name);
}
});

Expand All @@ -137,7 +137,7 @@ final class ExportedApi {
item.updated.isBefore(gcFilesBefore)) {
// Only delete the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
await _bucket.tryDelete(item.name);
await _bucket.tryDeleteWithRetry(item.name);
}
});
}
Expand Down Expand Up @@ -184,7 +184,7 @@ final class ExportedApi {
await _listBucket(
prefix: entry.name,
delimiter: '',
(entry) async => await _bucket.tryDelete(entry.name),
(entry) async => await _bucket.tryDeleteWithRetry(entry.name),
);
}
}));
Expand Down Expand Up @@ -336,7 +336,7 @@ final class ExportedPackage {
item.updated.isBefore(clock.agoBy(_minGarbageAge))) {
// Only delete if the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
await _owner._bucket.tryDelete(item.name);
await _owner._bucket.tryDeleteWithRetry(item.name);
}
});

Expand Down Expand Up @@ -380,7 +380,7 @@ final class ExportedPackage {
if (info.updated.isBefore(clock.agoBy(_minGarbageAge))) {
// Only delete if the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
await _owner._bucket.tryDelete(item.name);
await _owner._bucket.tryDeleteWithRetry(item.name);
}
}
// Ignore cases where tryInfo fails, assuming the object has been
Expand All @@ -399,7 +399,7 @@ final class ExportedPackage {
await _owner._listBucket(
prefix: prefix + '/api/archives/$_package-',
delimiter: '',
(item) async => await _owner._bucket.tryDelete(item.name),
(item) async => await _owner._bucket.tryDeleteWithRetry(item.name),
);
}),
]);
Expand Down Expand Up @@ -442,7 +442,7 @@ sealed class ExportedObject {
Future<void> delete() async {
await Future.wait(_owner._prefixes.map((prefix) async {
await _owner._pool.withResource(() async {
await _owner._bucket.tryDelete(prefix + _objectName);
await _owner._bucket.tryDeleteWithRetry(prefix + _objectName);
});
}));
}
Expand Down
4 changes: 2 additions & 2 deletions app/lib/package/tarball_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ class TarballStorage {
Future<void> deleteArchiveFromAllBuckets(
String package, String version) async {
final objectName = tarballObjectName(package, version);
await deleteFromBucket(_canonicalBucket, objectName);
await deleteFromBucket(_publicBucket, objectName);
await _canonicalBucket.deleteWithRetry(objectName);
await _publicBucket.deleteWithRetry(objectName);
}

/// Deletes the package archive file from the canonical bucket.
Expand Down
83 changes: 27 additions & 56 deletions app/lib/shared/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,12 @@ import 'package:retry/retry.dart';

import 'configuration.dart';
import 'utils.dart'
show
contentType,
jsonUtf8Encoder,
retryAsync,
ByteArrayEqualsExt,
DeleteCounts;
show contentType, jsonUtf8Encoder, ByteArrayEqualsExt, DeleteCounts;
import 'versions.dart' as versions;

final _gzip = GZipCodec();
final _logger = Logger('shared.storage');

const _retryStatusCodes = <int>{502, 503, 504};

/// Additional methods on the storage service.
extension StorageExt on Storage {
/// Verifies bucket existence and access.
Expand Down Expand Up @@ -115,18 +108,19 @@ extension BucketExt on Bucket {
}

/// Deletes [name] if it exists, ignores 404 otherwise.
Future<void> tryDelete(String name) async {
return await retry(
Future<bool> tryDeleteWithRetry(String name) async {
return await _retry(
() async {
try {
return await delete(name);
await delete(name);
return true;
} on DetailedApiRequestError catch (e) {
if (e.status == 404) return null;
if (e.status == 404) {
return false;
}
rethrow;
}
},
maxAttempts: 3,
retryIf: _retryIf,
);
}

Expand Down Expand Up @@ -158,7 +152,7 @@ extension BucketExt on Bucket {
if (maxSize != null && length != null && maxSize < length) {
throw MaximumSizeExceeded(maxSize);
}
return retry(
return _retry(
() async {
final timeout = Duration(seconds: 30);
final deadline = clock.now().add(timeout);
Expand All @@ -175,8 +169,6 @@ extension BucketExt on Bucket {
}
return builder.toBytes();
},
maxAttempts: 3,
retryIf: _retryIf,
);
}

Expand Down Expand Up @@ -270,8 +262,17 @@ extension PageExt<T> on Page<T> {
}
}

Future<R> _retry<R>(Future<R> Function() fn) async {
return await retry(fn, maxAttempts: 3, retryIf: _retryIf);
Future<R> _retry<R>(
Future<R> Function() fn, {
FutureOr<void> Function(Exception)? onRetry,
}) async {
return await retry(
fn,
maxAttempts: 3,
delayFactor: Duration(seconds: 2),
retryIf: _retryIf,
onRetry: onRetry,
);
}

bool _retryIf(Exception e) {
Expand All @@ -295,32 +296,6 @@ bool _retryIf(Exception e) {
String bucketUri(Bucket bucket, String path) =>
'gs://${bucket.bucketName}/$path';

/// Deletes a single object from the [bucket].
///
/// Returns `true` if the object was deleted by this operation, `false` if it
/// didn't exist at the time of the operation.
Future<bool> deleteFromBucket(Bucket bucket, String objectName) async {
Future<bool> delete() async {
try {
await bucket.delete(objectName);
return true;
} on DetailedApiRequestError catch (e) {
if (e.status != 404) {
rethrow;
}
return false;
}
}

return await retry(
delete,
delayFactor: Duration(seconds: 10),
maxAttempts: 3,
retryIf: (e) =>
e is DetailedApiRequestError && _retryStatusCodes.contains(e.status),
);
}

Future<void> updateContentDispositionToAttachment(
ObjectInfo info, Bucket bucket) async {
if (info.metadata.contentDisposition != 'attachment') {
Expand Down Expand Up @@ -351,23 +326,19 @@ Future<int> deleteBucketFolderRecursively(
var count = 0;
Page<BucketEntry>? page;
while (page == null || !page.isLast) {
page = await retry(
page = await _retry(
() async {
return page == null
? await bucket.pageWithRetry(
prefix: folder, delimiter: '', pageSize: 100)
: await page.nextWithRetry(pageSize: 100);
},
delayFactor: Duration(seconds: 10),
maxAttempts: 3,
retryIf: (e) =>
e is DetailedApiRequestError && _retryStatusCodes.contains(e.status),
);
final futures = <Future>[];
final pool = Pool(concurrency ?? 1);
for (final entry in page!.items) {
final f = pool.withResource(() async {
final deleted = await deleteFromBucket(bucket, entry.name);
final deleted = await bucket.tryDeleteWithRetry(entry.name);
if (deleted) count++;
});
futures.add(f);
Expand All @@ -382,7 +353,7 @@ Future<int> deleteBucketFolderRecursively(
Future uploadWithRetry(Bucket bucket, String objectName, int length,
Stream<List<int>> Function() openStream,
{ObjectMetadata? metadata}) async {
await retryAsync(
await _retry(
() async {
final sink = bucket.write(objectName,
length: length,
Expand All @@ -391,9 +362,9 @@ Future uploadWithRetry(Bucket bucket, String objectName, int length,
await sink.addStream(openStream());
await sink.close();
},
description: 'Upload to $objectName',
shouldRetryOnError: _retryIf,
sleep: Duration(seconds: 10),
onRetry: (e) {
_logger.info('Upload to $objectName failed.', e, StackTrace.current);
},
);
}

Expand Down Expand Up @@ -506,7 +477,7 @@ class VersionedJsonStorage {
final age = clock.now().difference(info.updated);
if (minAgeThreshold == null || age > minAgeThreshold) {
deleted++;
await deleteFromBucket(_bucket, entry.name);
await _bucket.tryDeleteWithRetry(entry.name);
}
}
});
Expand Down
26 changes: 0 additions & 26 deletions app/lib/shared/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import 'dart:typed_data';

import 'package:appengine/appengine.dart';
import 'package:intl/intl.dart';
import 'package:logging/logging.dart';
// ignore: implementation_imports
import 'package:mime/src/default_extension_map.dart' as mime;
import 'package:path/path.dart' as p;
Expand All @@ -28,7 +27,6 @@ final Duration twoYears = const Duration(days: 2 * 365);
/// Appengine.
const _cloudTraceContextHeader = 'X-Cloud-Trace-Context';

final Logger _logger = Logger('pub.utils');
final _random = Random.secure();

final DateFormat shortDateFormat = DateFormat.yMMMd();
Expand Down Expand Up @@ -171,30 +169,6 @@ List<T> boundedList<T>(List<T> list, {int? offset, int? limit}) {
return iterable.toList();
}

/// Executes [body] and returns with the same result.
/// When it throws an exception, it will be re-run until [maxAttempt] is reached.
Future<R> retryAsync<R>(
Future<R> Function() body, {
int maxAttempt = 3,
bool Function(Exception)? shouldRetryOnError,
String description = 'Async operation',
Duration sleep = const Duration(seconds: 1),
}) async {
for (int i = 1;; i++) {
try {
return await body();
} on Exception catch (e, st) {
_logger.info('$description failed (attempt: $i of $maxAttempt).', e, st);
if (i < maxAttempt &&
(shouldRetryOnError == null || shouldRetryOnError(e))) {
await Future.delayed(sleep);
continue;
}
rethrow;
}
}
}

/// Returns a UUID in v4 format as a `String`.
///
/// If [bytes] is provided, it must be length 16 and have values between `0` and
Expand Down
Loading