diff --git a/Biwen.QuickApi.DemoWeb/Schedules/DemoTask.cs b/Biwen.QuickApi.DemoWeb/Schedules/DemoTask.cs index 18871ef5..12817bfc 100644 --- a/Biwen.QuickApi.DemoWeb/Schedules/DemoTask.cs +++ b/Biwen.QuickApi.DemoWeb/Schedules/DemoTask.cs @@ -1,18 +1,31 @@ -using Biwen.QuickApi.Scheduling; +using Biwen.QuickApi.Infrastructure.Locking; +using Biwen.QuickApi.Scheduling; using Biwen.QuickApi.Scheduling.Stores; - namespace Biwen.QuickApi.DemoWeb.Schedules { /// /// Demo ScheduleTask,用于Store演示 /// /// - public class DemoTask(ILogger logger) : IScheduleTask + public class DemoTask(ILogger logger, ILocalLock @lock) : IScheduleTask { - public Task ExecuteAsync() + public async Task ExecuteAsync() { - logger.LogInformation("Demo Schedule Done!"); - return Task.CompletedTask; + var timeout = TimeSpan.FromMilliseconds(30_000); + + //使用ILocalLock进行锁定,防止重复执行 + var (locker, _) = await @lock.TryAcquireLockAsync("ONLY_ONE_ScheduleTask_OF_DemoTask", timeout, timeout); + + if (locker is null) + { + return;//当前锁定,则不执行! + } + + using (locker) + { + await Task.Delay(31_000); + logger.LogInformation("Demo memory store Schedule Done!"); + } } } @@ -24,7 +37,7 @@ public class DemoConfigTask(ILogger logger) : IScheduleTask { public Task ExecuteAsync() { - logger.LogInformation("Demo Config Schedule Done!"); + logger.LogInformation("Demo Config store Schedule Done!"); return Task.CompletedTask; } } @@ -39,6 +52,7 @@ public Task> GetAllAsync() [ new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2)) { + IsStartOnInit =true, Description="测试的Schedule" }, ]; diff --git a/Biwen.QuickApi.DemoWeb/Schedules/SubscribeEvents.cs b/Biwen.QuickApi.DemoWeb/Schedules/SubscribeEvents.cs index 917c0dc8..2737dd4b 100644 --- a/Biwen.QuickApi.DemoWeb/Schedules/SubscribeEvents.cs +++ b/Biwen.QuickApi.DemoWeb/Schedules/SubscribeEvents.cs @@ -1,11 +1,8 @@ using Biwen.QuickApi.Events; +using Biwen.QuickApi.Scheduling.Events; namespace Biwen.QuickApi.DemoWeb.Schedules { - using Biwen.QuickApi.Scheduling.Events; - using System.Threading; - using System.Threading.Tasks; - [EventSubscriber(IsAsync = true)] public class DemoTaskSuccessedEvent(ILogger logger) : IEventSubscriber { diff --git a/Biwen.QuickApi/Infrastructure/Locking/ILock.cs b/Biwen.QuickApi/Infrastructure/Locking/ILock.cs new file mode 100644 index 00000000..29bcfb15 --- /dev/null +++ b/Biwen.QuickApi/Infrastructure/Locking/ILock.cs @@ -0,0 +1,29 @@ +namespace Biwen.QuickApi.Infrastructure.Locking +{ + /// + /// ILocalLock + /// + public interface ILocalLock : ILock + { + } + + public interface ILock + { + /// + /// Waits indefinitely until acquiring a named lock with a given expiration for the current tenant. + /// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'. + /// + Task AcquireLockAsync(string key, TimeSpan? expiration = null); + + /// + /// Tries to acquire a named lock in a given timeout with a given expiration for the current tenant. + /// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'. + /// + Task<(ILocker locker, bool locked)> TryAcquireLockAsync(string key, TimeSpan timeout, TimeSpan? expiration = null); + + /// + /// Whether a named lock is already acquired. + /// + Task IsLockAcquiredAsync(string key); + } +} diff --git a/Biwen.QuickApi/Infrastructure/Locking/ILocker.cs b/Biwen.QuickApi/Infrastructure/Locking/ILocker.cs new file mode 100644 index 00000000..d28f3abe --- /dev/null +++ b/Biwen.QuickApi/Infrastructure/Locking/ILocker.cs @@ -0,0 +1,6 @@ +namespace Biwen.QuickApi.Infrastructure.Locking +{ + public interface ILocker : IDisposable, IAsyncDisposable + { + } +} diff --git a/Biwen.QuickApi/Infrastructure/Locking/LocalLock.cs b/Biwen.QuickApi/Infrastructure/Locking/LocalLock.cs new file mode 100644 index 00000000..1b4fb166 --- /dev/null +++ b/Biwen.QuickApi/Infrastructure/Locking/LocalLock.cs @@ -0,0 +1,182 @@ +//tks to:https://github.com/OrchardCMS/OrchardCore/tree/main/src/OrchardCore/OrchardCore/Locking + +namespace Biwen.QuickApi.Infrastructure.Locking +{ + //var timeout = TimeSpan.FromMilliseconds(20_000); + //(var locker, var locked) = await _distributedLock.TryAcquireLockAsync("SITEMAPS_UPDATE_LOCK", timeout, timeout); + //if (!locked) + //{ + // throw new TimeoutException($"Couldn't acquire a lock to update the sitemap within {timeout.Seconds} seconds."); + //} + + //using (locker) + //{ + // // Do the work + //} + + /// + /// LocalLock + /// + public sealed class LocalLock : ILocalLock, IDisposable + { + private readonly ILogger _logger; + + private readonly Dictionary _semaphores = []; + + public LocalLock(ILogger logger) + { + _logger = logger; + } + + /// + /// Waits indefinitely until acquiring a named lock with a given expiration for the current tenant. + /// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'. + /// + public async Task AcquireLockAsync(string key, TimeSpan? expiration = null) + { + var semaphore = GetOrCreateSemaphore(key); + await semaphore.Value.WaitAsync(); + + return new Locker(this, semaphore, expiration); + } + + /// + /// Tries to acquire a named lock in a given timeout with a given expiration for the current tenant. + /// After 'expiration' the lock is auto released, a null value is equivalent to 'TimeSpan.MaxValue'. + /// + public async Task<(ILocker locker, bool locked)> TryAcquireLockAsync(string key, TimeSpan timeout, TimeSpan? expiration = null) + { + var semaphore = GetOrCreateSemaphore(key); + + if (await semaphore.Value.WaitAsync(timeout != TimeSpan.MaxValue ? timeout : Timeout.InfiniteTimeSpan)) + { + return (new Locker(this, semaphore, expiration), true); + } + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("Timeout elapsed before acquiring the named lock '{LockName}' after the given timeout of '{Timeout}'.", + key, timeout.ToString()); + } + + return (null!, false); + } + + public Task IsLockAcquiredAsync(string key) + { + lock (_semaphores) + { + if (_semaphores.TryGetValue(key, out var semaphore)) + { + return Task.FromResult(semaphore.Value.CurrentCount == 0); + } + + return Task.FromResult(false); + } + } + + private Semaphore GetOrCreateSemaphore(string key) + { + lock (_semaphores) + { + if (_semaphores.TryGetValue(key, out var semaphore)) + { + semaphore.RefCount++; + } + else + { + semaphore = new Semaphore(key, new SemaphoreSlim(1)); + _semaphores[key] = semaphore; + } + + return semaphore; + } + } + + private sealed class Semaphore + { + public Semaphore(string key, SemaphoreSlim value) + { + Key = key; + Value = value; + RefCount = 1; + } + + internal string Key { get; } + internal SemaphoreSlim Value { get; } + internal int RefCount { get; set; } + } + + private sealed class Locker : ILocker + { + private readonly LocalLock _localLock; + private readonly Semaphore _semaphore; + private readonly CancellationTokenSource _cts = null!; + private volatile int _released; + private bool _disposed; + + public Locker(LocalLock localLock, Semaphore semaphore, TimeSpan? expiration) + { + _localLock = localLock; + _semaphore = semaphore; + + if (expiration.HasValue && expiration.Value != TimeSpan.MaxValue) + { + _cts = new CancellationTokenSource(expiration.Value); + _cts.Token.Register(Release); + } + } + + private void Release() + { + if (Interlocked.Exchange(ref _released, 1) == 0) + { + lock (_localLock._semaphores) + { + if (_localLock._semaphores.TryGetValue(_semaphore.Key, out var semaphore)) + { + semaphore.RefCount--; + + if (semaphore.RefCount == 0) + { + _localLock._semaphores.Remove(_semaphore.Key); + } + } + } + + _semaphore.Value.Release(); + } + } + + public ValueTask DisposeAsync() + { + Dispose(); + return default; + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + + _cts?.Dispose(); + + Release(); + } + } + + public void Dispose() + { + var semaphores = _semaphores.Values.ToArray(); + + foreach (var semaphore in semaphores) + { + semaphore.Value.Dispose(); + } + } + } +} diff --git a/Biwen.QuickApi/ServiceRegistration.cs b/Biwen.QuickApi/ServiceRegistration.cs index 1ace80b4..2aef1a64 100644 --- a/Biwen.QuickApi/ServiceRegistration.cs +++ b/Biwen.QuickApi/ServiceRegistration.cs @@ -11,6 +11,7 @@ namespace Biwen.QuickApi using Biwen.QuickApi.Abstractions; using Biwen.QuickApi.Events; using Biwen.QuickApi.Http; + using Biwen.QuickApi.Infrastructure.Locking; using Biwen.QuickApi.Scheduling; #if NET8_0_OR_GREATER using Microsoft.AspNetCore.Antiforgery; @@ -34,10 +35,12 @@ public static IServiceCollection AddBiwenQuickApis( this IServiceCollection services, Action? options = null) { - //JSON Options services.ConfigureHttpJsonOptions(x => { }); + //注册LocalLock + services.AddLocking(); + //注册验证器 services.AddFluentValidationAutoValidation(); services.AddHttpContextAccessor(); @@ -85,6 +88,18 @@ public static IServiceCollection AddBiwenQuickApis( return services; } + /// + /// 提供LocalLock支持 + /// + /// + /// + internal static IServiceCollection AddLocking(this IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + return services; + } + /// /// 添加对Group的的扩展支持 ///