Skip to content

Commit

Permalink
Continue storage refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
loic-sharma committed Nov 4, 2018
1 parent 9d01d8b commit 3c40d3e
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 122 deletions.
31 changes: 10 additions & 21 deletions src/BaGet.Azure/BlobPackageStorageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using BaGet.Core.Extensions;
using BaGet.Core.Entities;
using BaGet.Core.Services;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using NuGet.Packaging;
using NuGet.Versioning;

namespace BaGet.Azure.Configuration
Expand All @@ -24,36 +23,26 @@ public BlobPackageStorageService(CloudBlobContainer container)
_container = container ?? throw new ArgumentNullException(nameof(container));
}

public async Task SavePackageStreamAsync(
PackageArchiveReader package,
public async Task SavePackageContentAsync(
Package package,
Stream packageStream,
Stream nuspecStream,
Stream readmeStream,
CancellationToken cancellationToken)
{
var identity = await package.GetIdentityAsync(cancellationToken);
var lowercasedId = identity.Id.ToLowerInvariant();
var lowercasedNormalizedVersion = identity.Version.ToNormalizedString().ToLowerInvariant();
var lowercasedId = package.Id.ToLowerInvariant();
var lowercasedNormalizedVersion = package.VersionString.ToLowerInvariant();

var packageBlob = GetPackageBlob(lowercasedId, lowercasedNormalizedVersion);
var nuspecBlob = GetNuspecBlob(lowercasedId, lowercasedNormalizedVersion);
var readmeBlob = GetReadmeBlob(lowercasedId, lowercasedNormalizedVersion);

// Save the package's nupkg
packageStream.Seek(0, SeekOrigin.Begin);
await UploadBlobAsync(packageBlob, packageStream, PackageContentType);
await UploadBlobAsync(nuspecBlob, nuspecStream, TextContentType);

// Save the package's nuspec
using (var nuspecStream = await package.GetNuspecAsync(cancellationToken))
if (readmeStream != null)
{
await UploadBlobAsync(nuspecBlob, nuspecStream, TextContentType);
}

// Save the package's reamde
if (package.HasReadme())
{
using (var readmeStream = package.GetReadme())
{
await UploadBlobAsync(readmeBlob, readmeStream, TextContentType);
}
await UploadBlobAsync(readmeBlob, readmeStream, TextContentType);
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/BaGet.Core/Extensions/PackageArchiveReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NuGet.Packaging;

namespace BaGet.Core.Extensions
Expand All @@ -21,8 +23,9 @@ public static class PackageArchiveReaderExtensions
public static bool HasReadme(this PackageArchiveReader package)
=> package.GetFiles().Any(ReadmeFileNames.Contains);

// TODO: This should be async and accept a CancellationToken.
public static Stream GetReadme(this PackageArchiveReader package)
public async static Task<Stream> GetReadmeAsync(
this PackageArchiveReader package,
CancellationToken cancellationToken)
{
var packageFiles = package.GetFiles();

Expand All @@ -32,11 +35,11 @@ public static Stream GetReadme(this PackageArchiveReader package)

if (readmePath != null)
{
return package.GetStream(readmePath);
return await package.GetStreamAsync(readmePath, cancellationToken);
}
}

return Stream.Null;
throw new InvalidOperationException("Package does not have a readme!");
}
}
}
73 changes: 43 additions & 30 deletions src/BaGet.Core/Services/FilePackageStorageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using BaGet.Core.Extensions;
using NuGet.Packaging;
using BaGet.Core.Entities;
using NuGet.Versioning;

namespace BaGet.Core.Services
Expand All @@ -20,43 +19,40 @@ public FilePackageStorageService(string storePath)
_storePath = storePath ?? throw new ArgumentNullException(nameof(storePath));
}

