Skip to content

Commit

Permalink
Merge pull request #77 from Klotzi111/main
Browse files Browse the repository at this point in the history
Various improvements 2
  • Loading branch information
psu-de authored Aug 22, 2024
2 parents baffc9d + 8e188b1 commit bd6deda
Show file tree
Hide file tree
Showing 73 changed files with 3,785 additions and 390 deletions.
159 changes: 90 additions & 69 deletions Components/MineSharp.Protocol/MinecraftClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,18 @@ protected override void Unregister()
/// <summary>
/// Waits until a packet of the specified type is received and matches the given condition.
/// </summary>
/// <typeparam name="T">The type of the packet.</typeparam>
/// <typeparam name="TPacket">The type of the packet.</typeparam>
/// <param name="condition">A function that evaluates the packet and returns true if the condition is met.</param>
/// <param name="cancellationToken">A token to cancel the wait for the matching packet.</param>
/// <returns>A task that completes once a packet matching the condition is received.</returns>
public Task WaitForPacketWhere<T>(Func<T, Task<bool>> condition, CancellationToken cancellationToken = default)
where T : IPacket
public Task<TPacket> WaitForPacketWhere<TPacket>(Func<TPacket, Task<bool>> condition, CancellationToken cancellationToken = default)
where TPacket : IPacket
{
// linked token is required to cancel the task when the client is disconnected
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, CancellationToken);
var token = cts.Token;
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
async Task PacketHandler(T packet)
var tcs = new TaskCompletionSource<TPacket>(TaskCreationOptions.RunContinuationsAsynchronously);
async Task PacketHandler(TPacket packet)
{
try
{
Expand All @@ -388,7 +388,7 @@ async Task PacketHandler(T packet)
}
if (await condition(packet).WaitAsync(token))
{
tcs.TrySetResult();
tcs.TrySetResult(packet);
}
}
catch (OperationCanceledException e)
Expand All @@ -400,31 +400,33 @@ async Task PacketHandler(T packet)
tcs.TrySetException(e);
}
}
var packetRegistration = On<T>(PacketHandler);
var packetRegistration = On<TPacket>(PacketHandler);
if (packetRegistration == null)
{
// TODO: Can this occur?
cts.Dispose();
throw new InvalidOperationException("Could not register packet handler");
}
return tcs.Task.ContinueWith(_ =>
// this registration is required because otherwise the task will only get cancelled when the next packet of that ype is received
var cancellationRegistration = token.Register(() =>
{
// cancelling the tcs will later dispose the other stuff
tcs.TrySetCanceled(token);
});
tcs.Task.ContinueWith(_ =>
{
cancellationRegistration.Dispose();
packetRegistration.Dispose();
cts.Dispose();
}, TaskContinuationOptions.ExecuteSynchronously);
return tcs.Task;
}

