diff options
author | chai <215380520@qq.com> | 2023-10-12 22:09:49 +0800 |
---|---|---|
committer | chai <215380520@qq.com> | 2023-10-12 22:09:49 +0800 |
commit | 8d2a2cd5de40e2b94ef5007c32832ed9a063dc40 (patch) | |
tree | a63dfbe815855925c9fb8f2804bd6ccfeffbd2eb /Tools/Hazel-Networking/Hazel/Udp | |
parent | dd0c5d50e377d9be1e728463670908a6c9d2c14f (diff) |
+hazel-networking
Diffstat (limited to 'Tools/Hazel-Networking/Hazel/Udp')
10 files changed, 2403 insertions, 0 deletions
diff --git a/Tools/Hazel-Networking/Hazel/Udp/SendOptionInternal.cs b/Tools/Hazel-Networking/Hazel/Udp/SendOptionInternal.cs new file mode 100644 index 0000000..74786d8 --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/SendOptionInternal.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + + +namespace Hazel.Udp +{ + /// <summary> + /// Extra internal states for SendOption enumeration when using UDP. + /// </summary> + public enum UdpSendOption : byte + { + /// <summary> + /// Hello message for initiating communication. + /// </summary> + Hello = 8, + + /// <summary> + /// A single byte of continued existence + /// </summary> + Ping = 12, + + /// <summary> + /// Message for discontinuing communication. + /// </summary> + Disconnect = 9, + + /// <summary> + /// Message acknowledging the receipt of a message. + /// </summary> + Acknowledgement = 10, + + /// <summary> + /// Message that is part of a larger, fragmented message. + /// </summary> + Fragment = 11, + } +} diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpBroadcastListener.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpBroadcastListener.cs new file mode 100644 index 0000000..13b8d0b --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpBroadcastListener.cs @@ -0,0 +1,157 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; + +namespace Hazel.Udp +{ + public class BroadcastPacket + { + public string Data; + public DateTime ReceiveTime; + public IPEndPoint Sender; + + public BroadcastPacket(string data, IPEndPoint sender) + { + this.Data = data; + this.Sender = sender; + this.ReceiveTime = DateTime.Now; + } + + public string GetAddress() + { + return this.Sender.Address.ToString(); + } + } + + public class UdpBroadcastListener : IDisposable + { + private Socket socket; + private EndPoint endpoint; + private Action<string> logger; + + private byte[] buffer = new byte[1024]; + + private List<BroadcastPacket> packets = new List<BroadcastPacket>(); + + public bool Running { get; private set; } + + /// + public UdpBroadcastListener(int port, Action<string> logger = null) + { + this.logger = logger; + this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + this.socket.EnableBroadcast = true; + this.socket.MulticastLoopback = false; + this.endpoint = new IPEndPoint(IPAddress.Any, port); + this.socket.Bind(this.endpoint); + } + + /// + public void StartListen() + { + if (this.Running) return; + this.Running = true; + + try + { + EndPoint endpt = new IPEndPoint(IPAddress.Any, 0); + this.socket.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref endpt, this.HandleData, null); + } + catch (NullReferenceException) { } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + this.Dispose(); + } + } + + private void HandleData(IAsyncResult result) + { + this.Running = false; + + int numBytes; + EndPoint endpt = new IPEndPoint(IPAddress.Any, 0); + try + { + numBytes = this.socket.EndReceiveFrom(result, ref endpt); + } + catch (NullReferenceException) + { + // Already disposed + return; + } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + this.Dispose(); + return; + } + + if (numBytes < 3 + || buffer[0] != 4 || buffer[1] != 2) + { + this.StartListen(); + return; + } + + IPEndPoint ipEnd = (IPEndPoint)endpt; + string data = UTF8Encoding.UTF8.GetString(buffer, 2, numBytes - 2); + int dataHash = data.GetHashCode(); + + lock (packets) + { + bool found = false; + for (int i = 0; i < this.packets.Count; ++i) + { + var pkt = this.packets[i]; + if (pkt == null || pkt.Data == null) + { + this.packets.RemoveAt(i); + i--; + continue; + } + + if (pkt.Data.GetHashCode() == dataHash + && pkt.Sender.Equals(ipEnd)) + { + this.packets[i].ReceiveTime = DateTime.Now; + break; + } + } + + if (!found) + { + this.packets.Add(new BroadcastPacket(data, ipEnd)); + } + } + + this.StartListen(); + } + + /// + public BroadcastPacket[] GetPackets() + { + lock (this.packets) + { + var output = this.packets.ToArray(); + this.packets.Clear(); + return output; + } + } + + /// + public void Dispose() + { + if (this.socket != null) + { + try { this.socket.Shutdown(SocketShutdown.Both); } catch { } + try { this.socket.Close(); } catch { } + try { this.socket.Dispose(); } catch { } + this.socket = null; + } + } + } +}
\ No newline at end of file diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpBroadcaster.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpBroadcaster.cs new file mode 100644 index 0000000..8877f86 --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpBroadcaster.cs @@ -0,0 +1,127 @@ +using Hazel.UPnP; +using System; +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace Hazel.Udp +{ + public class UdpBroadcaster : IDisposable + { + private SocketBroadcast[] socketBroadcasts; + private byte[] data; + private Action<string> logger; + + /// + public UdpBroadcaster(int port, Action<string> logger = null) + { + this.logger = logger; + + var addresses = NetUtility.GetAddressesFromNetworkInterfaces(AddressFamily.InterNetwork); + this.socketBroadcasts = new SocketBroadcast[addresses.Count > 0 ? addresses.Count : 1]; + + int count = 0; + foreach (var addressInformation in addresses) + { + Socket socket = CreateSocket(new IPEndPoint(addressInformation.Address, 0)); + IPAddress broadcast = NetUtility.GetBroadcastAddress(addressInformation); + + this.socketBroadcasts[count] = new SocketBroadcast(socket, new IPEndPoint(broadcast, port)); + count++; + } + if (count == 0) + { + Socket socket = CreateSocket(new IPEndPoint(IPAddress.Any, 0)); + + this.socketBroadcasts[0] = new SocketBroadcast(socket, new IPEndPoint(IPAddress.Broadcast, port)); + } + } + + private static Socket CreateSocket(IPEndPoint endPoint) + { + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + socket.EnableBroadcast = true; + socket.MulticastLoopback = false; + socket.Bind(endPoint); + + return socket; + } + + /// + public void SetData(string data) + { + int len = UTF8Encoding.UTF8.GetByteCount(data); + this.data = new byte[len + 2]; + this.data[0] = 4; + this.data[1] = 2; + + UTF8Encoding.UTF8.GetBytes(data, 0, data.Length, this.data, 2); + } + + /// + public void Broadcast() + { + if (this.data == null) + { + return; + } + + foreach (SocketBroadcast socketBroadcast in this.socketBroadcasts) + { + try + { + Socket socket = socketBroadcast.Socket; + socket.BeginSendTo(data, 0, data.Length, SocketFlags.None, socketBroadcast.Broadcast, this.FinishSendTo, socket); + } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + } + } + } + + private void FinishSendTo(IAsyncResult evt) + { + try + { + Socket socket = (Socket)evt.AsyncState; + socket.EndSendTo(evt); + } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + } + } + + /// + public void Dispose() + { + if (this.socketBroadcasts != null) + { + foreach (SocketBroadcast socketBroadcast in this.socketBroadcasts) + { + Socket socket = socketBroadcast.Socket; + if (socket != null) + { + try { socket.Shutdown(SocketShutdown.Both); } catch { } + try { socket.Close(); } catch { } + try { socket.Dispose(); } catch { } + } + } + Array.Clear(this.socketBroadcasts, 0, this.socketBroadcasts.Length); + } + } + + private struct SocketBroadcast + { + public Socket Socket; + public IPEndPoint Broadcast; + + public SocketBroadcast(Socket socket, IPEndPoint broadcast) + { + Socket = socket; + Broadcast = broadcast; + } + } + } +}
\ No newline at end of file diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpClientConnection.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpClientConnection.cs new file mode 100644 index 0000000..f6da329 --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpClientConnection.cs @@ -0,0 +1,364 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; + + +namespace Hazel.Udp +{ + /// <summary> + /// Represents a client's connection to a server that uses the UDP protocol. + /// </summary> + /// <inheritdoc/> + public sealed class UdpClientConnection : UdpConnection + { + /// <summary> + /// The max size Hazel attempts to read from the network. + /// Defaults to 8096. + /// </summary> + /// <remarks> + /// 8096 is 5 times the standard modern MTU of 1500, so it's already too large imo. + /// If Hazel ever implements fragmented packets, then we might consider a larger value since combining 5 + /// packets into 1 reader would be realistic and would cause reallocations. That said, Hazel is not meant + /// for transferring large contiguous blocks of data, so... please don't? + /// </remarks> + public int ReceiveBufferSize = 8096; + + /// <summary> + /// The socket we're connected via. + /// </summary> + private Socket socket; + + /// <summary> + /// Reset event that is triggered when the connection is marked Connected. + /// </summary> + private ManualResetEvent connectWaitLock = new ManualResetEvent(false); + + private Timer reliablePacketTimer; + +#if DEBUG + public event Action<byte[], int> DataSentRaw; + public event Action<byte[], int> DataReceivedRaw; +#endif + + /// <summary> + /// Creates a new UdpClientConnection. + /// </summary> + /// <param name="remoteEndPoint">A <see cref="NetworkEndPoint"/> to connect to.</param> + public UdpClientConnection(ILogger logger, IPEndPoint remoteEndPoint, IPMode ipMode = IPMode.IPv4) + : base(logger) + { + this.EndPoint = remoteEndPoint; + this.IPMode = ipMode; + + this.socket = CreateSocket(ipMode); + + reliablePacketTimer = new Timer(ManageReliablePacketsInternal, null, 100, Timeout.Infinite); + this.InitializeKeepAliveTimer(); + } + + ~UdpClientConnection() + { + this.Dispose(false); + } + + private void ManageReliablePacketsInternal(object state) + { + base.ManageReliablePackets(); + try + { + reliablePacketTimer.Change(100, Timeout.Infinite); + } + catch { } + } + + /// <inheritdoc /> + protected override void WriteBytesToConnection(byte[] bytes, int length) + { +#if DEBUG + if (TestLagMs > 0) + { + ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length); }); + } + else +#endif + { + WriteBytesToConnectionReal(bytes, length); + } + } + + private void WriteBytesToConnectionReal(byte[] bytes, int length) + { +#if DEBUG + DataSentRaw?.Invoke(bytes, length); +#endif + + try + { + this.Statistics.LogPacketSend(length); + socket.BeginSendTo( + bytes, + 0, + length, + SocketFlags.None, + EndPoint, + HandleSendTo, + null); + } + catch (NullReferenceException) { } + catch (ObjectDisposedException) + { + // Already disposed and disconnected... + } + catch (SocketException ex) + { + DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); + } + } + + private void HandleSendTo(IAsyncResult result) + { + try + { + socket.EndSendTo(result); + } + catch (NullReferenceException) { } + catch (ObjectDisposedException) + { + // Already disposed and disconnected... + } + catch (SocketException ex) + { + DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); + } + } + + /// <inheritdoc /> + public override void Connect(byte[] bytes = null, int timeout = 5000) + { + this.ConnectAsync(bytes); + + //Wait till hello packet is acknowledged and the state is set to Connected + bool timedOut = !WaitOnConnect(timeout); + + //If we timed out raise an exception + if (timedOut) + { + Dispose(); + throw new HazelException("Connection attempt timed out."); + } + } + + /// <inheritdoc /> + public override void ConnectAsync(byte[] bytes = null) + { + this.State = ConnectionState.Connecting; + + try + { + if (IPMode == IPMode.IPv4) + socket.Bind(new IPEndPoint(IPAddress.Any, 0)); + else + socket.Bind(new IPEndPoint(IPAddress.IPv6Any, 0)); + } + catch (SocketException e) + { + this.State = ConnectionState.NotConnected; + throw new HazelException("A SocketException occurred while binding to the port.", e); + } + + try + { + StartListeningForData(); + } + catch (ObjectDisposedException) + { + // If the socket's been disposed then we can just end there but make sure we're in NotConnected state. + // If we end up here I'm really lost... + this.State = ConnectionState.NotConnected; + return; + } + catch (SocketException e) + { + Dispose(); + throw new HazelException("A SocketException occurred while initiating a receive operation.", e); + } + + // Write bytes to the server to tell it hi (and to punch a hole in our NAT, if present) + // When acknowledged set the state to connected + SendHello(bytes, () => + { + this.State = ConnectionState.Connected; + this.InitializeKeepAliveTimer(); + }); + } + + /// <summary> + /// Instructs the listener to begin listening. + /// </summary> + void StartListeningForData() + { +#if DEBUG + if (this.TestLagMs > 0) + { + Thread.Sleep(this.TestLagMs); + } +#endif + + var msg = MessageReader.GetSized(this.ReceiveBufferSize); + try + { + socket.BeginReceive(msg.Buffer, 0, msg.Buffer.Length, SocketFlags.None, ReadCallback, msg); + } + catch + { + msg.Recycle(); + this.Dispose(); + } + } + + protected override void SetState(ConnectionState state) + { + try + { + // If the server disconnects you during the hello + // you can go straight from Connecting to NotConnected. + if (state == ConnectionState.Connected + || state == ConnectionState.NotConnected) + { + connectWaitLock.Set(); + } + else + { + connectWaitLock.Reset(); + } + } + catch (ObjectDisposedException) + { + } + } + + /// <summary> + /// Blocks until the Connection is connected. + /// </summary> + /// <param name="timeout">The number of milliseconds to wait before timing out.</param> + public bool WaitOnConnect(int timeout) + { + return connectWaitLock.WaitOne(timeout); + } + + /// <summary> + /// Called when data has been received by the socket. + /// </summary> + /// <param name="result">The asyncronous operation's result.</param> + void ReadCallback(IAsyncResult result) + { + var msg = (MessageReader)result.AsyncState; + + try + { + msg.Length = socket.EndReceive(result); + } + catch (SocketException e) + { + msg.Recycle(); + DisconnectInternal(HazelInternalErrors.SocketExceptionReceive, "Socket exception while reading data: " + e.Message); + return; + } + catch (Exception) + { + msg.Recycle(); + return; + } + + //Exit if no bytes read, we've failed. + if (msg.Length == 0) + { + msg.Recycle(); + DisconnectInternal(HazelInternalErrors.ReceivedZeroBytes, "Received 0 bytes"); + return; + } + + //Begin receiving again + try + { + StartListeningForData(); + } + catch (SocketException e) + { + DisconnectInternal(HazelInternalErrors.SocketExceptionReceive, "Socket exception during receive: " + e.Message); + } + catch (ObjectDisposedException) + { + //If the socket's been disposed then we can just end there. + return; + } + +#if DEBUG + if (this.TestDropRate > 0) + { + if ((this.testDropCount++ % this.TestDropRate) == 0) + { + return; + } + } + + DataReceivedRaw?.Invoke(msg.Buffer, msg.Length); +#endif + HandleReceive(msg, msg.Length); + } + + /// <summary> + /// Sends a disconnect message to the end point. + /// You may include optional disconnect data. The SendOption must be unreliable. + /// </summary> + protected override bool SendDisconnect(MessageWriter data = null) + { + lock (this) + { + if (this._state == ConnectionState.NotConnected) return false; + this.State = ConnectionState.NotConnected; // Use the property so we release the state lock + } + + var bytes = EmptyDisconnectBytes; + if (data != null && data.Length > 0) + { + if (data.SendOption != SendOption.None) throw new ArgumentException("Disconnect messages can only be unreliable."); + + bytes = data.ToByteArray(true); + bytes[0] = (byte)UdpSendOption.Disconnect; + } + + try + { + socket.SendTo( + bytes, + 0, + bytes.Length, + SocketFlags.None, + EndPoint); + } + catch { } + + return true; + } + + /// <inheritdoc /> + protected override void Dispose(bool disposing) + { + if (disposing) + { + SendDisconnect(); + } + + try { this.socket.Shutdown(SocketShutdown.Both); } catch { } + try { this.socket.Close(); } catch { } + try { this.socket.Dispose(); } catch { } + + this.reliablePacketTimer.Dispose(); + this.connectWaitLock.Dispose(); + + base.Dispose(disposing); + } + } +} diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.KeepAlive.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.KeepAlive.cs new file mode 100644 index 0000000..75b4f1d --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.KeepAlive.cs @@ -0,0 +1,167 @@ +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading; + + +namespace Hazel.Udp +{ + partial class UdpConnection + { + + /// <summary> + /// Class to hold packet data + /// </summary> + public class PingPacket : IRecyclable + { + private static readonly ObjectPool<PingPacket> PacketPool = new ObjectPool<PingPacket>(() => new PingPacket()); + + public readonly Stopwatch Stopwatch = new Stopwatch(); + + internal static PingPacket GetObject() + { + return PacketPool.GetObject(); + } + + public void Recycle() + { + Stopwatch.Stop(); + PacketPool.PutObject(this); + } + } + + internal ConcurrentDictionary<ushort, PingPacket> activePingPackets = new ConcurrentDictionary<ushort, PingPacket>(); + + /// <summary> + /// The interval from data being received or transmitted to a keepalive packet being sent in milliseconds. + /// </summary> + /// <remarks> + /// <para> + /// Keepalive packets serve to close connections when an endpoint abruptly disconnects and to ensure than any + /// NAT devices do not close their translation for our argument. By ensuring there is regular contact the + /// connection can detect and prevent these issues. + /// </para> + /// <para> + /// The default value is 10 seconds, set to System.Threading.Timeout.Infinite to disable keepalive packets. + /// </para> + /// </remarks> + public int KeepAliveInterval + { + get + { + return keepAliveInterval; + } + + set + { + keepAliveInterval = value; + ResetKeepAliveTimer(); + } + } + private int keepAliveInterval = 1500; + + public int MissingPingsUntilDisconnect { get; set; } = 6; + private volatile int pingsSinceAck = 0; + + /// <summary> + /// The timer creating keepalive pulses. + /// </summary> + private Timer keepAliveTimer; + + /// <summary> + /// Starts the keepalive timer. + /// </summary> + protected void InitializeKeepAliveTimer() + { + keepAliveTimer = new Timer( + HandleKeepAlive, + null, + keepAliveInterval, + keepAliveInterval + ); + } + + private void HandleKeepAlive(object state) + { + if (this.State != ConnectionState.Connected) return; + + if (this.pingsSinceAck >= this.MissingPingsUntilDisconnect) + { + this.DisposeKeepAliveTimer(); + this.DisconnectInternal(HazelInternalErrors.PingsWithoutResponse, $"Sent {this.pingsSinceAck} pings that remote has not responded to."); + return; + } + + try + { + this.pingsSinceAck++; + SendPing(); + } + catch + { + } + } + + // Pings are special, quasi-reliable packets. + // We send them to trigger responses that validate our connection is alive + // An unacked ping should never be the sole cause of a disconnect. + // Rather, the responses will reset our pingsSinceAck, enough unacked + // pings should cause a disconnect. + private void SendPing() + { + ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); + + byte[] bytes = new byte[3]; + bytes[0] = (byte)UdpSendOption.Ping; + bytes[1] = (byte)(id >> 8); + bytes[2] = (byte)id; + + PingPacket pkt; + if (!this.activePingPackets.TryGetValue(id, out pkt)) + { + pkt = PingPacket.GetObject(); + if (!this.activePingPackets.TryAdd(id, pkt)) + { + throw new Exception("This shouldn't be possible"); + } + } + + pkt.Stopwatch.Restart(); + + WriteBytesToConnection(bytes, bytes.Length); + + Statistics.LogReliableSend(0); + } + + /// <summary> + /// Resets the keepalive timer to zero. + /// </summary> + protected void ResetKeepAliveTimer() + { + try + { + keepAliveTimer?.Change(keepAliveInterval, keepAliveInterval); + } + catch { } + } + + /// <summary> + /// Disposes of the keep alive timer. + /// </summary> + private void DisposeKeepAliveTimer() + { + if (this.keepAliveTimer != null) + { + this.keepAliveTimer.Dispose(); + } + + foreach (var kvp in activePingPackets) + { + if (this.activePingPackets.TryRemove(kvp.Key, out var pkt)) + { + pkt.Recycle(); + } + } + } + } +}
\ No newline at end of file diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.Reliable.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.Reliable.cs new file mode 100644 index 0000000..bed4738 --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.Reliable.cs @@ -0,0 +1,490 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; + +namespace Hazel.Udp +{ + partial class UdpConnection + { + private const int MinResendDelayMs = 50; + private const int MaxInitialResendDelayMs = 300; + private const int MaxAdditionalResendDelayMs = 1000; + + public readonly ObjectPool<Packet> PacketPool; + + /// <summary> + /// The starting timeout, in miliseconds, at which data will be resent. + /// </summary> + /// <remarks> + /// <para> + /// For reliable delivery data is resent at specified intervals unless an acknowledgement is received from the + /// receiving device. The ResendTimeout specifies the interval between the packets being resent, each time a packet + /// is resent the interval is increased for that packet until the duration exceeds the <see cref="DisconnectTimeoutMs"/> value. + /// </para> + /// <para> + /// Setting this to its default of 0 will mean the timeout is 2 times the value of the average ping, usually + /// resulting in a more dynamic resend that responds to endpoints on slower or faster connections. + /// </para> + /// </remarks> + public volatile int ResendTimeoutMs = 0; + + /// <summary> + /// Max number of times to resend. 0 == no limit + /// </summary> + public volatile int ResendLimit = 0; + + /// <summary> + /// A compounding multiplier to back off resend timeout. + /// Applied to ping before first timeout when ResendTimeout == 0. + /// </summary> + public volatile float ResendPingMultiplier = 2; + + /// <summary> + /// Holds the last ID allocated. + /// </summary> + private int lastIDAllocated = -1; + + /// <summary> + /// The packets of data that have been transmitted reliably and not acknowledged. + /// </summary> + internal ConcurrentDictionary<ushort, Packet> reliableDataPacketsSent = new ConcurrentDictionary<ushort, Packet>(); + + /// <summary> + /// Packet ids that have not been received, but are expected. + /// </summary> + private HashSet<ushort> reliableDataPacketsMissing = new HashSet<ushort>(); + + /// <summary> + /// The packet id that was received last. + /// </summary> + protected volatile ushort reliableReceiveLast = ushort.MaxValue; + + private object PingLock = new object(); + + /// <summary> + /// Returns the average ping to this endpoint. + /// </summary> + /// <remarks> + /// This returns the average ping for a one-way trip as calculated from the reliable packets that have been sent + /// and acknowledged by the endpoint. + /// </remarks> + private float _pingMs = 500; + + /// <summary> + /// The maximum times a message should be resent before marking the endpoint as disconnected. + /// </summary> + /// <remarks> + /// Reliable packets will be resent at an interval defined in <see cref="ResendTimeoutMs"/> for the number of times + /// specified here. Once a packet has been retransmitted this number of times and has not been acknowledged the + /// connection will be marked as disconnected and the <see cref="Connection.Disconnected">Disconnected</see> event + /// will be invoked. + /// </remarks> + public volatile int DisconnectTimeoutMs = 5000; + + /// <summary> + /// Class to hold packet data + /// </summary> + public class Packet : IRecyclable + { + public ushort Id; + private byte[] Data; + private readonly UdpConnection Connection; + private int Length; + + public int NextTimeoutMs; + public volatile bool Acknowledged; + + public Action AckCallback; + + public int Retransmissions; + public Stopwatch Stopwatch = new Stopwatch(); + + internal Packet(UdpConnection connection) + { + this.Connection = connection; + } + + internal void Set(ushort id, byte[] data, int length, int timeout, Action ackCallback) + { + this.Id = id; + this.Data = data; + this.Length = length; + + this.Acknowledged = false; + this.NextTimeoutMs = timeout; + this.AckCallback = ackCallback; + this.Retransmissions = 0; + + this.Stopwatch.Restart(); + } + + // Packets resent + public int Resend() + { + var connection = this.Connection; + if (!this.Acknowledged && connection != null) + { + long lifetimeMs = this.Stopwatch.ElapsedMilliseconds; + if (lifetimeMs >= connection.DisconnectTimeoutMs) + { + if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) + { + connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetimeMs}ms ({self.Retransmissions} resends)"); + + self.Recycle(); + } + + return 0; + } + + if (lifetimeMs >= this.NextTimeoutMs) + { + ++this.Retransmissions; + if (connection.ResendLimit != 0 + && this.Retransmissions > connection.ResendLimit) + { + if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) + { + connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetimeMs}ms)"); + + self.Recycle(); + } + + return 0; + } + + this.NextTimeoutMs += (int)Math.Min(this.NextTimeoutMs * connection.ResendPingMultiplier, MaxAdditionalResendDelayMs); + try + { + connection.WriteBytesToConnection(this.Data, this.Length); + connection.Statistics.LogMessageResent(); + return 1; + } + catch (InvalidOperationException) + { + connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected"); + } + } + } + + return 0; + } + + /// <summary> + /// Returns this object back to the object pool from whence it came. + /// </summary> + public void Recycle() + { + this.Acknowledged = true; + + this.Connection.PacketPool.PutObject(this); + } + } + + internal int ManageReliablePackets() + { + int output = 0; + if (this.reliableDataPacketsSent.Count > 0) + { + foreach (var kvp in this.reliableDataPacketsSent) + { + Packet pkt = kvp.Value; + + try + { + output += pkt.Resend(); + } + catch { } + } + } + + return output; + } + + /// <summary> + /// Adds a 2 byte ID to the packet at offset and stores the packet reference for retransmission. + /// </summary> + /// <param name="buffer">The buffer to attach to.</param> + /// <param name="offset">The offset to attach at.</param> + /// <param name="ackCallback">The callback to make once the packet has been acknowledged.</param> + protected void AttachReliableID(byte[] buffer, int offset, Action ackCallback = null) + { + ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); + + buffer[offset] = (byte)(id >> 8); + buffer[offset + 1] = (byte)id; + + int resendDelayMs = this.ResendTimeoutMs; + if (resendDelayMs <= 0) + { + resendDelayMs = (_pingMs * this.ResendPingMultiplier).ClampToInt(MinResendDelayMs, MaxInitialResendDelayMs); + } + + Packet packet = this.PacketPool.GetObject(); + packet.Set( + id, + buffer, + buffer.Length, + resendDelayMs, + ackCallback); + + if (!reliableDataPacketsSent.TryAdd(id, packet)) + { + throw new Exception("That shouldn't be possible"); + } + } + + public static int ClampToInt(float value, int min, int max) + { + if (value < min) return min; + if (value > max) return max; + return (int)value; + } + + /// <summary> + /// Sends the bytes reliably and stores the send. + /// </summary> + /// <param name="sendOption"></param> + /// <param name="data">The byte array to write to.</param> + /// <param name="ackCallback">The callback to make once the packet has been acknowledged.</param> + private void ReliableSend(byte sendOption, byte[] data, Action ackCallback = null) + { + //Inform keepalive not to send for a while + ResetKeepAliveTimer(); + + byte[] bytes = new byte[data.Length + 3]; + + //Add message type + bytes[0] = sendOption; + + //Add reliable ID + AttachReliableID(bytes, 1, ackCallback); + + //Copy data into new array + Buffer.BlockCopy(data, 0, bytes, bytes.Length - data.Length, data.Length); + + //Write to connection + WriteBytesToConnection(bytes, bytes.Length); + + Statistics.LogReliableSend(data.Length); + } + + /// <summary> + /// Handles a reliable message being received and invokes the data event. + /// </summary> + /// <param name="message">The buffer received.</param> + private void ReliableMessageReceive(MessageReader message, int bytesReceived) + { + ushort id; + if (ProcessReliableReceive(message.Buffer, 1, out id)) + { + InvokeDataReceived(SendOption.Reliable, message, 3, bytesReceived); + } + else + { + message.Recycle(); + } + + Statistics.LogReliableReceive(message.Length - 3, message.Length); + } + + /// <summary> + /// Handles receives from reliable packets. + /// </summary> + /// <param name="bytes">The buffer containing the data.</param> + /// <param name="offset">The offset of the reliable header.</param> + /// <returns>Whether the packet was a new packet or not.</returns> + private bool ProcessReliableReceive(byte[] bytes, int offset, out ushort id) + { + byte b1 = bytes[offset]; + byte b2 = bytes[offset + 1]; + + //Get the ID form the packet + id = (ushort)((b1 << 8) + b2); + + /* + * It gets a little complicated here (note the fact I'm actually using a multiline comment for once...) + * + * In a simple world if our data is greater than the last reliable packet received (reliableReceiveLast) + * then it is guaranteed to be a new packet, if it's not we can see if we are missing that packet (lookup + * in reliableDataPacketsMissing). + * + * --------rrl############# (1) + * + * (where --- are packets received already and #### are packets that will be counted as new) + * + * Unfortunately if id becomes greater than 65535 it will loop back to zero so we will add a pointer that + * specifies any packets with an id behind it are also new (overwritePointer). + * + * ####op----------rrl##### (2) + * + * ------rll#########op---- (3) + * + * Anything behind than the reliableReceiveLast pointer (but greater than the overwritePointer is either a + * missing packet or something we've already received so when we change the pointers we need to make sure + * we keep note of what hasn't been received yet (reliableDataPacketsMissing). + * + * So... + */ + + bool result = true; + + lock (reliableDataPacketsMissing) + { + //Calculate overwritePointer + ushort overwritePointer = (ushort)(reliableReceiveLast - 32768); + + //Calculate if it is a new packet by examining if it is within the range + bool isNew; + if (overwritePointer < reliableReceiveLast) + isNew = id > reliableReceiveLast || id <= overwritePointer; //Figure (2) + else + isNew = id > reliableReceiveLast && id <= overwritePointer; //Figure (3) + + //If it's new or we've not received anything yet + if (isNew) + { + // Mark items between the most recent receive and the id received as missing + if (id > reliableReceiveLast) + { + for (ushort i = (ushort)(reliableReceiveLast + 1); i < id; i++) + { + reliableDataPacketsMissing.Add(i); + } + } + else + { + int cnt = (ushort.MaxValue - reliableReceiveLast) + id; + for (ushort i = 1; i <= cnt; ++i) + { + reliableDataPacketsMissing.Add((ushort)(i + reliableReceiveLast)); + } + } + + //Update the most recently received + reliableReceiveLast = id; + } + + //Else it could be a missing packet + else + { + //See if we're missing it, else this packet is a duplicate as so we return false + if (!reliableDataPacketsMissing.Remove(id)) + { + result = false; + } + } + } + + // Send an acknowledgement + SendAck(id); + + return result; + } + + /// <summary> + /// Handles acknowledgement packets to us. + /// </summary> + /// <param name="bytes">The buffer containing the data.</param> + private void AcknowledgementMessageReceive(byte[] bytes, int bytesReceived) + { + this.pingsSinceAck = 0; + + ushort id = (ushort)((bytes[1] << 8) + bytes[2]); + AcknowledgeMessageId(id); + + if (bytesReceived == 4) + { + byte recentPackets = bytes[3]; + for (int i = 1; i <= 8; ++i) + { + if ((recentPackets & 1) != 0) + { + AcknowledgeMessageId((ushort)(id - i)); + } + + recentPackets >>= 1; + } + } + + Statistics.LogAcknowledgementReceive(bytesReceived); + } + + private void AcknowledgeMessageId(ushort id) + { + // Dispose of timer and remove from dictionary + if (reliableDataPacketsSent.TryRemove(id, out Packet packet)) + { + this.Statistics.LogReliablePacketAcknowledged(); + float rt = packet.Stopwatch.ElapsedMilliseconds; + + packet.AckCallback?.Invoke(); + packet.Recycle(); + + lock (PingLock) + { + this._pingMs = this._pingMs * .7f + rt * .3f; + } + } + else if (this.activePingPackets.TryRemove(id, out PingPacket pingPkt)) + { + this.Statistics.LogReliablePacketAcknowledged(); + float rt = pingPkt.Stopwatch.ElapsedMilliseconds; + + pingPkt.Recycle(); + + lock (PingLock) + { + this._pingMs = this._pingMs * .7f + rt * .3f; + } + } + } + + /// <summary> + /// Sends an acknowledgement for a packet given its identification bytes. + /// </summary> + /// <param name="byte1">The first identification byte.</param> + /// <param name="byte2">The second identification byte.</param> + private void SendAck(ushort id) + { + byte recentPackets = 0; + lock (this.reliableDataPacketsMissing) + { + for (int i = 1; i <= 8; ++i) + { + if (!this.reliableDataPacketsMissing.Contains((ushort)(id - i))) + { + recentPackets |= (byte)(1 << (i - 1)); + } + } + } + + byte[] bytes = new byte[] + { + (byte)UdpSendOption.Acknowledgement, + (byte)(id >> 8), + (byte)(id >> 0), + recentPackets + }; + + try + { + WriteBytesToConnection(bytes, bytes.Length); + } + catch (InvalidOperationException) { } + } + + private void DisposeReliablePackets() + { + foreach (var kvp in reliableDataPacketsSent) + { + if (this.reliableDataPacketsSent.TryRemove(kvp.Key, out var pkt)) + { + pkt.Recycle(); + } + } + } + } +} diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.cs new file mode 100644 index 0000000..e64576a --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpConnection.cs @@ -0,0 +1,259 @@ +using System; +using System.Net.Sockets; + +namespace Hazel.Udp +{ + /// <summary> + /// Represents a connection that uses the UDP protocol. + /// </summary> + /// <inheritdoc /> + public abstract partial class UdpConnection : NetworkConnection + { + public static readonly byte[] EmptyDisconnectBytes = new byte[] { (byte)UdpSendOption.Disconnect }; + + public override float AveragePingMs => this._pingMs; + protected readonly ILogger logger; + + + public UdpConnection(ILogger logger) : base() + { + this.logger = logger; + this.PacketPool = new ObjectPool<Packet>(() => new Packet(this)); + } + + internal static Socket CreateSocket(IPMode ipMode) + { + Socket socket; + if (ipMode == IPMode.IPv4) + { + socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + } + else + { + if (!Socket.OSSupportsIPv6) + throw new InvalidOperationException("IPV6 not supported!"); + + socket = new Socket(AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp); + socket.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.IPv6Only, false); + } + + try + { + socket.DontFragment = false; + } + catch { } + + try + { + const int SIO_UDP_CONNRESET = -1744830452; + socket.IOControl(SIO_UDP_CONNRESET, new byte[1], null); + } + catch { } // Only necessary on Windows + + return socket; + } + + /// <summary> + /// Writes the given bytes to the connection. + /// </summary> + /// <param name="bytes">The bytes to write.</param> + protected abstract void WriteBytesToConnection(byte[] bytes, int length); + + /// <inheritdoc/> + public override SendErrors Send(MessageWriter msg) + { + if (this._state != ConnectionState.Connected) + { + return SendErrors.Disconnected; + } + + try + { + byte[] buffer = new byte[msg.Length]; + Buffer.BlockCopy(msg.Buffer, 0, buffer, 0, msg.Length); + + switch (msg.SendOption) + { + case SendOption.Reliable: + ResetKeepAliveTimer(); + + AttachReliableID(buffer, 1); + WriteBytesToConnection(buffer, buffer.Length); + Statistics.LogReliableSend(buffer.Length - 3); + break; + + default: + WriteBytesToConnection(buffer, buffer.Length); + Statistics.LogUnreliableSend(buffer.Length - 1); + break; + } + } + catch (Exception e) + { + this.logger?.WriteError("Unknown exception while sending: " + e); + return SendErrors.Unknown; + } + + return SendErrors.None; + } + + /// <summary> + /// Handles the reliable/fragmented sending from this connection. + /// </summary> + /// <param name="data">The data being sent.</param> + /// <param name="sendOption">The <see cref="SendOption"/> specified as its byte value.</param> + /// <param name="ackCallback">The callback to invoke when this packet is acknowledged.</param> + /// <returns>The bytes that should actually be sent.</returns> + protected virtual void HandleSend(byte[] data, byte sendOption, Action ackCallback = null) + { + switch (sendOption) + { + case (byte)UdpSendOption.Ping: + case (byte)SendOption.Reliable: + case (byte)UdpSendOption.Hello: + ReliableSend(sendOption, data, ackCallback); + break; + + //Treat all else as unreliable + default: + UnreliableSend(sendOption, data); + break; + } + } + + /// <summary> + /// Handles the receiving of data. + /// </summary> + /// <param name="message">The buffer containing the bytes received.</param> + protected internal virtual void HandleReceive(MessageReader message, int bytesReceived) + { + ushort id; + switch (message.Buffer[0]) + { + //Handle reliable receives + case (byte)SendOption.Reliable: + ReliableMessageReceive(message, bytesReceived); + break; + + //Handle acknowledgments + case (byte)UdpSendOption.Acknowledgement: + AcknowledgementMessageReceive(message.Buffer, bytesReceived); + message.Recycle(); + break; + + //We need to acknowledge hello and ping messages but dont want to invoke any events! + case (byte)UdpSendOption.Ping: + ProcessReliableReceive(message.Buffer, 1, out id); + Statistics.LogHelloReceive(bytesReceived); + message.Recycle(); + break; + case (byte)UdpSendOption.Hello: + ProcessReliableReceive(message.Buffer, 1, out id); + Statistics.LogHelloReceive(bytesReceived); + message.Recycle(); + break; + + case (byte)UdpSendOption.Disconnect: + message.Offset = 1; + message.Position = 0; + DisconnectRemote("The remote sent a disconnect request", message); + message.Recycle(); + break; + + case (byte)SendOption.None: + InvokeDataReceived(SendOption.None, message, 1, bytesReceived); + Statistics.LogUnreliableReceive(bytesReceived - 1, bytesReceived); + break; + + // Treat everything else as garbage + default: + message.Recycle(); + + // TODO: A new stat for unused data + Statistics.LogUnreliableReceive(bytesReceived - 1, bytesReceived); + break; + } + } + + /// <summary> + /// Sends bytes using the unreliable UDP protocol. + /// </summary> + /// <param name="sendOption">The SendOption to attach.</param> + /// <param name="data">The data.</param> + void UnreliableSend(byte sendOption, byte[] data) + { + this.UnreliableSend(sendOption, data, 0, data.Length); + } + + /// <summary> + /// Sends bytes using the unreliable UDP protocol. + /// </summary> + /// <param name="data">The data.</param> + /// <param name="sendOption">The SendOption to attach.</param> + /// <param name="offset"></param> + /// <param name="length"></param> + void UnreliableSend(byte sendOption, byte[] data, int offset, int length) + { + byte[] bytes = new byte[length + 1]; + + //Add message type + bytes[0] = sendOption; + + //Copy data into new array + Buffer.BlockCopy(data, offset, bytes, bytes.Length - length, length); + + //Write to connection + WriteBytesToConnection(bytes, bytes.Length); + + Statistics.LogUnreliableSend(length); + } + + /// <summary> + /// Helper method to invoke the data received event. + /// </summary> + /// <param name="sendOption">The send option the message was received with.</param> + /// <param name="buffer">The buffer received.</param> + /// <param name="dataOffset">The offset of data in the buffer.</param> + void InvokeDataReceived(SendOption sendOption, MessageReader buffer, int dataOffset, int bytesReceived) + { + buffer.Offset = dataOffset; + buffer.Length = bytesReceived - dataOffset; + buffer.Position = 0; + + InvokeDataReceived(buffer, sendOption); + } + + /// <summary> + /// Sends a hello packet to the remote endpoint. + /// </summary> + /// <param name="acknowledgeCallback">The callback to invoke when the hello packet is acknowledged.</param> + protected void SendHello(byte[] bytes, Action acknowledgeCallback) + { + //First byte of handshake is version indicator so add data after + byte[] actualBytes; + if (bytes == null) + { + actualBytes = new byte[1]; + } + else + { + actualBytes = new byte[bytes.Length + 1]; + Buffer.BlockCopy(bytes, 0, actualBytes, 1, bytes.Length); + } + + HandleSend(actualBytes, (byte)UdpSendOption.Hello, acknowledgeCallback); + } + + /// <inheritdoc/> + protected override void Dispose(bool disposing) + { + if (disposing) + { + DisposeKeepAliveTimer(); + DisposeReliablePackets(); + } + + base.Dispose(disposing); + } + } +} diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpConnectionListener.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpConnectionListener.cs new file mode 100644 index 0000000..c017a0f --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpConnectionListener.cs @@ -0,0 +1,339 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading; + +namespace Hazel.Udp +{ + /// <summary> + /// Listens for new UDP connections and creates UdpConnections for them. + /// </summary> + /// <inheritdoc /> + public class UdpConnectionListener : NetworkConnectionListener + { + private const int SendReceiveBufferSize = 1024 * 1024; + private const int BufferSize = ushort.MaxValue; + + private Socket socket; + private ILogger Logger; + private Timer reliablePacketTimer; + + private ConcurrentDictionary<EndPoint, UdpServerConnection> allConnections = new ConcurrentDictionary<EndPoint, UdpServerConnection>(); + + public override double AveragePing => this.allConnections.Values.Sum(c => c.AveragePingMs) / this.allConnections.Count; + public override int ConnectionCount { get { return this.allConnections.Count; } } + public override int ReceiveQueueLength => throw new NotImplementedException(); + public override int SendQueueLength => throw new NotImplementedException(); + + /// <summary> + /// Creates a new UdpConnectionListener for the given <see cref="IPAddress"/>, port and <see cref="IPMode"/>. + /// </summary> + /// <param name="endPoint">The endpoint to listen on.</param> + public UdpConnectionListener(IPEndPoint endPoint, IPMode ipMode = IPMode.IPv4, ILogger logger = null) + { + this.Logger = logger; + this.EndPoint = endPoint; + this.IPMode = ipMode; + + this.socket = UdpConnection.CreateSocket(this.IPMode); + + socket.ReceiveBufferSize = SendReceiveBufferSize; + socket.SendBufferSize = SendReceiveBufferSize; + + reliablePacketTimer = new Timer(ManageReliablePackets, null, 100, Timeout.Infinite); + } + + ~UdpConnectionListener() + { + this.Dispose(false); + } + + private void ManageReliablePackets(object state) + { + foreach (var kvp in this.allConnections) + { + var sock = kvp.Value; + sock.ManageReliablePackets(); + } + + try + { + this.reliablePacketTimer.Change(100, Timeout.Infinite); + } + catch { } + } + + /// <inheritdoc /> + public override void Start() + { + try + { + socket.Bind(EndPoint); + } + catch (SocketException e) + { + throw new HazelException("Could not start listening as a SocketException occurred", e); + } + + StartListeningForData(); + } + + /// <summary> + /// Instructs the listener to begin listening. + /// </summary> + private void StartListeningForData() + { + EndPoint remoteEP = EndPoint; + + MessageReader message = null; + try + { + message = MessageReader.GetSized(this.ReceiveBufferSize); + socket.BeginReceiveFrom(message.Buffer, 0, message.Buffer.Length, SocketFlags.None, ref remoteEP, ReadCallback, message); + } + catch (SocketException sx) + { + message?.Recycle(); + + this.Logger?.WriteError("Socket Ex in StartListening: " + sx.Message); + + Thread.Sleep(10); + StartListeningForData(); + return; + } + catch (Exception ex) + { + message.Recycle(); + this.Logger?.WriteError("Stopped due to: " + ex.Message); + return; + } + } + + void ReadCallback(IAsyncResult result) + { + var message = (MessageReader)result.AsyncState; + int bytesReceived; + EndPoint remoteEndPoint = new IPEndPoint(this.EndPoint.Address, this.EndPoint.Port); + + //End the receive operation + try + { + bytesReceived = socket.EndReceiveFrom(result, ref remoteEndPoint); + + message.Offset = 0; + message.Length = bytesReceived; + } + catch (ObjectDisposedException) + { + message.Recycle(); + return; + } + catch (SocketException sx) + { + message.Recycle(); + if (sx.SocketErrorCode == SocketError.NotConnected) + { + this.InvokeInternalError(HazelInternalErrors.ConnectionDisconnected); + return; + } + + // Client no longer reachable, pretend it didn't happen + // TODO should this not inform the connection this client is lost??? + + // This thread suggests the IP is not passed out from WinSoc so maybe not possible + // http://stackoverflow.com/questions/2576926/python-socket-error-on-udp-data-receive-10054 + this.Logger?.WriteError($"Socket Ex {sx.SocketErrorCode} in ReadCallback: {sx.Message}"); + + Thread.Sleep(10); + StartListeningForData(); + return; + } + catch (Exception ex) + { + // Idk, maybe a null ref after dispose? + message.Recycle(); + this.Logger?.WriteError("Stopped due to: " + ex.Message); + return; + } + + // I'm a little concerned about a infinite loop here, but it seems like it's possible + // to get 0 bytes read on UDP without the socket being shut down. + if (bytesReceived == 0) + { + message.Recycle(); + this.Logger?.WriteInfo("Received 0 bytes"); + Thread.Sleep(10); + StartListeningForData(); + return; + } + + //Begin receiving again + StartListeningForData(); + + bool aware = true; + bool isHello = message.Buffer[0] == (byte)UdpSendOption.Hello; + + // If we're aware of this connection use the one already + // If this is a new client then connect with them! + UdpServerConnection connection; + if (!this.allConnections.TryGetValue(remoteEndPoint, out connection)) + { + lock (this.allConnections) + { + if (!this.allConnections.TryGetValue(remoteEndPoint, out connection)) + { + // Check for malformed connection attempts + if (!isHello) + { + message.Recycle(); + return; + } + + if (AcceptConnection != null) + { + if (!AcceptConnection((IPEndPoint)remoteEndPoint, message.Buffer, out var response)) + { + message.Recycle(); + if (response != null) + { + SendData(response, response.Length, remoteEndPoint); + } + + return; + } + } + + aware = false; + connection = new UdpServerConnection(this, (IPEndPoint)remoteEndPoint, this.IPMode, this.Logger); + if (!this.allConnections.TryAdd(remoteEndPoint, connection)) + { + throw new HazelException("Failed to add a connection. This should never happen."); + } + } + } + } + + // If it's a new connection invoke the NewConnection event. + // This needs to happen before handling the message because in localhost scenarios, the ACK and + // subsequent messages can happen before the NewConnection event sets up OnDataRecieved handlers + if (!aware) + { + // Skip header and hello byte; + message.Offset = 4; + message.Length = bytesReceived - 4; + message.Position = 0; + InvokeNewConnection(message, connection); + } + + // Inform the connection of the buffer (new connections need to send an ack back to client) + connection.HandleReceive(message, bytesReceived); + } + +#if DEBUG + public int TestDropRate = -1; + private int dropCounter = 0; +#endif + + /// <summary> + /// Sends data from the listener socket. + /// </summary> + /// <param name="bytes">The bytes to send.</param> + /// <param name="endPoint">The endpoint to send to.</param> + internal void SendData(byte[] bytes, int length, EndPoint endPoint) + { + if (length > bytes.Length) return; + +#if DEBUG + if (TestDropRate > 0) + { + if (Interlocked.Increment(ref dropCounter) % TestDropRate == 0) + { + return; + } + } +#endif + + try + { + socket.BeginSendTo( + bytes, + 0, + length, + SocketFlags.None, + endPoint, + SendCallback, + null); + + this.Statistics.AddBytesSent(length); + } + catch (SocketException e) + { + this.Logger?.WriteError("Could not send data as a SocketException occurred: " + e); + } + catch (ObjectDisposedException) + { + //Keep alive timer probably ran, ignore + return; + } + } + + private void SendCallback(IAsyncResult result) + { + try + { + socket.EndSendTo(result); + } + catch { } + } + + /// <summary> + /// Sends data from the listener socket. + /// </summary> + /// <param name="bytes">The bytes to send.</param> + /// <param name="endPoint">The endpoint to send to.</param> + internal void SendDataSync(byte[] bytes, int length, EndPoint endPoint) + { + try + { + socket.SendTo( + bytes, + 0, + length, + SocketFlags.None, + endPoint + ); + + this.Statistics.AddBytesSent(length); + } + catch { } + } + + /// <summary> + /// Removes a virtual connection from the list. + /// </summary> + /// <param name="endPoint">The endpoint of the virtual connection.</param> + internal void RemoveConnectionTo(EndPoint endPoint) + { + this.allConnections.TryRemove(endPoint, out var conn); + } + + /// <inheritdoc /> + protected override void Dispose(bool disposing) + { + foreach (var kvp in this.allConnections) + { + kvp.Value.Dispose(); + } + + try { this.socket.Shutdown(SocketShutdown.Both); } catch { } + try { this.socket.Close(); } catch { } + try { this.socket.Dispose(); } catch { } + + this.reliablePacketTimer.Dispose(); + + base.Dispose(disposing); + } + } +} diff --git a/Tools/Hazel-Networking/Hazel/Udp/UdpServerConnection.cs b/Tools/Hazel-Networking/Hazel/Udp/UdpServerConnection.cs new file mode 100644 index 0000000..ff5b29d --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UdpServerConnection.cs @@ -0,0 +1,108 @@ +using System; +using System.Net; + +namespace Hazel.Udp +{ + /// <summary> + /// Represents a servers's connection to a client that uses the UDP protocol. + /// </summary> + /// <inheritdoc/> + internal sealed class UdpServerConnection : UdpConnection + { + /// <summary> + /// The connection listener that we use the socket of. + /// </summary> + /// <remarks> + /// Udp server connections utilize the same socket in the listener for sends/receives, this is the listener that + /// created this connection and is hence the listener this conenction sends and receives via. + /// </remarks> + public UdpConnectionListener Listener { get; private set; } + + /// <summary> + /// Creates a UdpConnection for the virtual connection to the endpoint. + /// </summary> + /// <param name="listener">The listener that created this connection.</param> + /// <param name="endPoint">The endpoint that we are connected to.</param> + /// <param name="IPMode">The IPMode we are connected using.</param> + internal UdpServerConnection(UdpConnectionListener listener, IPEndPoint endPoint, IPMode IPMode, ILogger logger) + : base(logger) + { + this.Listener = listener; + this.EndPoint = endPoint; + this.IPMode = IPMode; + + State = ConnectionState.Connected; + this.InitializeKeepAliveTimer(); + } + + /// <inheritdoc /> + protected override void WriteBytesToConnection(byte[] bytes, int length) + { + this.Statistics.LogPacketSend(length); + Listener.SendData(bytes, length, EndPoint); + } + + /// <inheritdoc /> + /// <remarks> + /// This will always throw a HazelException. + /// </remarks> + public override void Connect(byte[] bytes = null, int timeout = 5000) + { + throw new InvalidOperationException("Cannot manually connect a UdpServerConnection, did you mean to use UdpClientConnection?"); + } + + /// <inheritdoc /> + /// <remarks> + /// This will always throw a HazelException. + /// </remarks> + public override void ConnectAsync(byte[] bytes = null) + { + throw new InvalidOperationException("Cannot manually connect a UdpServerConnection, did you mean to use UdpClientConnection?"); + } + + /// <summary> + /// Sends a disconnect message to the end point. + /// </summary> + protected override bool SendDisconnect(MessageWriter data = null) + { + lock (this) + { + if (this._state != ConnectionState.Connected) + { + return false; + } + + this._state = ConnectionState.NotConnected; + } + + var bytes = EmptyDisconnectBytes; + if (data != null && data.Length > 0) + { + if (data.SendOption != SendOption.None) throw new ArgumentException("Disconnect messages can only be unreliable."); + + bytes = data.ToByteArray(true); + bytes[0] = (byte)UdpSendOption.Disconnect; + } + + try + { + Listener.SendDataSync(bytes, bytes.Length, EndPoint); + } + catch { } + + return true; + } + + protected override void Dispose(bool disposing) + { + Listener.RemoveConnectionTo(EndPoint); + + if (disposing) + { + SendDisconnect(); + } + + base.Dispose(disposing); + } + } +} diff --git a/Tools/Hazel-Networking/Hazel/Udp/UnityUdpClientConnection.cs b/Tools/Hazel-Networking/Hazel/Udp/UnityUdpClientConnection.cs new file mode 100644 index 0000000..8e6063d --- /dev/null +++ b/Tools/Hazel-Networking/Hazel/Udp/UnityUdpClientConnection.cs @@ -0,0 +1,353 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; + + +namespace Hazel.Udp +{ + /// <summary> + /// Unity doesn't always get along with thread pools well, so this interface will hopefully suit that case better. + /// </summary> + /// <inheritdoc/> + public class UnityUdpClientConnection : UdpConnection + { + /// <summary> + /// The max size Hazel attempts to read from the network. + /// Defaults to 8096. + /// </summary> + /// <remarks> + /// 8096 is 5 times the standard modern MTU of 1500, so it's already too large imo. + /// If Hazel ever implements fragmented packets, then we might consider a larger value since combining 5 + /// packets into 1 reader would be realistic and would cause reallocations. That said, Hazel is not meant + /// for transferring large contiguous blocks of data, so... please don't? + /// </remarks> + public int ReceiveBufferSize = 8096; + + private Socket socket; + + public UnityUdpClientConnection(ILogger logger, IPEndPoint remoteEndPoint, IPMode ipMode = IPMode.IPv4) + : base(logger) + { + this.EndPoint = remoteEndPoint; + this.IPMode = ipMode; + + this.socket = CreateSocket(ipMode); + this.socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ExclusiveAddressUse, true); + } + + ~UnityUdpClientConnection() + { + this.Dispose(false); + } + + public void FixedUpdate() + { + try + { + ResendPacketsIfNeeded(); + } + catch (Exception e) + { + this.logger.WriteError("FixedUpdate: " + e); + } + + try + { + ManageReliablePackets(); + } + catch (Exception e) + { + this.logger.WriteError("FixedUpdate: " + e); + } + } + + protected virtual void RestartConnection() + { + } + + protected virtual void ResendPacketsIfNeeded() + { + } + + /// <inheritdoc /> + protected override void WriteBytesToConnection(byte[] bytes, int length) + { +#if DEBUG + if (TestLagMs > 0) + { + ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length); }); + } + else +#endif + { + WriteBytesToConnectionReal(bytes, length); + } + } + + private void WriteBytesToConnectionReal(byte[] bytes, int length) + { + try + { + this.Statistics.LogPacketSend(length); + socket.BeginSendTo( + bytes, + 0, + length, + SocketFlags.None, + EndPoint, + HandleSendTo, + null); + } + catch (NullReferenceException) { } + catch (ObjectDisposedException) + { + // Already disposed and disconnected... + } + catch (SocketException ex) + { + DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); + } + } + + /// <summary> + /// Synchronously writes the given bytes to the connection. + /// </summary> + /// <param name="bytes">The bytes to write.</param> + protected virtual void WriteBytesToConnectionSync(byte[] bytes, int length) + { + try + { + socket.SendTo( + bytes, + 0, + length, + SocketFlags.None, + EndPoint); + } + catch (NullReferenceException) { } + catch (ObjectDisposedException) + { + // Already disposed and disconnected... + } + catch (SocketException ex) + { + DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); + } + } + + private void HandleSendTo(IAsyncResult result) + { + try + { + socket.EndSendTo(result); + } + catch (NullReferenceException) { } + catch (ObjectDisposedException) + { + // Already disposed and disconnected... + } + catch (SocketException ex) + { + DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); + } + } + + public override void Connect(byte[] bytes = null, int timeout = 5000) + { + this.ConnectAsync(bytes); + for (int timer = 0; timer < timeout; timer += 100) + { + if (this.State != ConnectionState.Connecting) return; + Thread.Sleep(100); + + // I guess if we're gonna block in Unity, then let's assume no one will pump this for us. + this.FixedUpdate(); + } + } + + /// <inheritdoc /> + public override void ConnectAsync(byte[] bytes = null) + { + this.State = ConnectionState.Connecting; + + try + { + if (IPMode == IPMode.IPv4) + socket.Bind(new IPEndPoint(IPAddress.Any, 0)); + else + socket.Bind(new IPEndPoint(IPAddress.IPv6Any, 0)); + } + catch (SocketException e) + { + this.State = ConnectionState.NotConnected; + throw new HazelException("A SocketException occurred while binding to the port.", e); + } + + this.RestartConnection(); + + try + { + StartListeningForData(); + } + catch (ObjectDisposedException) + { + // If the socket's been disposed then we can just end there but make sure we're in NotConnected state. + // If we end up here I'm really lost... + this.State = ConnectionState.NotConnected; + return; + } + catch (SocketException e) + { + Dispose(); + throw new HazelException("A SocketException occurred while initiating a receive operation.", e); + } + + // Write bytes to the server to tell it hi (and to punch a hole in our NAT, if present) + // When acknowledged set the state to connected + SendHello(bytes, () => + { + this.InitializeKeepAliveTimer(); + this.State = ConnectionState.Connected; + }); + } + + /// <summary> + /// Instructs the listener to begin listening. + /// </summary> + void StartListeningForData() + { + var msg = MessageReader.GetSized(this.ReceiveBufferSize); + try + { + EndPoint ep = this.EndPoint; + socket.BeginReceiveFrom(msg.Buffer, 0, msg.Buffer.Length, SocketFlags.None, ref ep, ReadCallback, msg); + } + catch + { + msg.Recycle(); + this.Dispose(); + } + } + + /// <summary> + /// Called when data has been received by the socket. + /// </summary> + /// <param name="result">The asyncronous operation's result.</param> + void ReadCallback(IAsyncResult result) + { +#if DEBUG + if (this.TestLagMs > 0) + { + Thread.Sleep(this.TestLagMs); + } +#endif + + var msg = (MessageReader)result.AsyncState; + + try + { + EndPoint ep = this.EndPoint; + msg.Length = socket.EndReceiveFrom(result, ref ep); + } + catch (SocketException e) + { + msg.Recycle(); + DisconnectInternal(HazelInternalErrors.SocketExceptionReceive, "Socket exception while reading data: " + e.Message); + return; + } + catch (ObjectDisposedException) + { + // Weirdly, it seems that this method can be called twice on the same AsyncState when object is disposed... + // So this just keeps us from hitting Duplicate Add errors at the risk of if this is a platform + // specific bug, we leak a MessageReader while the socket is disposing. Not a bad trade off. + return; + } + catch (Exception) + { + msg.Recycle(); + return; + } + + //Exit if no bytes read, we've failed. + if (msg.Length == 0) + { + msg.Recycle(); + DisconnectInternal(HazelInternalErrors.ReceivedZeroBytes, "Received 0 bytes"); + return; + } + + //Begin receiving again + try + { + StartListeningForData(); + } + catch (SocketException e) + { + DisconnectInternal(HazelInternalErrors.SocketExceptionReceive, "Socket exception during receive: " + e.Message); + } + catch (ObjectDisposedException) + { + //If the socket's been disposed then we can just end there. + return; + } + +#if DEBUG + if (this.TestDropRate > 0) + { + if ((this.testDropCount++ % this.TestDropRate) == 0) + { + return; + } + } +#endif + + HandleReceive(msg, msg.Length); + } + + /// <summary> + /// Sends a disconnect message to the end point. + /// You may include optional disconnect data. The SendOption must be unreliable. + /// </summary> + protected override bool SendDisconnect(MessageWriter data = null) + { + lock (this) + { + if (this._state == ConnectionState.NotConnected) return false; + this._state = ConnectionState.NotConnected; + } + + var bytes = EmptyDisconnectBytes; + if (data != null && data.Length > 0) + { + if (data.SendOption != SendOption.None) throw new ArgumentException("Disconnect messages can only be unreliable."); + + bytes = data.ToByteArray(true); + bytes[0] = (byte)UdpSendOption.Disconnect; + } + + try + { + this.WriteBytesToConnectionSync(bytes, bytes.Length); + } + catch { } + + return true; + } + + /// <inheritdoc /> + protected override void Dispose(bool disposing) + { + if (disposing) + { + SendDisconnect(); + } + + try { this.socket.Shutdown(SocketShutdown.Both); } catch { } + try { this.socket.Close(); } catch { } + try { this.socket.Dispose(); } catch { } + + base.Dispose(disposing); + } + } +} |