public async Task SavePackageStreamAsync(
PackageArchiveReader package,
public async Task SavePackageContentAsync(
Package package,
Stream packageStream,
Stream nuspecStream,
Stream readmeStream,
CancellationToken cancellationToken)
{
var identity = await package.GetIdentityAsync(cancellationToken);
var lowercasedId = identity.Id.ToLowerInvariant();
var lowercasedNormalizedVersion = identity.Version.ToNormalizedString().ToLowerInvariant();

var packagePath = PackagePath(lowercasedId, lowercasedNormalizedVersion);
var nuspecPath = NuspecPath(lowercasedId, lowercasedNormalizedVersion);
var readmePath = ReadmePath(lowercasedId, lowercasedNormalizedVersion);
var lowercasedId = package.Id.ToLowerInvariant();
var lowercasedNormalizedVersion = package.VersionString.ToLowerInvariant();

EnsurePathExists(lowercasedId, lowercasedNormalizedVersion);

// TODO: Uploads should be idempotent. This should fail if and only if the blob
// already exists but has different content.
using (var fileStream = File.Open(packagePath, FileMode.CreateNew))
{
packageStream.Seek(0, SeekOrigin.Begin);

await packageStream.CopyToAsync(fileStream, DefaultCopyBufferSize, cancellationToken);
}
await SaveFileStreamAsync(
lowercasedId,
lowercasedNormalizedVersion,
PackagePath,
packageStream,
cancellationToken);

using (var nuspec = await package.GetNuspecAsync(cancellationToken))
using (var fileStream = File.Open(nuspecPath, FileMode.CreateNew))
{
await nuspec.CopyToAsync(fileStream, DefaultCopyBufferSize, cancellationToken);
}
await SaveFileStreamAsync(
lowercasedId,
lowercasedNormalizedVersion,
NuspecPath,
nuspecStream,
cancellationToken);

if (package.HasReadme())
if (readmeStream != null)
{
using (var readme = package.GetReadme())
using (var fileStream = File.Open(readmePath, FileMode.CreateNew))
{
await readme.CopyToAsync(fileStream, DefaultCopyBufferSize, cancellationToken);
}
await SaveFileStreamAsync(
lowercasedId,
lowercasedNormalizedVersion,
ReadmePath,
readmeStream,
cancellationToken);
}
}

Expand Down Expand Up @@ -97,6 +93,23 @@ public Task DeleteAsync(string id, NuGetVersion version)
return Task.CompletedTask;
}

private async Task SaveFileStreamAsync(
string lowercasedId,
string lowercasedNormalizedVersion,
Func<string, string, string> pathFunc,
Stream content,
CancellationToken cancellationToken)
{
var path = pathFunc(lowercasedId, lowercasedNormalizedVersion);

// TODO: Uploads should be idempotent. This should fail if and only if the blob
// already exists but has different content.
using (var fileStream = File.Open(path, FileMode.CreateNew))
{
await content.CopyToAsync(fileStream, DefaultCopyBufferSize, cancellationToken);
}
}

