From dfad9d314d7048f5a6045d2514fb022116eb11be Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Tue, 17 Dec 2024 18:19:01 +0100 Subject: [PATCH] Retry streaming storage reads with callback method. --- app/lib/package/backend.dart | 9 +++++-- app/lib/package/tarball_storage.dart | 24 ++++++++++--------- app/lib/service/download_counts/backend.dart | 11 +++++---- app/lib/shared/storage.dart | 22 ++++++++++++----- .../lib/retry_enforcer_storage.dart | 2 +- 5 files changed, 44 insertions(+), 24 deletions(-) diff --git a/app/lib/package/backend.dart b/app/lib/package/backend.dart index f328a4aba..92fc67a18 100644 --- a/app/lib/package/backend.dart +++ b/app/lib/package/backend.dart @@ -902,8 +902,8 @@ class PackageBackend { throw PackageRejectedException.archiveTooLarge( UploadSignerService.maxUploadSize); } - await _saveTarballToFS( - _incomingBucket.read(tmpObjectName(guid)), filename); + await _incomingBucket.readWithRetry( + tmpObjectName(guid), (input) => _saveTarballToFS(input, filename)); _logger.info('Examining tarball content ($guid).'); final sw = Stopwatch()..start(); final file = File(filename); @@ -1756,6 +1756,11 @@ class InviteStatus { Future _saveTarballToFS(Stream> data, String filename) async { final sw = Stopwatch()..start(); final targetFile = File(filename); + + // cleanup the leftover if previous attempt failed + if (await targetFile.exists()) { + await targetFile.delete(); + } try { int receivedBytes = 0; final stream = data.transform>( diff --git a/app/lib/package/tarball_storage.dart b/app/lib/package/tarball_storage.dart index e7750b2a9..0fe99fb3f 100644 --- a/app/lib/package/tarball_storage.dart +++ b/app/lib/package/tarball_storage.dart @@ -120,18 +120,20 @@ class TarballStorage { final raf = await file.open(); var remainingLength = info.length; try { - await for (final chunk in _canonicalBucket.read(objectName)) { - if (chunk.isEmpty) continue; - remainingLength -= chunk.length; - if (remainingLength < 0) { - return ContentMatchStatus.different; + await _canonicalBucket.readWithRetry(objectName, (input) async { + await for (final chunk in input) { + if (chunk.isEmpty) continue; + remainingLength -= chunk.length; + if (remainingLength < 0) { + return ContentMatchStatus.different; + } + // TODO: consider rewriting to fixed-length chunk comparison + final fileChunk = await raf.read(chunk.length); + if (!fileChunk.byteToByteEquals(chunk)) { + return ContentMatchStatus.different; + } } - // TODO: consider rewriting to fixed-length chunk comparison - final fileChunk = await raf.read(chunk.length); - if (!fileChunk.byteToByteEquals(chunk)) { - return ContentMatchStatus.different; - } - } + }); } finally { await raf.close(); } diff --git a/app/lib/service/download_counts/backend.dart b/app/lib/service/download_counts/backend.dart index e891e0891..0e73c037c 100644 --- a/app/lib/service/download_counts/backend.dart +++ b/app/lib/service/download_counts/backend.dart @@ -50,10 +50,13 @@ class DownloadCountsBackend { } final data = (await storageService .bucket(activeConfiguration.reportsBucketName!) - .read(downloadCounts30DaysTotalsFileName) - .transform(utf8.decoder) - .transform(json.decoder) - .single as Map) + .readWithRetry( + downloadCounts30DaysTotalsFileName, + (input) async => await input + .transform(utf8.decoder) + .transform(json.decoder) + .single as Map, + )) .cast(); _lastData = (data: data, etag: info.etag); return data; diff --git a/app/lib/shared/storage.dart b/app/lib/shared/storage.dart index 412af1553..65304564c 100644 --- a/app/lib/shared/storage.dart +++ b/app/lib/shared/storage.dart @@ -180,6 +180,14 @@ extension BucketExt on Bucket { ); } + /// Read object content as byte stream using the callback function to receive data chunks. + /// + /// When network error occurs, the entire stream is restarted and [fn] is called again. + Future readWithRetry( + String objectName, Future Function(Stream> input) fn) async { + return await _retry(() async => fn(read(objectName))); + } + /// The HTTP URL of a publicly accessable GCS object. String objectUrl(String objectName) { return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName'; @@ -399,12 +407,14 @@ class VersionedJsonStorage { } final objectName = _objectName(version); _logger.info('Loading snapshot: $objectName'); - final map = await _bucket - .read(objectName) - .transform(_gzip.decoder) - .transform(utf8.decoder) - .transform(json.decoder) - .single; + final map = await _bucket.readWithRetry( + objectName, + (input) => input + .transform(_gzip.decoder) + .transform(utf8.decoder) + .transform(json.decoder) + .single, + ); return map as Map; } diff --git a/pkg/fake_gcloud/lib/retry_enforcer_storage.dart b/pkg/fake_gcloud/lib/retry_enforcer_storage.dart index 3b71e51c1..7a8fa72bf 100644 --- a/pkg/fake_gcloud/lib/retry_enforcer_storage.dart +++ b/pkg/fake_gcloud/lib/retry_enforcer_storage.dart @@ -164,7 +164,7 @@ class _RetryEnforcerBucket implements Bucket { @override Stream> read(String objectName, {int? offset, int? length}) { - // TODO: verify retry wrapper here + _verifyRetryOnStack(); return _bucket.read(objectName, offset: offset, length: length); }