Skip to content

Commit

Permalink
添加本地锁功能并优化相关代码
Browse files Browse the repository at this point in the history
在`DemoTask.cs`中,对`DemoTask`类的构造函数进行了修改,添加了一个新的参数`ILocalLock @lock`。同时,重写了`ExecuteAsync`方法,添加了使用`ILocalLock`进行锁定的逻辑,以防止任务重复执行。
  • Loading branch information
vipwan committed May 15, 2024
1 parent c899622 commit e6714f6
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 12 deletions.
28 changes: 21 additions & 7 deletions Biwen.QuickApi.DemoWeb/Schedules/DemoTask.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
using Biwen.QuickApi.Scheduling;
using Biwen.QuickApi.Infrastructure.Locking;
using Biwen.QuickApi.Scheduling;
using Biwen.QuickApi.Scheduling.Stores;

namespace Biwen.QuickApi.DemoWeb.Schedules
{
/// <summary>
/// Demo ScheduleTask,用于Store演示
/// </summary>
/// <param name="logger"></param>
public class DemoTask(ILogger<DemoTask> logger) : IScheduleTask
public class DemoTask(ILogger<DemoTask> logger, ILocalLock @lock) : IScheduleTask
{
public Task ExecuteAsync()
public async Task ExecuteAsync()
{
logger.LogInformation("Demo Schedule Done!");
return Task.CompletedTask;
var timeout = TimeSpan.FromMilliseconds(30_000);

//使用ILocalLock进行锁定,防止重复执行
var (locker, _) = await @lock.TryAcquireLockAsync("ONLY_ONE_ScheduleTask_OF_DemoTask", timeout, timeout);

if (locker is null)
{
return;//当前锁定,则不执行!
}

using (locker)
{
await Task.Delay(31_000);
logger.LogInformation("Demo memory store Schedule Done!");
}
}
}

Expand All @@ -24,7 +37,7 @@ public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
{
public Task ExecuteAsync()
{
logger.LogInformation("Demo Config Schedule Done!");
logger.LogInformation("Demo Config store Schedule Done!");
return Task.CompletedTask;
}
}
Expand All @@ -39,6 +52,7 @@ public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
[
new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
{
IsStartOnInit =true,
Description="测试的Schedule"
},
];
Expand Down
5 changes: 1 addition & 4 deletions Biwen.QuickApi.DemoWeb/Schedules/SubscribeEvents.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using Biwen.QuickApi.Events;
using Biwen.QuickApi.Scheduling.Events;

namespace Biwen.QuickApi.DemoWeb.Schedules
{
using Biwen.QuickApi.Scheduling.Events;
using System.Threading;
using System.Threading.Tasks;

[EventSubscriber(IsAsync = true)]
public class DemoTaskSuccessedEvent(ILogger<DemoTaskSuccessedEvent> logger) : IEventSubscriber<TaskSuccessedEvent>
{
Expand Down
29 changes: 29 additions & 0 deletions Biwen.QuickApi/Infrastructure/Locking/ILock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Biwen.QuickApi.Infrastructure.Locking
{
/// <summary>
/// ILocalLock
/// </summary>
public interface ILocalLock : ILock
{
}

public interface ILock
{
/// <summary>
/// Waits indefinitely until acquiring a named lock with a given expiration for the current tenant.
/// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'.
/// </summary>
Task<ILocker> AcquireLockAsync(string key, TimeSpan? expiration = null);

/// <summary>
/// Tries to acquire a named lock in a given timeout with a given expiration for the current tenant.
/// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'.
/// </summary>
Task<(ILocker locker, bool locked)> TryAcquireLockAsync(string key, TimeSpan timeout, TimeSpan? expiration = null);

/// <summary>
/// Whether a named lock is already acquired.
/// </summary>
Task<bool> IsLockAcquiredAsync(string key);
}
}
6 changes: 6 additions & 0 deletions Biwen.QuickApi/Infrastructure/Locking/ILocker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Biwen.QuickApi.Infrastructure.Locking
{
public interface ILocker : IDisposable, IAsyncDisposable
{
}
}
182 changes: 182 additions & 0 deletions Biwen.QuickApi/Infrastructure/Locking/LocalLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//tks to:https://github.com/OrchardCMS/OrchardCore/tree/main/src/OrchardCore/OrchardCore/Locking

namespace Biwen.QuickApi.Infrastructure.Locking
{
//var timeout = TimeSpan.FromMilliseconds(20_000);
//(var locker, var locked) = await _distributedLock.TryAcquireLockAsync("SITEMAPS_UPDATE_LOCK", timeout, timeout);
//if (!locked)
//{
// throw new TimeoutException($"Couldn't acquire a lock to update the sitemap within {timeout.Seconds} seconds.");
//}

//using (locker)
//{
// // Do the work
//}

/// <summary>
/// LocalLock
/// </summary>
public sealed class LocalLock : ILocalLock, IDisposable
{
private readonly ILogger _logger;

private readonly Dictionary<string, Semaphore> _semaphores = [];

public LocalLock(ILogger<LocalLock> logger)
{
_logger = logger;
}

/// <summary>
/// Waits indefinitely until acquiring a named lock with a given expiration for the current tenant.
/// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'.
/// </summary>
public async Task<ILocker> AcquireLockAsync(string key, TimeSpan? expiration = null)
{
var semaphore = GetOrCreateSemaphore(key);
await semaphore.Value.WaitAsync();

return new Locker(this, semaphore, expiration);
}

/// <summary>
/// Tries to acquire a named lock in a given timeout with a given expiration for the current tenant.
/// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'.
/// </summary>
public async Task<(ILocker locker, bool locked)> TryAcquireLockAsync(string key, TimeSpan timeout, TimeSpan? expiration = null)
{
var semaphore = GetOrCreateSemaphore(key);

if (await semaphore.Value.WaitAsync(timeout != TimeSpan.MaxValue ? timeout : Timeout.InfiniteTimeSpan))
{
return (new Locker(this, semaphore, expiration), true);
}

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Timeout elapsed before acquiring the named lock '{LockName}' after the given timeout of '{Timeout}'.",
key, timeout.ToString());
}

return (null!, false);
}

public Task<bool> IsLockAcquiredAsync(string key)
{
lock (_semaphores)
{
if (_semaphores.TryGetValue(key, out var semaphore))
{
return Task.FromResult(semaphore.Value.CurrentCount == 0);
}

return Task.FromResult(false);
}
}

private Semaphore GetOrCreateSemaphore(string key)
{
lock (_semaphores)
{
if (_semaphores.TryGetValue(key, out var semaphore))
{
semaphore.RefCount++;
}
else
{
semaphore = new Semaphore(key, new SemaphoreSlim(1));
_semaphores[key] = semaphore;
}

return semaphore;
}
}

private sealed class Semaphore
{
public Semaphore(string key, SemaphoreSlim value)
{
Key = key;
Value = value;
RefCount = 1;
}

internal string Key { get; }
internal SemaphoreSlim Value { get; }
internal int RefCount { get; set; }
}

private sealed class Locker : ILocker
{
private readonly LocalLock _localLock;
private readonly Semaphore _semaphore;
private readonly CancellationTokenSource _cts = null!;
private volatile int _released;
private bool _disposed;

public Locker(LocalLock localLock, Semaphore semaphore, TimeSpan? expiration)
{
_localLock = localLock;
_semaphore = semaphore;

if (expiration.HasValue && expiration.Value != TimeSpan.MaxValue)
{
_cts = new CancellationTokenSource(expiration.Value);
_cts.Token.Register(Release);
}
}

private void Release()
{
if (Interlocked.Exchange(ref _released, 1) == 0)
{
lock (_localLock._semaphores)
{
if (_localLock._semaphores.TryGetValue(_semaphore.Key, out var semaphore))
{
semaphore.RefCount--;

if (semaphore.RefCount == 0)
{
_localLock._semaphores.Remove(_semaphore.Key);
}
}
}

_semaphore.Value.Release();
}
}

public ValueTask DisposeAsync()
{
Dispose();
return default;
}

public void Dispose()
{
if (_disposed)
{
return;
}

_disposed = true;

_cts?.Dispose();

Release();
}
}

public void Dispose()
{
var semaphores = _semaphores.Values.ToArray();

foreach (var semaphore in semaphores)
{
semaphore.Value.Dispose();
}
}
}
}
17 changes: 16 additions & 1 deletion Biwen.QuickApi/ServiceRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Biwen.QuickApi
using Biwen.QuickApi.Abstractions;
using Biwen.QuickApi.Events;
using Biwen.QuickApi.Http;
using Biwen.QuickApi.Infrastructure.Locking;
using Biwen.QuickApi.Scheduling;
#if NET8_0_OR_GREATER
using Microsoft.AspNetCore.Antiforgery;
Expand All @@ -34,10 +35,12 @@ public static IServiceCollection AddBiwenQuickApis(
this IServiceCollection services,
Action<BiwenQuickApiOptions>? options = null)
{

//JSON Options
services.ConfigureHttpJsonOptions(x => { });

//注册LocalLock
services.AddLocking();

//注册验证器
services.AddFluentValidationAutoValidation();
services.AddHttpContextAccessor();
Expand Down Expand Up @@ -85,6 +88,18 @@ public static IServiceCollection AddBiwenQuickApis(
return services;
}

/// <summary>
/// 提供LocalLock支持
/// </summary>
/// <param name="services"></param>
/// <returns></returns>
internal static IServiceCollection AddLocking(this IServiceCollection services)
{
services.AddSingleton<LocalLock>();
services.AddSingleton<ILocalLock>(sp => sp.GetRequiredService<LocalLock>());
return services;
}

/// <summary>
/// 添加对Group的的扩展支持
/// </summary>
Expand Down

0 comments on commit e6714f6

Please sign in to comment.