Skip to content

Commit

Permalink
Added rate-limit handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rvanoord committed Jul 12, 2024
1 parent e67ea76 commit e280c3a
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 15 deletions.
37 changes: 35 additions & 2 deletions BccCode.Tripletex.Client.Tests/TripletexClientTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Formats.Asn1;
using System.Security.AccessControl;
using Task = System.Threading.Tasks.Task;
Expand All @@ -7,7 +8,7 @@ namespace BccCode.Tripletex.Client.Tests
public class TripletexClientTests
{
TripletexClientOptions _options = default!;
public TripletexClientTests()
public TripletexClientTests()
{
_options = ConfigHelper.GetOptions();
}
Expand All @@ -19,6 +20,38 @@ public async System.Threading.Tasks.Task GetBudgetsTest()
var project = await client.GetProjectAsync(113087043, "*,projectActivities");
}

[Fact]
public async System.Threading.Tasks.Task RateLimitingConcurrencyTest()
{
var client = new TripletexClient(_options);
var start = new DateTime(DateTime.Today.Year, DateTime.Today.Month, 1);
var end = start.AddMonths(1);
var date = start;
var tasks = new List<Task<ListResponsePosting>>();
while (date < end)
{
for (int i = 0; i < 25; i++)
{
// Simulate some concurrency
if (i % 20 != 0)
{
Debug.WriteLine("Starting async request: " + date.ToString("yyyy-MM-dd") + " " + i);
tasks.Add(client.GetLedgerPostingsAsync(date.ToString("yyyy-MM-dd"), date.ToString("yyyy-MM-dd")));
}
else
{
Debug.WriteLine("Starting sync request: " + date.ToString("yyyy-MM-dd") + " " + i);
await client.GetLedgerPostingsAsync(date.ToString("yyyy-MM-dd"), date.ToString("yyyy-MM-dd"));
Debug.WriteLine("Completed sync request: " + date.ToString("yyyy-MM-dd") + " " + i);
}
}
date = date.AddDays(1);
}
Debug.WriteLine("Awaiting async requests");
await System.Threading.Tasks.Task.WhenAll(tasks);
Debug.WriteLine("Completed async requests");
}


// [Fact]
public async void Test1()
Expand Down Expand Up @@ -71,7 +104,7 @@ public async void Test1()

});

// client.AddOrderOrderlineAsync
// client.AddOrderOrderlineAsync
}
}
}
16 changes: 8 additions & 8 deletions BccCode.Tripletex.Client/TripletexClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public TripletexClient(TripletexClientOptions options) : this()

internal class ClientFactory : IHttpClientFactory
{
public HttpClient CreateClient(string name) => new HttpClient();
public HttpClient CreateClient(string name) => new HttpClient(new TripletexRateLimitingHandler(new HttpClientHandler()));
}


public async Task<HttpClient> CreateHttpClientAsync(CancellationToken cancellation)
{
_sessionToken = await EnsureAccessTokenAsync(cancellation);
return _clientFactory.CreateClient();
return _clientFactory.CreateClient("TripletexClient");
}

partial void UpdateJsonSerializerSettings(JsonSerializerSettings settings)
Expand All @@ -59,7 +59,7 @@ partial void ProcessResponse(HttpClient client, HttpResponseMessage response)
}


