Skip to content

Commit

Permalink
Bring custom object store commands in parity with custom raw string c…
Browse files Browse the repository at this point in the history
…ommands. (microsoft#499)

This change brings in support for CustomObjectFunctions which provides high level of control when authoring custom object commands similar to that of custom raw string commands that already exists.

Benchmark (.net 8)

main branch
|Method	        |Mean	|Error	|StdDev	| Allocated |
|----------------|----------|-----------|----------|------------|
|ZAddRem	|190.6 us	| 1.58 us	 | 1.48 us	| 23 KB       |
|MyDictSetGet	|270.6 us	| 1.04 us	 | 0.81 us	| 30 KB       |

This PR
|Method	|Mean	|Error	|StdDev	|Allocated|
|----------------|----------|-----------|----------|------------|
|ZAddRem	| 192.7 us	| 0.86 us	 | 0.76 us	| 23 KB |
|MyDictSetGet	| 275.3 us	| 1.63 us	| 1.53 us	| 30 KB |
  • Loading branch information
yrajas authored Jul 25, 2024
1 parent 35d99dd commit dd31566
Show file tree
Hide file tree
Showing 31 changed files with 681 additions and 212 deletions.
5 changes: 4 additions & 1 deletion benchmark/BDN.benchmark/BDN.benchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
<ItemGroup>
<Compile Include="..\..\playground\Embedded.perftest\EmbeddedRespServer.cs" Link="EmbeddedRespServer.cs" />
<Compile Include="..\..\playground\Embedded.perftest\DummyNetworkSender.cs" Link="DummyNetworkSender.cs" />
</ItemGroup>
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictObject.cs" Link="MyDictObject.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictSet.cs" Link="MyDictSet.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictGet.cs" Link="MyDictGet.cs" />
</ItemGroup>

</Project>
25 changes: 25 additions & 0 deletions benchmark/BDN.benchmark/Resp/RespParseStress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Runtime.CompilerServices;
using BenchmarkDotNet.Attributes;
using Embedded.perftest;
using Garnet;
using Garnet.server;
using Garnet.server.Auth.Settings;

Expand Down Expand Up @@ -50,6 +51,10 @@ public unsafe class RespParseStress
byte[] hSetDelRequestBuffer;
byte* hSetDelRequestBufferPointer;

static ReadOnlySpan<byte> MYDICTSETGET => "*4\r\n$9\r\nMYDICTSET\r\n$2\r\nck\r\n$1\r\nf\r\n$1\r\nv\r\n*3\r\n$9\r\nMYDICTGET\r\n$2\r\nck\r\n$1\r\nf\r\n"u8;
byte[] myDictSetGetRequestBuffer;
byte* myDictSetGetRequestBufferPointer;

[GlobalSetup]
public void GlobalSetup()
{
Expand All @@ -59,6 +64,12 @@ public void GlobalSetup()
AuthSettings = authSettings,
};
server = new EmbeddedRespServer(opt);

var factory = new MyDictFactory();
server.Register.NewType(factory);
server.Register.NewCommand("MYDICTSET", 2, CommandType.ReadModifyWrite, factory, new MyDictSet());
server.Register.NewCommand("MYDICTGET", 1, CommandType.Read, factory, new MyDictGet());

session = server.GetRespSession();

pingRequestBuffer = GC.AllocateArray<byte>(INLINE_PING.Length * batchSize, pinned: true);
Expand Down Expand Up @@ -112,6 +123,14 @@ public void GlobalSetup()

// Pre-populate hash with a single element to avoid repeatedly emptying it during the benchmark
SlowConsumeMessage("*3\r\n$4\r\nHSET\r\n$1\r\nf\r\n$1\r\nb\r\n$1\r\nb\r\n"u8);

myDictSetGetRequestBuffer = GC.AllocateArray<byte>(MYDICTSETGET.Length * batchSize, pinned: true);
myDictSetGetRequestBufferPointer = (byte*)Unsafe.AsPointer(ref myDictSetGetRequestBuffer[0]);
for (int i = 0; i < batchSize; i++)
MYDICTSETGET.CopyTo(new Span<byte>(myDictSetGetRequestBuffer).Slice(i * MYDICTSETGET.Length));

// Pre-populate custom object
SlowConsumeMessage("*4\r\n$9\r\nMYDICTSET\r\n$2\r\nck\r\n$1\r\nf\r\n$1\r\nv\r\n"u8);
}