private Stream GetFileStream(string id, NuGetVersion version, Func<string, string, string> pathFunc)
{
var versionString = version.ToNormalizedString().ToLowerInvariant();
Expand Down
10 changes: 7 additions & 3 deletions src/BaGet.Core/Services/IPackageStorageService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using NuGet.Packaging;
using BaGet.Core.Entities;
using NuGet.Versioning;

namespace BaGet.Core.Services
Expand All @@ -18,11 +18,15 @@ public interface IPackageStorageService
/// </summary>
/// <param name="package">The package's metadata.</param>
/// <param name="packageStream">The package's nupkg stream.</param>
/// <param name="nuspecStream">The package's nuspec stream.</param>
/// <param name="readmeStream">The package's readme stream, or null if none.</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task SavePackageStreamAsync(
PackageArchiveReader package,
Task SavePackageContentAsync(
Package package,
Stream packageStream,
Stream nuspecStream,
Stream readmeStream,
CancellationToken cancellationToken);

/// <summary>
Expand Down
136 changes: 72 additions & 64 deletions src/BaGet.Core/Services/IndexingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,102 +32,110 @@ public IndexingService(
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task<IndexingResult> IndexAsync(Stream stream, CancellationToken cancellationToken)
public async Task<IndexingResult> IndexAsync(Stream packageStream, CancellationToken cancellationToken)
{
// Try to save the package stream to storage.
// Try to extract all the necessary information from the package.
Package package;
Stream nuspecStream;
Stream readmeStream;

try
{
using (var packageReader = new PackageArchiveReader(stream))
using (var packageReader = new PackageArchiveReader(packageStream, leaveStreamOpen: true))
{
var packageId = packageReader.NuspecReader.GetId();
var packageVersion = packageReader.NuspecReader.GetVersion();
package = GetPackageMetadata(packageReader);
nuspecStream = await packageReader.GetNuspecAsync(cancellationToken);

if (await _packages.ExistsAsync(packageId, packageVersion))
if (package.HasReadme)
{
return IndexingResult.PackageAlreadyExists;
readmeStream = await packageReader.GetReadmeAsync(cancellationToken);
}

try
{
_logger.LogInformation(
"Persisting package {Id} {Version} content to storage...",
packageId,
packageVersion.ToNormalizedString());

await _storage.SavePackageStreamAsync(packageReader, stream, cancellationToken);
}
catch (Exception e)
{
// This may happen due to concurrent pushes.
// TODO: Make IStorageService.SaveAsync return a result enum so this can be properly handled.
_logger.LogError(e, "Failed to save package {PackageId} {PackageVersion} to storage", packageId, packageVersion);

throw;
}

try
{
package = GetPackageMetadata(packageReader);
}
catch (Exception e)
else
{
_logger.LogError(
e,
"Failed to extract metadata for package {PackageId} {PackageVersion}",
packageId,
packageVersion);

throw;
readmeStream = null;
}
}
}
catch (Exception e)
{
_logger.LogError(e, "Uploaded package is invalid or the package already existed in storage");
_logger.LogError(e, "Uploaded package is invalid");

return IndexingResult.InvalidPackage;
}

// The package stream has been stored. Persist the package's metadata to the database.
// The package is well-formed. Ensure this is a new package.
if (await _packages.ExistsAsync(package.Id, package.Version))
{
return IndexingResult.PackageAlreadyExists;
}

// TODO: Add more package validations
_logger.LogInformation(
"Persisting package {Id} {Version} metadata to database...",
"Validated package {PackageId} {PackageVersion}, persisting content to storage...",
package.Id,
package.VersionString);

var result = await _packages.AddAsync(package);
try
{
packageStream.Position = 0;

await _storage.SavePackageContentAsync(
package,
packageStream,
nuspecStream,
readmeStream,
cancellationToken);
}
catch (Exception e)
{
// This may happen due to concurrent pushes.
// TODO: Make IPackageStorageService.SavePackageContentAsync return a result enum so this
// can be properly handled.
_logger.LogError(
e,
"Failed to persist package {PackageId} {PackageVersion} content to storage",
package.Id,
package.VersionString);

throw;
}

switch (result)
_logger.LogInformation(
"Persisted package {Id} {Version} content to storage, saving metadata to database...",
package.Id,
package.VersionString);

var result = await _packages.AddAsync(package);
if (result == PackageAddResult.PackageAlreadyExists)
{
case PackageAddResult.Success:
_logger.LogInformation(
"Successfully persisted package {Id} {Version} metadata to database. Indexing in search...",
package.Id,
package.VersionString);
_logger.LogWarning(
"Package {Id} {Version} metadata already exists in database",
package.Id,
package.VersionString);

await _search.IndexAsync(package);
return IndexingResult.PackageAlreadyExists;
}

_logger.LogInformation(
"Successfully indexed package {Id} {Version} in search",
package.Id,
package.VersionString);
if (result != PackageAddResult.Success)
{
_logger.LogError($"Unknown {nameof(PackageAddResult)} value: {{PackageAddResult}}", result);

return IndexingResult.Success;
throw new InvalidOperationException($"Unknown {nameof(PackageAddResult)} value: {result}");
}

case PackageAddResult.PackageAlreadyExists:
_logger.LogWarning(
"Package {Id} {Version} metadata already exists in database",
package.Id,
package.VersionString);
_logger.LogInformation(
"Successfully persisted package {Id} {Version} metadata to database. Indexing in search...",
package.Id,
package.VersionString);

return IndexingResult.PackageAlreadyExists;
await _search.IndexAsync(package);

default:
_logger.LogError($"Unknown {nameof(PackageAddResult)} value: {{PackageAddResult}}", result);
_logger.LogInformation(
"Successfully indexed package {Id} {Version} in search",
package.Id,
package.VersionString);

throw new InvalidOperationException($"Unknown {nameof(PackageAddResult)} value: {result}");
}
return IndexingResult.Success;
}

private Package GetPackageMetadata(PackageArchiveReader packageReader)
Expand Down

0 comments on commit 3c40d3e

Please sign in to comment.