private static ConcurrentDictionary<string, (string? token, DateTime expires)> _tokens = new ConcurrentDictionary<string, (string? token, DateTime expires)>();
private static ConcurrentDictionary<string, (string? token, DateTime expires)> _tokens = new ConcurrentDictionary<string, (string? token, DateTime expires)>();
private static SemaphoreSlim _tokenLock = new SemaphoreSlim(1);
protected virtual async Task<string> EnsureAccessTokenAsync(System.Threading.CancellationToken cancellationToken)
{
Expand All @@ -74,9 +74,9 @@ protected virtual async Task<string> EnsureAccessTokenAsync(System.Threading.Can
{
token = await GetSessionTokenAsync(cancellationToken).ConfigureAwait(false);
_tokens[key] = token;
}
}

}
}
finally
{
_tokenLock.Release();
Expand All @@ -88,8 +88,8 @@ protected virtual async Task<string> EnsureAccessTokenAsync(System.Threading.Can
protected async Task<(string? value, DateTime expires)> GetSessionTokenAsync(CancellationToken cancellation)
{
var expires = DateTimeOffset.Now.AddDays(1).Date;
var token = default(ResponseWrapperSessionToken);
var result = await _clientFactory.CreateClient().PutAsync($"{_options.ApiBasePath}/token/session/:create?consumerToken={_options.ConsumerToken}&employeeToken={_options.EmployeeToken}&expirationDate={expires.ToString("yyyy-MM-dd")}", new StringContent(""), cancellation);
var token = default(ResponseWrapperSessionToken);
var result = await _clientFactory.CreateClient("TripletexClient").PutAsync($"{_options.ApiBasePath}/token/session/:create?consumerToken={_options.ConsumerToken}&employeeToken={_options.EmployeeToken}&expirationDate={expires.ToString("yyyy-MM-dd")}", new StringContent(""), cancellation);
if (result.IsSuccessStatusCode)
{
var strResponse = await result.Content.ReadAsStringAsync();
Expand All @@ -102,6 +102,6 @@ protected virtual async Task<string> EnsureAccessTokenAsync(System.Threading.Can
return (token?.Value?.Token?.ToString(), expires);
}


}
}
10 changes: 5 additions & 5 deletions BccCode.Tripletex.Client/TripletexClientInstaller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public static IServiceCollection AddTripletexClient(this IServiceCollection serv

public static IServiceCollection AddTripletexClient(this IServiceCollection services, TripletexClientOptions? options)
{
if (!services.Any(x => x.ServiceType == typeof(IHttpClientFactory)))
{
services.AddHttpClient();
}
services.AddHttpClient("TripletexClient")
.AddHttpMessageHandler<TripletexRateLimitingHandler>();

services.AddTransient<TripletexRateLimitingHandler>();

return services.AddSingleton<ITripletexClient>(x =>
{
Expand All @@ -46,7 +46,7 @@ public static IServiceCollection AddTripletexClient(this IServiceCollection serv
options = x.GetRequiredService<TripletexClientOptions>();
}
var clientFactory = x.GetRequiredService<IHttpClientFactory>();
var client = new TripletexClient(options, clientFactory);
var client = new TripletexClient(options, clientFactory);
return client;
});
}
Expand Down
118 changes: 118 additions & 0 deletions BccCode.Tripletex.Client/TripletexRateLimitingHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace BccCode.Tripletex.Client
{
public class TripletexRateLimitingHandler : DelegatingHandler
{
private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private static int _remainingRequests = 100;
private static int _requestLimit = 100;
private static DateTimeOffset _resetTime = DateTimeOffset.MinValue;

private const int MAX_RETRIES = 10;
private const int RETRY_DELAY_MILLISECONDS = 500;

private const int QUOTA_DELAY_MILLISECONDS = 50;

public TripletexRateLimitingHandler(HttpMessageHandler innerHandler = null!)
: base(innerHandler ?? new HttpClientHandler())
{
}

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
await WaitForQuotaAsync(cancellationToken);

HttpResponseMessage response = default!;
for (int retry = 0; retry < MAX_RETRIES; retry++)
{
response = await base.SendAsync(request, cancellationToken);
await UpdateRateLimitStateAsync(response);

if (response.StatusCode != System.Net.HttpStatusCode.TooManyRequests)
{
return response;
}

var retryAfter = RETRY_DELAY_MILLISECONDS;
if (response.Headers.TryGetValues("Retry-After", out var values) && int.TryParse(values.FirstOrDefault(), out var retryAfterValue))
{
retryAfter = retryAfterValue * 1000;
}

Debug.WriteLine("Too many requests. Retrying after {0} milliseconds", retryAfter);
await System.Threading.Tasks.Task.Delay(retryAfter * retry, cancellationToken);
}

return response; // Return the last response after exhausting retries
}

private static async System.Threading.Tasks.Task WaitForQuotaAsync(CancellationToken cancellationToken)
{
while (true)
{
await _semaphore.WaitAsync(cancellationToken);
try
{
if (DateTimeOffset.Now >= _resetTime)
{
_remainingRequests = _requestLimit; // Reset to the new limit from headers
}

if (_remainingRequests > 0)
{
_remainingRequests--;
return;
}
}
finally
{
_semaphore.Release();
}

var now = DateTimeOffset.Now;
if (_resetTime > now)
{
var timeUntilReset = _resetTime - now;

Debug.WriteLine("Quota exceeded, waiting until {0}", _resetTime);
await System.Threading.Tasks.Task.Delay(timeUntilReset, cancellationToken);
}
}
}

private static async System.Threading.Tasks.Task UpdateRateLimitStateAsync(HttpResponseMessage response)
{
await _semaphore.WaitAsync();
try
{
if (response.Headers.TryGetValues("X-Rate-Limit-Limit", out var limitValues) &&
int.TryParse(limitValues.FirstOrDefault(), out var limit))
{
_requestLimit = limit;
}

if (response.Headers.TryGetValues("X-Rate-Limit-Remaining", out var remainingValues) &&
int.TryParse(remainingValues.FirstOrDefault(), out var remaining))
{
_remainingRequests = remaining;
}

if (response.Headers.TryGetValues("X-Rate-Limit-Reset", out var resetValues) &&
int.TryParse(resetValues.FirstOrDefault(), out var resetSeconds))
{
_resetTime = DateTimeOffset.Now.AddSeconds(resetSeconds);
}
}
finally
{
_semaphore.Release();
}
}
}
}

0 comments on commit e280c3a

Please sign in to comment.