[GlobalCleanup]
Expand Down Expand Up @@ -169,6 +188,12 @@ public void HSetDel()
_ = session.TryConsumeMessages(hSetDelRequestBufferPointer, hSetDelRequestBuffer.Length);
}

[Benchmark]
public void MyDictSetGet()
{
_ = session.TryConsumeMessages(myDictSetGetRequestBufferPointer, myDictSetGetRequestBuffer.Length);
}

private void SlowConsumeMessage(ReadOnlySpan<byte> message)
{
var buffer = GC.AllocateArray<byte>(message.Length, pinned: true);
Expand Down
12 changes: 12 additions & 0 deletions libs/common/NumUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ public static long BytesToLong(int length, byte* source)
return fNeg ? -(result) : result;
}

/// <summary>
/// Convert sequence of ASCII bytes into long number
/// </summary>
/// <param name="source">Source bytes</param>
/// <param name="result">Long value extracted from sequence</param>
/// <returns>True if sequence contains only numeric digits, otherwise false</returns>
public static bool TryBytesToLong(ReadOnlySpan<byte> source, out long result)
{
fixed (byte* ptr = source)
return TryBytesToLong(source.Length, ptr, out result);
}

/// <summary>
/// Convert sequence of ASCII bytes into long number
/// </summary>
Expand Down
28 changes: 28 additions & 0 deletions libs/server/Custom/CustomCommandManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,34 @@ internal int Register(string name, int numParams, CommandType commandType, int o
return (objectTypeId, subCommand);
}

internal (int objectTypeId, int subCommand) Register(string name, int numParams, CommandType commandType, CustomObjectFactory factory, CustomObjectFunctions customObjectFunctions, RespCommandsInfo commandInfo)
{
var objectTypeId = -1;
for (var i = 0; i < ObjectTypeId; i++)
{
if (objectCommandMap[i].factory == factory) { objectTypeId = i; break; }
}

if (objectTypeId == -1)
{
objectTypeId = Interlocked.Increment(ref ObjectTypeId) - 1;
if (objectTypeId >= MaxRegistrations)
throw new Exception("Out of registration space");
objectCommandMap[objectTypeId] = new CustomObjectCommandWrapper((byte)objectTypeId, factory);
}

var wrapper = objectCommandMap[objectTypeId];

int subCommand = Interlocked.Increment(ref wrapper.CommandId) - 1;
if (subCommand >= byte.MaxValue)
throw new Exception("Out of registration space");
wrapper.commandMap[subCommand] = new CustomObjectCommand(name, (byte)objectTypeId, (byte)subCommand, 1, numParams, commandType, wrapper.factory, customObjectFunctions);

if (commandInfo != null) customCommandsInfo.Add(name, commandInfo);

return (objectTypeId, subCommand);
}

internal bool Match(ReadOnlySpan<byte> command, out CustomCommand cmd)
{
for (int i = 0; i < CommandId; i++)
Expand Down
24 changes: 23 additions & 1 deletion libs/server/Custom/CustomCommandRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ internal class RegisterCmdArgs : RegisterArgsBase
public CommandType CommandType { get; set; }

public long ExpirationTicks { get; set; }

public string ObjectCommandName { get; set; }

public CustomObjectFunctions ObjectCommand { get; set; }
}

/// <summary>
Expand All @@ -60,7 +64,7 @@ public static IRegisterCustomCommandProvider GetRegisterCustomCommandProvider(ob

if (instance is CustomObjectFactory cof && args is RegisterCmdArgs cofa)
{
return new RegisterCustomObjectFactoryProvider(cof, cofa);
return new RegisterCustomObjectCommandProvider(cof, cofa);
}

if (instance is CustomTransactionProcedure ctp && args is RegisterTxnArgs ctpa)
Expand Down Expand Up @@ -201,6 +205,24 @@ public override void Register(CustomCommandManager customCommandManager)
}
}

/// <summary>
/// CustomObjectFactory registration provider
/// </summary>
internal sealed class RegisterCustomObjectCommandProvider : RegisterCustomCmdProvider<CustomObjectFunctions>
{
private CustomObjectFactory factory;

public RegisterCustomObjectCommandProvider(CustomObjectFactory instance, RegisterCmdArgs args) : base(args.ObjectCommand, args)
{
this.factory = instance;
}

public override void Register(CustomCommandManager customCommandManager)
{
customCommandManager.Register(RegisterArgs.Name, RegisterArgs.NumParams, RegisterArgs.CommandType, factory, RegisterArgs.ObjectCommand, RegisterArgs.CommandInfo);
}
}

