Skip to content

Commit

Permalink
make peer client public
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasteles committed Mar 21, 2024
1 parent e05e5ec commit ccaf352
Show file tree
Hide file tree
Showing 24 changed files with 191 additions and 147 deletions.
6 changes: 3 additions & 3 deletions benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
using Backdash.Network.Client;
namespace Backdash.Benchmarks.Ping;
sealed class PingMessageHandler(
IUdpClient<PingMessage> sender,
IPeerClient<PingMessage> sender,
Memory<byte>? buffer = null
) : IUdpObserver<PingMessage>
) : IPeerObserver<PingMessage>
{
public static long TotalProcessed => _processedCount;
static long _processedCount;
public async ValueTask OnUdpMessage(
public async ValueTask OnPeerMessage(
PingMessage message,
SocketAddress from,
int bytesReceived,
Expand Down
14 changes: 7 additions & 7 deletions benchmarks/Backdash.Benchmarks.Ping/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
await using Measurer measurer = new(snapshotInterval);
measurer.Start();

IPEndPoint peer2Endpoint = new(IPAddress.Loopback, peer2.Port);
IPEndPoint peer2Endpoint = new(IPAddress.Loopback, 9001);
_ = peer1.SendTo(peer2Endpoint.Serialize(), PingMessage.Ping, sendBuffer1).AsTask();

Console.WriteLine("Press enter to stop.");
Expand All @@ -44,17 +44,17 @@
Console.Clear();
Console.WriteLine(measurer.Summary(printSnapshots));

IUdpClient<PingMessage> CreateClient(int port, Memory<byte>? buffer = null)
IPeerClient<PingMessage> CreateClient(int port, Memory<byte>? buffer = null)
{
UdpObserverGroup<PingMessage> observers = new();
UdpClient<PingMessage> udp = new(
PeerObserverGroup<PingMessage> observers = new();
PeerClient<PingMessage> peer = new(
new UdpSocket(port),
BinarySerializerFactory.ForEnum<PingMessage>(),
observers,
logger,
bufferSize
);
observers.Add(new PingMessageHandler(udp, buffer));
jobs.Register(udp);
return udp;
observers.Add(new PingMessageHandler(peer, buffer));
jobs.Register(peer);
return peer;
}
2 changes: 1 addition & 1 deletion benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void OnProcessed(long count)

pingerHandler.OnProcessed += OnProcessed;

IPEndPoint pongerEndpoint = new(IPAddress.Loopback, ponger.Port);
IPEndPoint pongerEndpoint = new(IPAddress.Loopback, 9001);
var pongerAddress = pongerEndpoint.Serialize();

async Task StartSending()
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/Backdash.Benchmarks/Network/Factory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ namespace Backdash.Benchmarks.Network;

static class Factory
{
public static UdpClient<PingMessage> CreateUdpClient(
public static PeerClient<PingMessage> CreateUdpClient(
int port,
out UdpObserverGroup<PingMessage> observers
out PeerObserverGroup<PingMessage> observers
)
{
observers = new();
UdpClient<PingMessage> client = new(
PeerClient<PingMessage> client = new(
new UdpSocket(port),
BinarySerializerFactory.ForEnum<PingMessage>(),
observers,
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/Backdash.Benchmarks/Network/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ public enum PingMessage : long
}
sealed class PingMessageHandler(
string name,
IUdpClient<PingMessage> sender,
IPeerClient<PingMessage> sender,
Memory<byte> sendBuffer
) : IUdpObserver<PingMessage>
) : IPeerObserver<PingMessage>
{
long processedCount;
long badMessages;
public long ProcessedCount => processedCount;
public long BadMessages => badMessages;
public event Action<long> OnProcessed = delegate { };
public async ValueTask OnUdpMessage(
public async ValueTask OnPeerMessage(
PingMessage message,
SocketAddress from,
int bytesReceived,
Expand Down
1 change: 0 additions & 1 deletion samples/SpaceWar.Lobby/AppSettings.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Reflection;
using System.Text.Json;

namespace SpaceWar;
Expand Down
8 changes: 6 additions & 2 deletions src/Backdash/Backends/BackendServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using Backdash.Sync.Input;
using Backdash.Sync.State;
using Backdash.Sync.State.Stores;

namespace Backdash.Backends;

sealed class BackendServices<TInput, TGameState>
where TInput : struct
where TGameState : IEquatable<TGameState>, new()
Expand All @@ -15,11 +17,12 @@ sealed class BackendServices<TInput, TGameState>
public Logger Logger { get; }
public IClock Clock { get; }
public IBackgroundJobManager JobManager { get; }
public IUdpClientFactory UdpClientFactory { get; }
public IProtocolClientFactory ProtocolClientFactory { get; }
public IStateStore<TGameState> StateStore { get; }
public IInputGenerator<TInput>? InputGenerator { get; }
public IRandomNumberGenerator Random { get; }
public IDelayStrategy DelayStrategy { get; }

public BackendServices(RollbackOptions options, SessionServices<TInput, TGameState>? services)
{
ChecksumProvider = services?.ChecksumProvider ?? ChecksumProviderFactory.Create<TGameState>();
Expand All @@ -35,9 +38,10 @@ public BackendServices(RollbackOptions options, SessionServices<TInput, TGameSta
Logger = new Logger(options.Log, logWriter);
Clock = new Clock();
JobManager = new BackgroundJobManager(Logger);
UdpClientFactory = new UdpClientFactory();
ProtocolClientFactory = new ProtocolClientFactory(options, Logger);
}
}

static class BackendServices
{
public static BackendServices<TInput, TGameState> Create<TInput, TGameState>(
Expand Down
22 changes: 8 additions & 14 deletions src/Backdash/Backends/Peer2PeerBackend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ sealed class Peer2PeerBackend<TInput, TGameState> : IRollbackSession<TInput, TGa
readonly IBinarySerializer<CombinedInputs<TInput>> inputGroupSerializer;
readonly Logger logger;
readonly IStateStore<TGameState> stateStore;
readonly IUdpClient<ProtocolMessage> udp;
readonly UdpObserverGroup<ProtocolMessage> udpObservers;
readonly IProtocolClient udp;
readonly PeerObserverGroup<ProtocolMessage> peerObservers;
readonly Synchronizer<TInput, TGameState> synchronizer;
readonly ConnectionsState localConnections;
readonly IBackgroundJobManager backgroundJobManager;
Expand Down Expand Up @@ -66,7 +66,7 @@ BackendServices<TInput, TGameState> services
localConnections = new(Max.NumberOfPlayers);
spectators = [];
endpoints = [];
udpObservers = new();
peerObservers = new();
callbacks = new EmptySessionHandler<TGameState>(logger);
synchronizer = new(
this.options,
Expand All @@ -79,15 +79,9 @@ BackendServices<TInput, TGameState> services
{
Callbacks = callbacks,
};
var selectedEndianness = Platform.GetEndianness(this.options.NetworkEndianness);
udp = services.UdpClientFactory.CreateClient(
port,
selectedEndianness,
this.options.Protocol.UdpPacketBufferSize,
this.options.UseIPv6,
udpObservers,
logger
);

udp = services.ProtocolClientFactory.CreateProtocolClient(port, peerObservers);

peerConnectionFactory = new(
this,
services.Clock,
Expand Down Expand Up @@ -267,7 +261,7 @@ ResultCode AddRemotePlayer(RemotePlayer player)
inputSerializer,
peerInputEventQueue
);
udpObservers.Add(protocol.GetUdpObserver());
peerObservers.Add(protocol.GetUdpObserver());
endpoints.Add(protocol);
Array.Resize(ref syncInputBuffer, syncInputBuffer.Length + 1);
synchronizer.AddQueue(player.Handle);
Expand Down Expand Up @@ -304,7 +298,7 @@ ResultCode AddSpectator(Spectator spectator)
new(spectatorHandle, spectator.EndPoint, localConnections),
inputGroupSerializer, peerCombinedInputsEventPublisher
);
udpObservers.Add(protocol.GetUdpObserver());
peerObservers.Add(protocol.GetUdpObserver());
spectators.Add(protocol);
logger.Write(LogLevel.Information, $"Adding {spectator.Handle} at {spectator.EndPoint}");
protocol.Synchronize();
Expand Down
17 changes: 5 additions & 12 deletions src/Backdash/Backends/SpectatorBackend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ sealed class SpectatorBackend<TInput, TGameState> :
where TGameState : IEquatable<TGameState>, new()
{
readonly Logger logger;
readonly IUdpClient<ProtocolMessage> udp;
readonly IProtocolClient udp;
readonly IPEndPoint hostEndpoint;
readonly RollbackOptions options;
readonly IBackgroundJobManager backgroundJobManager;
Expand Down Expand Up @@ -56,18 +56,11 @@ public SpectatorBackend(int port,
.Select(x => new PlayerHandle(PlayerType.Remote, x + 1, x)).ToArray();
IBinarySerializer<CombinedInputs<TInput>> inputGroupSerializer =
new CombinedInputsSerializer<TInput>(services.InputSerializer);
UdpObserverGroup<ProtocolMessage> udpObservers = new();
PeerObserverGroup<ProtocolMessage> peerObservers = new();
callbacks = new EmptySessionHandler<TGameState>(logger);
inputs = new GameInput<CombinedInputs<TInput>>[options.SpectatorInputBufferLength];
var selectedEndianness = Platform.GetEndianness(options.NetworkEndianness);
udp = services.UdpClientFactory.CreateClient(
port,
selectedEndianness,
options.Protocol.UdpPacketBufferSize,
options.UseIPv6,
udpObservers,
logger
);

udp = services.ProtocolClientFactory.CreateProtocolClient(port, peerObservers);
backgroundJobManager.Register(udp);

PeerConnectionFactory peerConnectionFactory = new(
Expand All @@ -79,7 +72,7 @@ public SpectatorBackend(int port,
new(new PlayerHandle(PlayerType.Remote, 0), hostEndpoint, localConnections);

host = peerConnectionFactory.Create(protocolState, inputGroupSerializer, this);
udpObservers.Add(host.GetUdpObserver());
peerObservers.Add(host.GetUdpObserver());
host.Synchronize();
isSynchronizing = true;
}
Expand Down
14 changes: 7 additions & 7 deletions src/Backdash/Core/BackgroundJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ interface IBackgroundJob
{
string JobName { get; }

Task Start(CancellationToken ct);
Task Start(CancellationToken cancellationToken);
}

interface IBackgroundJobManager : IDisposable
{
Task Start(CancellationToken ct);
void Register(IBackgroundJob job, CancellationToken ct = default);
Task Start(CancellationToken cancellationToken);
void Register(IBackgroundJob job, CancellationToken cancellationToken = default);
void Stop(TimeSpan timeout = default);
void ThrowIfError();
}
Expand All @@ -27,11 +27,11 @@ sealed class BackgroundJobManager(Logger logger) : IBackgroundJobManager

readonly List<Exception> exceptions = [];

public async Task Start(CancellationToken ct)
public async Task Start(CancellationToken cancellationToken)
{
if (isRunning) return;
if (jobs.Count is 0) throw new NetcodeException("No jobs registered");
ct.Register(() => Stop(TimeSpan.Zero));
cancellationToken.Register(() => Stop(TimeSpan.Zero));
logger.Write(LogLevel.Debug, "Starting background tasks");
foreach (var job in jobs) AddJobTask(new(job.Job, job.StoppingToken));
isRunning = true;
Expand Down Expand Up @@ -84,9 +84,9 @@ public void ThrowIfError()
throw new AggregateException(exceptions);
}

public void Register(IBackgroundJob job, CancellationToken ct = default)
public void Register(IBackgroundJob job, CancellationToken cancellationToken = default)
{
JobEntry entry = new(job, ct);
JobEntry entry = new(job, cancellationToken);
if (!jobs.Add(entry))
return;
if (!isRunning) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,43 @@

namespace Backdash.Network.Client;

interface IUdpClient<in T> : IBackgroundJob, IDisposable where T : struct
/// <summary>
/// Client for peer communication
/// </summary>
public interface IPeerClient<in T> : IDisposable where T : struct
{
public int Port { get; }
/// <summary>
/// Send Message to peer
/// </summary>
ValueTask<int> SendTo(SocketAddress peerAddress, T payload, CancellationToken ct = default);

/// <summary>
/// Send Message to peer
/// </summary>
ValueTask<int> SendTo(SocketAddress peerAddress, T payload, Memory<byte> buffer, CancellationToken ct = default);

/// <summary>
/// Start receiving messages
/// </summary>
Task StartReceiving(CancellationToken cancellationToken);
}

sealed class UdpClient<T> : IUdpClient<T> where T : struct
interface IPeerJobClient<in T> : IBackgroundJob, IPeerClient<T> where T : struct;

sealed class PeerClient<T> : IPeerJobClient<T> where T : struct
{
readonly UdpSocket socket;
readonly IUdpObserver<T> observer;
readonly IPeerObserver<T> observer;
readonly IBinarySerializer<T> serializer;
readonly Logger logger;
readonly int maxPacketSize;
CancellationTokenSource? cancellation;
public string JobName { get; }
public int Port => socket.Port;

public UdpClient(
public PeerClient(
UdpSocket socket,
IBinarySerializer<T> serializer,
IUdpObserver<T> observer,
IPeerObserver<T> observer,
Logger logger,
int maxPacketSize = Max.UdpPacketSize
)
Expand All @@ -37,6 +52,7 @@ public UdpClient(
ArgumentNullException.ThrowIfNull(observer);
ArgumentNullException.ThrowIfNull(serializer);
ArgumentNullException.ThrowIfNull(logger);

this.socket = socket;
this.observer = observer;
this.serializer = serializer;
Expand All @@ -46,11 +62,14 @@ public UdpClient(
JobName = $"{nameof(UdpClient)} ({socket.Port})";
}

public async Task Start(CancellationToken ct)
public Task Start(CancellationToken cancellationToken) =>
StartReceiving(cancellationToken);

public async Task StartReceiving(CancellationToken cancellationToken)
{
if (cancellation is not null) return;
cancellation = new();
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct, cancellation.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cancellation.Token);
var token = cts.Token;
await ReceiveLoop(token).ConfigureAwait(false);
}
Expand Down Expand Up @@ -96,7 +115,7 @@ async Task ReceiveLoop(CancellationToken ct)
try
{
serializer.Deserialize(buffer[..receivedSize].Span, ref msg);
await observer.OnUdpMessage(msg, address, receivedSize, ct).ConfigureAwait(false);
await observer.OnPeerMessage(msg, address, receivedSize, ct).ConfigureAwait(false);
}
catch (NetcodeDeserializationException ex)
{
Expand Down
27 changes: 27 additions & 0 deletions src/Backdash/Network/Client/PeerClientFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Backdash.Core;
using Backdash.Serialization;

namespace Backdash.Network.Client;

/// <summary>
/// Create new instances of <see cref="IPeerClient{T}"/>
/// </summary>
public static class PeerClientFactory
{
internal static IPeerClient<T> Create<T>(UdpSocket socket,
IBinarySerializer<T> serializer,
IPeerObserver<T> observer, Logger logger,
int maxPacketSize = Max.UdpPacketSize
) where T : struct =>
new PeerClient<T>(socket, serializer, observer, logger, maxPacketSize);

/// <summary>
/// Creates new <see cref="IPeerClient{T}"/>
/// </summary>
public static IPeerClient<T> Create<T>(UdpSocket socket,
IBinarySerializer<T> serializer,
IPeerObserver<T> observer,
int maxPacketSize = Max.UdpPacketSize
) where T : struct =>
Create(socket, serializer, observer, Logger.CreateConsoleLogger(LogLevel.None), maxPacketSize);
}
Loading

0 comments on commit ccaf352

Please sign in to comment.