diff --git a/Directory.Packages.props b/Directory.Packages.props
index 0719a40ed8..60462d5b1b 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -9,6 +9,7 @@
+
diff --git a/benchmark/BDN.benchmark/BDN.benchmark.csproj b/benchmark/BDN.benchmark/BDN.benchmark.csproj
index 71c1961eee..99fc55eafe 100644
--- a/benchmark/BDN.benchmark/BDN.benchmark.csproj
+++ b/benchmark/BDN.benchmark/BDN.benchmark.csproj
@@ -29,4 +29,8 @@
+
+
+
+
diff --git a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs
index 631a7dda90..ffd1089489 100644
--- a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs
+++ b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs
@@ -10,9 +10,9 @@
namespace BDN.benchmark.Cluster
{
- unsafe class ClusterContext
+ class ClusterContext
{
- EmbeddedRespServer server;
+ GarnetEmbeddedApplication server;
RespServerSession session;
readonly BenchUtils benchUtils = new();
readonly int port = 7000;
@@ -22,9 +22,11 @@ unsafe class ClusterContext
public Request[] singleMGetMSet;
public Request singleCTXNSET;
- public void Dispose()
+ public async Task Dispose()
{
session.Dispose();
+ await server.StopAsync();
+ server.Dispose();
server.Dispose();
}
@@ -39,12 +41,17 @@ public void SetupSingleInstance(bool disableSlotVerification = false)
};
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
opt.CheckpointDir = "/tmp";
- server = new EmbeddedRespServer(opt);
+
+ var builder = GarnetEmbeddedApplication.CreateHostBuilder([], opt);
+
+ server = builder.Build();
+
session = server.GetRespSession();
+
_ = server.Register.NewTransactionProc(CustomTxnSet.CommandName, () => new CustomTxnSet(), new RespCommandsInfo { Arity = CustomTxnSet.Arity });
}
- public void AddSlotRange(List<(int, int)> slotRanges)
+ public unsafe void AddSlotRange(List<(int, int)> slotRanges)
{
foreach (var slotRange in slotRanges)
{
@@ -56,7 +63,7 @@ public void AddSlotRange(List<(int, int)> slotRanges)
}
}
- public void CreateGetSet(int keySize = 8, int valueSize = 32, int batchSize = 100)
+ public unsafe void CreateGetSet(int keySize = 8, int valueSize = 32, int batchSize = 100)
{
var pairs = new (byte[], byte[])[batchSize];
for (var i = 0; i < batchSize; i++)
@@ -93,7 +100,7 @@ public void CreateGetSet(int keySize = 8, int valueSize = 32, int batchSize = 10
singleGetSet = [getReq, setReq];
}
- public void CreateMGetMSet(int keySize = 8, int valueSize = 32, int batchSize = 100)
+ public unsafe void CreateMGetMSet(int keySize = 8, int valueSize = 32, int batchSize = 100)
{
var pairs = new (byte[], byte[])[batchSize];
for (var i = 0; i < batchSize; i++)
@@ -134,7 +141,7 @@ public void CreateMGetMSet(int keySize = 8, int valueSize = 32, int batchSize =
singleMGetMSet = [mGetReq, mSetReq];
}
- public void CreateCTXNSET(int keySize = 8, int batchSize = 100)
+ public unsafe void CreateCTXNSET(int keySize = 8, int batchSize = 100)
{
var keys = new byte[8][];
for (var i = 0; i < 8; i++)
@@ -163,7 +170,7 @@ public void CreateCTXNSET(int keySize = 8, int batchSize = 100)
singleCTXNSET = ctxnsetReq;
}
- public void Consume(byte* ptr, int length)
+ public unsafe void Consume(byte* ptr, int length)
=> session.TryConsumeMessages(ptr, length);
}
diff --git a/benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs b/benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
deleted file mode 100644
index bae520f373..0000000000
--- a/benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT license.
-
-using Garnet;
-using Garnet.common;
-using Garnet.server;
-using Microsoft.Extensions.Logging;
-using Tsavorite.core;
-
-namespace Embedded.server
-{
- ///
- /// Implements an embedded Garnet RESP server
- ///
- internal sealed class EmbeddedRespServer : GarnetServer
- {
- readonly GarnetServerEmbedded garnetServerEmbedded;
- readonly SubscribeBroker> subscribeBroker;
-
- ///
- /// Creates an EmbeddedRespServer instance
- ///
- /// Server options to configure the base GarnetServer instance
- /// Logger factory to configure the base GarnetServer instance
- /// Server network
- public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, server)
- {
- this.garnetServerEmbedded = server;
- this.subscribeBroker = opts.DisablePubSub ? null :
- new SubscribeBroker>(
- new SpanByteKeySerializer(),
- null,
- opts.PubSubPageSizeBytes(),
- opts.SubscriberRefreshFrequencyMs,
- true);
- }
-
- ///
- /// Dispose server
- ///
- public new void Dispose() => base.Dispose();
-
- public StoreWrapper StoreWrapper => storeWrapper;
-
- ///
- /// Return a direct RESP session to this server
- ///
- /// A new RESP server session
- internal RespServerSession GetRespSession()
- {
- return new RespServerSession(0, new EmbeddedNetworkSender(), storeWrapper, subscribeBroker: subscribeBroker, null, true);
- }
-
- internal EmbeddedNetworkHandler GetNetworkHandler()
- {
- return garnetServerEmbedded.CreateNetworkHandler();
- }
- }
-}
\ No newline at end of file
diff --git a/benchmark/BDN.benchmark/Embedded/GarnetEmbeddedApplication.cs b/benchmark/BDN.benchmark/Embedded/GarnetEmbeddedApplication.cs
new file mode 100644
index 0000000000..dbecf59b5a
--- /dev/null
+++ b/benchmark/BDN.benchmark/Embedded/GarnetEmbeddedApplication.cs
@@ -0,0 +1,77 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Garnet;
+using Garnet.common;
+using Garnet.server;
+using Microsoft.Extensions.DependencyInjection;
+using Tsavorite.core;
+
+namespace Embedded.server;
+
+internal sealed class GarnetEmbeddedApplication
+{
+ public RegisterApi Register => app.Register;
+
+ readonly StoreWrapper store;
+ readonly SubscribeBroker> subscriberBroker;
+ readonly GarnetApplication app;
+
+ public GarnetEmbeddedApplication(GarnetApplication app)
+ {
+ this.app = app;
+ this.store = app.Services.GetRequiredService();
+ this.subscriberBroker =
+ app.Services.GetRequiredService>>();
+ }
+
+ public static GarnetEmbeddedApplicationBuilder CreateHostBuilder(string[] args, GarnetServerOptions options)
+ {
+ return new(new GarnetApplicationOptions
+ {
+ Args = args,
+ }, options);
+ }
+
+ public async Task StopAsync(CancellationToken cancellationToken = default)
+ {
+ await app.StopAsync(cancellationToken);
+ }
+
+ ///
+ /// Dispose server
+ ///
+ public void Dispose() => app.Dispose();
+
+ public StoreWrapper StoreWrapper => store;
+
+ ///
+ /// Return a direct RESP session to this server
+ ///
+ /// A new RESP server session
+ internal RespServerSession GetRespSession()
+ {
+ return new RespServerSession(0,
+ new EmbeddedNetworkSender(),
+ store,
+ subscribeBroker: this.subscriberBroker,
+ null,
+ true);
+ }
+
+ internal EmbeddedNetworkHandler GetNetworkHandler()
+ {
+ var server = app.Services
+ .GetServices()
+ .OfType()
+ .FirstOrDefault();
+
+ Console.WriteLine(server);
+
+ return server?.CreateNetworkHandler();
+ }
+}
\ No newline at end of file
diff --git a/benchmark/BDN.benchmark/Embedded/GarnetEmbeddedApplicationBuilder.cs b/benchmark/BDN.benchmark/Embedded/GarnetEmbeddedApplicationBuilder.cs
new file mode 100644
index 0000000000..527809b17a
--- /dev/null
+++ b/benchmark/BDN.benchmark/Embedded/GarnetEmbeddedApplicationBuilder.cs
@@ -0,0 +1,40 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using System.Linq;
+using Garnet;
+using Garnet.server;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Embedded.server;
+
+internal class GarnetEmbeddedApplicationBuilder : GarnetApplicationBuilder
+{
+ internal GarnetEmbeddedApplicationBuilder(GarnetApplicationOptions options, GarnetServerOptions garnetServerOptions)
+ : base(options, garnetServerOptions)
+ {
+ }
+
+ public new GarnetEmbeddedApplication Build()
+ {
+ {
+ var serviceDescriptor = base.Services
+ .FirstOrDefault(descriptor => descriptor.ServiceType == typeof(StoreWrapper));
+
+ base.Services.Remove(serviceDescriptor);
+ }
+
+ {
+ var serviceDescriptor = base.Services
+ .FirstOrDefault(descriptor => descriptor.ServiceType == typeof(IGarnetServer));
+
+ base.Services.Remove(serviceDescriptor);
+
+ base.Services.AddSingleton();
+ }
+
+ var app = base.Build();
+
+ return new GarnetEmbeddedApplication(app);
+ }
+}
\ No newline at end of file
diff --git a/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs b/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
index 63fe7d90ec..99687f0109 100644
--- a/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
+++ b/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
@@ -9,93 +9,92 @@
using Garnet.server;
using Microsoft.Extensions.Logging;
-namespace Embedded.server
+namespace Embedded.server;
+
+internal class GarnetServerEmbedded : GarnetServerBase, IServerHook
{
- internal class GarnetServerEmbedded : GarnetServerBase, IServerHook
+ public GarnetServerEmbedded() : base("0.0.0.0", 0, 1 << 10)
{
- public GarnetServerEmbedded() : base("0.0.0.0", 0, 1 << 10)
- {
- }
+ }
- public EmbeddedNetworkHandler CreateNetworkHandler(SslClientAuthenticationOptions tlsOptions = null, string remoteEndpointName = null)
- {
- var networkSender = new EmbeddedNetworkSender();
- var networkSettings = new NetworkBufferSettings();
- var networkPool = networkSettings.CreateBufferPool();
- EmbeddedNetworkHandler handler = null;
+ public EmbeddedNetworkHandler CreateNetworkHandler(SslClientAuthenticationOptions tlsOptions = null, string remoteEndpointName = null)
+ {
+ var networkSender = new EmbeddedNetworkSender();
+ var networkSettings = new NetworkBufferSettings();
+ var networkPool = networkSettings.CreateBufferPool();
+ EmbeddedNetworkHandler handler = null;
- if (activeHandlerCount >= 0)
+ if (activeHandlerCount >= 0)
+ {
+ var currentActiveHandlerCount = Interlocked.Increment(ref activeHandlerCount);
+ if (currentActiveHandlerCount > 0)
{
- var currentActiveHandlerCount = Interlocked.Increment(ref activeHandlerCount);
- if (currentActiveHandlerCount > 0)
+ try
{
- try
- {
- handler = new EmbeddedNetworkHandler(this, networkSender, networkSettings, networkPool, tlsOptions != null);
- if (!activeHandlers.TryAdd(handler, default))
- throw new Exception("Unable to add handler to dictionary");
+ handler = new EmbeddedNetworkHandler(this, networkSender, networkSettings, networkPool, tlsOptions != null);
+ if (!activeHandlers.TryAdd(handler, default))
+ throw new Exception("Unable to add handler to dictionary");
- handler.Start(tlsOptions, remoteEndpointName);
- incr_conn_recv();
- return handler;
- }
- catch (Exception ex)
- {
- logger?.LogError(ex, "Error starting network handler");
- Interlocked.Decrement(ref activeHandlerCount);
- handler?.Dispose();
- }
+ handler.Start(tlsOptions, remoteEndpointName);
+ incr_conn_recv();
+ return handler;
}
- else
+ catch (Exception ex)
{
+ logger?.LogError(ex, "Error starting network handler");
Interlocked.Decrement(ref activeHandlerCount);
+ handler?.Dispose();
}
}
- return handler;
- }
-
- public void DisposeMessageConsumer(INetworkHandler session)
- {
- if (activeHandlers.TryRemove(session, out _))
+ else
{
Interlocked.Decrement(ref activeHandlerCount);
- incr_conn_disp();
- try
- {
- session.Session?.Dispose();
- }
- catch (Exception ex)
- {
- logger?.LogError(ex, "Error disposing RespServerSession");
- }
}
}
+ return handler;
+ }
- public override void Start()
+ public void DisposeMessageConsumer(INetworkHandler session)
+ {
+ if (activeHandlers.TryRemove(session, out _))
{
+ Interlocked.Decrement(ref activeHandlerCount);
+ incr_conn_disp();
+ try
+ {
+ session.Session?.Dispose();
+ }
+ catch (Exception ex)
+ {
+ logger?.LogError(ex, "Error disposing RespServerSession");
+ }
}
+ }
- public bool TryCreateMessageConsumer(Span bytes, INetworkSender networkSender, out IMessageConsumer session)
- {
- session = null;
-
- // We need at least 4 bytes to determine session
- if (bytes.Length < 4)
- return false;
+ public override void Start()
+ {
+ }
- WireFormat protocol = WireFormat.ASCII;
+ public bool TryCreateMessageConsumer(Span bytes, INetworkSender networkSender, out IMessageConsumer session)
+ {
+ session = null;
- if (!GetSessionProviders().TryGetValue(protocol, out var provider))
- {
- var input = System.Text.Encoding.ASCII.GetString(bytes);
- logger?.LogError("Cannot identify wire protocol {bytes}", input);
- throw new Exception($"Unsupported incoming wire format {protocol} {input}");
- }
+ // We need at least 4 bytes to determine session
+ if (bytes.Length < 4)
+ return false;
- if (!AddSession(protocol, ref provider, networkSender, out session))
- throw new Exception($"Unable to add session");
+ WireFormat protocol = WireFormat.ASCII;
- return true;
+ if (!GetSessionProviders().TryGetValue(protocol, out var provider))
+ {
+ var input = System.Text.Encoding.ASCII.GetString(bytes);
+ logger?.LogError("Cannot identify wire protocol {bytes}", input);
+ throw new Exception($"Unsupported incoming wire format {protocol} {input}");
}
+
+ if (!AddSession(protocol, ref provider, networkSender, out session))
+ throw new Exception($"Unable to add session");
+
+ return true;
}
}
\ No newline at end of file
diff --git a/benchmark/BDN.benchmark/Lua/LuaRunnerOperations.cs b/benchmark/BDN.benchmark/Lua/LuaRunnerOperations.cs
index a100249efe..e4431fe236 100644
--- a/benchmark/BDN.benchmark/Lua/LuaRunnerOperations.cs
+++ b/benchmark/BDN.benchmark/Lua/LuaRunnerOperations.cs
@@ -3,6 +3,7 @@
using BenchmarkDotNet.Attributes;
using Embedded.server;
+using Garnet;
using Garnet.server;
namespace BDN.benchmark.Lua
@@ -11,7 +12,7 @@ namespace BDN.benchmark.Lua
/// Benchmark for non-script running operations in LuaRunner
///
[MemoryDiagnoser]
- public unsafe class LuaRunnerOperations
+ public class LuaRunnerOperations
{
private const string SmallScript = "return nil";
@@ -147,7 +148,7 @@ public IEnumerable LuaParamsProvider()
new(LuaMemoryManagementMode.Managed, true),
];
- private EmbeddedRespServer server;
+ private GarnetEmbeddedApplication server;
private RespServerSession session;
private LuaRunner paramsRunner;
@@ -162,7 +163,14 @@ public void GlobalSetup()
{
opts = Params.CreateOptions();
- server = new EmbeddedRespServer(new GarnetServerOptions() { EnableLua = true, QuietMode = true, LuaOptions = opts });
+ var builder = GarnetEmbeddedApplication.CreateHostBuilder([], new GarnetServerOptions
+ {
+ EnableLua = true,
+ QuietMode = true,
+ LuaOptions = opts
+ });
+
+ server = builder.Build();
session = server.GetRespSession();
diff --git a/benchmark/BDN.benchmark/Lua/LuaScriptCacheOperations.cs b/benchmark/BDN.benchmark/Lua/LuaScriptCacheOperations.cs
index 430a92ca59..efda07487a 100644
--- a/benchmark/BDN.benchmark/Lua/LuaScriptCacheOperations.cs
+++ b/benchmark/BDN.benchmark/Lua/LuaScriptCacheOperations.cs
@@ -30,7 +30,7 @@ public IEnumerable LuaParamsProvider()
new(LuaMemoryManagementMode.Managed, true),
];
- private EmbeddedRespServer server;
+ private GarnetEmbeddedApplication server;
private StoreWrapper storeWrapper;
private SessionScriptCache sessionScriptCache;
private RespServerSession session;
@@ -44,7 +44,11 @@ public void GlobalSetup()
{
var options = Params.CreateOptions();
- server = new EmbeddedRespServer(new GarnetServerOptions() { EnableLua = true, QuietMode = true, LuaOptions = options });
+ var builder = GarnetEmbeddedApplication.CreateHostBuilder([],
+ new GarnetServerOptions() { EnableLua = true, QuietMode = true, LuaOptions = options });
+
+ server = builder.Build();
+
storeWrapper = server.StoreWrapper;
sessionScriptCache = new SessionScriptCache(storeWrapper, new GarnetNoAuthAuthenticator());
session = server.GetRespSession();
diff --git a/benchmark/BDN.benchmark/Network/NetworkBase.cs b/benchmark/BDN.benchmark/Network/NetworkBase.cs
index c76820a350..d922555d82 100644
--- a/benchmark/BDN.benchmark/Network/NetworkBase.cs
+++ b/benchmark/BDN.benchmark/Network/NetworkBase.cs
@@ -32,7 +32,7 @@ public IEnumerable NetworkParamsProvider()
/// in order to stress the network layer.
///
const int batchSize = 1;
- EmbeddedRespServer server;
+ GarnetEmbeddedApplication server;
EmbeddedNetworkHandler networkHandler;
///
@@ -47,7 +47,9 @@ public virtual void GlobalSetup()
DisablePubSub = true,
};
- server = new EmbeddedRespServer(opts, null, new GarnetServerEmbedded());
+ var builder = GarnetEmbeddedApplication.CreateHostBuilder([], opts);
+
+ server = builder.Build();
networkHandler = server.GetNetworkHandler();
// Send a PING message to warm up the session
diff --git a/benchmark/BDN.benchmark/Operations/OperationsBase.cs b/benchmark/BDN.benchmark/Operations/OperationsBase.cs
index 16be016871..4f79511e75 100644
--- a/benchmark/BDN.benchmark/Operations/OperationsBase.cs
+++ b/benchmark/BDN.benchmark/Operations/OperationsBase.cs
@@ -41,7 +41,7 @@ public IEnumerable OperationParamsProvider()
/// 100 us = 1 Mops/sec
///
internal const int batchSize = 100;
- internal EmbeddedRespServer server;
+ internal GarnetEmbeddedApplication server;
internal RespServerSession session;
internal RespServerSession subscribeSession;
@@ -79,7 +79,9 @@ public virtual void GlobalSetup()
opts.AuthSettings = new AclAuthenticationPasswordSettings(aclFile);
}
- server = new EmbeddedRespServer(opts, null, new GarnetServerEmbedded());
+ var builder = GarnetEmbeddedApplication.CreateHostBuilder([], opts);
+
+ server = builder.Build();
session = server.GetRespSession();
}
finally
diff --git a/benchmark/BDN.benchmark/Operations/PubSubOperations.cs b/benchmark/BDN.benchmark/Operations/PubSubOperations.cs
index a2414d8405..4b8d9f2e2c 100644
--- a/benchmark/BDN.benchmark/Operations/PubSubOperations.cs
+++ b/benchmark/BDN.benchmark/Operations/PubSubOperations.cs
@@ -27,7 +27,8 @@ public override void GlobalSetup()
DisablePubSub = false,
};
- server = new EmbeddedRespServer(opts, null, new GarnetServerEmbedded());
+ var builder = GarnetEmbeddedApplication.CreateHostBuilder([], opts);
+ server = builder.Build();
session = server.GetRespSession();
subscribeSession = server.GetRespSession();
diff --git a/benchmark/BDN.benchmark/Operations/ScriptOperations.cs b/benchmark/BDN.benchmark/Operations/ScriptOperations.cs
index b8298912eb..d04c59eaa0 100644
--- a/benchmark/BDN.benchmark/Operations/ScriptOperations.cs
+++ b/benchmark/BDN.benchmark/Operations/ScriptOperations.cs
@@ -187,7 +187,7 @@ public static IEnumerable LuaParamsProvider()
/// 100 us = 1 Mops/sec
///
internal const int batchSize = 100;
- internal EmbeddedRespServer server;
+ internal GarnetEmbeddedApplication server;
internal RespServerSession session;
///
@@ -203,7 +203,8 @@ public void GlobalSetup()
LuaOptions = Params.CreateOptions(),
};
- server = new EmbeddedRespServer(opts);
+ var builder = GarnetEmbeddedApplication.CreateHostBuilder([], opts);
+ server = builder.Build();
session = server.GetRespSession();
diff --git a/hosting/Windows/Garnet.worker/Program.cs b/hosting/Windows/Garnet.worker/Program.cs
index 8418da8671..854d7c4ab4 100644
--- a/hosting/Windows/Garnet.worker/Program.cs
+++ b/hosting/Windows/Garnet.worker/Program.cs
@@ -5,19 +5,12 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
-class Program
-{
- static void Main(string[] args)
- {
- var builder = Host.CreateApplicationBuilder(args);
- builder.Services.AddHostedService(_ => new Worker(args));
+var builder = GarnetApplication.CreateHostBuilder(args);
- builder.Services.AddWindowsService(options =>
- {
- options.ServiceName = "Microsoft Garnet Server";
- });
+builder.Services.AddWindowsService(options =>
+{
+ options.ServiceName = "Microsoft Garnet Server";
+});
- var host = builder.Build();
- host.Run();
- }
-}
\ No newline at end of file
+var app = builder.Build();
+app.Run();
\ No newline at end of file
diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs
deleted file mode 100644
index d69adb7e3c..0000000000
--- a/hosting/Windows/Garnet.worker/Worker.cs
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT license.
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Hosting;
-
-namespace Garnet
-{
- public class Worker : BackgroundService
- {
- private bool _isDisposed = false;
- private readonly string[] args;
-
- private GarnetServer server;
-
- public Worker(string[] args)
- {
- this.args = args;
- }
-
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- try
- {
- server = new GarnetServer(args);
-
- // Start the server
- server.Start();
-
- await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- Console.WriteLine($"Unable to initialize server due to exception: {ex.Message}");
- }
- }
-
- ///
- /// Triggered when the application host is performing a graceful shutdown.
- ///
- /// Indicates that the shutdown process should no longer be graceful.
- public override async Task StopAsync(CancellationToken cancellationToken)
- {
- Dispose();
- await base.StopAsync(cancellationToken).ConfigureAwait(false);
- }
-
- public override void Dispose()
- {
- if (_isDisposed)
- {
- return;
- }
- server?.Dispose();
- _isDisposed = true;
- }
- }
-}
\ No newline at end of file
diff --git a/libs/host/Factories/GarnetProviderFactory.cs b/libs/host/Factories/GarnetProviderFactory.cs
new file mode 100644
index 0000000000..e1b9162fef
--- /dev/null
+++ b/libs/host/Factories/GarnetProviderFactory.cs
@@ -0,0 +1,36 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using Garnet.common;
+using Garnet.server;
+using Microsoft.Extensions.Options;
+using Tsavorite.core;
+
+namespace Garnet;
+
+internal class GarnetProviderFactory
+{
+ readonly GarnetServerOptions options;
+ readonly SubscribeBroker> subscribeBroker;
+ readonly StoreWrapper storeWrapper;
+
+ public GarnetProviderFactory(
+ IOptions options,
+ SubscribeBroker> subscribeBroker,
+ StoreWrapper storeWrapper)
+ {
+ this.options = options.Value;
+ this.subscribeBroker = subscribeBroker;
+ this.storeWrapper = storeWrapper;
+ }
+
+ public GarnetProvider Create()
+ {
+ if (options.DisablePubSub)
+ {
+ return new GarnetProvider(storeWrapper, null);
+ }
+
+ return new GarnetProvider(storeWrapper, subscribeBroker);
+ }
+}
\ No newline at end of file
diff --git a/libs/host/Factories/StoreFactory.cs b/libs/host/Factories/StoreFactory.cs
new file mode 100644
index 0000000000..9802c886ba
--- /dev/null
+++ b/libs/host/Factories/StoreFactory.cs
@@ -0,0 +1,105 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using Garnet.server;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Tsavorite.core;
+
+namespace Garnet;
+
+using ObjectStoreAllocator =
+ GenericAllocator>>;
+using ObjectStoreFunctions =
+ StoreFunctions>;
+
+internal class StoreFactory
+{
+ private readonly IClusterFactory clusterFactory;
+ private readonly GarnetServerOptions opts;
+ private readonly ILoggerFactory loggerFactory;
+ private readonly CustomCommandManager customCommandManager;
+
+ public StoreFactory(
+ IClusterFactory clusterFactory,
+ IOptions options,
+ ILoggerFactory loggerFactory,
+ CustomCommandManager customCommandManager)
+ {
+ this.clusterFactory = options.Value.EnableCluster ? clusterFactory : null;
+ this.opts = options.Value;
+ this.loggerFactory = loggerFactory;
+ this.customCommandManager = customCommandManager;
+ }
+
+ public MainStoreWrapper CreateMainStore()
+ {
+ var kvSettings = opts.GetSettings(loggerFactory, out var logFactory);
+
+ var checkpointDir = opts.CheckpointDir ?? opts.LogDir;
+
+ // Run checkpoint on its own thread to control p99
+ kvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
+ kvSettings.CheckpointVersionSwitchBarrier = opts.EnableCluster;
+
+ var checkpointFactory = opts.DeviceFactoryCreator();
+ if (opts.EnableCluster)
+ {
+ kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(checkpointFactory,
+ new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), isMainStore: true);
+ }
+ else
+ {
+ kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(checkpointFactory,
+ new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), removeOutdated: true);
+ }
+
+ return new(new(kvSettings
+ , StoreFunctions.Create()
+ , (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions)), kvSettings);
+ }
+
+ public ObjectStoreWrapper CreateObjectStore()
+ {
+ TsavoriteKV objectStore = null;
+ KVSettings objKvSettings = null;
+ CacheSizeTracker objectStoreSizeTracker = null;
+
+ if (!opts.DisableObjects)
+ {
+ var checkpointDir = opts.CheckpointDir ?? opts.LogDir;
+
+ objKvSettings = opts.GetObjectStoreSettings(loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
+ out var objHeapMemorySize, out var objReadCacheHeapMemorySize);
+
+ // Run checkpoint on its own thread to control p99
+ objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
+ objKvSettings.CheckpointVersionSwitchBarrier = opts.EnableCluster;
+
+ if (opts.EnableCluster)
+ objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(
+ opts.DeviceFactoryCreator(),
+ new DefaultCheckpointNamingScheme(checkpointDir + "/ObjectStore/checkpoints"),
+ isMainStore: false);
+ else
+ objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(
+ opts.DeviceFactoryCreator(),
+ new DefaultCheckpointNamingScheme(checkpointDir + "/ObjectStore/checkpoints"),
+ removeOutdated: true);
+
+ objectStore = new(objKvSettings
+ , StoreFunctions.Create(new ByteArrayKeyComparer(),
+ () => new ByteArrayBinaryObjectSerializer(),
+ () => new GarnetObjectSerializer(customCommandManager))
+ , (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
+
+ if (objHeapMemorySize > 0 || objReadCacheHeapMemorySize > 0)
+ objectStoreSizeTracker = new CacheSizeTracker(objectStore, objKvSettings, objHeapMemorySize,
+ objReadCacheHeapMemorySize,
+ loggerFactory);
+ }
+
+ return new(objectStore, objKvSettings, objectStoreSizeTracker);
+ }
+}
\ No newline at end of file
diff --git a/libs/host/Factories/StoreWrapperFactory.cs b/libs/host/Factories/StoreWrapperFactory.cs
new file mode 100644
index 0000000000..1627618682
--- /dev/null
+++ b/libs/host/Factories/StoreWrapperFactory.cs
@@ -0,0 +1,114 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using System;
+using System.Linq;
+using System.Text;
+using Garnet.server;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace Garnet;
+
+internal class StoreWrapperFactory
+{
+ ///
+ /// Resp protocol version
+ ///
+ readonly string redisProtocolVersion = "7.2.5";
+
+ readonly ILoggerFactory loggerFactory;
+ readonly ILogger logger;
+ readonly IGarnetServer garnetServer;
+ readonly GarnetServerOptions options;
+ readonly CustomCommandManager customCommandManager;
+ readonly IClusterFactory clusterFactory;
+ readonly MainStoreWrapper mainStoreWrapper;
+ readonly ObjectStoreWrapper objectStoreWrapper;
+ readonly AppendOnlyFileWrapper appendOnlyFileWrapper;
+
+ public StoreWrapperFactory(
+ ILoggerFactory loggerFactory,
+ ILogger logger,
+ IGarnetServer garnetServer,
+ IOptions options,
+ CustomCommandManager customCommandManager,
+ IClusterFactory clusterFactory,
+ MainStoreWrapper mainStoreWrapper,
+ ObjectStoreWrapper objectStoreWrapper,
+ AppendOnlyFileWrapper appendOnlyFileWrapper)
+ {
+ this.loggerFactory = loggerFactory;
+ this.logger = logger;
+ this.garnetServer = garnetServer;
+ this.options = options.Value;
+ this.customCommandManager = customCommandManager;
+ this.clusterFactory = this.options.EnableCluster ? clusterFactory : null;
+ this.mainStoreWrapper = mainStoreWrapper;
+ this.objectStoreWrapper = objectStoreWrapper;
+ this.appendOnlyFileWrapper = appendOnlyFileWrapper;
+ }
+
+ public StoreWrapper Create(string version)
+ {
+ var store = mainStoreWrapper.store;
+ var objectStore = objectStoreWrapper.objectStore;
+ var appendOnlyFile = appendOnlyFileWrapper.appendOnlyFile;
+
+ var objectStoreSizeTracker = objectStoreWrapper.objectStoreSizeTracker;
+
+ var configMemoryLimit = (store.IndexSize * 64) + store.Log.MaxMemorySizeBytes +
+ (store.ReadCache?.MaxMemorySizeBytes ?? 0) +
+ (appendOnlyFile?.MaxMemorySizeBytes ?? 0);
+ if (objectStore != null)
+ {
+ configMemoryLimit += objectStore.IndexSize * 64 + objectStore.Log.MaxMemorySizeBytes +
+ (objectStore.ReadCache?.MaxMemorySizeBytes ?? 0) +
+ (objectStoreSizeTracker?.TargetSize ?? 0) +
+ (objectStoreSizeTracker?.ReadCacheTargetSize ?? 0);
+ }
+
+ logger.LogInformation("Total configured memory limit: {configMemoryLimit}", configMemoryLimit);
+
+ LoadModules();
+
+ return new StoreWrapper(
+ version,
+ redisProtocolVersion,
+ garnetServer,
+ store,
+ objectStore,
+ objectStoreSizeTracker,
+ customCommandManager,
+ appendOnlyFile,
+ options,
+ clusterFactory: clusterFactory,
+ loggerFactory: loggerFactory);
+ }
+
+ private void LoadModules()
+ {
+ if (options.LoadModuleCS == null)
+ return;
+
+ foreach (var moduleCS in options.LoadModuleCS)
+ {
+ var moduleCSData = moduleCS.Split(' ', StringSplitOptions.RemoveEmptyEntries);
+ if (moduleCSData.Length < 1)
+ continue;
+
+ var modulePath = moduleCSData[0];
+ var moduleArgs = moduleCSData.Length > 1 ? moduleCSData.Skip(1).ToArray() : [];
+ if (ModuleUtils.LoadAssemblies([modulePath], null, true, out var loadedAssemblies, out var errorMsg))
+ {
+ ModuleRegistrar.Instance.LoadModule(customCommandManager, loadedAssemblies.ToList()[0], moduleArgs,
+ logger, out errorMsg);
+ }
+ else
+ {
+ logger?.LogError("Module {0} failed to load with error {1}", modulePath,
+ Encoding.UTF8.GetString(errorMsg));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/libs/host/Garnet.host.csproj b/libs/host/Garnet.host.csproj
index 1c1259fec3..afeca6f519 100644
--- a/libs/host/Garnet.host.csproj
+++ b/libs/host/Garnet.host.csproj
@@ -20,6 +20,8 @@
+
+
diff --git a/libs/host/GarnetApplicationBuilder.cs b/libs/host/GarnetApplicationBuilder.cs
new file mode 100644
index 0000000000..cf5a54ed83
--- /dev/null
+++ b/libs/host/GarnetApplicationBuilder.cs
@@ -0,0 +1,179 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Garnet.cluster;
+using Garnet.common;
+using Garnet.server;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Diagnostics.Metrics;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Tsavorite.core;
+
+namespace Garnet;
+
+public class GarnetApplicationBuilder : IHostApplicationBuilder
+{
+ readonly HostApplicationBuilder hostApplicationBuilder;
+
+ public GarnetApplicationBuilder(GarnetApplicationOptions options, GarnetServerOptions garnetServerOptions)
+ {
+ var configuration = new ConfigurationManager();
+
+ configuration.AddEnvironmentVariables(prefix: "GARNET_");
+
+ hostApplicationBuilder = new HostApplicationBuilder(new HostApplicationBuilderSettings
+ {
+ Args = options.Args,
+ ApplicationName = options.ApplicationName,
+ EnvironmentName = options.EnvironmentName,
+ Configuration = configuration
+ });
+
+ hostApplicationBuilder.Services.AddOptions();
+
+ var garnetServerOptionsWrapped = Microsoft.Extensions.Options.Options.Create(garnetServerOptions);
+ hostApplicationBuilder.Services.AddSingleton(garnetServerOptionsWrapped);
+
+ hostApplicationBuilder.Logging.ClearProviders();
+ hostApplicationBuilder.Logging.AddSimpleConsole(simpleConsoleFormatterOptions =>
+ {
+ simpleConsoleFormatterOptions.SingleLine = true;
+ simpleConsoleFormatterOptions.TimestampFormat = "hh::mm::ss ";
+ });
+ hostApplicationBuilder.Logging.SetMinimumLevel(garnetServerOptions.LogLevel);
+
+ hostApplicationBuilder.Services.AddTransient();
+ hostApplicationBuilder.Services.AddTransient();
+ hostApplicationBuilder.Services.AddTransient();
+ hostApplicationBuilder.Services.AddTransient();
+
+ hostApplicationBuilder.Services.AddSingleton();
+
+ hostApplicationBuilder.Services.AddSingleton(sp =>
+ {
+ var loggerFactory = sp.GetService();
+
+ var options = sp.GetRequiredService>();
+ var opts = options.Value;
+
+ var appendOnlyFileWrapper = new AppendOnlyFileWrapper(null, null);
+
+ if (opts.EnableAOF)
+ {
+ if (opts.MainMemoryReplication && opts.CommitFrequencyMs != -1)
+ throw new Exception(
+ "Need to set CommitFrequencyMs to -1 (manual commits) with MainMemoryReplication");
+
+ opts.GetAofSettings(out var aofSettings);
+
+ var aofDevice = aofSettings.LogDevice;
+ var appendOnlyFile = new TsavoriteLog(aofSettings,
+ logger: loggerFactory?.CreateLogger("TsavoriteLog [aof]"));
+
+ appendOnlyFileWrapper = new AppendOnlyFileWrapper(aofDevice, appendOnlyFile);
+
+ if (opts.CommitFrequencyMs < 0 && opts.WaitForCommit)
+ throw new Exception("Cannot use CommitWait with manual commits");
+
+ return appendOnlyFileWrapper;
+ }
+
+ if (opts.CommitFrequencyMs != 0 || opts.WaitForCommit)
+ throw new Exception("Cannot use CommitFrequencyMs or CommitWait without EnableAOF");
+
+ return appendOnlyFileWrapper;
+ });
+
+ hostApplicationBuilder.Services.AddSingleton(sp =>
+ {
+ var storeFactory = sp.GetRequiredService();
+
+ return storeFactory.CreateMainStore();
+ });
+
+ hostApplicationBuilder.Services.AddSingleton(sp =>
+ {
+ var storeFactory = sp.GetRequiredService();
+
+ return storeFactory.CreateObjectStore();
+ });
+
+ hostApplicationBuilder.Services.AddSingleton();
+
+ hostApplicationBuilder.Services.AddSingleton(sp =>
+ {
+ var options = sp.GetRequiredService>();
+ var opts = options.Value;
+
+ return new SubscribeBroker>(
+ new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, true);
+ });
+
+ hostApplicationBuilder.Services.AddSingleton(sp =>
+ {
+ var storeWrapperFactory = sp.GetRequiredService();
+
+ var version = GetVersion();
+
+ return storeWrapperFactory.Create(version);
+ });
+
+ hostApplicationBuilder.Services.AddSingleton(sp =>
+ {
+ var garnetProviderFactory = sp.GetRequiredService();
+
+ return garnetProviderFactory.Create();
+ });
+
+ hostApplicationBuilder.Services.AddSingleton();
+ hostApplicationBuilder.Services.AddSingleton();
+ hostApplicationBuilder.Services.AddSingleton();
+
+ hostApplicationBuilder.Services.AddSingleton();
+
+ hostApplicationBuilder.Services.AddHostedService();
+ }
+
+ public GarnetApplication Build()
+ {
+ var host = hostApplicationBuilder.Build();
+ return new GarnetApplication(host);
+ }
+
+ public void ConfigureContainer(IServiceProviderFactory factory,
+ Action configure = null)
+ where TContainerBuilder : notnull
+ {
+ hostApplicationBuilder.ConfigureContainer(factory, configure);
+ }
+
+ public IDictionary