Skip to content

Commit

Permalink
[Client SDK] Add catalog resource abstractions (loic-sharma#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
loic-sharma authored Sep 8, 2019
1 parent eea6a82 commit 2a9ce61
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/BaGet.Protocol/BaGet.Protocol.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsPackageVersion)" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="NuGet.Protocol" Version="$(NuGetPackageVersion)" />
<PackageReference Include="System.Text.Encodings.Web" Version="4.5.0" />
Expand Down
199 changes: 199 additions & 0 deletions src/BaGet.Protocol/Catalog/CatalogProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace BaGet.Protocol.Catalog
{
/// <summary>
/// Processes catalog leafs in chronological order.
/// See: https://docs.microsoft.com/en-us/nuget/api/catalog-resource
/// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/CatalogProcessor.cs
/// </summary>
public class CatalogProcessor
{
private readonly ICatalogLeafProcessor _leafProcessor;
private readonly ICatalogResource _client;
private readonly ICursor _cursor;
private readonly CatalogProcessorOptions _options;
private readonly ILogger<CatalogProcessor> _logger;

/// <summary>
/// Create a processor to discover and download catalog leafs. Leafs are processed
/// by the <see cref="ICatalogLeafProcessor"/>.
/// </summary>
/// <param name="cursor">Cursor to track succesfully processed leafs. Leafs before the cursor are skipped.</param>
/// <param name="client">The client to interact with the catalog resource.</param>
/// <param name="leafProcessor">The leaf processor.</param>
/// <param name="options">The options to configure catalog processing.</param>
/// <param name="logger">The logger used for telemetry.</param>
public CatalogProcessor(
ICursor cursor,
ICatalogResource client,
ICatalogLeafProcessor leafProcessor,
CatalogProcessorOptions options,
ILogger<CatalogProcessor> logger)
{
_leafProcessor = leafProcessor ?? throw new ArgumentNullException(nameof(leafProcessor));
_client = client ?? throw new ArgumentNullException(nameof(client));
_cursor = cursor ?? throw new ArgumentNullException(nameof(cursor));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

/// <summary>
/// Discovers and downloads all of the catalog leafs after the current cursor value and before the maximum
/// commit timestamp found in the settings. Each catalog leaf is passed to the catalog leaf processor in
/// chronological order. After a commit is completed, its commit timestamp is written to the cursor, i.e. when
/// transitioning from commit timestamp A to B, A is written to the cursor so that it never is processed again.
/// </summary>
/// <param name="cancellationToken">A token to cancel the task.</param>
/// <returns>True if all of the catalog leaves found were processed successfully.</returns>
public async Task<bool> ProcessAsync(CancellationToken cancellationToken = default)
{
var minCommitTimestamp = await GetMinCommitTimestamp(cancellationToken);
_logger.LogInformation(
"Using time bounds {min:O} (exclusive) to {max:O} (inclusive).",
minCommitTimestamp,
_options.MaxCommitTimestamp);

return await ProcessIndexAsync(minCommitTimestamp, cancellationToken);
}

private async Task<bool> ProcessIndexAsync(DateTimeOffset minCommitTimestamp, CancellationToken cancellationToken)
{
var index = await _client.GetIndexAsync(cancellationToken);

var pageItems = index.GetPagesInBounds(
minCommitTimestamp,
_options.MaxCommitTimestamp);
_logger.LogInformation(
"{pages} pages were in the time bounds, out of {totalPages}.",
pageItems.Count,
index.Items.Count);

var success = true;
for (var i = 0; i < pageItems.Count; i++)
{
success = await ProcessPageAsync(minCommitTimestamp, pageItems[i], cancellationToken);
if (!success)
{
_logger.LogWarning(
"{unprocessedPages} out of {pages} pages were left incomplete due to a processing failure.",
pageItems.Count - i,
pageItems.Count);
break;
}
}

return success;
}

private async Task<bool> ProcessPageAsync(
DateTimeOffset minCommitTimestamp,
CatalogPageItem pageItem,
CancellationToken cancellationToken)
{
var page = await _client.GetPageAsync(pageItem.Url, cancellationToken);

var leafItems = page.GetLeavesInBounds(
minCommitTimestamp,
_options.MaxCommitTimestamp,
_options.ExcludeRedundantLeaves);
_logger.LogInformation(
"On page {page}, {leaves} out of {totalLeaves} were in the time bounds.",
pageItem.Url,
leafItems.Count,
page.Items.Count);

DateTimeOffset? newCursor = null;
var success = true;
for (var i = 0; i < leafItems.Count; i++)
{
var leafItem = leafItems[i];

if (newCursor.HasValue && newCursor.Value != leafItem.CommitTimestamp)
{
await _cursor.SetAsync(newCursor.Value, cancellationToken);
}

newCursor = leafItem.CommitTimestamp;

success = await ProcessLeafAsync(leafItem, cancellationToken);
if (!success)
{
_logger.LogWarning(
"{unprocessedLeaves} out of {leaves} leaves were left incomplete due to a processing failure.",
leafItems.Count - i,
leafItems.Count);
break;
}
}

if (newCursor.HasValue && success)
{
await _cursor.SetAsync(newCursor.Value);
}

return success;
}

private async Task<bool> ProcessLeafAsync(CatalogLeafItem leafItem, CancellationToken cancellationToken)
{
bool success;
try
{
switch (leafItem.Type)
{
case CatalogLeafType.PackageDelete:
var packageDelete = await _client.GetPackageDeleteLeafAsync(leafItem.Url);
success = await _leafProcessor.ProcessPackageDeleteAsync(packageDelete, cancellationToken);
break;
case CatalogLeafType.PackageDetails:
var packageDetails = await _client.GetPackageDetailsLeafAsync(leafItem.Url);
success = await _leafProcessor.ProcessPackageDetailsAsync(packageDetails, cancellationToken);
break;
default:
throw new NotSupportedException($"The catalog leaf type '{leafItem.Type}' is not supported.");
}
}
catch (Exception exception)
{
_logger.LogError(
0,
exception,
"An exception was thrown while processing leaf {leafUrl}.",
leafItem.Url);
success = false;
}

if (!success)
{
_logger.LogWarning(
"Failed to process leaf {leafUrl} ({packageId} {packageVersion}, {leafType}).",
leafItem.Url,
leafItem.PackageId,
leafItem.PackageVersion,
leafItem.Type);
}

return success;
}

private async Task<DateTimeOffset> GetMinCommitTimestamp(CancellationToken cancellationToken)
{
var minCommitTimestamp = await _cursor.GetAsync(cancellationToken);

minCommitTimestamp = minCommitTimestamp
?? _options.DefaultMinCommitTimestamp
?? _options.MinCommitTimestamp;

if (minCommitTimestamp.Value < _options.MinCommitTimestamp)
{
minCommitTimestamp = _options.MinCommitTimestamp;
}

return minCommitTimestamp.Value;
}
}
}
32 changes: 32 additions & 0 deletions src/BaGet.Protocol/Catalog/CatalogProcessorOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;

namespace BaGet.Protocol.Catalog
{
/// <summary>
/// The options to configure <see cref="CatalogProcessor"/>.
/// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/CatalogProcessorSettings.cs
/// </summary>
public class CatalogProcessorOptions
{
/// <summary>
/// The minimum commit timestamp to use when no cursor value has been saved.
/// </summary>
public DateTimeOffset? DefaultMinCommitTimestamp { get; set; }

/// <summary>
/// The absolute minimum (exclusive) commit timestamp to process in the catalog.
/// </summary>
public DateTimeOffset MinCommitTimestamp { get; set; }

/// <summary>
/// The absolute maximum (inclusive) commit timestamp to process in the catalog.
/// </summary>
public DateTimeOffset MaxCommitTimestamp { get; set; }

/// <summary>
/// If multiple catalog leaves are found in a page concerning the same package ID and version, only the latest
/// is processed.
/// </summary>
public bool ExcludeRedundantLeaves { get; set; }
}
}
58 changes: 58 additions & 0 deletions src/BaGet.Protocol/Catalog/FileCursor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace BaGet.Protocol.Catalog
{
/// <summary>
/// A cursor implementation which stores the cursor in local file. The cursor value is written to the file as
/// a JSON object.
/// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/FileCursor.cs
/// </summary>
public class FileCursor : ICursor
{
private static readonly JsonSerializerSettings Settings = HttpClientExtensions.JsonSettings;

private readonly string _path;
private readonly ILogger<FileCursor> _logger;

public FileCursor(string path, ILogger<FileCursor> logger)
{
_path = path ?? throw new ArgumentNullException(nameof(path));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public Task<DateTimeOffset?> GetAsync(CancellationToken cancellationToken)
{
try
{
var jsonString = File.ReadAllText(_path);
var data = JsonConvert.DeserializeObject<Data>(jsonString, Settings);
_logger.LogDebug("Read cursor value {cursor:O} from {path}.", data.Value, _path);
return Task.FromResult<DateTimeOffset?>(data.Value);
}
catch (Exception e) when (e is FileNotFoundException || e is JsonException)
{
return Task.FromResult<DateTimeOffset?>(null);
}
}

public Task SetAsync(DateTimeOffset value, CancellationToken cancellationToken)
{
var data = new Data { Value = value };
var jsonString = JsonConvert.SerializeObject(data);
File.WriteAllText(_path, jsonString);
_logger.LogDebug("Wrote cursor value {cursor:O} to {path}.", data.Value, _path);
return Task.CompletedTask;
}

private class Data
{
[JsonProperty("value")]
public DateTimeOffset Value { get; set; }
}
}
}
37 changes: 37 additions & 0 deletions src/BaGet.Protocol/Catalog/ICatalogLeafProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System.Threading;
using System.Threading.Tasks;

namespace BaGet.Protocol.Catalog
{
/// <summary>
/// An interface which allows custom processing of catalog leaves. This interface should be implemented when the
/// catalog leaf documents need to be downloaded and processed in chronological order.
/// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/master/src/NuGet.Protocol.Catalog/ICatalogLeafProcessor.cs
/// </summary>
public interface ICatalogLeafProcessor
{
/// <summary>
/// Process a catalog leaf containing package details. This method should return false or throw an exception
/// if the catalog leaf cannot be processed. In this case, the <see cref="CatalogProcessor" /> will stop
/// processing items. Note that the same package ID/version combination can be passed to this multiple times,
/// for example due to an edit in the package metadata or due to a transient error and retry on the part of the
/// <see cref="CatalogProcessor" />.
/// </summary>
/// <param name="leaf">The leaf document.</param>
/// <param name="cancellationToken">A token to cancel the task.</param>
/// <returns>True, if the leaf was successfully processed. False, otherwise.</returns>
Task<bool> ProcessPackageDetailsAsync(PackageDetailsCatalogLeaf leaf, CancellationToken cancellationToken = default);

/// <summary>
/// Process a catalog leaf containing a package delete. This method should return false or throw an exception
/// if the catalog leaf cannot be processed. In this case, the <see cref="CatalogProcessor" /> will stop
/// processing items. Note that the same package ID/version combination can be passed to this multiple times,
/// for example due to a package being deleted again due to a transient error and retry on the part of the
/// <see cref="CatalogProcessor" />.
/// </summary>
/// <param name="leaf">The leaf document.</param>
/// <param name="cancellationToken">A token to cancel the task.</param>
/// <returns>True, if the leaf was successfully processed. False, otherwise.</returns>
Task<bool> ProcessPackageDeleteAsync(PackageDeleteCatalogLeaf leaf, CancellationToken cancellationToken = default);
}
}
30 changes: 30 additions & 0 deletions src/BaGet.Protocol/Catalog/ICursor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace BaGet.Protocol.Catalog
{
/// <summary>
/// The NuGet Catalog resource is an append-only data structure indexed by time.
/// The <see cref="ICursor"/> tracks up to what point in the catalog has been successfully
/// processed. The value is a catalog commit timestamp.
/// See: https://docs.microsoft.com/en-us/nuget/api/catalog-resource#cursor
/// Based off: https://github.com/NuGet/NuGet.Services.Metadata/blob/3a468fe534a03dcced897eb5992209fdd3c4b6c9/src/NuGet.Protocol.Catalog/ICursor.cs
/// </summary>
public interface ICursor
{
/// <summary>
/// Get the value of the cursor.
/// </summary>
/// <param name="cancellationToken">A token to cancel the task.</param>
/// <returns>The cursor value. Null if the cursor has no value yet.</returns>
Task<DateTimeOffset?> GetAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Set the value of the cursor.
/// </summary>
/// <param name="value">The new cursor value.</param>
/// <param name="cancellationToken">A token to cancel the task.</param>
Task SetAsync(DateTimeOffset value, CancellationToken cancellationToken = default);
}
}
24 changes: 24 additions & 0 deletions src/BaGet.Protocol/Catalog/NullCursor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace BaGet.Protocol.Catalog
{
/// <summary>
/// A cursor that does not persist any state. Use this with a <see cref="CatalogProcessor"/>
/// to process all leafs each time <see cref="CatalogProcessor.ProcessAsync(CancellationToken)"/>
/// is called.
/// </summary>
public class NullCursor : ICursor
{
public Task<DateTimeOffset?> GetAsync(CancellationToken cancellationToken = default)
{
return null;
}

public Task SetAsync(DateTimeOffset value, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
}
Loading

0 comments on commit 2a9ce61

Please sign in to comment.