/// <summary>
/// Waits until a packet of the specified type is received and matches the given condition.
/// </summary>
/// <typeparam name="T">The type of the packet.</typeparam>
/// <param name="condition">A function that evaluates the packet and returns true if the condition is met.</param>
/// <param name="cancellationToken">A token to cancel the wait for the matching packet.</param>
/// <returns>A task that completes once a packet matching the condition is received.</returns>
public Task WaitForPacketWhere<T>(Func<T, bool> condition, CancellationToken cancellationToken = default)
where T : IPacket
/// <inheritdoc cref="WaitForPacketWhere{TPacket}(Func{TPacket, Task{bool}}, CancellationToken)"/>
public Task<TPacket> WaitForPacketWhere<TPacket>(Func<TPacket, bool> condition, CancellationToken cancellationToken = default)
where TPacket : IPacket
{
return WaitForPacketWhere<T>(packet => Task.FromResult(condition(packet)), cancellationToken);
return WaitForPacketWhere<TPacket>(packet => Task.FromResult(condition(packet)), cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -550,7 +552,7 @@ internal void HandleBundleDelimiter()

private async Task ProcessBundledPackets(ConcurrentQueue<(PacketType, PacketBuffer)> packets)
{
Logger.Debug($"Processing {packets.Count} bundled packets");
Logger.Trace($"Processing {packets.Count} bundled packets");
try
{
// wiki.vg: the client is guaranteed to process every packet in the bundle on the same tick
Expand Down Expand Up @@ -581,8 +583,9 @@ private async Task StreamLoop()
try
{
// run both tasks in parallel
var receiveTask = Task.Factory.StartNew(ReceivePackets, TaskCreationOptions.LongRunning);
var sendTask = Task.Factory.StartNew(SendPackets, TaskCreationOptions.LongRunning);
// because the task factory does not unwrap the tasks (like Task.Run) we need to do it manually
var receiveTask = Task.Factory.StartNew(ReceivePackets, TaskCreationOptions.LongRunning).Unwrap();
var sendTask = Task.Factory.StartNew(SendPackets, TaskCreationOptions.LongRunning).Unwrap();

// extract the exception from the task that finished first
await await Task.WhenAny(receiveTask, sendTask);
Expand All @@ -601,79 +604,97 @@ private async Task StreamLoop()

private async Task ReceivePackets()
{
while (true)
try
{
CancellationToken.ThrowIfCancellationRequested();
while (true)
{
CancellationToken.ThrowIfCancellationRequested();

var buffer = stream!.ReadPacket();
var buffer = stream!.ReadPacket();

var packetId = buffer.ReadVarInt();
var gameState = gameStatePacketHandler.GameState;
var packetType = Data.Protocol.GetPacketType(PacketFlow.Clientbound, gameState, packetId);
var packetId = buffer.ReadVarInt();
var gameState = gameStatePacketHandler.GameState;
var packetType = Data.Protocol.GetPacketType(PacketFlow.Clientbound, gameState, packetId);

Logger.Trace("Received packet {PacketType}. GameState = {GameState}, PacketId = {PacketId}", packetType, gameState, packetId);
Logger.Trace("Received packet {PacketType}. GameState = {GameState}, PacketId = {PacketId}", packetType, gameState, packetId);

// handle BundleDelimiter packet here, because there is a race condition where some
// packets may be read before HandleBundleDelimiter is invoked through a handler
if (packetType == PacketType.CB_Play_BundleDelimiter)
{
HandleBundleDelimiter();
continue;
}
// handle BundleDelimiter packet here, because there is a race condition where some
// packets may be read before HandleBundleDelimiter is invoked through a handler
if (packetType == PacketType.CB_Play_BundleDelimiter)
{
HandleBundleDelimiter();
continue;
}

if (gameState != GameState.Play)
{
await HandleIncomingPacket(packetType, buffer);
}
else
{
var bundledPackets = this.bundledPackets;
if (bundledPackets != null)
if (gameState != GameState.Play)
{
bundledPackets.Enqueue((packetType, buffer));
await HandleIncomingPacket(packetType, buffer);
}
else
{
// handle the packet in a new task to prevent blocking the stream loop
_ = Task.Run(() => HandleIncomingPacket(packetType, buffer));
var bundledPackets = this.bundledPackets;
if (bundledPackets != null)
{
bundledPackets.Enqueue((packetType, buffer));
}
else
{
// handle the packet in a new task to prevent blocking the stream loop
_ = Task.Run(() => HandleIncomingPacket(packetType, buffer));
}
}
}
}
catch (Exception e)
{
Logger.Debug(e, "ReceivePackets loop ended with exception.");
throw;
}
// can never exit without exception because infinite loop without break
}

private async Task SendPackets()
{
await foreach (var task in packetQueue.ReceiveAllAsync())
try
{
if (task.Token.IsCancellationRequested)
await foreach (var task in packetQueue.ReceiveAllAsync())
{
task.Task.TrySetCanceled();
continue;
}
if (task.Token.IsCancellationRequested)
{
task.Task.TrySetCanceled();
continue;
}

try
{
DispatchPacket(task.Packet);
task.Task.TrySetResult();
}
catch (OperationCanceledException e)
{
task.Task.TrySetCanceled(e.CancellationToken);
// we should stop. So we do by rethrowing the exception
throw;
}
catch (Exception e)
{
Logger.Error(e, "Encountered exception while dispatching packet {PacketType}", task.Packet.Type);
task.Task.TrySetException(e);
if (e is SocketException)
try
{
DispatchPacket(task.Packet);
task.Task.TrySetResult();
}
catch (OperationCanceledException e)
{
// break the loop to prevent further packets from being sent
// because the connection is probably dead
task.Task.TrySetCanceled(e.CancellationToken);
// we should stop. So we do by rethrowing the exception
throw;
}
catch (Exception e)
{
Logger.Error(e, "Encountered exception while dispatching packet {PacketType}", task.Packet.Type);
task.Task.TrySetException(e);
if (e is SocketException)
{
// break the loop to prevent further packets from being sent
// because the connection is probably dead
throw;
}
}
}
}
catch (Exception e)
{
Logger.Debug(e, "SendPackets loop ended with exception.");
throw;
}
// can never exit without exception because infinite loop without break (because we never complete the BufferBlock we only cancel it)
}

private void DispatchPacket(IPacket packet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace MineSharp.Protocol.Packets.Clientbound.Configuration;
/// <summary>
/// Plugin message packet
/// </summary>
/// <param name="ChannelName">The name of the channel the data was sent</param>
/// <param name="Channel">The name of the channel the data was sent</param>
/// <param name="Data">The message data</param>
public sealed record PluginMessagePacket(Identifier ChannelName, byte[] Data) : IPacket
public sealed record PluginMessagePacket(Identifier Channel, byte[] Data) : IPacket
{
/// <inheritdoc />
public PacketType Type => StaticType;
Expand All @@ -20,16 +20,16 @@ public sealed record PluginMessagePacket(Identifier ChannelName, byte[] Data) :
/// <inheritdoc />
public void Write(PacketBuffer buffer, MinecraftData version)
{
buffer.WriteIdentifier(ChannelName);
buffer.WriteIdentifier(Channel);
buffer.WriteBytes(Data);
}

/// <inheritdoc />
public static IPacket Read(PacketBuffer buffer, MinecraftData version)
{
var channelName = buffer.ReadIdentifier();
var channel = buffer.ReadIdentifier();
var data = buffer.RestBuffer();
return new PluginMessagePacket(channelName, data);
return new PluginMessagePacket(channel, data);
}
}

Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using MineSharp.Core.Common;
using MineSharp.Core.Serialization;
using MineSharp.Core.Serialization;
using MineSharp.Data;
using MineSharp.Data.Protocol;
using static MineSharp.Protocol.Packets.Clientbound.Configuration.UpdateTagsPacket;
using MineSharp.Protocol.Packets.NetworkTypes;

namespace MineSharp.Protocol.Packets.Clientbound.Configuration;

Expand All @@ -20,88 +19,14 @@ public sealed record UpdateTagsPacket(Registry[] Registries) : IPacket
/// <inheritdoc />
public void Write(PacketBuffer buffer, MinecraftData version)
{
WriteRegistries(buffer, Registries);
buffer.WriteVarIntArray(Registries, (buffer, registry) => registry.Write(buffer));
}

/// <inheritdoc />
public static IPacket Read(PacketBuffer buffer, MinecraftData version)
{
var registries = ReadRegistries(buffer);
return new UpdateTagsPacket(registries);
}

private static void WriteRegistries(PacketBuffer buffer, Registry[] registries)
{
buffer.WriteVarInt(registries.Length);
foreach (var registry in registries)
{
buffer.WriteIdentifier(registry.Identifier);
WriteTags(buffer, registry.Tags);
}
}

private static Registry[] ReadRegistries(PacketBuffer buffer)
{
var registryCount = buffer.ReadVarInt();
var registries = new Registry[registryCount];
for (int i = 0; i < registryCount; i++)
{
var registry = buffer.ReadIdentifier();
var tags = ReadTags(buffer);
registries[i] = new Registry(registry, tags);
}
return registries;
}

private static void WriteTags(PacketBuffer buffer, Tag[] tags)
{
buffer.WriteVarInt(tags.Length);
foreach (var tag in tags)
{
buffer.WriteIdentifier(tag.Name);
buffer.WriteVarInt(tag.Entries.Length);
foreach (var entry in tag.Entries)
{
buffer.WriteVarInt(entry);
}
}
}

private static Tag[] ReadTags(PacketBuffer buffer)
{
var tagCount = buffer.ReadVarInt();
var tags = new Tag[tagCount];
for (int j = 0; j < tagCount; j++)
{
var tagName = buffer.ReadIdentifier();
var entries = ReadEntries(buffer);
tags[j] = new Tag(tagName, entries);
}
return tags;
}
var registries = buffer.ReadVarIntArray(Registry.Read);

private static int[] ReadEntries(PacketBuffer buffer)
{
var entryCount = buffer.ReadVarInt();
var entries = new int[entryCount];
for (int k = 0; k < entryCount; k++)
{
entries[k] = buffer.ReadVarInt();
}
return entries;
return new UpdateTagsPacket(registries);
}

/// <summary>
/// Represents a registry with its tags
/// </summary>
/// <param name="Identifier">The registry identifier</param>
/// <param name="Tags">Array of tags</param>
public sealed record Registry(Identifier Identifier, Tag[] Tags);

/// <summary>
/// Represents a tag with its entries
/// </summary>
/// <param name="Name">The tag name</param>
/// <param name="Entries">Array of entries</param>
public sealed record Tag(Identifier Name, int[] Entries);
}
Loading

0 comments on commit bd6deda

Please sign in to comment.