/// <summary>
/// TransactionProcedureProvider registration provider
/// </summary>
Expand Down
129 changes: 129 additions & 0 deletions libs/server/Custom/CustomCommandUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Buffers;
using System.Diagnostics;
using Garnet.common;
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Garnet.server
{
public static class CustomCommandUtils
{
/// <summary>
/// Shared memory pool used by functions
/// </summary>
private static MemoryPool<byte> MemoryPool => MemoryPool<byte>.Shared;

/// <summary>
/// Get first arg from input
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static ReadOnlySpan<byte> GetFirstArg(ReadOnlySpan<byte> input)
{
int offset = 0;
return GetNextArg(input, ref offset);
}

/// <summary>
/// Get argument from input, at specified offset (starting from 0)
/// </summary>
/// <param name="input">Input as ReadOnlySpan of byte</param>
/// <param name="offset">Current offset into input</param>
/// <returns>Argument as a span</returns>
public static unsafe ReadOnlySpan<byte> GetNextArg(ReadOnlySpan<byte> input, scoped ref int offset)
{
byte* result = null;
var len = 0;

fixed (byte* inputPtr = input)
{
var ptr = inputPtr + offset;
var end = inputPtr + input.Length;
if (ptr < end && RespReadUtils.ReadPtrWithLengthHeader(ref result, ref len, ref ptr, end))
{
offset = (int)(ptr - inputPtr);
return new ReadOnlySpan<byte>(result, len);
}
}
return default;
}

/// <summary>
/// Create output as bulk string, from given Span
/// </summary>
public static unsafe void WriteBulkString(ref (IMemoryOwner<byte>, int) output, Span<byte> bulkString)
{
// Get space for bulk string
var len = RespWriteUtils.GetBulkStringLength(bulkString.Length);
output.Item1 = MemoryPool.Rent(len);
output.Item2 = len;
fixed (byte* ptr = output.Item1.Memory.Span)
{
var curr = ptr;
// NOTE: Expected to always have enough space to write into pre-allocated buffer
var success = RespWriteUtils.WriteBulkString(bulkString, ref curr, ptr + len);
Debug.Assert(success, "Insufficient space in pre-allocated buffer");
}
}

/// <summary>
/// Create output as error message, from given string
/// </summary>
public static unsafe void WriteError(ref (IMemoryOwner<byte>, int) output, string errorMessage)
{
var bytes = System.Text.Encoding.ASCII.GetBytes(errorMessage);
// Get space for error
var len = 1 + bytes.Length + 2;
output.Item1 = MemoryPool.Rent(len);
fixed (byte* ptr = output.Item1.Memory.Span)
{
var curr = ptr;
// NOTE: Expected to always have enough space to write into pre-allocated buffer
var success = RespWriteUtils.WriteError(bytes, ref curr, ptr + len);
Debug.Assert(success, "Insufficient space in pre-allocated buffer");
}
output.Item2 = len;
}

/// <summary>
/// Create null output as bulk string
/// </summary>
public static unsafe void WriteNullBulkString(ref (IMemoryOwner<byte>, int) output)
{
// Get space for null bulk string "$-1\r\n"
var len = 5;
output.Item1 = MemoryPool.Rent(len);
output.Item2 = len;
fixed (byte* ptr = output.Item1.Memory.Span)
{
var curr = ptr;
// NOTE: Expected to always have enough space to write into pre-allocated buffer
var success = RespWriteUtils.WriteNull(ref curr, ptr + len);
Debug.Assert(success, "Insufficient space in pre-allocated buffer");
}
}

/// <summary>
/// Create output as simple string, from given string
/// </summary>
public static unsafe void WriteSimpleString(ref (IMemoryOwner<byte>, int) output, string simpleString)
{
var bytes = System.Text.Encoding.ASCII.GetBytes(simpleString);
// Get space for simple string
var len = 1 + bytes.Length + 2;
output.Item1 = MemoryPool.Rent(len);
fixed (byte* ptr = output.Item1.Memory.Span)
{
var curr = ptr;
// NOTE: Expected to always have enough space to write into pre-allocated buffer
var success = RespWriteUtils.WriteSimpleString(bytes, ref curr, ptr + len);
Debug.Assert(success, "Insufficient space in pre-allocated buffer");
}
output.Item2 = len;
}
}
}
Loading

0 comments on commit dd31566

Please sign in to comment.