Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry streaming storage reads with callback method. #8423

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions app/lib/package/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,8 @@ class PackageBackend {
throw PackageRejectedException.archiveTooLarge(
UploadSignerService.maxUploadSize);
}
await _saveTarballToFS(
_incomingBucket.read(tmpObjectName(guid)), filename);
await _incomingBucket.readWithRetry(
tmpObjectName(guid), (input) => _saveTarballToFS(input, filename));
_logger.info('Examining tarball content ($guid).');
final sw = Stopwatch()..start();
final file = File(filename);
Expand Down Expand Up @@ -1756,6 +1756,11 @@ class InviteStatus {
Future _saveTarballToFS(Stream<List<int>> data, String filename) async {
final sw = Stopwatch()..start();
final targetFile = File(filename);

// cleanup the leftover if previous attempt failed
if (await targetFile.exists()) {
await targetFile.delete();
}
try {
int receivedBytes = 0;
final stream = data.transform<List<int>>(
Expand Down
24 changes: 13 additions & 11 deletions app/lib/package/tarball_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,20 @@ class TarballStorage {
final raf = await file.open();
var remainingLength = info.length;
try {
await for (final chunk in _canonicalBucket.read(objectName)) {
if (chunk.isEmpty) continue;
remainingLength -= chunk.length;
if (remainingLength < 0) {
return ContentMatchStatus.different;
await _canonicalBucket.readWithRetry(objectName, (input) async {
await for (final chunk in input) {
if (chunk.isEmpty) continue;
remainingLength -= chunk.length;
if (remainingLength < 0) {
return ContentMatchStatus.different;
}
// TODO: consider rewriting to fixed-length chunk comparison
final fileChunk = await raf.read(chunk.length);
if (!fileChunk.byteToByteEquals(chunk)) {
return ContentMatchStatus.different;
}
}
// TODO: consider rewriting to fixed-length chunk comparison
final fileChunk = await raf.read(chunk.length);
if (!fileChunk.byteToByteEquals(chunk)) {
return ContentMatchStatus.different;
}
}
});
} finally {
await raf.close();
}
Expand Down
11 changes: 7 additions & 4 deletions app/lib/service/download_counts/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ class DownloadCountsBackend {
}
final data = (await storageService
.bucket(activeConfiguration.reportsBucketName!)
.read(downloadCounts30DaysTotalsFileName)
.transform(utf8.decoder)
.transform(json.decoder)
.single as Map<String, dynamic>)
.readWithRetry(
downloadCounts30DaysTotalsFileName,
(input) async => await input
.transform(utf8.decoder)
.transform(json.decoder)
.single as Map<String, dynamic>,
))
.cast<String, int>();
_lastData = (data: data, etag: info.etag);
return data;
Expand Down
22 changes: 16 additions & 6 deletions app/lib/shared/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ extension BucketExt on Bucket {
);
}

/// Read object content as byte stream using the callback function to receive data chunks.
///
/// When network error occurs, the entire stream is restarted and [fn] is called again.
Future<T> readWithRetry<T>(
String objectName, Future<T> Function(Stream<List<int>> input) fn) async {
return await _retry(() async => fn(read(objectName)));
}

/// The HTTP URL of a publicly accessable GCS object.
String objectUrl(String objectName) {
return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName';
Expand Down Expand Up @@ -399,12 +407,14 @@ class VersionedJsonStorage {
}
final objectName = _objectName(version);
_logger.info('Loading snapshot: $objectName');
final map = await _bucket
.read(objectName)
.transform(_gzip.decoder)
.transform(utf8.decoder)
.transform(json.decoder)
.single;
final map = await _bucket.readWithRetry(
objectName,
(input) => input
.transform(_gzip.decoder)
.transform(utf8.decoder)
.transform(json.decoder)
.single,
);
return map as Map<String, dynamic>;
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/fake_gcloud/lib/retry_enforcer_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class _RetryEnforcerBucket implements Bucket {

@override
Stream<List<int>> read(String objectName, {int? offset, int? length}) {
// TODO: verify retry wrapper here
_verifyRetryOnStack();
return _bucket.read(objectName, offset: offset, length: length);
}

Expand Down
Loading