From 2a9ce61c829db378a1c5eb72089b04e497fc917d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Sharma?= Date: Sun, 8 Sep 2019 13:38:44 -0700 Subject: [PATCH] [Client SDK] Add catalog resource abstractions (#323) --- src/BaGet.Protocol/BaGet.Protocol.csproj | 1 + .../Catalog/CatalogProcessor.cs | 199 ++++++++++++++++++ .../Catalog/CatalogProcessorOptions.cs | 32 +++ src/BaGet.Protocol/Catalog/FileCursor.cs | 58 +++++ .../Catalog/ICatalogLeafProcessor.cs | 37 ++++ src/BaGet.Protocol/Catalog/ICursor.cs | 30 +++ src/BaGet.Protocol/Catalog/NullCursor.cs | 24 +++ .../Extensions/HttpClientExtensions.cs | 4 +- 8 files changed, 383 insertions(+), 2 deletions(-) create mode 100644 src/BaGet.Protocol/Catalog/CatalogProcessor.cs create mode 100644 src/BaGet.Protocol/Catalog/CatalogProcessorOptions.cs create mode 100644 src/BaGet.Protocol/Catalog/FileCursor.cs create mode 100644 src/BaGet.Protocol/Catalog/ICatalogLeafProcessor.cs create mode 100644 src/BaGet.Protocol/Catalog/ICursor.cs create mode 100644 src/BaGet.Protocol/Catalog/NullCursor.cs diff --git a/src/BaGet.Protocol/BaGet.Protocol.csproj b/src/BaGet.Protocol/BaGet.Protocol.csproj index ac3dc3af..7bdaaaa5 100644 --- a/src/BaGet.Protocol/BaGet.Protocol.csproj +++ b/src/BaGet.Protocol/BaGet.Protocol.csproj @@ -8,6 +8,7 @@ + diff --git a/src/BaGet.Protocol/Catalog/CatalogProcessor.cs b/src/BaGet.Protocol/Catalog/CatalogProcessor.cs new file mode 100644 index 00000000..c286779b --- /dev/null +++ b/src/BaGet.Protocol/Catalog/CatalogProcessor.cs @@ -0,0 +1,199 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace BaGet.Protocol.Catalog +{ + /// + /// Processes catalog leafs in chronological order. + /// See: https://docs.microsoft.com/en-us/nuget/api/catalog-resource + /// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/CatalogProcessor.cs + /// + public class CatalogProcessor + { + private readonly ICatalogLeafProcessor _leafProcessor; + private readonly ICatalogResource _client; + private readonly ICursor _cursor; + private readonly CatalogProcessorOptions _options; + private readonly ILogger _logger; + + /// + /// Create a processor to discover and download catalog leafs. Leafs are processed + /// by the . + /// + /// Cursor to track succesfully processed leafs. Leafs before the cursor are skipped. + /// The client to interact with the catalog resource. + /// The leaf processor. + /// The options to configure catalog processing. + /// The logger used for telemetry. + public CatalogProcessor( + ICursor cursor, + ICatalogResource client, + ICatalogLeafProcessor leafProcessor, + CatalogProcessorOptions options, + ILogger logger) + { + _leafProcessor = leafProcessor ?? throw new ArgumentNullException(nameof(leafProcessor)); + _client = client ?? throw new ArgumentNullException(nameof(client)); + _cursor = cursor ?? throw new ArgumentNullException(nameof(cursor)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + /// Discovers and downloads all of the catalog leafs after the current cursor value and before the maximum + /// commit timestamp found in the settings. Each catalog leaf is passed to the catalog leaf processor in + /// chronological order. After a commit is completed, its commit timestamp is written to the cursor, i.e. when + /// transitioning from commit timestamp A to B, A is written to the cursor so that it never is processed again. + /// + /// A token to cancel the task. + /// True if all of the catalog leaves found were processed successfully. + public async Task ProcessAsync(CancellationToken cancellationToken = default) + { + var minCommitTimestamp = await GetMinCommitTimestamp(cancellationToken); + _logger.LogInformation( + "Using time bounds {min:O} (exclusive) to {max:O} (inclusive).", + minCommitTimestamp, + _options.MaxCommitTimestamp); + + return await ProcessIndexAsync(minCommitTimestamp, cancellationToken); + } + + private async Task ProcessIndexAsync(DateTimeOffset minCommitTimestamp, CancellationToken cancellationToken) + { + var index = await _client.GetIndexAsync(cancellationToken); + + var pageItems = index.GetPagesInBounds( + minCommitTimestamp, + _options.MaxCommitTimestamp); + _logger.LogInformation( + "{pages} pages were in the time bounds, out of {totalPages}.", + pageItems.Count, + index.Items.Count); + + var success = true; + for (var i = 0; i < pageItems.Count; i++) + { + success = await ProcessPageAsync(minCommitTimestamp, pageItems[i], cancellationToken); + if (!success) + { + _logger.LogWarning( + "{unprocessedPages} out of {pages} pages were left incomplete due to a processing failure.", + pageItems.Count - i, + pageItems.Count); + break; + } + } + + return success; + } + + private async Task ProcessPageAsync( + DateTimeOffset minCommitTimestamp, + CatalogPageItem pageItem, + CancellationToken cancellationToken) + { + var page = await _client.GetPageAsync(pageItem.Url, cancellationToken); + + var leafItems = page.GetLeavesInBounds( + minCommitTimestamp, + _options.MaxCommitTimestamp, + _options.ExcludeRedundantLeaves); + _logger.LogInformation( + "On page {page}, {leaves} out of {totalLeaves} were in the time bounds.", + pageItem.Url, + leafItems.Count, + page.Items.Count); + + DateTimeOffset? newCursor = null; + var success = true; + for (var i = 0; i < leafItems.Count; i++) + { + var leafItem = leafItems[i]; + + if (newCursor.HasValue && newCursor.Value != leafItem.CommitTimestamp) + { + await _cursor.SetAsync(newCursor.Value, cancellationToken); + } + + newCursor = leafItem.CommitTimestamp; + + success = await ProcessLeafAsync(leafItem, cancellationToken); + if (!success) + { + _logger.LogWarning( + "{unprocessedLeaves} out of {leaves} leaves were left incomplete due to a processing failure.", + leafItems.Count - i, + leafItems.Count); + break; + } + } + + if (newCursor.HasValue && success) + { + await _cursor.SetAsync(newCursor.Value); + } + + return success; + } + + private async Task ProcessLeafAsync(CatalogLeafItem leafItem, CancellationToken cancellationToken) + { + bool success; + try + { + switch (leafItem.Type) + { + case CatalogLeafType.PackageDelete: + var packageDelete = await _client.GetPackageDeleteLeafAsync(leafItem.Url); + success = await _leafProcessor.ProcessPackageDeleteAsync(packageDelete, cancellationToken); + break; + case CatalogLeafType.PackageDetails: + var packageDetails = await _client.GetPackageDetailsLeafAsync(leafItem.Url); + success = await _leafProcessor.ProcessPackageDetailsAsync(packageDetails, cancellationToken); + break; + default: + throw new NotSupportedException($"The catalog leaf type '{leafItem.Type}' is not supported."); + } + } + catch (Exception exception) + { + _logger.LogError( + 0, + exception, + "An exception was thrown while processing leaf {leafUrl}.", + leafItem.Url); + success = false; + } + + if (!success) + { + _logger.LogWarning( + "Failed to process leaf {leafUrl} ({packageId} {packageVersion}, {leafType}).", + leafItem.Url, + leafItem.PackageId, + leafItem.PackageVersion, + leafItem.Type); + } + + return success; + } + + private async Task GetMinCommitTimestamp(CancellationToken cancellationToken) + { + var minCommitTimestamp = await _cursor.GetAsync(cancellationToken); + + minCommitTimestamp = minCommitTimestamp + ?? _options.DefaultMinCommitTimestamp + ?? _options.MinCommitTimestamp; + + if (minCommitTimestamp.Value < _options.MinCommitTimestamp) + { + minCommitTimestamp = _options.MinCommitTimestamp; + } + + return minCommitTimestamp.Value; + } + } +} diff --git a/src/BaGet.Protocol/Catalog/CatalogProcessorOptions.cs b/src/BaGet.Protocol/Catalog/CatalogProcessorOptions.cs new file mode 100644 index 00000000..99cd32a3 --- /dev/null +++ b/src/BaGet.Protocol/Catalog/CatalogProcessorOptions.cs @@ -0,0 +1,32 @@ +using System; + +namespace BaGet.Protocol.Catalog +{ + /// + /// The options to configure . + /// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/CatalogProcessorSettings.cs + /// + public class CatalogProcessorOptions + { + /// + /// The minimum commit timestamp to use when no cursor value has been saved. + /// + public DateTimeOffset? DefaultMinCommitTimestamp { get; set; } + + /// + /// The absolute minimum (exclusive) commit timestamp to process in the catalog. + /// + public DateTimeOffset MinCommitTimestamp { get; set; } + + /// + /// The absolute maximum (inclusive) commit timestamp to process in the catalog. + /// + public DateTimeOffset MaxCommitTimestamp { get; set; } + + /// + /// If multiple catalog leaves are found in a page concerning the same package ID and version, only the latest + /// is processed. + /// + public bool ExcludeRedundantLeaves { get; set; } + } +} diff --git a/src/BaGet.Protocol/Catalog/FileCursor.cs b/src/BaGet.Protocol/Catalog/FileCursor.cs new file mode 100644 index 00000000..b1b6a9dd --- /dev/null +++ b/src/BaGet.Protocol/Catalog/FileCursor.cs @@ -0,0 +1,58 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace BaGet.Protocol.Catalog +{ + /// + /// A cursor implementation which stores the cursor in local file. The cursor value is written to the file as + /// a JSON object. + /// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/FileCursor.cs + /// + public class FileCursor : ICursor + { + private static readonly JsonSerializerSettings Settings = HttpClientExtensions.JsonSettings; + + private readonly string _path; + private readonly ILogger _logger; + + public FileCursor(string path, ILogger logger) + { + _path = path ?? throw new ArgumentNullException(nameof(path)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task GetAsync(CancellationToken cancellationToken) + { + try + { + var jsonString = File.ReadAllText(_path); + var data = JsonConvert.DeserializeObject(jsonString, Settings); + _logger.LogDebug("Read cursor value {cursor:O} from {path}.", data.Value, _path); + return Task.FromResult(data.Value); + } + catch (Exception e) when (e is FileNotFoundException || e is JsonException) + { + return Task.FromResult(null); + } + } + + public Task SetAsync(DateTimeOffset value, CancellationToken cancellationToken) + { + var data = new Data { Value = value }; + var jsonString = JsonConvert.SerializeObject(data); + File.WriteAllText(_path, jsonString); + _logger.LogDebug("Wrote cursor value {cursor:O} to {path}.", data.Value, _path); + return Task.CompletedTask; + } + + private class Data + { + [JsonProperty("value")] + public DateTimeOffset Value { get; set; } + } + } +} diff --git a/src/BaGet.Protocol/Catalog/ICatalogLeafProcessor.cs b/src/BaGet.Protocol/Catalog/ICatalogLeafProcessor.cs new file mode 100644 index 00000000..1b26f264 --- /dev/null +++ b/src/BaGet.Protocol/Catalog/ICatalogLeafProcessor.cs @@ -0,0 +1,37 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace BaGet.Protocol.Catalog +{ + /// + /// An interface which allows custom processing of catalog leaves. This interface should be implemented when the + /// catalog leaf documents need to be downloaded and processed in chronological order. + /// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/master/src/NuGet.Protocol.Catalog/ICatalogLeafProcessor.cs + /// + public interface ICatalogLeafProcessor + { + /// + /// Process a catalog leaf containing package details. This method should return false or throw an exception + /// if the catalog leaf cannot be processed. In this case, the will stop + /// processing items. Note that the same package ID/version combination can be passed to this multiple times, + /// for example due to an edit in the package metadata or due to a transient error and retry on the part of the + /// . + /// + /// The leaf document. + /// A token to cancel the task. + /// True, if the leaf was successfully processed. False, otherwise. + Task ProcessPackageDetailsAsync(PackageDetailsCatalogLeaf leaf, CancellationToken cancellationToken = default); + + /// + /// Process a catalog leaf containing a package delete. This method should return false or throw an exception + /// if the catalog leaf cannot be processed. In this case, the will stop + /// processing items. Note that the same package ID/version combination can be passed to this multiple times, + /// for example due to a package being deleted again due to a transient error and retry on the part of the + /// . + /// + /// The leaf document. + /// A token to cancel the task. + /// True, if the leaf was successfully processed. False, otherwise. + Task ProcessPackageDeleteAsync(PackageDeleteCatalogLeaf leaf, CancellationToken cancellationToken = default); + } +} diff --git a/src/BaGet.Protocol/Catalog/ICursor.cs b/src/BaGet.Protocol/Catalog/ICursor.cs new file mode 100644 index 00000000..a564f8c5 --- /dev/null +++ b/src/BaGet.Protocol/Catalog/ICursor.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace BaGet.Protocol.Catalog +{ + /// + /// The NuGet Catalog resource is an append-only data structure indexed by time. + /// The tracks up to what point in the catalog has been successfully + /// processed. The value is a catalog commit timestamp. + /// See: https://docs.microsoft.com/en-us/nuget/api/catalog-resource#cursor + /// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/ICursor.cs + /// + public interface ICursor + { + /// + /// Get the value of the cursor. + /// + /// A token to cancel the task. + /// The cursor value. Null if the cursor has no value yet. + Task GetAsync(CancellationToken cancellationToken = default); + + /// + /// Set the value of the cursor. + /// + /// The new cursor value. + /// A token to cancel the task. + Task SetAsync(DateTimeOffset value, CancellationToken cancellationToken = default); + } +} diff --git a/src/BaGet.Protocol/Catalog/NullCursor.cs b/src/BaGet.Protocol/Catalog/NullCursor.cs new file mode 100644 index 00000000..7fbc9428 --- /dev/null +++ b/src/BaGet.Protocol/Catalog/NullCursor.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace BaGet.Protocol.Catalog +{ + /// + /// A cursor that does not persist any state. Use this with a + /// to process all leafs each time + /// is called. + /// + public class NullCursor : ICursor + { + public Task GetAsync(CancellationToken cancellationToken = default) + { + return null; + } + + public Task SetAsync(DateTimeOffset value, CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } + } +} diff --git a/src/BaGet.Protocol/Extensions/HttpClientExtensions.cs b/src/BaGet.Protocol/Extensions/HttpClientExtensions.cs index 0a0c7d73..805456cc 100644 --- a/src/BaGet.Protocol/Extensions/HttpClientExtensions.cs +++ b/src/BaGet.Protocol/Extensions/HttpClientExtensions.cs @@ -9,9 +9,9 @@ namespace BaGet.Protocol { internal static class HttpClientExtensions { - private static readonly JsonSerializer Serializer = JsonSerializer.Create(Settings); + internal static readonly JsonSerializer Serializer = JsonSerializer.Create(JsonSettings); - private static readonly JsonSerializerSettings Settings = new JsonSerializerSettings + internal static readonly JsonSerializerSettings JsonSettings = new JsonSerializerSettings { DateTimeZoneHandling = DateTimeZoneHandling.Utc, DateParseHandling = DateParseHandling.DateTimeOffset,