From d97bcf76cefe50f409ad3740a62c8c998f24cae3 Mon Sep 17 00:00:00 2001 From: js6pak Date: Thu, 16 Dec 2021 15:43:57 +0100 Subject: [PATCH 1/8] Add fragmentation support --- Hazel.UnitTests/FragmentationTests.cs | 65 +++++ Hazel.UnitTests/UdpConnectionTestHarness.cs | 2 +- Hazel/Dtls/DtlsConnectionListener.cs | 2 +- Hazel/Dtls/DtlsUnityConnection.cs | 4 +- .../ThreadLimitedUdpConnectionListener.cs | 18 +- .../ThreadLimitedUdpServerConnection.cs | 5 +- Hazel/NetworkConnection.cs | 3 +- Hazel/Udp/SendOptionInternal.cs | 5 + Hazel/Udp/UdpClientConnection.cs | 17 +- Hazel/Udp/UdpConnection.Fragmented.cs | 223 ++++++++++++++++++ Hazel/Udp/UdpConnection.Reliable.cs | 27 ++- Hazel/Udp/UdpConnection.cs | 63 ++--- Hazel/Udp/UdpConnectionListener.cs | 10 +- Hazel/Udp/UdpServerConnection.cs | 5 +- Hazel/Udp/UnityUdpClientConnection.cs | 17 +- 15 files changed, 406 insertions(+), 60 deletions(-) create mode 100644 Hazel.UnitTests/FragmentationTests.cs create mode 100644 Hazel/Udp/UdpConnection.Fragmented.cs diff --git a/Hazel.UnitTests/FragmentationTests.cs b/Hazel.UnitTests/FragmentationTests.cs new file mode 100644 index 0000000..acca3e5 --- /dev/null +++ b/Hazel.UnitTests/FragmentationTests.cs @@ -0,0 +1,65 @@ +using System; +using System.Linq; +using System.Net; +using System.Threading; +using Hazel.Udp; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Hazel.UnitTests +{ + [TestClass] + public class FragmentationTests + { + private readonly byte[] _testData = Enumerable.Range(0, 10000).Select(x => (byte)x).ToArray(); + + [TestMethod] + public void ReliableSendTest() + { + using (var listener = new UdpConnectionListener(new IPEndPoint(IPAddress.Any, 4296))) + using (var connection = new UdpClientConnection(new IPEndPoint(IPAddress.Loopback, 4296))) + { + var manualResetEvent = new ManualResetEventSlim(false); + MessageReader messageReader = null; + + listener.NewConnection += e => + { + e.Connection.DataReceived += data => + { + messageReader = data.Message; + manualResetEvent.Set(); + }; + }; + + listener.Start(); + connection.Connect(); + + connection.SendBytes(_testData, SendOption.Reliable); + + manualResetEvent.Wait(TimeSpan.FromSeconds(5)); + + Assert.IsNotNull(messageReader); + + var received = new byte[messageReader.Length - messageReader.Offset]; + Array.Copy(messageReader.Buffer, messageReader.Offset + messageReader.Position, received, 0, messageReader.Length - messageReader.Offset); + + CollectionAssert.AreEqual(_testData, received); + } + } + + [TestMethod] + public void UnreliableSendTest() + { + using (var listener = new UdpConnectionListener(new IPEndPoint(IPAddress.Any, 4296))) + using (var connection = new UdpClientConnection(new IPEndPoint(IPAddress.Loopback, 4296))) + { + listener.Start(); + connection.Connect(); + + Assert.AreEqual("Unreliable messages can't be bigger than MTU", Assert.ThrowsException(() => + { + connection.SendBytes(_testData); + }).Message); + } + } + } +} diff --git a/Hazel.UnitTests/UdpConnectionTestHarness.cs b/Hazel.UnitTests/UdpConnectionTestHarness.cs index 8414e0c..36ea165 100644 --- a/Hazel.UnitTests/UdpConnectionTestHarness.cs +++ b/Hazel.UnitTests/UdpConnectionTestHarness.cs @@ -38,7 +38,7 @@ protected override bool SendDisconnect(MessageWriter writer) return true; } - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { this.BytesSent.Add(MessageReader.Get(bytes)); } diff --git a/Hazel/Dtls/DtlsConnectionListener.cs b/Hazel/Dtls/DtlsConnectionListener.cs index ac84b67..38c4959 100644 --- a/Hazel/Dtls/DtlsConnectionListener.cs +++ b/Hazel/Dtls/DtlsConnectionListener.cs @@ -1258,7 +1258,7 @@ private void SendHelloVerifyRequest(IPEndPoint peerAddress, ulong recordSequence /// /// Handle a requrest to send a datagram to the network /// - protected override void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint) + protected override void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint, Action onTooBig = null) { PeerData peer; if (!this.existingPeers.TryGetValue(remoteEndPoint, out peer)) diff --git a/Hazel/Dtls/DtlsUnityConnection.cs b/Hazel/Dtls/DtlsUnityConnection.cs index 8bafac1..fd1ff64 100644 --- a/Hazel/Dtls/DtlsUnityConnection.cs +++ b/Hazel/Dtls/DtlsUnityConnection.cs @@ -325,13 +325,13 @@ private ByteSpan WriteBytesToConnectionInternal(byte[] bytes, int length) } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { ByteSpan wireData = this.WriteBytesToConnectionInternal(bytes, length); if (wireData.Length > 0) { Debug.Assert(wireData.Offset == 0, "Got a non-zero write data offset"); - base.WriteBytesToConnection(wireData.GetUnderlyingArray(), wireData.Length); + base.WriteBytesToConnection(wireData.GetUnderlyingArray(), wireData.Length, onTooBig); } } diff --git a/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs b/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs index 92a7a81..882a865 100644 --- a/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs +++ b/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs @@ -17,6 +17,7 @@ private struct SendMessageInfo { public ByteSpan Span; public IPEndPoint Recipient; + public Action OnTooBig; } private struct ReceiveMessageInfo @@ -250,7 +251,14 @@ private void SendLoop() { if (this.socket.Poll(Timeout.Infinite, SelectMode.SelectWrite)) { - this.socket.SendTo(msg.Span.GetUnderlyingArray(), msg.Span.Offset, msg.Span.Length, SocketFlags.None, msg.Recipient); + try + { + this.socket.SendTo(msg.Span.GetUnderlyingArray(), msg.Span.Offset, msg.Span.Length, SocketFlags.None, msg.Recipient); + } + catch (SocketException e) when (msg.OnTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + msg.OnTooBig(); + } } else { @@ -335,14 +343,14 @@ protected virtual void ReadCallback(MessageReader message, IPEndPoint remoteEndP connection.HandleReceive(message, bytesReceived); } - internal void SendDataRaw(byte[] response, IPEndPoint remoteEndPoint) + internal void SendDataRaw(byte[] response, IPEndPoint remoteEndPoint, Action onTooBig = null) { - QueueRawData(response, remoteEndPoint); + QueueRawData(response, remoteEndPoint, onTooBig); } - protected virtual void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint) + protected virtual void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint, Action onTooBig = null) { - this.sendQueue.TryAdd(new SendMessageInfo() { Span = span, Recipient = remoteEndPoint }); + this.sendQueue.TryAdd(new SendMessageInfo() { Span = span, Recipient = remoteEndPoint, OnTooBig = onTooBig }); } /// diff --git a/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs b/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs index 28b21d6..3aa20be 100644 --- a/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs +++ b/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs @@ -38,14 +38,15 @@ internal ThreadLimitedUdpServerConnection(ThreadLimitedUdpConnectionListener lis State = ConnectionState.Connected; this.InitializeKeepAliveTimer(); + this.StartMtuDiscovery(); } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { if (bytes.Length != length) throw new ArgumentException("I made an assumption here. I hope you see this error."); - Listener.SendDataRaw(bytes, EndPoint); + Listener.SendDataRaw(bytes, EndPoint, onTooBig); } /// diff --git a/Hazel/NetworkConnection.cs b/Hazel/NetworkConnection.cs index 9799509..e4bd91c 100644 --- a/Hazel/NetworkConnection.cs +++ b/Hazel/NetworkConnection.cs @@ -14,7 +14,8 @@ public enum HazelInternalErrors ReceivedZeroBytes, PingsWithoutResponse, ReliablePacketWithoutResponse, - ConnectionDisconnected + ConnectionDisconnected, + MtuTooLow } /// diff --git a/Hazel/Udp/SendOptionInternal.cs b/Hazel/Udp/SendOptionInternal.cs index 74786d8..6a0cc0a 100644 --- a/Hazel/Udp/SendOptionInternal.cs +++ b/Hazel/Udp/SendOptionInternal.cs @@ -35,5 +35,10 @@ public enum UdpSendOption : byte /// Message that is part of a larger, fragmented message. /// Fragment = 11, + + /// + /// Message that is used to discover MTU. + /// + MtuTest = 13, } } diff --git a/Hazel/Udp/UdpClientConnection.cs b/Hazel/Udp/UdpClientConnection.cs index 73eec41..bf6f92a 100644 --- a/Hazel/Udp/UdpClientConnection.cs +++ b/Hazel/Udp/UdpClientConnection.cs @@ -61,21 +61,21 @@ private void ManageReliablePacketsInternal(object state) } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { #if DEBUG if (TestLagMs > 0) { - ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length); }); + ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length, onTooBig); }); } else #endif { - WriteBytesToConnectionReal(bytes, length); + WriteBytesToConnectionReal(bytes, length, onTooBig); } } - private void WriteBytesToConnectionReal(byte[] bytes, int length) + private void WriteBytesToConnectionReal(byte[] bytes, int length, Action onTooBig) { #if DEBUG DataSentRaw?.Invoke(bytes, length); @@ -97,6 +97,10 @@ private void WriteBytesToConnectionReal(byte[] bytes, int length) { // Already disposed and disconnected... } + catch (SocketException e) when (onTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + onTooBig(); + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -114,6 +118,10 @@ private void HandleSendTo(IAsyncResult result) { // Already disposed and disconnected... } + catch (SocketException e) when (e.SocketErrorCode == SocketError.MessageSize) + { + throw; + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -177,6 +185,7 @@ public override void ConnectAsync(byte[] bytes = null) { this.State = ConnectionState.Connected; this.InitializeKeepAliveTimer(); + this.StartMtuDiscovery(); }); } diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs new file mode 100644 index 0000000..272acc4 --- /dev/null +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -0,0 +1,223 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace Hazel.Udp +{ + public partial class UdpConnection + { + /// + /// Maximum possible UDP header size - 60-byte IP header + 8-byte UDP header + /// + public const ushort MaxUdpHeaderSize = 68; + + /// + /// Popular MTU values used for quick MTU discovery + /// + public static ushort[] PossibleMtu { get; } = + { + 576 - MaxUdpHeaderSize, // RFC 1191 + 1024, + 1460 - MaxUdpHeaderSize, // Google Cloud + 1492 - MaxUdpHeaderSize, // RFC 1042 + 1500 - MaxUdpHeaderSize, // RFC 1191 + }; + + private int _mtu = PossibleMtu[0]; + + /// + /// MTU of this connection + /// + public int Mtu => ForcedMtu ?? _mtu; + + /// + /// Forced MTU, overrides the MTU + /// + public int? ForcedMtu { get; set; } = null; + + /// + /// Called when the MTU changes. + /// + public event Action MtuChanged; + + private byte _mtuIndex; + + private readonly Dictionary _fragmentedMessagesReceived = new Dictionary(); + private volatile int _lastFragmentedId; + + protected void StartMtuDiscovery() + { + MtuTest(_mtuIndex); + } + + private void MtuTest(byte index) + { + var mtu = PossibleMtu[index]; + var failed = false; + + var buffer = new byte[mtu]; + buffer[0] = (byte)UdpSendOption.MtuTest; + var id = AttachReliableID(buffer, 1, () => + { + if (failed) return; + MtuOk(index); + }); + buffer[mtu - 2] = (byte)mtu; + buffer[mtu - 1] = (byte)(mtu >> 8); + + WriteBytesToConnection(buffer, buffer.Length, () => + { + failed = true; + AcknowledgeMessageId(id); + + if (index == 0) + { + DisconnectInternal(HazelInternalErrors.MtuTooLow, "Connection MTU is lower than the minimum"); + } + }); + } + + private void MtuOk(byte index) + { + _mtuIndex = index; + _mtu = PossibleMtu[index]; + MtuChanged?.Invoke(); + + if (_mtuIndex < PossibleMtu.Length - 1) + { + MtuTest((byte)(index + 1)); + } + } + + private void MtuTestMessageReceive(MessageReader message) + { + message.Position = message.Length - 2; + var mtu = message.ReadUInt16(); + + if (mtu != message.Length) + { + return; + } + + ProcessReliableReceive(message.Buffer, 1, out _); + } + + private const byte FragmentHeaderSize = sizeof(byte) + sizeof(ushort) + sizeof(ushort) + sizeof(ushort); + + protected void FragmentedSend(byte[] data, Action ackCallback, bool includeHeader) + { + var id = (ushort)Interlocked.Increment(ref _lastFragmentedId); + var fragmentSize = Mtu; + var fragmentDataSize = fragmentSize - FragmentHeaderSize; + var fragmentsCount = (int)Math.Ceiling(data.Length / (double)fragmentDataSize); + + if (fragmentsCount >= ushort.MaxValue) + { + throw new HazelException("Too many fragments"); + } + + for (ushort i = 0; i < fragmentsCount; i++) + { + var dataLength = Math.Min(fragmentDataSize, data.Length - fragmentDataSize * i); + var buffer = new byte[dataLength + FragmentHeaderSize]; + + buffer[0] = (byte)UdpSendOption.Fragment; + + AttachReliableID(buffer, 1); + + buffer[3] = (byte)fragmentsCount; + buffer[4] = (byte)(fragmentsCount >> 8); + + buffer[5] = (byte)id; + buffer[6] = (byte)(id >> 8); + + Buffer.BlockCopy(data, fragmentDataSize * i, buffer, FragmentHeaderSize, dataLength); + + WriteBytesToConnection(buffer, buffer.Length); + } + } + + protected void FragmentMessageReceive(MessageReader messageReader) + { + if (ProcessReliableReceive(messageReader.Buffer, 1, out var id)) + { + messageReader.Position += 3; + + var fragmentsCount = messageReader.ReadUInt16(); + var fragmentedMessageId = messageReader.ReadUInt16(); + + lock (_fragmentedMessagesReceived) + { + if (!_fragmentedMessagesReceived.TryGetValue(fragmentedMessageId, out var fragmentedMessage)) + { + _fragmentedMessagesReceived.Add(fragmentedMessageId, fragmentedMessage = new FragmentedMessage(fragmentsCount)); + } + + var buffer = new byte[messageReader.Length - messageReader.Position]; + Buffer.BlockCopy(messageReader.Buffer, messageReader.Position, buffer, 0, messageReader.Length - messageReader.Position); + + fragmentedMessage.Fragments.Add(new FragmentedMessage.Fragment(id, buffer)); + + if (fragmentedMessage.Fragments.Count == fragmentsCount) + { + var reconstructed = fragmentedMessage.Reconstruct(); + InvokeDataReceived(MessageReader.Get(reconstructed), SendOption.Reliable); + + _fragmentedMessagesReceived.Remove(id); + } + } + } + } + + protected class FragmentedMessage + { + /// + /// The total number of fragments expected. + /// + public int FragmentsCount { get; } + + /// + /// The fragments received so far. + /// + public HashSet Fragments { get; } = new HashSet(); + + public byte[] Reconstruct() + { + if (Fragments.Count != FragmentsCount) + { + throw new HazelException("Can't reconstruct a FragmentedMessage until all fragments are received"); + } + + var buffer = new byte[Fragments.Sum(x => x.Data.Length)]; + + var offset = 0; + foreach (var fragment in Fragments.OrderBy(fragment => fragment.Id)) + { + var data = fragment.Data; + Buffer.BlockCopy(data, 0, buffer, offset, data.Length); + offset += data.Length; + } + + return buffer; + } + + public FragmentedMessage(int fragmentsCount) + { + FragmentsCount = fragmentsCount; + } + + public readonly struct Fragment + { + public int Id { get; } + public byte[] Data { get; } + + public Fragment(int id, byte[] data) + { + Id = id; + Data = data; + } + } + } + } +} diff --git a/Hazel/Udp/UdpConnection.Reliable.cs b/Hazel/Udp/UdpConnection.Reliable.cs index 83feabd..2b02f73 100644 --- a/Hazel/Udp/UdpConnection.Reliable.cs +++ b/Hazel/Udp/UdpConnection.Reliable.cs @@ -221,7 +221,7 @@ internal int ManageReliablePackets() /// The buffer to attach to. /// The offset to attach at. /// The callback to make once the packet has been acknowledged. - protected void AttachReliableID(byte[] buffer, int offset, Action ackCallback = null) + protected ushort AttachReliableID(byte[] buffer, int offset, Action ackCallback = null) { ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); @@ -241,6 +241,8 @@ protected void AttachReliableID(byte[] buffer, int offset, Action ackCallback = { throw new Exception("That shouldn't be possible"); } + + return id; } public static int ClampToInt(float value, int min, int max) @@ -256,15 +258,24 @@ public static int ClampToInt(float value, int min, int max) /// /// The byte array to write to. /// The callback to make once the packet has been acknowledged. - private void ReliableSend(byte sendOption, byte[] data, Action ackCallback = null) + private void ReliableSend(byte sendOption, byte[] data, Action ackCallback = null, bool includeHeader = true) { - //Inform keepalive not to send for a while - ResetKeepAliveTimer(); + var length = includeHeader ? data.Length + 3 : data.Length; + if (length >= Mtu) + { + FragmentedSend(data, ackCallback, includeHeader); + return; + } - byte[] bytes = new byte[data.Length + 3]; + var bytes = new byte[length]; - //Add message type - bytes[0] = sendOption; + if (includeHeader) + { + bytes[0] = sendOption; + } + + //Inform keepalive not to send for a while + ResetKeepAliveTimer(); //Add reliable ID AttachReliableID(bytes, 1, ackCallback); @@ -419,7 +430,7 @@ private void AcknowledgementMessageReceive(byte[] bytes, int bytesReceived) Statistics.LogReliableReceive(0, bytesReceived); } - private void AcknowledgeMessageId(ushort id) + protected void AcknowledgeMessageId(ushort id) { // Dispose of timer and remove from dictionary if (reliableDataPacketsSent.TryRemove(id, out Packet packet)) diff --git a/Hazel/Udp/UdpConnection.cs b/Hazel/Udp/UdpConnection.cs index cc5d088..3426c98 100644 --- a/Hazel/Udp/UdpConnection.cs +++ b/Hazel/Udp/UdpConnection.cs @@ -33,9 +33,12 @@ internal static Socket CreateSocket(IPMode ipMode) try { - socket.DontFragment = false; + socket.DontFragment = true; + } + catch + { + // ignored } - catch { } try { @@ -51,7 +54,7 @@ internal static Socket CreateSocket(IPMode ipMode) /// Writes the given bytes to the connection. /// /// The bytes to write. - protected abstract void WriteBytesToConnection(byte[] bytes, int length); + protected abstract void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null); /// public override void Send(MessageWriter msg) @@ -59,22 +62,17 @@ public override void Send(MessageWriter msg) if (this._state != ConnectionState.Connected) throw new InvalidOperationException("Could not send data as this Connection is not connected. Did you disconnect?"); - byte[] buffer = new byte[msg.Length]; + var buffer = new byte[msg.Length]; Buffer.BlockCopy(msg.Buffer, 0, buffer, 0, msg.Length); switch (msg.SendOption) { case SendOption.Reliable: - ResetKeepAliveTimer(); - - AttachReliableID(buffer, 1); - WriteBytesToConnection(buffer, buffer.Length); - Statistics.LogReliableSend(buffer.Length - 3, buffer.Length); + ReliableSend((byte)msg.SendOption, buffer, includeHeader: false); break; default: - WriteBytesToConnection(buffer, buffer.Length); - Statistics.LogUnreliableSend(buffer.Length - 1, buffer.Length); + UnreliableSend((byte)msg.SendOption, buffer, false); break; } } @@ -108,6 +106,7 @@ protected void HandleSend(byte[] data, byte sendOption, Action ackCallback = nul case (byte)UdpSendOption.Ping: case (byte)SendOption.Reliable: case (byte)UdpSendOption.Hello: + case (byte)UdpSendOption.MtuTest: ReliableSend(sendOption, data, ackCallback); break; @@ -162,6 +161,16 @@ protected internal virtual void HandleReceive(MessageReader message, int bytesRe Statistics.LogUnreliableReceive(bytesReceived - 1, bytesReceived); break; + case (byte)UdpSendOption.MtuTest: + MtuTestMessageReceive(message); + message.Recycle(); + break; + + case (byte)UdpSendOption.Fragment: + FragmentMessageReceive(message); + message.Recycle(); + break; + // Treat everything else as garbage default: message.Recycle(); @@ -177,32 +186,28 @@ protected internal virtual void HandleReceive(MessageReader message, int bytesRe /// /// The SendOption to attach. /// The data. - void UnreliableSend(byte sendOption, byte[] data) + void UnreliableSend(byte sendOption, byte[] data, bool includeHeader = true) { - this.UnreliableSend(sendOption, data, 0, data.Length); - } + var length = includeHeader ? data.Length + 1 : data.Length; + if (length >= Mtu) + { + throw new HazelException("Unreliable messages can't be bigger than MTU"); + } - /// - /// Sends bytes using the unreliable UDP protocol. - /// - /// The data. - /// The SendOption to attach. - /// - /// - void UnreliableSend(byte sendOption, byte[] data, int offset, int length) - { - byte[] bytes = new byte[length + 1]; + var bytes = new byte[length]; - //Add message type - bytes[0] = sendOption; + if (includeHeader) + { + bytes[0] = sendOption; + } //Copy data into new array - Buffer.BlockCopy(data, offset, bytes, bytes.Length - length, length); + Buffer.BlockCopy(data, 0, bytes, length - data.Length, data.Length); //Write to connection - WriteBytesToConnection(bytes, bytes.Length); + WriteBytesToConnection(bytes, length); - Statistics.LogUnreliableSend(length, bytes.Length); + Statistics.LogUnreliableSend(data.Length, length); } /// diff --git a/Hazel/Udp/UdpConnectionListener.cs b/Hazel/Udp/UdpConnectionListener.cs index d3ff423..227640b 100644 --- a/Hazel/Udp/UdpConnectionListener.cs +++ b/Hazel/Udp/UdpConnectionListener.cs @@ -239,7 +239,7 @@ void ReadCallback(IAsyncResult result) /// /// The bytes to send. /// The endpoint to send to. - internal void SendData(byte[] bytes, int length, EndPoint endPoint) + internal void SendData(byte[] bytes, int length, EndPoint endPoint, Action onTooBig = null) { if (length > bytes.Length) return; @@ -264,6 +264,10 @@ internal void SendData(byte[] bytes, int length, EndPoint endPoint) SendCallback, null); } + catch (SocketException e) when (onTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + onTooBig(); + } catch (SocketException e) { this.Logger?.Invoke("Could not send data as a SocketException occurred: " + e); @@ -281,6 +285,10 @@ private void SendCallback(IAsyncResult result) { socket.EndSendTo(result); } + catch (SocketException e) when (e.SocketErrorCode == SocketError.MessageSize) + { + throw; + } catch { } } diff --git a/Hazel/Udp/UdpServerConnection.cs b/Hazel/Udp/UdpServerConnection.cs index f348adb..7544a62 100644 --- a/Hazel/Udp/UdpServerConnection.cs +++ b/Hazel/Udp/UdpServerConnection.cs @@ -33,12 +33,13 @@ internal UdpServerConnection(UdpConnectionListener listener, IPEndPoint endPoint State = ConnectionState.Connected; this.InitializeKeepAliveTimer(); + this.StartMtuDiscovery(); } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { - Listener.SendData(bytes, length, EndPoint); + Listener.SendData(bytes, length, EndPoint, onTooBig); } /// diff --git a/Hazel/Udp/UnityUdpClientConnection.cs b/Hazel/Udp/UnityUdpClientConnection.cs index dd192bc..17118f0 100644 --- a/Hazel/Udp/UnityUdpClientConnection.cs +++ b/Hazel/Udp/UnityUdpClientConnection.cs @@ -63,21 +63,21 @@ protected virtual void ResendPacketsIfNeeded() /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { #if DEBUG if (TestLagMs > 0) { - ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length); }); + ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length, onTooBig); }); } else #endif { - WriteBytesToConnectionReal(bytes, length); + WriteBytesToConnectionReal(bytes, length, onTooBig); } } - private void WriteBytesToConnectionReal(byte[] bytes, int length) + private void WriteBytesToConnectionReal(byte[] bytes, int length, Action onTooBig) { try { @@ -96,6 +96,10 @@ private void WriteBytesToConnectionReal(byte[] bytes, int length) { // Already disposed and disconnected... } + catch (SocketException e) when (onTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + onTooBig(); + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -122,6 +126,10 @@ protected virtual void WriteBytesToConnectionSync(byte[] bytes, int length) { // Already disposed and disconnected... } + catch (SocketException e) when (e.SocketErrorCode == SocketError.MessageSize) + { + throw; + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -202,6 +210,7 @@ public override void ConnectAsync(byte[] bytes = null) SendHello(bytes, () => { this.State = ConnectionState.Connected; + this.StartMtuDiscovery(); }); } From 04c1323b27d86762f950b4d3f0971c267558f62c Mon Sep 17 00:00:00 2001 From: js6pak Date: Thu, 16 Dec 2021 17:06:26 +0100 Subject: [PATCH 2/8] Implement ackCallback --- Hazel/Udp/UdpConnection.Fragmented.cs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs index 272acc4..6c98514 100644 --- a/Hazel/Udp/UdpConnection.Fragmented.cs +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -117,6 +117,8 @@ protected void FragmentedSend(byte[] data, Action ackCallback, bool includeHeade throw new HazelException("Too many fragments"); } + var acksReceived = 0; + for (ushort i = 0; i < fragmentsCount; i++) { var dataLength = Math.Min(fragmentDataSize, data.Length - fragmentDataSize * i); @@ -124,7 +126,15 @@ protected void FragmentedSend(byte[] data, Action ackCallback, bool includeHeade buffer[0] = (byte)UdpSendOption.Fragment; - AttachReliableID(buffer, 1); + AttachReliableID(buffer, 1, () => + { + acksReceived++; + + if (acksReceived >= fragmentsCount) + { + ackCallback?.Invoke(); + } + }); buffer[3] = (byte)fragmentsCount; buffer[4] = (byte)(fragmentsCount >> 8); From bc21a5e69b576c609d2ec2538232ec7236ca8f27 Mon Sep 17 00:00:00 2001 From: js6pak Date: Thu, 16 Dec 2021 17:14:16 +0100 Subject: [PATCH 3/8] Implement includeHeader --- Hazel.UnitTests/FragmentationTests.cs | 15 +++++++++------ Hazel/Udp/UdpConnection.Fragmented.cs | 18 +++++++++++++----- Hazel/Udp/UdpConnection.Reliable.cs | 2 +- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/Hazel.UnitTests/FragmentationTests.cs b/Hazel.UnitTests/FragmentationTests.cs index acca3e5..c3058b1 100644 --- a/Hazel.UnitTests/FragmentationTests.cs +++ b/Hazel.UnitTests/FragmentationTests.cs @@ -19,13 +19,13 @@ public void ReliableSendTest() using (var connection = new UdpClientConnection(new IPEndPoint(IPAddress.Loopback, 4296))) { var manualResetEvent = new ManualResetEventSlim(false); - MessageReader messageReader = null; + DataReceivedEventArgs? data = null; listener.NewConnection += e => { - e.Connection.DataReceived += data => + e.Connection.DataReceived += de => { - messageReader = data.Message; + data = de; manualResetEvent.Set(); }; }; @@ -37,10 +37,13 @@ public void ReliableSendTest() manualResetEvent.Wait(TimeSpan.FromSeconds(5)); - Assert.IsNotNull(messageReader); + Assert.IsNotNull(data); - var received = new byte[messageReader.Length - messageReader.Offset]; - Array.Copy(messageReader.Buffer, messageReader.Offset + messageReader.Position, received, 0, messageReader.Length - messageReader.Offset); + Assert.AreEqual(SendOption.Reliable, data.Value.SendOption); + + var messageReader = data.Value.Message; + var received = new byte[messageReader.Buffer.Length - messageReader.Offset - messageReader.Position]; + Array.Copy(messageReader.Buffer, messageReader.Offset + messageReader.Position, received, 0, received.Length); CollectionAssert.AreEqual(_testData, received); } diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs index 6c98514..9767d8c 100644 --- a/Hazel/Udp/UdpConnection.Fragmented.cs +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -105,12 +105,14 @@ private void MtuTestMessageReceive(MessageReader message) private const byte FragmentHeaderSize = sizeof(byte) + sizeof(ushort) + sizeof(ushort) + sizeof(ushort); - protected void FragmentedSend(byte[] data, Action ackCallback, bool includeHeader) + protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback, bool includeHeader) { + var length = includeHeader ? data.Length + 1 : data.Length; + var id = (ushort)Interlocked.Increment(ref _lastFragmentedId); var fragmentSize = Mtu; var fragmentDataSize = fragmentSize - FragmentHeaderSize; - var fragmentsCount = (int)Math.Ceiling(data.Length / (double)fragmentDataSize); + var fragmentsCount = (int)Math.Ceiling(length / (double)fragmentDataSize); if (fragmentsCount >= ushort.MaxValue) { @@ -121,7 +123,7 @@ protected void FragmentedSend(byte[] data, Action ackCallback, bool includeHeade for (ushort i = 0; i < fragmentsCount; i++) { - var dataLength = Math.Min(fragmentDataSize, data.Length - fragmentDataSize * i); + var dataLength = Math.Min(fragmentDataSize, length - fragmentDataSize * i); var buffer = new byte[dataLength + FragmentHeaderSize]; buffer[0] = (byte)UdpSendOption.Fragment; @@ -142,7 +144,13 @@ protected void FragmentedSend(byte[] data, Action ackCallback, bool includeHeade buffer[5] = (byte)id; buffer[6] = (byte)(id >> 8); - Buffer.BlockCopy(data, fragmentDataSize * i, buffer, FragmentHeaderSize, dataLength); + var includingHeader = i == 0 && includeHeader; + if (includingHeader) + { + buffer[7] = sendOption; + } + + Buffer.BlockCopy(data, fragmentDataSize * i - (includingHeader ? 0 : 1), buffer, FragmentHeaderSize + (includingHeader ? 1 : 0), dataLength - (includingHeader ? 1 : 0)); WriteBytesToConnection(buffer, buffer.Length); } @@ -172,7 +180,7 @@ protected void FragmentMessageReceive(MessageReader messageReader) if (fragmentedMessage.Fragments.Count == fragmentsCount) { var reconstructed = fragmentedMessage.Reconstruct(); - InvokeDataReceived(MessageReader.Get(reconstructed), SendOption.Reliable); + InvokeDataReceived(SendOption.Reliable, MessageReader.Get(reconstructed), 1, reconstructed.Length); _fragmentedMessagesReceived.Remove(id); } diff --git a/Hazel/Udp/UdpConnection.Reliable.cs b/Hazel/Udp/UdpConnection.Reliable.cs index 2b02f73..07c457f 100644 --- a/Hazel/Udp/UdpConnection.Reliable.cs +++ b/Hazel/Udp/UdpConnection.Reliable.cs @@ -263,7 +263,7 @@ private void ReliableSend(byte sendOption, byte[] data, Action ackCallback = nul var length = includeHeader ? data.Length + 3 : data.Length; if (length >= Mtu) { - FragmentedSend(data, ackCallback, includeHeader); + FragmentedSend(sendOption, data, ackCallback, includeHeader); return; } From 45284e73c478c6e46fe1e1cf63ef955ae4fc6835 Mon Sep 17 00:00:00 2001 From: js6pak Date: Fri, 17 Dec 2021 00:07:40 +0100 Subject: [PATCH 4/8] Update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5de4c0c..a5e4668 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ The aim of this fork is to create a simple interface for ultra-fast connection-b ## Features - UDP and Reliable UDP. +- Fragmented packets with MTU discovery - Encrypted packets using DTLS - UDP Broadcast for local-multiplayer. - Completely thread safe. @@ -37,7 +38,6 @@ To build Hazel open [solution file](Hazel.sln) using your favourite C# IDE (I us * You *should not* recycle messages after NewConnection events. * You *should not* recycle messages after Disconnect events. * You *should* recycle messages after DataReceived events. - * Hazel doesn't support fragmented packets. It used to, but I wasn't sure of it so I removed it and have never needed it since. Just stay under 1kb packets. ## Tips for using Hazel with Unity From ebd26fc154e7c5d96baf714b018e5ebbd4b3392c Mon Sep 17 00:00:00 2001 From: js6pak Date: Fri, 17 Dec 2021 15:30:06 +0100 Subject: [PATCH 5/8] Use a single byte for fragment count/id and use array instead HashSet --- Hazel/Udp/UdpConnection.Fragmented.cs | 92 +++++++++++++++++---------- 1 file changed, 57 insertions(+), 35 deletions(-) diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs index 9767d8c..24eb6e7 100644 --- a/Hazel/Udp/UdpConnection.Fragmented.cs +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -103,7 +103,7 @@ private void MtuTestMessageReceive(MessageReader message) ProcessReliableReceive(message.Buffer, 1, out _); } - private const byte FragmentHeaderSize = sizeof(byte) + sizeof(ushort) + sizeof(ushort) + sizeof(ushort); + private const byte FragmentHeaderSize = sizeof(byte) + sizeof(ushort) + sizeof(ushort) + sizeof(byte) + sizeof(byte); protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback, bool includeHeader) { @@ -114,14 +114,14 @@ protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback, var fragmentDataSize = fragmentSize - FragmentHeaderSize; var fragmentsCount = (int)Math.Ceiling(length / (double)fragmentDataSize); - if (fragmentsCount >= ushort.MaxValue) + if (fragmentsCount >= byte.MaxValue) { throw new HazelException("Too many fragments"); } var acksReceived = 0; - for (ushort i = 0; i < fragmentsCount; i++) + for (byte i = 0; i < fragmentsCount; i++) { var dataLength = Math.Min(fragmentDataSize, length - fragmentDataSize * i); var buffer = new byte[dataLength + FragmentHeaderSize]; @@ -138,11 +138,11 @@ protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback, } }); - buffer[3] = (byte)fragmentsCount; - buffer[4] = (byte)(fragmentsCount >> 8); + buffer[3] = (byte)id; + buffer[4] = (byte)(id >> 8); - buffer[5] = (byte)id; - buffer[6] = (byte)(id >> 8); + buffer[5] = (byte)fragmentsCount; + buffer[6] = i; var includingHeader = i == 0 && includeHeader; if (includingHeader) @@ -158,12 +158,18 @@ protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback, protected void FragmentMessageReceive(MessageReader messageReader) { - if (ProcessReliableReceive(messageReader.Buffer, 1, out var id)) + if (ProcessReliableReceive(messageReader.Buffer, 1, out _)) { messageReader.Position += 3; - var fragmentsCount = messageReader.ReadUInt16(); var fragmentedMessageId = messageReader.ReadUInt16(); + var fragmentsCount = messageReader.ReadByte(); + var fragmentId = messageReader.ReadByte(); + + if (fragmentsCount <= 0 || fragmentId >= fragmentsCount) + { + return; + } lock (_fragmentedMessagesReceived) { @@ -172,17 +178,22 @@ protected void FragmentMessageReceive(MessageReader messageReader) _fragmentedMessagesReceived.Add(fragmentedMessageId, fragmentedMessage = new FragmentedMessage(fragmentsCount)); } + if (fragmentedMessage.Fragments[fragmentId] != null) + { + return; + } + var buffer = new byte[messageReader.Length - messageReader.Position]; Buffer.BlockCopy(messageReader.Buffer, messageReader.Position, buffer, 0, messageReader.Length - messageReader.Position); - fragmentedMessage.Fragments.Add(new FragmentedMessage.Fragment(id, buffer)); + fragmentedMessage.AddFragment(fragmentId, buffer); - if (fragmentedMessage.Fragments.Count == fragmentsCount) + if (fragmentedMessage.IsFinished) { var reconstructed = fragmentedMessage.Reconstruct(); - InvokeDataReceived(SendOption.Reliable, MessageReader.Get(reconstructed), 1, reconstructed.Length); + InvokeDataReceived((SendOption)reconstructed[0], MessageReader.Get(reconstructed), 1, reconstructed.Length); - _fragmentedMessagesReceived.Remove(id); + _fragmentedMessagesReceived.Remove(fragmentedMessageId); } } } @@ -195,47 +206,58 @@ protected class FragmentedMessage /// public int FragmentsCount { get; } + /// + /// The number of fragments received. + /// + public int FragmentsReceived { get; private set; } + + /// + /// The total size of all fragments. + /// + public int Size { get; private set; } + /// /// The fragments received so far. /// - public HashSet Fragments { get; } = new HashSet(); + public byte[][] Fragments { get; } + + /// + /// Whether all fragments were received. + /// + public bool IsFinished => FragmentsReceived == FragmentsCount; + + public FragmentedMessage(int fragmentsCount) + { + FragmentsCount = fragmentsCount; + Fragments = new byte[fragmentsCount][]; + } + + public void AddFragment(byte id, byte[] fragment) + { + Fragments[id] = fragment; + Size += fragment.Length; + FragmentsReceived++; + } public byte[] Reconstruct() { - if (Fragments.Count != FragmentsCount) + if (!IsFinished) { throw new HazelException("Can't reconstruct a FragmentedMessage until all fragments are received"); } - var buffer = new byte[Fragments.Sum(x => x.Data.Length)]; + var buffer = new byte[Size]; var offset = 0; - foreach (var fragment in Fragments.OrderBy(fragment => fragment.Id)) + for (var i = 0; i < FragmentsCount; i++) { - var data = fragment.Data; + var data = Fragments[i]; Buffer.BlockCopy(data, 0, buffer, offset, data.Length); offset += data.Length; } return buffer; } - - public FragmentedMessage(int fragmentsCount) - { - FragmentsCount = fragmentsCount; - } - - public readonly struct Fragment - { - public int Id { get; } - public byte[] Data { get; } - - public Fragment(int id, byte[] data) - { - Id = id; - Data = data; - } - } } } } From 088a541eec056c4a8294bbbf4ebe2103816803c2 Mon Sep 17 00:00:00 2001 From: js6pak Date: Fri, 17 Dec 2021 16:18:52 +0100 Subject: [PATCH 6/8] Fix fragmenting MessageWriters --- Hazel.UnitTests/FragmentationTests.cs | 15 +++++++++-- Hazel/MessageWriter.cs | 36 ++++++++++++++++++++++----- Hazel/Udp/UdpConnection.Fragmented.cs | 6 ++--- Hazel/Udp/UdpConnection.Reliable.cs | 19 ++++++-------- Hazel/Udp/UdpConnection.cs | 18 +++++++++++--- 5 files changed, 68 insertions(+), 26 deletions(-) diff --git a/Hazel.UnitTests/FragmentationTests.cs b/Hazel.UnitTests/FragmentationTests.cs index c3058b1..44f1281 100644 --- a/Hazel.UnitTests/FragmentationTests.cs +++ b/Hazel.UnitTests/FragmentationTests.cs @@ -13,7 +13,9 @@ public class FragmentationTests private readonly byte[] _testData = Enumerable.Range(0, 10000).Select(x => (byte)x).ToArray(); [TestMethod] - public void ReliableSendTest() + [DataRow(false, DisplayName = "SendBytes")] + [DataRow(true, DisplayName = "MessageWriter")] + public void ReliableSendTest(bool useMessageWriter) { using (var listener = new UdpConnectionListener(new IPEndPoint(IPAddress.Any, 4296))) using (var connection = new UdpClientConnection(new IPEndPoint(IPAddress.Loopback, 4296))) @@ -33,7 +35,16 @@ public void ReliableSendTest() listener.Start(); connection.Connect(); - connection.SendBytes(_testData, SendOption.Reliable); + if (useMessageWriter) + { + var messageWriter = MessageWriter.Get(SendOption.Reliable); + messageWriter.Write(_testData); + connection.Send(messageWriter); + } + else + { + connection.SendBytes(_testData, SendOption.Reliable); + } manualResetEvent.Wait(TimeSpan.FromSeconds(5)); diff --git a/Hazel/MessageWriter.cs b/Hazel/MessageWriter.cs index 7d4e050..d128d08 100644 --- a/Hazel/MessageWriter.cs +++ b/Hazel/MessageWriter.cs @@ -33,10 +33,12 @@ public MessageWriter(int bufferSize) public byte[] ToByteArray(bool includeHeader) { + var length = GetLength(includeHeader); + if (includeHeader) { - byte[] output = new byte[this.Length]; - System.Buffer.BlockCopy(this.Buffer, 0, output, 0, this.Length); + byte[] output = new byte[length]; + System.Buffer.BlockCopy(this.Buffer, 0, output, 0, length); return output; } else @@ -45,14 +47,14 @@ public byte[] ToByteArray(bool includeHeader) { case SendOption.Reliable: { - byte[] output = new byte[this.Length - 3]; - System.Buffer.BlockCopy(this.Buffer, 3, output, 0, this.Length - 3); + byte[] output = new byte[length]; + System.Buffer.BlockCopy(this.Buffer, 3, output, 0, length); return output; } case SendOption.None: { - byte[] output = new byte[this.Length - 1]; - System.Buffer.BlockCopy(this.Buffer, 1, output, 0, this.Length - 1); + byte[] output = new byte[length]; + System.Buffer.BlockCopy(this.Buffer, 1, output, 0, length); return output; } } @@ -61,6 +63,28 @@ public byte[] ToByteArray(bool includeHeader) throw new NotImplementedException(); } + public int GetLength(bool includeHeader) + { + if (includeHeader) + { + return Length; + } + + switch (this.SendOption) + { + case SendOption.Reliable: + { + return Length - 3; + } + case SendOption.None: + { + return Length - 1; + } + } + + throw new NotImplementedException(); + } + /// /// The option specifying how the message should be sent. public static MessageWriter Get(SendOption sendOption = SendOption.None) diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs index 24eb6e7..c2c43b4 100644 --- a/Hazel/Udp/UdpConnection.Fragmented.cs +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -105,9 +105,9 @@ private void MtuTestMessageReceive(MessageReader message) private const byte FragmentHeaderSize = sizeof(byte) + sizeof(ushort) + sizeof(ushort) + sizeof(byte) + sizeof(byte); - protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback, bool includeHeader) + protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback = null) { - var length = includeHeader ? data.Length + 1 : data.Length; + var length = data.Length + 1; var id = (ushort)Interlocked.Increment(ref _lastFragmentedId); var fragmentSize = Mtu; @@ -144,7 +144,7 @@ protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback, buffer[5] = (byte)fragmentsCount; buffer[6] = i; - var includingHeader = i == 0 && includeHeader; + var includingHeader = i == 0; if (includingHeader) { buffer[7] = sendOption; diff --git a/Hazel/Udp/UdpConnection.Reliable.cs b/Hazel/Udp/UdpConnection.Reliable.cs index 07c457f..94831df 100644 --- a/Hazel/Udp/UdpConnection.Reliable.cs +++ b/Hazel/Udp/UdpConnection.Reliable.cs @@ -258,25 +258,22 @@ public static int ClampToInt(float value, int min, int max) /// /// The byte array to write to. /// The callback to make once the packet has been acknowledged. - private void ReliableSend(byte sendOption, byte[] data, Action ackCallback = null, bool includeHeader = true) + private void ReliableSend(byte sendOption, byte[] data, Action ackCallback = null) { - var length = includeHeader ? data.Length + 3 : data.Length; - if (length >= Mtu) + if (data.Length >= Mtu) { - FragmentedSend(sendOption, data, ackCallback, includeHeader); + FragmentedSend(sendOption, data, ackCallback); return; } - var bytes = new byte[length]; - - if (includeHeader) - { - bytes[0] = sendOption; - } - //Inform keepalive not to send for a while ResetKeepAliveTimer(); + byte[] bytes = new byte[data.Length + 3]; + + //Add message type + bytes[0] = sendOption; + //Add reliable ID AttachReliableID(bytes, 1, ackCallback); diff --git a/Hazel/Udp/UdpConnection.cs b/Hazel/Udp/UdpConnection.cs index 3426c98..7ef44aa 100644 --- a/Hazel/Udp/UdpConnection.cs +++ b/Hazel/Udp/UdpConnection.cs @@ -61,18 +61,28 @@ public override void Send(MessageWriter msg) { if (this._state != ConnectionState.Connected) throw new InvalidOperationException("Could not send data as this Connection is not connected. Did you disconnect?"); + + if (msg.GetLength(false) >= Mtu) + { + FragmentedSend((byte)msg.SendOption, msg.ToByteArray(false)); + return; + } - var buffer = new byte[msg.Length]; - Buffer.BlockCopy(msg.Buffer, 0, buffer, 0, msg.Length); + byte[] buffer = msg.ToByteArray(true); switch (msg.SendOption) { case SendOption.Reliable: - ReliableSend((byte)msg.SendOption, buffer, includeHeader: false); + ResetKeepAliveTimer(); + + AttachReliableID(buffer, 1); + WriteBytesToConnection(buffer, buffer.Length); + Statistics.LogReliableSend(buffer.Length - 3, buffer.Length); break; default: - UnreliableSend((byte)msg.SendOption, buffer, false); + WriteBytesToConnection(buffer, buffer.Length); + Statistics.LogUnreliableSend(buffer.Length - 1, buffer.Length); break; } } From 31a66a0f22d9ae81cac35db9584e5b660b019770 Mon Sep 17 00:00:00 2001 From: js6pak Date: Fri, 17 Dec 2021 16:27:16 +0100 Subject: [PATCH 7/8] Use pooled MessageReaders for reconstructed fragments --- Hazel.UnitTests/FragmentationTests.cs | 3 ++- Hazel/Udp/UdpConnection.Fragmented.cs | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Hazel.UnitTests/FragmentationTests.cs b/Hazel.UnitTests/FragmentationTests.cs index 44f1281..39fd014 100644 --- a/Hazel.UnitTests/FragmentationTests.cs +++ b/Hazel.UnitTests/FragmentationTests.cs @@ -53,8 +53,9 @@ public void ReliableSendTest(bool useMessageWriter) Assert.AreEqual(SendOption.Reliable, data.Value.SendOption); var messageReader = data.Value.Message; - var received = new byte[messageReader.Buffer.Length - messageReader.Offset - messageReader.Position]; + var received = new byte[messageReader.Length]; Array.Copy(messageReader.Buffer, messageReader.Offset + messageReader.Position, received, 0, received.Length); + messageReader.Recycle(); CollectionAssert.AreEqual(_testData, received); } diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs index c2c43b4..b83ec6e 100644 --- a/Hazel/Udp/UdpConnection.Fragmented.cs +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -191,7 +191,7 @@ protected void FragmentMessageReceive(MessageReader messageReader) if (fragmentedMessage.IsFinished) { var reconstructed = fragmentedMessage.Reconstruct(); - InvokeDataReceived((SendOption)reconstructed[0], MessageReader.Get(reconstructed), 1, reconstructed.Length); + InvokeDataReceived((SendOption)reconstructed.ReadByte(), reconstructed, 1, fragmentedMessage.Size); _fragmentedMessagesReceived.Remove(fragmentedMessageId); } @@ -239,20 +239,20 @@ public void AddFragment(byte id, byte[] fragment) FragmentsReceived++; } - public byte[] Reconstruct() + public MessageReader Reconstruct() { if (!IsFinished) { throw new HazelException("Can't reconstruct a FragmentedMessage until all fragments are received"); } - var buffer = new byte[Size]; + var buffer = MessageReader.GetSized(Size); var offset = 0; for (var i = 0; i < FragmentsCount; i++) { var data = Fragments[i]; - Buffer.BlockCopy(data, 0, buffer, offset, data.Length); + Buffer.BlockCopy(data, 0, buffer.Buffer, offset, data.Length); offset += data.Length; } From 857547d42af867a65a4c3689c139a7bc3aa219de Mon Sep 17 00:00:00 2001 From: js6pak Date: Fri, 17 Dec 2021 16:37:40 +0100 Subject: [PATCH 8/8] Improve locking --- Hazel/Udp/UdpConnection.Fragmented.cs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs index b83ec6e..482769d 100644 --- a/Hazel/Udp/UdpConnection.Fragmented.cs +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -1,6 +1,5 @@ using System; -using System.Collections.Generic; -using System.Linq; +using System.Collections.Concurrent; using System.Threading; namespace Hazel.Udp @@ -43,7 +42,7 @@ public partial class UdpConnection private byte _mtuIndex; - private readonly Dictionary _fragmentedMessagesReceived = new Dictionary(); + private readonly ConcurrentDictionary _fragmentedMessagesReceived = new ConcurrentDictionary(); private volatile int _lastFragmentedId; protected void StartMtuDiscovery() @@ -171,13 +170,23 @@ protected void FragmentMessageReceive(MessageReader messageReader) return; } - lock (_fragmentedMessagesReceived) + + if (!_fragmentedMessagesReceived.TryGetValue(fragmentedMessageId, out var fragmentedMessage)) { - if (!_fragmentedMessagesReceived.TryGetValue(fragmentedMessageId, out var fragmentedMessage)) + lock (_fragmentedMessagesReceived) { - _fragmentedMessagesReceived.Add(fragmentedMessageId, fragmentedMessage = new FragmentedMessage(fragmentsCount)); + if (!_fragmentedMessagesReceived.TryGetValue(fragmentedMessageId, out fragmentedMessage)) + { + if (!_fragmentedMessagesReceived.TryAdd(fragmentedMessageId, fragmentedMessage = new FragmentedMessage(fragmentsCount))) + { + throw new HazelException("Failed to add fragmented message"); + } + } } + } + lock (fragmentedMessage) + { if (fragmentedMessage.Fragments[fragmentId] != null) { return; @@ -193,7 +202,7 @@ protected void FragmentMessageReceive(MessageReader messageReader) var reconstructed = fragmentedMessage.Reconstruct(); InvokeDataReceived((SendOption)reconstructed.ReadByte(), reconstructed, 1, fragmentedMessage.Size); - _fragmentedMessagesReceived.Remove(fragmentedMessageId); + _fragmentedMessagesReceived.TryRemove(fragmentedMessageId, out _); } } }