summaryrefslogtreecommitdiff
path: root/Impostor-dev/src/Impostor.Hazel/Udp
diff options
context:
space:
mode:
Diffstat (limited to 'Impostor-dev/src/Impostor.Hazel/Udp')
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/SendOptionInternal.cs33
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcastListener.cs156
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcaster.cs79
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpClientConnection.cs225
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.KeepAlive.cs167
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs491
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.cs312
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionListener.cs281
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionRateLimit.cs75
-rw-r--r--Impostor-dev/src/Impostor.Hazel/Udp/UdpServerConnection.cs97
10 files changed, 1916 insertions, 0 deletions
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/SendOptionInternal.cs b/Impostor-dev/src/Impostor.Hazel/Udp/SendOptionInternal.cs
new file mode 100644
index 0000000..c0c4e21
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/SendOptionInternal.cs
@@ -0,0 +1,33 @@
+namespace Impostor.Hazel.Udp
+{
+ /// <summary>
+ /// Extra internal states for SendOption enumeration when using UDP.
+ /// </summary>
+ public enum UdpSendOption : byte
+ {
+ /// <summary>
+ /// Hello message for initiating communication.
+ /// </summary>
+ Hello = 8,
+
+ /// <summary>
+ /// A single byte of continued existence
+ /// </summary>
+ Ping = 12,
+
+ /// <summary>
+ /// Message for discontinuing communication.
+ /// </summary>
+ Disconnect = 9,
+
+ /// <summary>
+ /// Message acknowledging the receipt of a message.
+ /// </summary>
+ Acknowledgement = 10,
+
+ /// <summary>
+ /// Message that is part of a larger, fragmented message.
+ /// </summary>
+ Fragment = 11,
+ }
+}
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcastListener.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcastListener.cs
new file mode 100644
index 0000000..ed7b68d
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcastListener.cs
@@ -0,0 +1,156 @@
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+
+namespace Impostor.Hazel.Udp
+{
+ public class BroadcastPacket
+ {
+ public string Data;
+ public DateTime ReceiveTime;
+ public IPEndPoint Sender;
+
+ public BroadcastPacket(string data, IPEndPoint sender)
+ {
+ this.Data = data;
+ this.Sender = sender;
+ this.ReceiveTime = DateTime.Now;
+ }
+
+ public string GetAddress()
+ {
+ return this.Sender.Address.ToString();
+ }
+ }
+
+ public class UdpBroadcastListener : IDisposable
+ {
+ private Socket socket;
+ private EndPoint endpoint;
+ private Action<string> logger;
+
+ private byte[] buffer = new byte[1024];
+
+ private List<BroadcastPacket> packets = new List<BroadcastPacket>();
+
+ public bool Running { get; private set; }
+
+ ///
+ public UdpBroadcastListener(int port, Action<string> logger = null)
+ {
+ this.logger = logger;
+ this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
+ this.socket.EnableBroadcast = true;
+ this.socket.MulticastLoopback = false;
+ this.endpoint = new IPEndPoint(IPAddress.Any, port);
+ this.socket.Bind(this.endpoint);
+ }
+
+ ///
+ public void StartListen()
+ {
+ if (this.Running) return;
+ this.Running = true;
+
+ try
+ {
+ EndPoint endpt = new IPEndPoint(IPAddress.Any, 0);
+ this.socket.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref endpt, this.HandleData, null);
+ }
+ catch (NullReferenceException) { }
+ catch (Exception e)
+ {
+ this.logger?.Invoke("BroadcastListener: " + e);
+ this.Dispose();
+ }
+ }
+
+ private void HandleData(IAsyncResult result)
+ {
+ this.Running = false;
+
+ int numBytes;
+ EndPoint endpt = new IPEndPoint(IPAddress.Any, 0);
+ try
+ {
+ numBytes = this.socket.EndReceiveFrom(result, ref endpt);
+ }
+ catch (NullReferenceException)
+ {
+ // Already disposed
+ return;
+ }
+ catch (Exception e)
+ {
+ this.logger?.Invoke("BroadcastListener: " + e);
+ this.Dispose();
+ return;
+ }
+
+ if (numBytes < 3
+ || buffer[0] != 4 || buffer[1] != 2)
+ {
+ this.StartListen();
+ return;
+ }
+
+ IPEndPoint ipEnd = (IPEndPoint)endpt;
+ string data = UTF8Encoding.UTF8.GetString(buffer, 2, numBytes - 2);
+ int dataHash = data.GetHashCode();
+
+ lock (packets)
+ {
+ bool found = false;
+ for (int i = 0; i < this.packets.Count; ++i)
+ {
+ var pkt = this.packets[i];
+ if (pkt == null || pkt.Data == null)
+ {
+ this.packets.RemoveAt(i);
+ i--;
+ continue;
+ }
+
+ if (pkt.Data.GetHashCode() == dataHash
+ && pkt.Sender.Equals(ipEnd))
+ {
+ this.packets[i].ReceiveTime = DateTime.Now;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ this.packets.Add(new BroadcastPacket(data, ipEnd));
+ }
+ }
+
+ this.StartListen();
+ }
+
+ ///
+ public BroadcastPacket[] GetPackets()
+ {
+ lock (this.packets)
+ {
+ var output = this.packets.ToArray();
+ this.packets.Clear();
+ return output;
+ }
+ }
+
+ ///
+ public void Dispose()
+ {
+ if (this.socket != null)
+ {
+ try { this.socket.Shutdown(SocketShutdown.Both); } catch { }
+ try { this.socket.Close(); } catch { }
+ try { this.socket.Dispose(); } catch { }
+ this.socket = null;
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcaster.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcaster.cs
new file mode 100644
index 0000000..5fa1cca
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcaster.cs
@@ -0,0 +1,79 @@
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+
+namespace Impostor.Hazel.Udp
+{
+ ///
+ public class UdpBroadcaster : IDisposable
+ {
+ private Socket socket;
+ private byte[] data;
+ private EndPoint endpoint;
+ private Action<string> logger;
+
+ ///
+ public UdpBroadcaster(int port, Action<string> logger = null)
+ {
+ this.logger = logger;
+ this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
+ this.socket.EnableBroadcast = true;
+ this.socket.MulticastLoopback = false;
+ this.endpoint = new IPEndPoint(IPAddress.Broadcast, port);
+ }
+
+ ///
+ public void SetData(string data)
+ {
+ int len = UTF8Encoding.UTF8.GetByteCount(data);
+ this.data = new byte[len + 2];
+ this.data[0] = 4;
+ this.data[1] = 2;
+
+ UTF8Encoding.UTF8.GetBytes(data, 0, data.Length, this.data, 2);
+ }
+
+ ///
+ public void Broadcast()
+ {
+ if (this.data == null)
+ {
+ return;
+ }
+
+ try
+ {
+ this.socket.BeginSendTo(data, 0, data.Length, SocketFlags.None, this.endpoint, this.FinishSendTo, null);
+ }
+ catch (Exception e)
+ {
+ this.logger?.Invoke("BroadcastListener: " + e);
+ }
+ }
+
+ private void FinishSendTo(IAsyncResult evt)
+ {
+ try
+ {
+ this.socket.EndSendTo(evt);
+ }
+ catch (Exception e)
+ {
+ this.logger?.Invoke("BroadcastListener: " + e);
+ }
+ }
+
+ ///
+ public void Dispose()
+ {
+ if (this.socket != null)
+ {
+ try { this.socket.Shutdown(SocketShutdown.Both); } catch { }
+ try { this.socket.Close(); } catch { }
+ try { this.socket.Dispose(); } catch { }
+ this.socket = null;
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpClientConnection.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpClientConnection.cs
new file mode 100644
index 0000000..5125ebe
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpClientConnection.cs
@@ -0,0 +1,225 @@
+using System;
+using System.Buffers;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Impostor.Api.Net.Messages;
+using Microsoft.Extensions.ObjectPool;
+using Serilog;
+
+namespace Impostor.Hazel.Udp
+{
+ /// <summary>
+ /// Represents a client's connection to a server that uses the UDP protocol.
+ /// </summary>
+ /// <inheritdoc/>
+ public sealed class UdpClientConnection : UdpConnection
+ {
+ private static readonly ILogger Logger = Log.ForContext<UdpClientConnection>();
+
+ /// <summary>
+ /// The socket we're connected via.
+ /// </summary>
+ private readonly UdpClient _socket;
+
+ private readonly Timer _reliablePacketTimer;
+ private readonly SemaphoreSlim _connectWaitLock;
+ private Task _listenTask;
+
+ /// <summary>
+ /// Creates a new UdpClientConnection.
+ /// </summary>
+ /// <param name="remoteEndPoint">A <see cref="NetworkEndPoint"/> to connect to.</param>
+ public UdpClientConnection(IPEndPoint remoteEndPoint, ObjectPool<MessageReader> readerPool, IPMode ipMode = IPMode.IPv4) : base(null, readerPool)
+ {
+ EndPoint = remoteEndPoint;
+ RemoteEndPoint = remoteEndPoint;
+ IPMode = ipMode;
+
+ _socket = new UdpClient
+ {
+ DontFragment = false
+ };
+
+ _reliablePacketTimer = new Timer(ManageReliablePacketsInternal, null, 100, Timeout.Infinite);
+ _connectWaitLock = new SemaphoreSlim(1, 1);
+ }
+
+ ~UdpClientConnection()
+ {
+ Dispose(false);
+ }
+
+ private async void ManageReliablePacketsInternal(object state)
+ {
+ await ManageReliablePackets();
+
+ try
+ {
+ _reliablePacketTimer.Change(100, Timeout.Infinite);
+ }
+ catch
+ {
+ // ignored
+ }
+ }
+
+ /// <inheritdoc />
+ protected override ValueTask WriteBytesToConnection(byte[] bytes, int length)
+ {
+ return WriteBytesToConnectionReal(bytes, length);
+ }
+
+ private async ValueTask WriteBytesToConnectionReal(byte[] bytes, int length)
+ {
+ try
+ {
+ await _socket.SendAsync(bytes, length);
+ }
+ catch (NullReferenceException) { }
+ catch (ObjectDisposedException)
+ {
+ // Already disposed and disconnected...
+ }
+ catch (SocketException ex)
+ {
+ await DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message);
+ }
+ }
+
+ /// <inheritdoc />
+ public override async ValueTask ConnectAsync(byte[] bytes = null)
+ {
+ State = ConnectionState.Connecting;
+
+ try
+ {
+ _socket.Connect(RemoteEndPoint);
+ }
+ catch (SocketException e)
+ {
+ State = ConnectionState.NotConnected;
+ throw new HazelException("A SocketException occurred while binding to the port.", e);
+ }
+
+ try
+ {
+ _listenTask = Task.Factory.StartNew(ListenAsync, TaskCreationOptions.LongRunning);
+ }
+ catch (ObjectDisposedException)
+ {
+ // If the socket's been disposed then we can just end there but make sure we're in NotConnected state.
+ // If we end up here I'm really lost...
+ State = ConnectionState.NotConnected;
+ return;
+ }
+ catch (SocketException e)
+ {
+ Dispose();
+ throw new HazelException("A SocketException occurred while initiating a receive operation.", e);
+ }
+
+ // Write bytes to the server to tell it hi (and to punch a hole in our NAT, if present)
+ // When acknowledged set the state to connected
+ await SendHello(bytes, () =>
+ {
+ State = ConnectionState.Connected;
+ InitializeKeepAliveTimer();
+ });
+
+ await _connectWaitLock.WaitAsync(TimeSpan.FromSeconds(10));
+ }
+
+ private async Task ListenAsync()
+ {
+ // Start packet handler.
+ await StartAsync();
+
+ // Listen.
+ while (State != ConnectionState.NotConnected)
+ {
+ UdpReceiveResult data;
+
+ try
+ {
+ data = await _socket.ReceiveAsync();
+ }
+ catch (SocketException e)
+ {
+ await DisconnectInternal(HazelInternalErrors.SocketExceptionReceive, "Socket exception while reading data: " + e.Message);
+ return;
+ }
+ catch (Exception)
+ {
+ return;
+ }
+
+ if (data.Buffer.Length == 0)
+ {
+ await DisconnectInternal(HazelInternalErrors.ReceivedZeroBytes, "Received 0 bytes");
+ return;
+ }
+
+ // Write to client.
+ await Pipeline.Writer.WriteAsync(data.Buffer);
+ }
+ }
+
+ protected override void SetState(ConnectionState state)
+ {
+ if (state == ConnectionState.Connected)
+ {
+ _connectWaitLock.Release();
+ }
+ }
+
+ /// <summary>
+ /// Sends a disconnect message to the end point.
+ /// You may include optional disconnect data. The SendOption must be unreliable.
+ /// </summary>
+ protected override async ValueTask<bool> SendDisconnect(MessageWriter data = null)
+ {
+ lock (this)
+ {
+ if (_state == ConnectionState.NotConnected) return false;
+ _state = ConnectionState.NotConnected;
+ }
+
+ var bytes = EmptyDisconnectBytes;
+ if (data != null && data.Length > 0)
+ {
+ if (data.SendOption != MessageType.Unreliable)
+ {
+ throw new ArgumentException("Disconnect messages can only be unreliable.");
+ }
+
+ bytes = data.ToByteArray(true);
+ bytes[0] = (byte)UdpSendOption.Disconnect;
+ }
+
+ try
+ {
+ await _socket.SendAsync(bytes, bytes.Length, RemoteEndPoint);
+ }
+ catch { }
+
+ return true;
+ }
+
+ /// <inheritdoc />
+ protected override void Dispose(bool disposing)
+ {
+ State = ConnectionState.NotConnected;
+
+ try { _socket.Close(); } catch { }
+ try { _socket.Dispose(); } catch { }
+
+ _reliablePacketTimer.Dispose();
+ _connectWaitLock.Dispose();
+
+ base.Dispose(disposing);
+ }
+ }
+}
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.KeepAlive.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.KeepAlive.cs
new file mode 100644
index 0000000..a73291b
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.KeepAlive.cs
@@ -0,0 +1,167 @@
+using System;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Impostor.Hazel.Udp
+{
+ partial class UdpConnection
+ {
+
+ /// <summary>
+ /// Class to hold packet data
+ /// </summary>
+ public class PingPacket : IRecyclable
+ {
+ private static readonly ObjectPoolCustom<PingPacket> PacketPool = new ObjectPoolCustom<PingPacket>(() => new PingPacket());
+
+ public readonly Stopwatch Stopwatch = new Stopwatch();
+
+ internal static PingPacket GetObject()
+ {
+ return PacketPool.GetObject();
+ }
+
+ public void Recycle()
+ {
+ Stopwatch.Stop();
+ PacketPool.PutObject(this);
+ }
+ }
+
+ internal ConcurrentDictionary<ushort, PingPacket> activePingPackets = new ConcurrentDictionary<ushort, PingPacket>();
+
+ /// <summary>
+ /// The interval from data being received or transmitted to a keepalive packet being sent in milliseconds.
+ /// </summary>
+ /// <remarks>
+ /// <para>
+ /// Keepalive packets serve to close connections when an endpoint abruptly disconnects and to ensure than any
+ /// NAT devices do not close their translation for our argument. By ensuring there is regular contact the
+ /// connection can detect and prevent these issues.
+ /// </para>
+ /// <para>
+ /// The default value is 10 seconds, set to System.Threading.Timeout.Infinite to disable keepalive packets.
+ /// </para>
+ /// </remarks>
+ public int KeepAliveInterval
+ {
+ get
+ {
+ return keepAliveInterval;
+ }
+
+ set
+ {
+ keepAliveInterval = value;
+ ResetKeepAliveTimer();
+ }
+ }
+ private int keepAliveInterval = 1500;
+
+ public int MissingPingsUntilDisconnect { get; set; } = 6;
+ private volatile int pingsSinceAck = 0;
+
+ /// <summary>
+ /// The timer creating keepalive pulses.
+ /// </summary>
+ private Timer keepAliveTimer;
+
+ /// <summary>
+ /// Starts the keepalive timer.
+ /// </summary>
+ protected void InitializeKeepAliveTimer()
+ {
+ keepAliveTimer = new Timer(
+ HandleKeepAlive,
+ null,
+ keepAliveInterval,
+ keepAliveInterval
+ );
+ }
+
+ private async void HandleKeepAlive(object state)
+ {
+ if (this.State != ConnectionState.Connected) return;
+
+ if (this.pingsSinceAck >= this.MissingPingsUntilDisconnect)
+ {
+ this.DisposeKeepAliveTimer();
+ await this.DisconnectInternal(HazelInternalErrors.PingsWithoutResponse, $"Sent {this.pingsSinceAck} pings that remote has not responded to.");
+ return;
+ }
+
+ try
+ {
+ Interlocked.Increment(ref pingsSinceAck);
+ await SendPing();
+ }
+ catch
+ {
+ }
+ }
+
+ // Pings are special, quasi-reliable packets.
+ // We send them to trigger responses that validate our connection is alive
+ // An unacked ping should never be the sole cause of a disconnect.
+ // Rather, the responses will reset our pingsSinceAck, enough unacked
+ // pings should cause a disconnect.
+ private async ValueTask SendPing()
+ {
+ ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated);
+
+ byte[] bytes = new byte[3];
+ bytes[0] = (byte)UdpSendOption.Ping;
+ bytes[1] = (byte)(id >> 8);
+ bytes[2] = (byte)id;
+
+ PingPacket pkt;
+ if (!this.activePingPackets.TryGetValue(id, out pkt))
+ {
+ pkt = PingPacket.GetObject();
+ if (!this.activePingPackets.TryAdd(id, pkt))
+ {
+ throw new Exception("This shouldn't be possible");
+ }
+ }
+
+ pkt.Stopwatch.Restart();
+
+ await WriteBytesToConnection(bytes, bytes.Length);
+
+ Statistics.LogReliableSend(0, bytes.Length);
+ }
+
+ /// <summary>
+ /// Resets the keepalive timer to zero.
+ /// </summary>
+ private void ResetKeepAliveTimer()
+ {
+ try
+ {
+ keepAliveTimer.Change(keepAliveInterval, keepAliveInterval);
+ }
+ catch { }
+ }
+
+ /// <summary>
+ /// Disposes of the keep alive timer.
+ /// </summary>
+ private void DisposeKeepAliveTimer()
+ {
+ if (this.keepAliveTimer != null)
+ {
+ this.keepAliveTimer.Dispose();
+ }
+
+ foreach (var kvp in activePingPackets)
+ {
+ if (this.activePingPackets.TryRemove(kvp.Key, out var pkt))
+ {
+ pkt.Recycle();
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs
new file mode 100644
index 0000000..a7a4309
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs
@@ -0,0 +1,491 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using Impostor.Api.Net.Messages;
+
+namespace Impostor.Hazel.Udp
+{
+ partial class UdpConnection
+ {
+ /// <summary>
+ /// The starting timeout, in miliseconds, at which data will be resent.
+ /// </summary>
+ /// <remarks>
+ /// <para>
+ /// For reliable delivery data is resent at specified intervals unless an acknowledgement is received from the
+ /// receiving device. The ResendTimeout specifies the interval between the packets being resent, each time a packet
+ /// is resent the interval is increased for that packet until the duration exceeds the <see cref="DisconnectTimeout"/> value.
+ /// </para>
+ /// <para>
+ /// Setting this to its default of 0 will mean the timeout is 2 times the value of the average ping, usually
+ /// resulting in a more dynamic resend that responds to endpoints on slower or faster connections.
+ /// </para>
+ /// </remarks>
+ public volatile int ResendTimeout = 0;
+
+ /// <summary>
+ /// Max number of times to resend. 0 == no limit
+ /// </summary>
+ public volatile int ResendLimit = 0;
+
+ /// <summary>
+ /// A compounding multiplier to back off resend timeout.
+ /// Applied to ping before first timeout when ResendTimeout == 0.
+ /// </summary>
+ public volatile float ResendPingMultiplier = 2;
+
+ /// <summary>
+ /// Holds the last ID allocated.
+ /// </summary>
+ private int lastIDAllocated = 0;
+
+ /// <summary>
+ /// The packets of data that have been transmitted reliably and not acknowledged.
+ /// </summary>
+ internal ConcurrentDictionary<ushort, Packet> reliableDataPacketsSent = new ConcurrentDictionary<ushort, Packet>();
+
+ /// <summary>
+ /// Packet ids that have not been received, but are expected.
+ /// </summary>
+ private HashSet<ushort> reliableDataPacketsMissing = new HashSet<ushort>();
+
+ /// <summary>
+ /// The packet id that was received last.
+ /// </summary>
+ private volatile ushort reliableReceiveLast = ushort.MaxValue;
+
+ private object PingLock = new object();
+
+ /// <summary>
+ /// Returns the average ping to this endpoint.
+ /// </summary>
+ /// <remarks>
+ /// This returns the average ping for a one-way trip as calculated from the reliable packets that have been sent
+ /// and acknowledged by the endpoint.
+ /// </remarks>
+ public float AveragePingMs = 500;
+
+ /// <summary>
+ /// The maximum times a message should be resent before marking the endpoint as disconnected.
+ /// </summary>
+ /// <remarks>
+ /// Reliable packets will be resent at an interval defined in <see cref="ResendTimeout"/> for the number of times
+ /// specified here. Once a packet has been retransmitted this number of times and has not been acknowledged the
+ /// connection will be marked as disconnected and the <see cref="Connection.Disconnected">Disconnected</see> event
+ /// will be invoked.
+ /// </remarks>
+ public volatile int DisconnectTimeout = 5000;
+
+ /// <summary>
+ /// Class to hold packet data
+ /// </summary>
+ public class Packet : IRecyclable
+ {
+ /// <summary>
+ /// Object pool for this event.
+ /// </summary>
+ public static readonly ObjectPoolCustom<Packet> PacketPool = new ObjectPoolCustom<Packet>(() => new Packet());
+
+ /// <summary>
+ /// Returns an instance of this object from the pool.
+ /// </summary>
+ /// <returns></returns>
+ internal static Packet GetObject()
+ {
+ return PacketPool.GetObject();
+ }
+
+ public ushort Id;
+ private byte[] Data;
+ private UdpConnection Connection;
+ private int Length;
+
+ public int NextTimeout;
+ public volatile bool Acknowledged;
+
+ public Action AckCallback;
+
+ public int Retransmissions;
+ public Stopwatch Stopwatch = new Stopwatch();
+
+ Packet()
+ {
+ }
+
+ internal void Set(ushort id, UdpConnection connection, byte[] data, int length, int timeout, Action ackCallback)
+ {
+ this.Id = id;
+ this.Data = data;
+ this.Connection = connection;
+ this.Length = length;
+
+ this.Acknowledged = false;
+ this.NextTimeout = timeout;
+ this.AckCallback = ackCallback;
+ this.Retransmissions = 0;
+
+ this.Stopwatch.Restart();
+ }
+
+ // Packets resent
+ public async ValueTask<int> Resend()
+ {
+ var connection = this.Connection;
+ if (!this.Acknowledged && connection != null)
+ {
+ long lifetime = this.Stopwatch.ElapsedMilliseconds;
+ if (lifetime >= connection.DisconnectTimeout)
+ {
+ if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
+ {
+ await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetime}ms ({self.Retransmissions} resends)");
+
+ self.Recycle();
+ }
+
+ return 0;
+ }
+
+ if (lifetime >= this.NextTimeout)
+ {
+ ++this.Retransmissions;
+ if (connection.ResendLimit != 0
+ && this.Retransmissions > connection.ResendLimit)
+ {
+ if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
+ {
+ await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetime}ms)");
+
+ self.Recycle();
+ }
+
+ return 0;
+ }
+
+ this.NextTimeout += (int)Math.Min(this.NextTimeout * connection.ResendPingMultiplier, 1000);
+ try
+ {
+ await connection.WriteBytesToConnection(this.Data, this.Length);
+ connection.Statistics.LogMessageResent();
+ return 1;
+ }
+ catch (InvalidOperationException)
+ {
+ await connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected");
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ /// <summary>
+ /// Returns this object back to the object pool from whence it came.
+ /// </summary>
+ public void Recycle()
+ {
+ this.Acknowledged = true;
+ this.Connection = null;
+
+ PacketPool.PutObject(this);
+ }
+ }
+
+ internal async ValueTask<int> ManageReliablePackets()
+ {
+ int output = 0;
+ if (this.reliableDataPacketsSent.Count > 0)
+ {
+ foreach (var kvp in this.reliableDataPacketsSent)
+ {
+ Packet pkt = kvp.Value;
+
+ try
+ {
+ output += await pkt.Resend();
+ }
+ catch { }
+ }
+ }
+
+ return output;
+ }
+
+ /// <summary>
+ /// Adds a 2 byte ID to the packet at offset and stores the packet reference for retransmission.
+ /// </summary>
+ /// <param name="buffer">The buffer to attach to.</param>
+ /// <param name="offset">The offset to attach at.</param>
+ /// <param name="ackCallback">The callback to make once the packet has been acknowledged.</param>
+ protected void AttachReliableID(byte[] buffer, int offset, int sendLength, Action ackCallback = null)
+ {
+ ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated);
+
+ buffer[offset] = (byte)(id >> 8);
+ buffer[offset + 1] = (byte)id;
+
+ Packet packet = Packet.GetObject();
+ packet.Set(
+ id,
+ this,
+ buffer,
+ sendLength,
+ ResendTimeout > 0 ? ResendTimeout : (int)Math.Min(AveragePingMs * this.ResendPingMultiplier, 300),
+ ackCallback);
+
+ if (!reliableDataPacketsSent.TryAdd(id, packet))
+ {
+ throw new Exception("That shouldn't be possible");
+ }
+ }
+
+ public static int ClampToInt(float value, int min, int max)
+ {
+ if (value < min) return min;
+ if (value > max) return max;
+ return (int)value;
+ }
+
+ /// <summary>
+ /// Sends the bytes reliably and stores the send.
+ /// </summary>
+ /// <param name="sendOption"></param>
+ /// <param name="data">The byte array to write to.</param>
+ /// <param name="ackCallback">The callback to make once the packet has been acknowledged.</param>
+ private async ValueTask ReliableSend(byte sendOption, byte[] data, Action ackCallback = null)
+ {
+ //Inform keepalive not to send for a while
+ ResetKeepAliveTimer();
+
+ byte[] bytes = new byte[data.Length + 3];
+
+ //Add message type
+ bytes[0] = sendOption;
+
+ //Add reliable ID
+ AttachReliableID(bytes, 1, bytes.Length, ackCallback);
+
+ //Copy data into new array
+ Buffer.BlockCopy(data, 0, bytes, bytes.Length - data.Length, data.Length);
+
+ //Write to connection
+ await WriteBytesToConnection(bytes, bytes.Length);
+
+ Statistics.LogReliableSend(data.Length, bytes.Length);
+ }
+
+ /// <summary>
+ /// Handles a reliable message being received and invokes the data event.
+ /// </summary>
+ /// <param name="message">The buffer received.</param>
+ private async ValueTask ReliableMessageReceive(MessageReader message)
+ {
+ if (await ProcessReliableReceive(message.Buffer, 1))
+ {
+ message.Offset += 3;
+ message.Length -= 3;
+ message.Position = 0;
+
+ await InvokeDataReceived(message, MessageType.Reliable);
+ }
+
+ Statistics.LogReliableReceive(message.Length - 3, message.Length);
+ }
+
+ /// <summary>
+ /// Handles receives from reliable packets.
+ /// </summary>
+ /// <param name="bytes">The buffer containing the data.</param>
+ /// <param name="offset">The offset of the reliable header.</param>
+ /// <returns>Whether the packet was a new packet or not.</returns>
+ private async ValueTask<bool> ProcessReliableReceive(ReadOnlyMemory<byte> bytes, int offset)
+ {
+ var b1 = bytes.Span[offset];
+ var b2 = bytes.Span[offset + 1];
+
+ //Get the ID form the packet
+ var id = (ushort)((b1 << 8) + b2);
+
+ //Send an acknowledgement
+ await SendAck(id);
+
+ /*
+ * It gets a little complicated here (note the fact I'm actually using a multiline comment for once...)
+ *
+ * In a simple world if our data is greater than the last reliable packet received (reliableReceiveLast)
+ * then it is guaranteed to be a new packet, if it's not we can see if we are missing that packet (lookup
+ * in reliableDataPacketsMissing).
+ *
+ * --------rrl############# (1)
+ *
+ * (where --- are packets received already and #### are packets that will be counted as new)
+ *
+ * Unfortunately if id becomes greater than 65535 it will loop back to zero so we will add a pointer that
+ * specifies any packets with an id behind it are also new (overwritePointer).
+ *
+ * ####op----------rrl##### (2)
+ *
+ * ------rll#########op---- (3)
+ *
+ * Anything behind than the reliableReceiveLast pointer (but greater than the overwritePointer is either a
+ * missing packet or something we've already received so when we change the pointers we need to make sure
+ * we keep note of what hasn't been received yet (reliableDataPacketsMissing).
+ *
+ * So...
+ */
+
+ lock (reliableDataPacketsMissing)
+ {
+ //Calculate overwritePointer
+ ushort overwritePointer = (ushort)(reliableReceiveLast - 32768);
+
+ //Calculate if it is a new packet by examining if it is within the range
+ bool isNew;
+ if (overwritePointer < reliableReceiveLast)
+ isNew = id > reliableReceiveLast || id <= overwritePointer; //Figure (2)
+ else
+ isNew = id > reliableReceiveLast && id <= overwritePointer; //Figure (3)
+
+ //If it's new or we've not received anything yet
+ if (isNew)
+ {
+ // Mark items between the most recent receive and the id received as missing
+ if (id > reliableReceiveLast)
+ {
+ for (ushort i = (ushort)(reliableReceiveLast + 1); i < id; i++)
+ {
+ reliableDataPacketsMissing.Add(i);
+ }
+ }
+ else
+ {
+ int cnt = (ushort.MaxValue - reliableReceiveLast) + id;
+ for (ushort i = 1; i < cnt; ++i)
+ {
+ reliableDataPacketsMissing.Add((ushort)(i + reliableReceiveLast));
+ }
+ }
+
+ //Update the most recently received
+ reliableReceiveLast = id;
+ }
+
+ //Else it could be a missing packet
+ else
+ {
+ //See if we're missing it, else this packet is a duplicate as so we return false
+ if (!reliableDataPacketsMissing.Remove(id))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /// <summary>
+ /// Handles acknowledgement packets to us.
+ /// </summary>
+ /// <param name="bytes">The buffer containing the data.</param>
+ private void AcknowledgementMessageReceive(ReadOnlySpan<byte> bytes)
+ {
+ this.pingsSinceAck = 0;
+
+ ushort id = (ushort)((bytes[1] << 8) + bytes[2]);
+ AcknowledgeMessageId(id);
+
+ if (bytes.Length == 4)
+ {
+ byte recentPackets = bytes[3];
+ for (int i = 1; i <= 8; ++i)
+ {
+ if ((recentPackets & 1) != 0)
+ {
+ AcknowledgeMessageId((ushort)(id - i));
+ }
+
+ recentPackets >>= 1;
+ }
+ }
+
+ Statistics.LogReliableReceive(0, bytes.Length);
+ }
+
+ private void AcknowledgeMessageId(ushort id)
+ {
+ // Dispose of timer and remove from dictionary
+ if (reliableDataPacketsSent.TryRemove(id, out Packet packet))
+ {
+ float rt = packet.Stopwatch.ElapsedMilliseconds;
+
+ packet.AckCallback?.Invoke();
+ packet.Recycle();
+
+ lock (PingLock)
+ {
+ this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f);
+ }
+ }
+ else if (this.activePingPackets.TryRemove(id, out PingPacket pingPkt))
+ {
+ float rt = pingPkt.Stopwatch.ElapsedMilliseconds;
+
+ pingPkt.Recycle();
+
+ lock (PingLock)
+ {
+ this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Sends an acknowledgement for a packet given its identification bytes.
+ /// </summary>
+ /// <param name="byte1">The first identification byte.</param>
+ /// <param name="byte2">The second identification byte.</param>
+ private async ValueTask SendAck(ushort id)
+ {
+ byte recentPackets = 0;
+ lock (this.reliableDataPacketsMissing)
+ {
+ for (int i = 1; i <= 8; ++i)
+ {
+ if (!this.reliableDataPacketsMissing.Contains((ushort)(id - i)))
+ {
+ recentPackets |= (byte)(1 << (i - 1));
+ }
+ }
+ }
+
+ byte[] bytes = new byte[]
+ {
+ (byte)UdpSendOption.Acknowledgement,
+ (byte)(id >> 8),
+ (byte)(id >> 0),
+ recentPackets
+ };
+
+ try
+ {
+ await WriteBytesToConnection(bytes, bytes.Length);
+ }
+ catch (InvalidOperationException) { }
+ }
+
+ private void DisposeReliablePackets()
+ {
+ foreach (var kvp in reliableDataPacketsSent)
+ {
+ if (this.reliableDataPacketsSent.TryRemove(kvp.Key, out var pkt))
+ {
+ pkt.Recycle();
+ }
+ }
+ }
+ }
+}
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.cs
new file mode 100644
index 0000000..5288d3c
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.cs
@@ -0,0 +1,312 @@
+using System;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Impostor.Api.Net.Messages;
+using Microsoft.Extensions.ObjectPool;
+using Serilog;
+
+namespace Impostor.Hazel.Udp
+{
+ /// <summary>
+ /// Represents a connection that uses the UDP protocol.
+ /// </summary>
+ /// <inheritdoc />
+ public abstract partial class UdpConnection : NetworkConnection
+ {
+ protected static readonly byte[] EmptyDisconnectBytes = { (byte)UdpSendOption.Disconnect };
+
+ private static readonly ILogger Logger = Log.ForContext<UdpConnection>();
+ private readonly ConnectionListener _listener;
+ private readonly ObjectPool<MessageReader> _readerPool;
+ private readonly CancellationTokenSource _stoppingCts;
+
+ private bool _isDisposing;
+ private bool _isFirst = true;
+ private Task _executingTask;
+
+ protected UdpConnection(ConnectionListener listener, ObjectPool<MessageReader> readerPool)
+ {
+ _listener = listener;
+ _readerPool = readerPool;
+ _stoppingCts = new CancellationTokenSource();
+
+ Pipeline = Channel.CreateUnbounded<byte[]>(new UnboundedChannelOptions
+ {
+ SingleReader = true,
+ SingleWriter = true
+ });
+ }
+
+ internal Channel<byte[]> Pipeline { get; }
+
+ public Task StartAsync()
+ {
+ // Store the task we're executing
+ _executingTask = Task.Factory.StartNew(ReadAsync, TaskCreationOptions.LongRunning);
+
+ // If the task is completed then return it, this will bubble cancellation and failure to the caller
+ if (_executingTask.IsCompleted)
+ {
+ return _executingTask;
+ }
+
+ // Otherwise it's running
+ return Task.CompletedTask;
+ }
+
+ public void Stop()
+ {
+ // Stop called without start
+ if (_executingTask == null)
+ {
+ return;
+ }
+
+ // Signal cancellation to methods.
+ _stoppingCts.Cancel();
+
+ try
+ {
+ // Cancel reader.
+ Pipeline.Writer.Complete();
+ }
+ catch (ChannelClosedException)
+ {
+ // Already done.
+ }
+
+ // Remove references.
+ if (!_isDisposing)
+ {
+ Dispose(true);
+ }
+ }
+
+ private async Task ReadAsync()
+ {
+ var reader = new MessageReader(_readerPool);
+
+ while (!_stoppingCts.IsCancellationRequested)
+ {
+ var result = await Pipeline.Reader.ReadAsync(_stoppingCts.Token);
+
+ try
+ {
+ reader.Update(result);
+
+ await HandleReceive(reader);
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception during ReadAsync");
+ Dispose(true);
+ break;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Writes the given bytes to the connection.
+ /// </summary>
+ /// <param name="bytes">The bytes to write.</param>
+ /// <param name="length"></param>
+ protected abstract ValueTask WriteBytesToConnection(byte[] bytes, int length);
+
+ /// <inheritdoc/>
+ public override async ValueTask SendAsync(IMessageWriter msg)
+ {
+ if (this._state != ConnectionState.Connected)
+ throw new InvalidOperationException("Could not send data as this Connection is not connected. Did you disconnect?");
+
+ byte[] buffer = new byte[msg.Length];
+ Buffer.BlockCopy(msg.Buffer, 0, buffer, 0, msg.Length);
+
+ switch (msg.SendOption)
+ {
+ case MessageType.Reliable:
+ ResetKeepAliveTimer();
+
+ AttachReliableID(buffer, 1, buffer.Length);
+ await WriteBytesToConnection(buffer, buffer.Length);
+ Statistics.LogReliableSend(buffer.Length - 3, buffer.Length);
+ break;
+
+ default:
+ await WriteBytesToConnection(buffer, buffer.Length);
+ Statistics.LogUnreliableSend(buffer.Length - 1, buffer.Length);
+ break;
+ }
+ }
+
+ /// <inheritdoc/>
+ /// <remarks>
+ /// <include file="DocInclude/common.xml" path="docs/item[@name='Connection_SendBytes_General']/*" />
+ /// <para>
+ /// Udp connections can currently send messages using <see cref="SendOption.None"/> and
+ /// <see cref="SendOption.Reliable"/>. Fragmented messages are not currently supported and will default to
+ /// <see cref="SendOption.None"/> until implemented.
+ /// </para>
+ /// </remarks>
+ public override async ValueTask SendBytes(byte[] bytes, MessageType sendOption = MessageType.Unreliable)
+ {
+ //Add header information and send
+ await HandleSend(bytes, (byte)sendOption);
+ }
+
+ /// <summary>
+ /// Handles the reliable/fragmented sending from this connection.
+ /// </summary>
+ /// <param name="data">The data being sent.</param>
+ /// <param name="sendOption">The <see cref="SendOption"/> specified as its byte value.</param>
+ /// <param name="ackCallback">The callback to invoke when this packet is acknowledged.</param>
+ /// <returns>The bytes that should actually be sent.</returns>
+ protected async ValueTask HandleSend(byte[] data, byte sendOption, Action ackCallback = null)
+ {
+ switch (sendOption)
+ {
+ case (byte)UdpSendOption.Ping:
+ case (byte)MessageType.Reliable:
+ case (byte)UdpSendOption.Hello:
+ await ReliableSend(sendOption, data, ackCallback);
+ break;
+
+ //Treat all else as unreliable
+ default:
+ await UnreliableSend(sendOption, data);
+ break;
+ }
+ }
+
+ /// <summary>
+ /// Handles the receiving of data.
+ /// </summary>
+ /// <param name="message">The buffer containing the bytes received.</param>
+ protected async ValueTask HandleReceive(MessageReader message)
+ {
+ // Check if the first message received is the hello packet.
+ if (_isFirst)
+ {
+ _isFirst = false;
+
+ // Slice 4 bytes to get handshake data.
+ if (_listener != null)
+ {
+ using (var handshake = message.Copy(4))
+ {
+ await _listener.InvokeNewConnection(handshake, this);
+ }
+ }
+ }
+
+ switch (message.Buffer[0])
+ {
+ //Handle reliable receives
+ case (byte)MessageType.Reliable:
+ await ReliableMessageReceive(message);
+ break;
+
+ //Handle acknowledgments
+ case (byte)UdpSendOption.Acknowledgement:
+ AcknowledgementMessageReceive(message.Buffer);
+ break;
+
+ //We need to acknowledge hello and ping messages but dont want to invoke any events!
+ case (byte)UdpSendOption.Ping:
+ await ProcessReliableReceive(message.Buffer, 1);
+ Statistics.LogHelloReceive(message.Length);
+ break;
+ case (byte)UdpSendOption.Hello:
+ await ProcessReliableReceive(message.Buffer, 1);
+ Statistics.LogHelloReceive(message.Length);
+ break;
+
+ case (byte)UdpSendOption.Disconnect:
+ using (var reader = message.Copy(1))
+ {
+ await DisconnectRemote("The remote sent a disconnect request", reader);
+ }
+ break;
+
+ //Treat everything else as unreliable
+ default:
+ using (var reader = message.Copy(1))
+ {
+ await InvokeDataReceived(reader, MessageType.Unreliable);
+ }
+ Statistics.LogUnreliableReceive(message.Length - 1, message.Length);
+ break;
+ }
+ }
+
+ /// <summary>
+ /// Sends bytes using the unreliable UDP protocol.
+ /// </summary>
+ /// <param name="sendOption">The SendOption to attach.</param>
+ /// <param name="data">The data.</param>
+ ValueTask UnreliableSend(byte sendOption, byte[] data)
+ {
+ return UnreliableSend(sendOption, data, 0, data.Length);
+ }
+
+ /// <summary>
+ /// Sends bytes using the unreliable UDP protocol.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ /// <param name="sendOption">The SendOption to attach.</param>
+ /// <param name="offset"></param>
+ /// <param name="length"></param>
+ async ValueTask UnreliableSend(byte sendOption, byte[] data, int offset, int length)
+ {
+ byte[] bytes = new byte[length + 1];
+
+ //Add message type
+ bytes[0] = sendOption;
+
+ //Copy data into new array
+ Buffer.BlockCopy(data, offset, bytes, bytes.Length - length, length);
+
+ //Write to connection
+ await WriteBytesToConnection(bytes, bytes.Length);
+
+ Statistics.LogUnreliableSend(length, bytes.Length);
+ }
+
+ /// <summary>
+ /// Sends a hello packet to the remote endpoint.
+ /// </summary>
+ /// <param name="bytes"></param>
+ /// <param name="acknowledgeCallback">The callback to invoke when the hello packet is acknowledged.</param>
+ protected ValueTask SendHello(byte[] bytes, Action acknowledgeCallback)
+ {
+ //First byte of handshake is version indicator so add data after
+ byte[] actualBytes;
+ if (bytes == null)
+ {
+ actualBytes = new byte[1];
+ }
+ else
+ {
+ actualBytes = new byte[bytes.Length + 1];
+ Buffer.BlockCopy(bytes, 0, actualBytes, 1, bytes.Length);
+ }
+
+ return HandleSend(actualBytes, (byte)UdpSendOption.Hello, acknowledgeCallback);
+ }
+
+ /// <inheritdoc/>
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _isDisposing = true;
+
+ Stop();
+ DisposeKeepAliveTimer();
+ DisposeReliablePackets();
+ }
+
+ base.Dispose(disposing);
+ }
+ }
+}
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionListener.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionListener.cs
new file mode 100644
index 0000000..573a00c
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionListener.cs
@@ -0,0 +1,281 @@
+using System;
+using System.Buffers;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Microsoft.Extensions.ObjectPool;
+using Serilog;
+
+namespace Impostor.Hazel.Udp
+{
+ /// <summary>
+ /// Listens for new UDP connections and creates UdpConnections for them.
+ /// </summary>
+ /// <inheritdoc />
+ public class UdpConnectionListener : NetworkConnectionListener
+ {
+ private static readonly ILogger Logger = Log.ForContext<UdpConnectionListener>();
+
+ /// <summary>
+ /// A callback for early connection rejection.
+ /// * Return false to reject connection.
+ /// * A null response is ok, we just won't send anything.
+ /// </summary>
+ public AcceptConnectionCheck AcceptConnection;
+ public delegate bool AcceptConnectionCheck(IPEndPoint endPoint, byte[] input, out byte[] response);
+
+ private readonly UdpClient _socket;
+ private readonly ObjectPool<MessageReader> _readerPool;
+ private readonly MemoryPool<byte> _pool;
+ private readonly Timer _reliablePacketTimer;
+ private readonly ConcurrentDictionary<EndPoint, UdpServerConnection> _allConnections;
+ private readonly CancellationTokenSource _stoppingCts;
+ private readonly UdpConnectionRateLimit _connectionRateLimit;
+ private Task _executingTask;
+
+ /// <summary>
+ /// Creates a new UdpConnectionListener for the given <see cref="IPAddress"/>, port and <see cref="IPMode"/>.
+ /// </summary>
+ /// <param name="endPoint">The endpoint to listen on.</param>
+ /// <param name="ipMode"></param>
+ public UdpConnectionListener(IPEndPoint endPoint, ObjectPool<MessageReader> readerPool, IPMode ipMode = IPMode.IPv4)
+ {
+ EndPoint = endPoint;
+ IPMode = ipMode;
+
+ _readerPool = readerPool;
+ _pool = MemoryPool<byte>.Shared;
+ _socket = new UdpClient(endPoint);
+
+ try
+ {
+ _socket.DontFragment = false;
+ }
+ catch (SocketException)
+ {
+ }
+
+ _reliablePacketTimer = new Timer(ManageReliablePackets, null, 100, Timeout.Infinite);
+
+ _allConnections = new ConcurrentDictionary<EndPoint, UdpServerConnection>();
+
+ _stoppingCts = new CancellationTokenSource();
+ _stoppingCts.Token.Register(() =>
+ {
+ _socket.Dispose();
+ });
+
+ _connectionRateLimit = new UdpConnectionRateLimit();
+ }
+
+ public int ConnectionCount => this._allConnections.Count;
+
+ private async void ManageReliablePackets(object state)
+ {
+ foreach (var kvp in _allConnections)
+ {
+ var sock = kvp.Value;
+ await sock.ManageReliablePackets();
+ }
+
+ try
+ {
+ this._reliablePacketTimer.Change(100, Timeout.Infinite);
+ }
+ catch { }
+ }
+
+ /// <inheritdoc />
+ public override Task StartAsync()
+ {
+ // Store the task we're executing
+ _executingTask = Task.Factory.StartNew(ListenAsync, TaskCreationOptions.LongRunning);
+
+ // If the task is completed then return it, this will bubble cancellation and failure to the caller
+ if (_executingTask.IsCompleted)
+ {
+ return _executingTask;
+ }
+
+ // Otherwise it's running
+ return Task.CompletedTask;
+ }
+
+ private async Task StopAsync()
+ {
+ // Stop called without start
+ if (_executingTask == null)
+ {
+ return;
+ }
+
+ try
+ {
+ // Signal cancellation to the executing method
+ _stoppingCts.Cancel();
+ }
+ finally
+ {
+ // Wait until the task completes or the timeout triggers
+ await Task.WhenAny(_executingTask, Task.Delay(TimeSpan.FromSeconds(5)));
+ }
+ }
+
+ /// <summary>
+ /// Instructs the listener to begin listening.
+ /// </summary>
+ private async Task ListenAsync()
+ {
+ try
+ {
+ while (!_stoppingCts.IsCancellationRequested)
+ {
+ UdpReceiveResult data;
+
+ try
+ {
+ data = await _socket.ReceiveAsync();
+
+ if (data.Buffer.Length == 0)
+ {
+ Logger.Fatal("Hazel read 0 bytes from UDP server socket.");
+ continue;
+ }
+ }
+ catch (SocketException)
+ {
+ // Client no longer reachable, pretend it didn't happen
+ continue;
+ }
+ catch (ObjectDisposedException)
+ {
+ // Socket was disposed, don't care.
+ return;
+ }
+
+ // Get client from active clients
+ if (!_allConnections.TryGetValue(data.RemoteEndPoint, out var client))
+ {
+ // Check for malformed connection attempts
+ if (data.Buffer[0] != (byte)UdpSendOption.Hello)
+ {
+ continue;
+ }
+
+ // Check rateLimit.
+ if (!_connectionRateLimit.IsAllowed(data.RemoteEndPoint.Address))
+ {
+ Logger.Warning("Ratelimited connection attempt from {0}.", data.RemoteEndPoint);
+ continue;
+ }
+
+ // Create new client
+ client = new UdpServerConnection(this, data.RemoteEndPoint, IPMode, _readerPool);
+
+ // Store the client
+ if (!_allConnections.TryAdd(data.RemoteEndPoint, client))
+ {
+ throw new HazelException("Failed to add a connection. This should never happen.");
+ }
+
+ // Activate the reader loop of the client
+ await client.StartAsync();
+ }
+
+ // Write to client.
+ await client.Pipeline.Writer.WriteAsync(data.Buffer);
+ }
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Listen loop error");
+ }
+ }
+
+#if DEBUG
+ public int TestDropRate = -1;
+ private int dropCounter = 0;
+#endif
+
+ /// <summary>
+ /// Sends data from the listener socket.
+ /// </summary>
+ /// <param name="bytes">The bytes to send.</param>
+ /// <param name="endPoint">The endpoint to send to.</param>
+ internal async ValueTask SendData(byte[] bytes, int length, IPEndPoint endPoint)
+ {
+ if (length > bytes.Length) return;
+
+#if DEBUG
+ if (TestDropRate > 0)
+ {
+ if (Interlocked.Increment(ref dropCounter) % TestDropRate == 0)
+ {
+ return;
+ }
+ }
+#endif
+
+ try
+ {
+ await _socket.SendAsync(bytes, length, endPoint);
+ }
+ catch (SocketException e)
+ {
+ Logger.Error(e, "Could not send data as a SocketException occurred");
+ }
+ catch (ObjectDisposedException)
+ {
+ //Keep alive timer probably ran, ignore
+ return;
+ }
+ }
+
+ /// <summary>
+ /// Sends data from the listener socket.
+ /// </summary>
+ /// <param name="bytes">The bytes to send.</param>
+ /// <param name="length"></param>
+ /// <param name="endPoint">The endpoint to send to.</param>
+ internal void SendDataSync(byte[] bytes, int length, IPEndPoint endPoint)
+ {
+ try
+ {
+ _socket.Send(bytes, length, endPoint);
+ }
+ catch (SocketException e)
+ {
+ Logger.Error(e, "Could not send data sync as a SocketException occurred");
+ }
+ }
+
+ /// <summary>
+ /// Removes a virtual connection from the list.
+ /// </summary>
+ /// <param name="endPoint">The endpoint of the virtual connection.</param>
+ internal void RemoveConnectionTo(EndPoint endPoint)
+ {
+ this._allConnections.TryRemove(endPoint, out var conn);
+ }
+
+ /// <inheritdoc />
+ public override async ValueTask DisposeAsync()
+ {
+ foreach (var kvp in _allConnections)
+ {
+ kvp.Value.Dispose();
+ }
+
+ await StopAsync();
+
+ await _reliablePacketTimer.DisposeAsync();
+
+ _connectionRateLimit.Dispose();
+
+ await base.DisposeAsync();
+ }
+ }
+}
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionRateLimit.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionRateLimit.cs
new file mode 100644
index 0000000..64881d3
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionRateLimit.cs
@@ -0,0 +1,75 @@
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Threading;
+using Serilog;
+
+namespace Impostor.Hazel.Udp
+{
+ public class UdpConnectionRateLimit : IDisposable
+ {
+ private static readonly ILogger Logger = Log.ForContext<UdpConnectionRateLimit>();
+
+ // Allow burst to 5 connections.
+ // Decrease by 1 every second.
+ private const int MaxConnections = 5;
+ private const int FalloffMs = 1000;
+
+ private readonly ConcurrentDictionary<IPAddress, int> _connectionCount;
+ private readonly Timer _timer;
+ private bool _isDisposed;
+
+ public UdpConnectionRateLimit()
+ {
+ _connectionCount = new ConcurrentDictionary<IPAddress, int>();
+ _timer = new Timer(UpdateRateLimit, null, FalloffMs, Timeout.Infinite);
+ }
+
+ private void UpdateRateLimit(object state)
+ {
+ try
+ {
+ foreach (var pair in _connectionCount)
+ {
+ var count = pair.Value - 1;
+ if (count > 0)
+ {
+ _connectionCount.TryUpdate(pair.Key, count, pair.Value);
+ }
+ else
+ {
+ _connectionCount.TryRemove(pair);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception caught in UpdateRateLimit.");
+ }
+ finally
+ {
+ if (!_isDisposed)
+ {
+ _timer.Change(FalloffMs, Timeout.Infinite);
+ }
+ }
+ }
+
+ public bool IsAllowed(IPAddress key)
+ {
+ if (_connectionCount.TryGetValue(key, out var value) && value >= MaxConnections)
+ {
+ return false;
+ }
+
+ _connectionCount.AddOrUpdate(key, _ => 1, (_, i) => i + 1);
+ return true;
+ }
+
+ public void Dispose()
+ {
+ _isDisposed = true;
+ _timer.Dispose();
+ }
+ }
+} \ No newline at end of file
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpServerConnection.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpServerConnection.cs
new file mode 100644
index 0000000..22eed98
--- /dev/null
+++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpServerConnection.cs
@@ -0,0 +1,97 @@
+using System;
+using System.Net;
+using System.Threading.Tasks;
+using Impostor.Api.Net.Messages;
+using Microsoft.Extensions.ObjectPool;
+
+namespace Impostor.Hazel.Udp
+{
+ /// <summary>
+ /// Represents a servers's connection to a client that uses the UDP protocol.
+ /// </summary>
+ /// <inheritdoc/>
+ internal sealed class UdpServerConnection : UdpConnection
+ {
+ /// <summary>
+ /// The connection listener that we use the socket of.
+ /// </summary>
+ /// <remarks>
+ /// Udp server connections utilize the same socket in the listener for sends/receives, this is the listener that
+ /// created this connection and is hence the listener this conenction sends and receives via.
+ /// </remarks>
+ public UdpConnectionListener Listener { get; private set; }
+
+ /// <summary>
+ /// Creates a UdpConnection for the virtual connection to the endpoint.
+ /// </summary>
+ /// <param name="listener">The listener that created this connection.</param>
+ /// <param name="endPoint">The endpoint that we are connected to.</param>
+ /// <param name="IPMode">The IPMode we are connected using.</param>
+ internal UdpServerConnection(UdpConnectionListener listener, IPEndPoint endPoint, IPMode IPMode, ObjectPool<MessageReader> readerPool) : base(listener, readerPool)
+ {
+ this.Listener = listener;
+ this.RemoteEndPoint = endPoint;
+ this.EndPoint = endPoint;
+ this.IPMode = IPMode;
+
+ State = ConnectionState.Connected;
+ this.InitializeKeepAliveTimer();
+ }
+
+ /// <inheritdoc />
+ protected override async ValueTask WriteBytesToConnection(byte[] bytes, int length)
+ {
+ await Listener.SendData(bytes, length, RemoteEndPoint);
+ }
+
+ /// <inheritdoc />
+ /// <remarks>
+ /// This will always throw a HazelException.
+ /// </remarks>
+ public override ValueTask ConnectAsync(byte[] bytes = null)
+ {
+ throw new InvalidOperationException("Cannot manually connect a UdpServerConnection, did you mean to use UdpClientConnection?");
+ }
+
+ /// <summary>
+ /// Sends a disconnect message to the end point.
+ /// </summary>
+ protected override async ValueTask<bool> SendDisconnect(MessageWriter data = null)
+ {
+ lock (this)
+ {
+ if (this._state != ConnectionState.Connected) return false;
+ this._state = ConnectionState.NotConnected;
+ }
+
+ var bytes = EmptyDisconnectBytes;
+ if (data != null && data.Length > 0)
+ {
+ if (data.SendOption != MessageType.Unreliable) throw new ArgumentException("Disconnect messages can only be unreliable.");
+
+ bytes = data.ToByteArray(true);
+ bytes[0] = (byte)UdpSendOption.Disconnect;
+ }
+
+ try
+ {
+ await Listener.SendData(bytes, bytes.Length, RemoteEndPoint);
+ }
+ catch { }
+
+ return true;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ Listener.RemoveConnectionTo(RemoteEndPoint);
+
+ if (disposing)
+ {
+ SendDisconnect();
+ }
+
+ base.Dispose(disposing);
+ }
+ }
+}