diff options
Diffstat (limited to 'Impostor-dev/src/Impostor.Hazel/Udp')
10 files changed, 1916 insertions, 0 deletions
diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/SendOptionInternal.cs b/Impostor-dev/src/Impostor.Hazel/Udp/SendOptionInternal.cs new file mode 100644 index 0000000..c0c4e21 --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/SendOptionInternal.cs @@ -0,0 +1,33 @@ +namespace Impostor.Hazel.Udp +{ + /// <summary> + /// Extra internal states for SendOption enumeration when using UDP. + /// </summary> + public enum UdpSendOption : byte + { + /// <summary> + /// Hello message for initiating communication. + /// </summary> + Hello = 8, + + /// <summary> + /// A single byte of continued existence + /// </summary> + Ping = 12, + + /// <summary> + /// Message for discontinuing communication. + /// </summary> + Disconnect = 9, + + /// <summary> + /// Message acknowledging the receipt of a message. + /// </summary> + Acknowledgement = 10, + + /// <summary> + /// Message that is part of a larger, fragmented message. + /// </summary> + Fragment = 11, + } +} diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcastListener.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcastListener.cs new file mode 100644 index 0000000..ed7b68d --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcastListener.cs @@ -0,0 +1,156 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace Impostor.Hazel.Udp +{ + public class BroadcastPacket + { + public string Data; + public DateTime ReceiveTime; + public IPEndPoint Sender; + + public BroadcastPacket(string data, IPEndPoint sender) + { + this.Data = data; + this.Sender = sender; + this.ReceiveTime = DateTime.Now; + } + + public string GetAddress() + { + return this.Sender.Address.ToString(); + } + } + + public class UdpBroadcastListener : IDisposable + { + private Socket socket; + private EndPoint endpoint; + private Action<string> logger; + + private byte[] buffer = new byte[1024]; + + private List<BroadcastPacket> packets = new List<BroadcastPacket>(); + + public bool Running { get; private set; } + + /// + public UdpBroadcastListener(int port, Action<string> logger = null) + { + this.logger = logger; + this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + this.socket.EnableBroadcast = true; + this.socket.MulticastLoopback = false; + this.endpoint = new IPEndPoint(IPAddress.Any, port); + this.socket.Bind(this.endpoint); + } + + /// + public void StartListen() + { + if (this.Running) return; + this.Running = true; + + try + { + EndPoint endpt = new IPEndPoint(IPAddress.Any, 0); + this.socket.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref endpt, this.HandleData, null); + } + catch (NullReferenceException) { } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + this.Dispose(); + } + } + + private void HandleData(IAsyncResult result) + { + this.Running = false; + + int numBytes; + EndPoint endpt = new IPEndPoint(IPAddress.Any, 0); + try + { + numBytes = this.socket.EndReceiveFrom(result, ref endpt); + } + catch (NullReferenceException) + { + // Already disposed + return; + } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + this.Dispose(); + return; + } + + if (numBytes < 3 + || buffer[0] != 4 || buffer[1] != 2) + { + this.StartListen(); + return; + } + + IPEndPoint ipEnd = (IPEndPoint)endpt; + string data = UTF8Encoding.UTF8.GetString(buffer, 2, numBytes - 2); + int dataHash = data.GetHashCode(); + + lock (packets) + { + bool found = false; + for (int i = 0; i < this.packets.Count; ++i) + { + var pkt = this.packets[i]; + if (pkt == null || pkt.Data == null) + { + this.packets.RemoveAt(i); + i--; + continue; + } + + if (pkt.Data.GetHashCode() == dataHash + && pkt.Sender.Equals(ipEnd)) + { + this.packets[i].ReceiveTime = DateTime.Now; + break; + } + } + + if (!found) + { + this.packets.Add(new BroadcastPacket(data, ipEnd)); + } + } + + this.StartListen(); + } + + /// + public BroadcastPacket[] GetPackets() + { + lock (this.packets) + { + var output = this.packets.ToArray(); + this.packets.Clear(); + return output; + } + } + + /// + public void Dispose() + { + if (this.socket != null) + { + try { this.socket.Shutdown(SocketShutdown.Both); } catch { } + try { this.socket.Close(); } catch { } + try { this.socket.Dispose(); } catch { } + this.socket = null; + } + } + } +}
\ No newline at end of file diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcaster.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcaster.cs new file mode 100644 index 0000000..5fa1cca --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpBroadcaster.cs @@ -0,0 +1,79 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace Impostor.Hazel.Udp +{ + /// + public class UdpBroadcaster : IDisposable + { + private Socket socket; + private byte[] data; + private EndPoint endpoint; + private Action<string> logger; + + /// + public UdpBroadcaster(int port, Action<string> logger = null) + { + this.logger = logger; + this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + this.socket.EnableBroadcast = true; + this.socket.MulticastLoopback = false; + this.endpoint = new IPEndPoint(IPAddress.Broadcast, port); + } + + /// + public void SetData(string data) + { + int len = UTF8Encoding.UTF8.GetByteCount(data); + this.data = new byte[len + 2]; + this.data[0] = 4; + this.data[1] = 2; + + UTF8Encoding.UTF8.GetBytes(data, 0, data.Length, this.data, 2); + } + + /// + public void Broadcast() + { + if (this.data == null) + { + return; + } + + try + { + this.socket.BeginSendTo(data, 0, data.Length, SocketFlags.None, this.endpoint, this.FinishSendTo, null); + } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + } + } + + private void FinishSendTo(IAsyncResult evt) + { + try + { + this.socket.EndSendTo(evt); + } + catch (Exception e) + { + this.logger?.Invoke("BroadcastListener: " + e); + } + } + + /// + public void Dispose() + { + if (this.socket != null) + { + try { this.socket.Shutdown(SocketShutdown.Both); } catch { } + try { this.socket.Close(); } catch { } + try { this.socket.Dispose(); } catch { } + this.socket = null; + } + } + } +}
\ No newline at end of file diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpClientConnection.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpClientConnection.cs new file mode 100644 index 0000000..5125ebe --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpClientConnection.cs @@ -0,0 +1,225 @@ +using System; +using System.Buffers; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Impostor.Api.Net.Messages; +using Microsoft.Extensions.ObjectPool; +using Serilog; + +namespace Impostor.Hazel.Udp +{ + /// <summary> + /// Represents a client's connection to a server that uses the UDP protocol. + /// </summary> + /// <inheritdoc/> + public sealed class UdpClientConnection : UdpConnection + { + private static readonly ILogger Logger = Log.ForContext<UdpClientConnection>(); + + /// <summary> + /// The socket we're connected via. + /// </summary> + private readonly UdpClient _socket; + + private readonly Timer _reliablePacketTimer; + private readonly SemaphoreSlim _connectWaitLock; + private Task _listenTask; + + /// <summary> + /// Creates a new UdpClientConnection. + /// </summary> + /// <param name="remoteEndPoint">A <see cref="NetworkEndPoint"/> to connect to.</param> + public UdpClientConnection(IPEndPoint remoteEndPoint, ObjectPool<MessageReader> readerPool, IPMode ipMode = IPMode.IPv4) : base(null, readerPool) + { + EndPoint = remoteEndPoint; + RemoteEndPoint = remoteEndPoint; + IPMode = ipMode; + + _socket = new UdpClient + { + DontFragment = false + }; + + _reliablePacketTimer = new Timer(ManageReliablePacketsInternal, null, 100, Timeout.Infinite); + _connectWaitLock = new SemaphoreSlim(1, 1); + } + + ~UdpClientConnection() + { + Dispose(false); + } + + private async void ManageReliablePacketsInternal(object state) + { + await ManageReliablePackets(); + + try + { + _reliablePacketTimer.Change(100, Timeout.Infinite); + } + catch + { + // ignored + } + } + + /// <inheritdoc /> + protected override ValueTask WriteBytesToConnection(byte[] bytes, int length) + { + return WriteBytesToConnectionReal(bytes, length); + } + + private async ValueTask WriteBytesToConnectionReal(byte[] bytes, int length) + { + try + { + await _socket.SendAsync(bytes, length); + } + catch (NullReferenceException) { } + catch (ObjectDisposedException) + { + // Already disposed and disconnected... + } + catch (SocketException ex) + { + await DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); + } + } + + /// <inheritdoc /> + public override async ValueTask ConnectAsync(byte[] bytes = null) + { + State = ConnectionState.Connecting; + + try + { + _socket.Connect(RemoteEndPoint); + } + catch (SocketException e) + { + State = ConnectionState.NotConnected; + throw new HazelException("A SocketException occurred while binding to the port.", e); + } + + try + { + _listenTask = Task.Factory.StartNew(ListenAsync, TaskCreationOptions.LongRunning); + } + catch (ObjectDisposedException) + { + // If the socket's been disposed then we can just end there but make sure we're in NotConnected state. + // If we end up here I'm really lost... + State = ConnectionState.NotConnected; + return; + } + catch (SocketException e) + { + Dispose(); + throw new HazelException("A SocketException occurred while initiating a receive operation.", e); + } + + // Write bytes to the server to tell it hi (and to punch a hole in our NAT, if present) + // When acknowledged set the state to connected + await SendHello(bytes, () => + { + State = ConnectionState.Connected; + InitializeKeepAliveTimer(); + }); + + await _connectWaitLock.WaitAsync(TimeSpan.FromSeconds(10)); + } + + private async Task ListenAsync() + { + // Start packet handler. + await StartAsync(); + + // Listen. + while (State != ConnectionState.NotConnected) + { + UdpReceiveResult data; + + try + { + data = await _socket.ReceiveAsync(); + } + catch (SocketException e) + { + await DisconnectInternal(HazelInternalErrors.SocketExceptionReceive, "Socket exception while reading data: " + e.Message); + return; + } + catch (Exception) + { + return; + } + + if (data.Buffer.Length == 0) + { + await DisconnectInternal(HazelInternalErrors.ReceivedZeroBytes, "Received 0 bytes"); + return; + } + + // Write to client. + await Pipeline.Writer.WriteAsync(data.Buffer); + } + } + + protected override void SetState(ConnectionState state) + { + if (state == ConnectionState.Connected) + { + _connectWaitLock.Release(); + } + } + + /// <summary> + /// Sends a disconnect message to the end point. + /// You may include optional disconnect data. The SendOption must be unreliable. + /// </summary> + protected override async ValueTask<bool> SendDisconnect(MessageWriter data = null) + { + lock (this) + { + if (_state == ConnectionState.NotConnected) return false; + _state = ConnectionState.NotConnected; + } + + var bytes = EmptyDisconnectBytes; + if (data != null && data.Length > 0) + { + if (data.SendOption != MessageType.Unreliable) + { + throw new ArgumentException("Disconnect messages can only be unreliable."); + } + + bytes = data.ToByteArray(true); + bytes[0] = (byte)UdpSendOption.Disconnect; + } + + try + { + await _socket.SendAsync(bytes, bytes.Length, RemoteEndPoint); + } + catch { } + + return true; + } + + /// <inheritdoc /> + protected override void Dispose(bool disposing) + { + State = ConnectionState.NotConnected; + + try { _socket.Close(); } catch { } + try { _socket.Dispose(); } catch { } + + _reliablePacketTimer.Dispose(); + _connectWaitLock.Dispose(); + + base.Dispose(disposing); + } + } +} diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.KeepAlive.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.KeepAlive.cs new file mode 100644 index 0000000..a73291b --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.KeepAlive.cs @@ -0,0 +1,167 @@ +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace Impostor.Hazel.Udp +{ + partial class UdpConnection + { + + /// <summary> + /// Class to hold packet data + /// </summary> + public class PingPacket : IRecyclable + { + private static readonly ObjectPoolCustom<PingPacket> PacketPool = new ObjectPoolCustom<PingPacket>(() => new PingPacket()); + + public readonly Stopwatch Stopwatch = new Stopwatch(); + + internal static PingPacket GetObject() + { + return PacketPool.GetObject(); + } + + public void Recycle() + { + Stopwatch.Stop(); + PacketPool.PutObject(this); + } + } + + internal ConcurrentDictionary<ushort, PingPacket> activePingPackets = new ConcurrentDictionary<ushort, PingPacket>(); + + /// <summary> + /// The interval from data being received or transmitted to a keepalive packet being sent in milliseconds. + /// </summary> + /// <remarks> + /// <para> + /// Keepalive packets serve to close connections when an endpoint abruptly disconnects and to ensure than any + /// NAT devices do not close their translation for our argument. By ensuring there is regular contact the + /// connection can detect and prevent these issues. + /// </para> + /// <para> + /// The default value is 10 seconds, set to System.Threading.Timeout.Infinite to disable keepalive packets. + /// </para> + /// </remarks> + public int KeepAliveInterval + { + get + { + return keepAliveInterval; + } + + set + { + keepAliveInterval = value; + ResetKeepAliveTimer(); + } + } + private int keepAliveInterval = 1500; + + public int MissingPingsUntilDisconnect { get; set; } = 6; + private volatile int pingsSinceAck = 0; + + /// <summary> + /// The timer creating keepalive pulses. + /// </summary> + private Timer keepAliveTimer; + + /// <summary> + /// Starts the keepalive timer. + /// </summary> + protected void InitializeKeepAliveTimer() + { + keepAliveTimer = new Timer( + HandleKeepAlive, + null, + keepAliveInterval, + keepAliveInterval + ); + } + + private async void HandleKeepAlive(object state) + { + if (this.State != ConnectionState.Connected) return; + + if (this.pingsSinceAck >= this.MissingPingsUntilDisconnect) + { + this.DisposeKeepAliveTimer(); + await this.DisconnectInternal(HazelInternalErrors.PingsWithoutResponse, $"Sent {this.pingsSinceAck} pings that remote has not responded to."); + return; + } + + try + { + Interlocked.Increment(ref pingsSinceAck); + await SendPing(); + } + catch + { + } + } + + // Pings are special, quasi-reliable packets. + // We send them to trigger responses that validate our connection is alive + // An unacked ping should never be the sole cause of a disconnect. + // Rather, the responses will reset our pingsSinceAck, enough unacked + // pings should cause a disconnect. + private async ValueTask SendPing() + { + ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); + + byte[] bytes = new byte[3]; + bytes[0] = (byte)UdpSendOption.Ping; + bytes[1] = (byte)(id >> 8); + bytes[2] = (byte)id; + + PingPacket pkt; + if (!this.activePingPackets.TryGetValue(id, out pkt)) + { + pkt = PingPacket.GetObject(); + if (!this.activePingPackets.TryAdd(id, pkt)) + { + throw new Exception("This shouldn't be possible"); + } + } + + pkt.Stopwatch.Restart(); + + await WriteBytesToConnection(bytes, bytes.Length); + + Statistics.LogReliableSend(0, bytes.Length); + } + + /// <summary> + /// Resets the keepalive timer to zero. + /// </summary> + private void ResetKeepAliveTimer() + { + try + { + keepAliveTimer.Change(keepAliveInterval, keepAliveInterval); + } + catch { } + } + + /// <summary> + /// Disposes of the keep alive timer. + /// </summary> + private void DisposeKeepAliveTimer() + { + if (this.keepAliveTimer != null) + { + this.keepAliveTimer.Dispose(); + } + + foreach (var kvp in activePingPackets) + { + if (this.activePingPackets.TryRemove(kvp.Key, out var pkt)) + { + pkt.Recycle(); + } + } + } + } +}
\ No newline at end of file diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs new file mode 100644 index 0000000..a7a4309 --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs @@ -0,0 +1,491 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Impostor.Api.Net.Messages; + +namespace Impostor.Hazel.Udp +{ + partial class UdpConnection + { + /// <summary> + /// The starting timeout, in miliseconds, at which data will be resent. + /// </summary> + /// <remarks> + /// <para> + /// For reliable delivery data is resent at specified intervals unless an acknowledgement is received from the + /// receiving device. The ResendTimeout specifies the interval between the packets being resent, each time a packet + /// is resent the interval is increased for that packet until the duration exceeds the <see cref="DisconnectTimeout"/> value. + /// </para> + /// <para> + /// Setting this to its default of 0 will mean the timeout is 2 times the value of the average ping, usually + /// resulting in a more dynamic resend that responds to endpoints on slower or faster connections. + /// </para> + /// </remarks> + public volatile int ResendTimeout = 0; + + /// <summary> + /// Max number of times to resend. 0 == no limit + /// </summary> + public volatile int ResendLimit = 0; + + /// <summary> + /// A compounding multiplier to back off resend timeout. + /// Applied to ping before first timeout when ResendTimeout == 0. + /// </summary> + public volatile float ResendPingMultiplier = 2; + + /// <summary> + /// Holds the last ID allocated. + /// </summary> + private int lastIDAllocated = 0; + + /// <summary> + /// The packets of data that have been transmitted reliably and not acknowledged. + /// </summary> + internal ConcurrentDictionary<ushort, Packet> reliableDataPacketsSent = new ConcurrentDictionary<ushort, Packet>(); + + /// <summary> + /// Packet ids that have not been received, but are expected. + /// </summary> + private HashSet<ushort> reliableDataPacketsMissing = new HashSet<ushort>(); + + /// <summary> + /// The packet id that was received last. + /// </summary> + private volatile ushort reliableReceiveLast = ushort.MaxValue; + + private object PingLock = new object(); + + /// <summary> + /// Returns the average ping to this endpoint. + /// </summary> + /// <remarks> + /// This returns the average ping for a one-way trip as calculated from the reliable packets that have been sent + /// and acknowledged by the endpoint. + /// </remarks> + public float AveragePingMs = 500; + + /// <summary> + /// The maximum times a message should be resent before marking the endpoint as disconnected. + /// </summary> + /// <remarks> + /// Reliable packets will be resent at an interval defined in <see cref="ResendTimeout"/> for the number of times + /// specified here. Once a packet has been retransmitted this number of times and has not been acknowledged the + /// connection will be marked as disconnected and the <see cref="Connection.Disconnected">Disconnected</see> event + /// will be invoked. + /// </remarks> + public volatile int DisconnectTimeout = 5000; + + /// <summary> + /// Class to hold packet data + /// </summary> + public class Packet : IRecyclable + { + /// <summary> + /// Object pool for this event. + /// </summary> + public static readonly ObjectPoolCustom<Packet> PacketPool = new ObjectPoolCustom<Packet>(() => new Packet()); + + /// <summary> + /// Returns an instance of this object from the pool. + /// </summary> + /// <returns></returns> + internal static Packet GetObject() + { + return PacketPool.GetObject(); + } + + public ushort Id; + private byte[] Data; + private UdpConnection Connection; + private int Length; + + public int NextTimeout; + public volatile bool Acknowledged; + + public Action AckCallback; + + public int Retransmissions; + public Stopwatch Stopwatch = new Stopwatch(); + + Packet() + { + } + + internal void Set(ushort id, UdpConnection connection, byte[] data, int length, int timeout, Action ackCallback) + { + this.Id = id; + this.Data = data; + this.Connection = connection; + this.Length = length; + + this.Acknowledged = false; + this.NextTimeout = timeout; + this.AckCallback = ackCallback; + this.Retransmissions = 0; + + this.Stopwatch.Restart(); + } + + // Packets resent + public async ValueTask<int> Resend() + { + var connection = this.Connection; + if (!this.Acknowledged && connection != null) + { + long lifetime = this.Stopwatch.ElapsedMilliseconds; + if (lifetime >= connection.DisconnectTimeout) + { + if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) + { + await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetime}ms ({self.Retransmissions} resends)"); + + self.Recycle(); + } + + return 0; + } + + if (lifetime >= this.NextTimeout) + { + ++this.Retransmissions; + if (connection.ResendLimit != 0 + && this.Retransmissions > connection.ResendLimit) + { + if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) + { + await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetime}ms)"); + + self.Recycle(); + } + + return 0; + } + + this.NextTimeout += (int)Math.Min(this.NextTimeout * connection.ResendPingMultiplier, 1000); + try + { + await connection.WriteBytesToConnection(this.Data, this.Length); + connection.Statistics.LogMessageResent(); + return 1; + } + catch (InvalidOperationException) + { + await connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected"); + } + } + } + + return 0; + } + + /// <summary> + /// Returns this object back to the object pool from whence it came. + /// </summary> + public void Recycle() + { + this.Acknowledged = true; + this.Connection = null; + + PacketPool.PutObject(this); + } + } + + internal async ValueTask<int> ManageReliablePackets() + { + int output = 0; + if (this.reliableDataPacketsSent.Count > 0) + { + foreach (var kvp in this.reliableDataPacketsSent) + { + Packet pkt = kvp.Value; + + try + { + output += await pkt.Resend(); + } + catch { } + } + } + + return output; + } + + /// <summary> + /// Adds a 2 byte ID to the packet at offset and stores the packet reference for retransmission. + /// </summary> + /// <param name="buffer">The buffer to attach to.</param> + /// <param name="offset">The offset to attach at.</param> + /// <param name="ackCallback">The callback to make once the packet has been acknowledged.</param> + protected void AttachReliableID(byte[] buffer, int offset, int sendLength, Action ackCallback = null) + { + ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); + + buffer[offset] = (byte)(id >> 8); + buffer[offset + 1] = (byte)id; + + Packet packet = Packet.GetObject(); + packet.Set( + id, + this, + buffer, + sendLength, + ResendTimeout > 0 ? ResendTimeout : (int)Math.Min(AveragePingMs * this.ResendPingMultiplier, 300), + ackCallback); + + if (!reliableDataPacketsSent.TryAdd(id, packet)) + { + throw new Exception("That shouldn't be possible"); + } + } + + public static int ClampToInt(float value, int min, int max) + { + if (value < min) return min; + if (value > max) return max; + return (int)value; + } + + /// <summary> + /// Sends the bytes reliably and stores the send. + /// </summary> + /// <param name="sendOption"></param> + /// <param name="data">The byte array to write to.</param> + /// <param name="ackCallback">The callback to make once the packet has been acknowledged.</param> + private async ValueTask ReliableSend(byte sendOption, byte[] data, Action ackCallback = null) + { + //Inform keepalive not to send for a while + ResetKeepAliveTimer(); + + byte[] bytes = new byte[data.Length + 3]; + + //Add message type + bytes[0] = sendOption; + + //Add reliable ID + AttachReliableID(bytes, 1, bytes.Length, ackCallback); + + //Copy data into new array + Buffer.BlockCopy(data, 0, bytes, bytes.Length - data.Length, data.Length); + + //Write to connection + await WriteBytesToConnection(bytes, bytes.Length); + + Statistics.LogReliableSend(data.Length, bytes.Length); + } + + /// <summary> + /// Handles a reliable message being received and invokes the data event. + /// </summary> + /// <param name="message">The buffer received.</param> + private async ValueTask ReliableMessageReceive(MessageReader message) + { + if (await ProcessReliableReceive(message.Buffer, 1)) + { + message.Offset += 3; + message.Length -= 3; + message.Position = 0; + + await InvokeDataReceived(message, MessageType.Reliable); + } + + Statistics.LogReliableReceive(message.Length - 3, message.Length); + } + + /// <summary> + /// Handles receives from reliable packets. + /// </summary> + /// <param name="bytes">The buffer containing the data.</param> + /// <param name="offset">The offset of the reliable header.</param> + /// <returns>Whether the packet was a new packet or not.</returns> + private async ValueTask<bool> ProcessReliableReceive(ReadOnlyMemory<byte> bytes, int offset) + { + var b1 = bytes.Span[offset]; + var b2 = bytes.Span[offset + 1]; + + //Get the ID form the packet + var id = (ushort)((b1 << 8) + b2); + + //Send an acknowledgement + await SendAck(id); + + /* + * It gets a little complicated here (note the fact I'm actually using a multiline comment for once...) + * + * In a simple world if our data is greater than the last reliable packet received (reliableReceiveLast) + * then it is guaranteed to be a new packet, if it's not we can see if we are missing that packet (lookup + * in reliableDataPacketsMissing). + * + * --------rrl############# (1) + * + * (where --- are packets received already and #### are packets that will be counted as new) + * + * Unfortunately if id becomes greater than 65535 it will loop back to zero so we will add a pointer that + * specifies any packets with an id behind it are also new (overwritePointer). + * + * ####op----------rrl##### (2) + * + * ------rll#########op---- (3) + * + * Anything behind than the reliableReceiveLast pointer (but greater than the overwritePointer is either a + * missing packet or something we've already received so when we change the pointers we need to make sure + * we keep note of what hasn't been received yet (reliableDataPacketsMissing). + * + * So... + */ + + lock (reliableDataPacketsMissing) + { + //Calculate overwritePointer + ushort overwritePointer = (ushort)(reliableReceiveLast - 32768); + + //Calculate if it is a new packet by examining if it is within the range + bool isNew; + if (overwritePointer < reliableReceiveLast) + isNew = id > reliableReceiveLast || id <= overwritePointer; //Figure (2) + else + isNew = id > reliableReceiveLast && id <= overwritePointer; //Figure (3) + + //If it's new or we've not received anything yet + if (isNew) + { + // Mark items between the most recent receive and the id received as missing + if (id > reliableReceiveLast) + { + for (ushort i = (ushort)(reliableReceiveLast + 1); i < id; i++) + { + reliableDataPacketsMissing.Add(i); + } + } + else + { + int cnt = (ushort.MaxValue - reliableReceiveLast) + id; + for (ushort i = 1; i < cnt; ++i) + { + reliableDataPacketsMissing.Add((ushort)(i + reliableReceiveLast)); + } + } + + //Update the most recently received + reliableReceiveLast = id; + } + + //Else it could be a missing packet + else + { + //See if we're missing it, else this packet is a duplicate as so we return false + if (!reliableDataPacketsMissing.Remove(id)) + { + return false; + } + } + } + + return true; + } + + /// <summary> + /// Handles acknowledgement packets to us. + /// </summary> + /// <param name="bytes">The buffer containing the data.</param> + private void AcknowledgementMessageReceive(ReadOnlySpan<byte> bytes) + { + this.pingsSinceAck = 0; + + ushort id = (ushort)((bytes[1] << 8) + bytes[2]); + AcknowledgeMessageId(id); + + if (bytes.Length == 4) + { + byte recentPackets = bytes[3]; + for (int i = 1; i <= 8; ++i) + { + if ((recentPackets & 1) != 0) + { + AcknowledgeMessageId((ushort)(id - i)); + } + + recentPackets >>= 1; + } + } + + Statistics.LogReliableReceive(0, bytes.Length); + } + + private void AcknowledgeMessageId(ushort id) + { + // Dispose of timer and remove from dictionary + if (reliableDataPacketsSent.TryRemove(id, out Packet packet)) + { + float rt = packet.Stopwatch.ElapsedMilliseconds; + + packet.AckCallback?.Invoke(); + packet.Recycle(); + + lock (PingLock) + { + this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f); + } + } + else if (this.activePingPackets.TryRemove(id, out PingPacket pingPkt)) + { + float rt = pingPkt.Stopwatch.ElapsedMilliseconds; + + pingPkt.Recycle(); + + lock (PingLock) + { + this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f); + } + } + } + + /// <summary> + /// Sends an acknowledgement for a packet given its identification bytes. + /// </summary> + /// <param name="byte1">The first identification byte.</param> + /// <param name="byte2">The second identification byte.</param> + private async ValueTask SendAck(ushort id) + { + byte recentPackets = 0; + lock (this.reliableDataPacketsMissing) + { + for (int i = 1; i <= 8; ++i) + { + if (!this.reliableDataPacketsMissing.Contains((ushort)(id - i))) + { + recentPackets |= (byte)(1 << (i - 1)); + } + } + } + + byte[] bytes = new byte[] + { + (byte)UdpSendOption.Acknowledgement, + (byte)(id >> 8), + (byte)(id >> 0), + recentPackets + }; + + try + { + await WriteBytesToConnection(bytes, bytes.Length); + } + catch (InvalidOperationException) { } + } + + private void DisposeReliablePackets() + { + foreach (var kvp in reliableDataPacketsSent) + { + if (this.reliableDataPacketsSent.TryRemove(kvp.Key, out var pkt)) + { + pkt.Recycle(); + } + } + } + } +} diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.cs new file mode 100644 index 0000000..5288d3c --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.cs @@ -0,0 +1,312 @@ +using System; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Impostor.Api.Net.Messages; +using Microsoft.Extensions.ObjectPool; +using Serilog; + +namespace Impostor.Hazel.Udp +{ + /// <summary> + /// Represents a connection that uses the UDP protocol. + /// </summary> + /// <inheritdoc /> + public abstract partial class UdpConnection : NetworkConnection + { + protected static readonly byte[] EmptyDisconnectBytes = { (byte)UdpSendOption.Disconnect }; + + private static readonly ILogger Logger = Log.ForContext<UdpConnection>(); + private readonly ConnectionListener _listener; + private readonly ObjectPool<MessageReader> _readerPool; + private readonly CancellationTokenSource _stoppingCts; + + private bool _isDisposing; + private bool _isFirst = true; + private Task _executingTask; + + protected UdpConnection(ConnectionListener listener, ObjectPool<MessageReader> readerPool) + { + _listener = listener; + _readerPool = readerPool; + _stoppingCts = new CancellationTokenSource(); + + Pipeline = Channel.CreateUnbounded<byte[]>(new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = true + }); + } + + internal Channel<byte[]> Pipeline { get; } + + public Task StartAsync() + { + // Store the task we're executing + _executingTask = Task.Factory.StartNew(ReadAsync, TaskCreationOptions.LongRunning); + + // If the task is completed then return it, this will bubble cancellation and failure to the caller + if (_executingTask.IsCompleted) + { + return _executingTask; + } + + // Otherwise it's running + return Task.CompletedTask; + } + + public void Stop() + { + // Stop called without start + if (_executingTask == null) + { + return; + } + + // Signal cancellation to methods. + _stoppingCts.Cancel(); + + try + { + // Cancel reader. + Pipeline.Writer.Complete(); + } + catch (ChannelClosedException) + { + // Already done. + } + + // Remove references. + if (!_isDisposing) + { + Dispose(true); + } + } + + private async Task ReadAsync() + { + var reader = new MessageReader(_readerPool); + + while (!_stoppingCts.IsCancellationRequested) + { + var result = await Pipeline.Reader.ReadAsync(_stoppingCts.Token); + + try + { + reader.Update(result); + + await HandleReceive(reader); + } + catch (Exception e) + { + Logger.Error(e, "Exception during ReadAsync"); + Dispose(true); + break; + } + } + } + + /// <summary> + /// Writes the given bytes to the connection. + /// </summary> + /// <param name="bytes">The bytes to write.</param> + /// <param name="length"></param> + protected abstract ValueTask WriteBytesToConnection(byte[] bytes, int length); + + /// <inheritdoc/> + public override async ValueTask SendAsync(IMessageWriter msg) + { + if (this._state != ConnectionState.Connected) + throw new InvalidOperationException("Could not send data as this Connection is not connected. Did you disconnect?"); + + byte[] buffer = new byte[msg.Length]; + Buffer.BlockCopy(msg.Buffer, 0, buffer, 0, msg.Length); + + switch (msg.SendOption) + { + case MessageType.Reliable: + ResetKeepAliveTimer(); + + AttachReliableID(buffer, 1, buffer.Length); + await WriteBytesToConnection(buffer, buffer.Length); + Statistics.LogReliableSend(buffer.Length - 3, buffer.Length); + break; + + default: + await WriteBytesToConnection(buffer, buffer.Length); + Statistics.LogUnreliableSend(buffer.Length - 1, buffer.Length); + break; + } + } + + /// <inheritdoc/> + /// <remarks> + /// <include file="DocInclude/common.xml" path="docs/item[@name='Connection_SendBytes_General']/*" /> + /// <para> + /// Udp connections can currently send messages using <see cref="SendOption.None"/> and + /// <see cref="SendOption.Reliable"/>. Fragmented messages are not currently supported and will default to + /// <see cref="SendOption.None"/> until implemented. + /// </para> + /// </remarks> + public override async ValueTask SendBytes(byte[] bytes, MessageType sendOption = MessageType.Unreliable) + { + //Add header information and send + await HandleSend(bytes, (byte)sendOption); + } + + /// <summary> + /// Handles the reliable/fragmented sending from this connection. + /// </summary> + /// <param name="data">The data being sent.</param> + /// <param name="sendOption">The <see cref="SendOption"/> specified as its byte value.</param> + /// <param name="ackCallback">The callback to invoke when this packet is acknowledged.</param> + /// <returns>The bytes that should actually be sent.</returns> + protected async ValueTask HandleSend(byte[] data, byte sendOption, Action ackCallback = null) + { + switch (sendOption) + { + case (byte)UdpSendOption.Ping: + case (byte)MessageType.Reliable: + case (byte)UdpSendOption.Hello: + await ReliableSend(sendOption, data, ackCallback); + break; + + //Treat all else as unreliable + default: + await UnreliableSend(sendOption, data); + break; + } + } + + /// <summary> + /// Handles the receiving of data. + /// </summary> + /// <param name="message">The buffer containing the bytes received.</param> + protected async ValueTask HandleReceive(MessageReader message) + { + // Check if the first message received is the hello packet. + if (_isFirst) + { + _isFirst = false; + + // Slice 4 bytes to get handshake data. + if (_listener != null) + { + using (var handshake = message.Copy(4)) + { + await _listener.InvokeNewConnection(handshake, this); + } + } + } + + switch (message.Buffer[0]) + { + //Handle reliable receives + case (byte)MessageType.Reliable: + await ReliableMessageReceive(message); + break; + + //Handle acknowledgments + case (byte)UdpSendOption.Acknowledgement: + AcknowledgementMessageReceive(message.Buffer); + break; + + //We need to acknowledge hello and ping messages but dont want to invoke any events! + case (byte)UdpSendOption.Ping: + await ProcessReliableReceive(message.Buffer, 1); + Statistics.LogHelloReceive(message.Length); + break; + case (byte)UdpSendOption.Hello: + await ProcessReliableReceive(message.Buffer, 1); + Statistics.LogHelloReceive(message.Length); + break; + + case (byte)UdpSendOption.Disconnect: + using (var reader = message.Copy(1)) + { + await DisconnectRemote("The remote sent a disconnect request", reader); + } + break; + + //Treat everything else as unreliable + default: + using (var reader = message.Copy(1)) + { + await InvokeDataReceived(reader, MessageType.Unreliable); + } + Statistics.LogUnreliableReceive(message.Length - 1, message.Length); + break; + } + } + + /// <summary> + /// Sends bytes using the unreliable UDP protocol. + /// </summary> + /// <param name="sendOption">The SendOption to attach.</param> + /// <param name="data">The data.</param> + ValueTask UnreliableSend(byte sendOption, byte[] data) + { + return UnreliableSend(sendOption, data, 0, data.Length); + } + + /// <summary> + /// Sends bytes using the unreliable UDP protocol. + /// </summary> + /// <param name="data">The data.</param> + /// <param name="sendOption">The SendOption to attach.</param> + /// <param name="offset"></param> + /// <param name="length"></param> + async ValueTask UnreliableSend(byte sendOption, byte[] data, int offset, int length) + { + byte[] bytes = new byte[length + 1]; + + //Add message type + bytes[0] = sendOption; + + //Copy data into new array + Buffer.BlockCopy(data, offset, bytes, bytes.Length - length, length); + + //Write to connection + await WriteBytesToConnection(bytes, bytes.Length); + + Statistics.LogUnreliableSend(length, bytes.Length); + } + + /// <summary> + /// Sends a hello packet to the remote endpoint. + /// </summary> + /// <param name="bytes"></param> + /// <param name="acknowledgeCallback">The callback to invoke when the hello packet is acknowledged.</param> + protected ValueTask SendHello(byte[] bytes, Action acknowledgeCallback) + { + //First byte of handshake is version indicator so add data after + byte[] actualBytes; + if (bytes == null) + { + actualBytes = new byte[1]; + } + else + { + actualBytes = new byte[bytes.Length + 1]; + Buffer.BlockCopy(bytes, 0, actualBytes, 1, bytes.Length); + } + + return HandleSend(actualBytes, (byte)UdpSendOption.Hello, acknowledgeCallback); + } + + /// <inheritdoc/> + protected override void Dispose(bool disposing) + { + if (disposing) + { + _isDisposing = true; + + Stop(); + DisposeKeepAliveTimer(); + DisposeReliablePackets(); + } + + base.Dispose(disposing); + } + } +} diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionListener.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionListener.cs new file mode 100644 index 0000000..573a00c --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionListener.cs @@ -0,0 +1,281 @@ +using System; +using System.Buffers; +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Microsoft.Extensions.ObjectPool; +using Serilog; + +namespace Impostor.Hazel.Udp +{ + /// <summary> + /// Listens for new UDP connections and creates UdpConnections for them. + /// </summary> + /// <inheritdoc /> + public class UdpConnectionListener : NetworkConnectionListener + { + private static readonly ILogger Logger = Log.ForContext<UdpConnectionListener>(); + + /// <summary> + /// A callback for early connection rejection. + /// * Return false to reject connection. + /// * A null response is ok, we just won't send anything. + /// </summary> + public AcceptConnectionCheck AcceptConnection; + public delegate bool AcceptConnectionCheck(IPEndPoint endPoint, byte[] input, out byte[] response); + + private readonly UdpClient _socket; + private readonly ObjectPool<MessageReader> _readerPool; + private readonly MemoryPool<byte> _pool; + private readonly Timer _reliablePacketTimer; + private readonly ConcurrentDictionary<EndPoint, UdpServerConnection> _allConnections; + private readonly CancellationTokenSource _stoppingCts; + private readonly UdpConnectionRateLimit _connectionRateLimit; + private Task _executingTask; + + /// <summary> + /// Creates a new UdpConnectionListener for the given <see cref="IPAddress"/>, port and <see cref="IPMode"/>. + /// </summary> + /// <param name="endPoint">The endpoint to listen on.</param> + /// <param name="ipMode"></param> + public UdpConnectionListener(IPEndPoint endPoint, ObjectPool<MessageReader> readerPool, IPMode ipMode = IPMode.IPv4) + { + EndPoint = endPoint; + IPMode = ipMode; + + _readerPool = readerPool; + _pool = MemoryPool<byte>.Shared; + _socket = new UdpClient(endPoint); + + try + { + _socket.DontFragment = false; + } + catch (SocketException) + { + } + + _reliablePacketTimer = new Timer(ManageReliablePackets, null, 100, Timeout.Infinite); + + _allConnections = new ConcurrentDictionary<EndPoint, UdpServerConnection>(); + + _stoppingCts = new CancellationTokenSource(); + _stoppingCts.Token.Register(() => + { + _socket.Dispose(); + }); + + _connectionRateLimit = new UdpConnectionRateLimit(); + } + + public int ConnectionCount => this._allConnections.Count; + + private async void ManageReliablePackets(object state) + { + foreach (var kvp in _allConnections) + { + var sock = kvp.Value; + await sock.ManageReliablePackets(); + } + + try + { + this._reliablePacketTimer.Change(100, Timeout.Infinite); + } + catch { } + } + + /// <inheritdoc /> + public override Task StartAsync() + { + // Store the task we're executing + _executingTask = Task.Factory.StartNew(ListenAsync, TaskCreationOptions.LongRunning); + + // If the task is completed then return it, this will bubble cancellation and failure to the caller + if (_executingTask.IsCompleted) + { + return _executingTask; + } + + // Otherwise it's running + return Task.CompletedTask; + } + + private async Task StopAsync() + { + // Stop called without start + if (_executingTask == null) + { + return; + } + + try + { + // Signal cancellation to the executing method + _stoppingCts.Cancel(); + } + finally + { + // Wait until the task completes or the timeout triggers + await Task.WhenAny(_executingTask, Task.Delay(TimeSpan.FromSeconds(5))); + } + } + + /// <summary> + /// Instructs the listener to begin listening. + /// </summary> + private async Task ListenAsync() + { + try + { + while (!_stoppingCts.IsCancellationRequested) + { + UdpReceiveResult data; + + try + { + data = await _socket.ReceiveAsync(); + + if (data.Buffer.Length == 0) + { + Logger.Fatal("Hazel read 0 bytes from UDP server socket."); + continue; + } + } + catch (SocketException) + { + // Client no longer reachable, pretend it didn't happen + continue; + } + catch (ObjectDisposedException) + { + // Socket was disposed, don't care. + return; + } + + // Get client from active clients + if (!_allConnections.TryGetValue(data.RemoteEndPoint, out var client)) + { + // Check for malformed connection attempts + if (data.Buffer[0] != (byte)UdpSendOption.Hello) + { + continue; + } + + // Check rateLimit. + if (!_connectionRateLimit.IsAllowed(data.RemoteEndPoint.Address)) + { + Logger.Warning("Ratelimited connection attempt from {0}.", data.RemoteEndPoint); + continue; + } + + // Create new client + client = new UdpServerConnection(this, data.RemoteEndPoint, IPMode, _readerPool); + + // Store the client + if (!_allConnections.TryAdd(data.RemoteEndPoint, client)) + { + throw new HazelException("Failed to add a connection. This should never happen."); + } + + // Activate the reader loop of the client + await client.StartAsync(); + } + + // Write to client. + await client.Pipeline.Writer.WriteAsync(data.Buffer); + } + } + catch (Exception e) + { + Logger.Error(e, "Listen loop error"); + } + } + +#if DEBUG + public int TestDropRate = -1; + private int dropCounter = 0; +#endif + + /// <summary> + /// Sends data from the listener socket. + /// </summary> + /// <param name="bytes">The bytes to send.</param> + /// <param name="endPoint">The endpoint to send to.</param> + internal async ValueTask SendData(byte[] bytes, int length, IPEndPoint endPoint) + { + if (length > bytes.Length) return; + +#if DEBUG + if (TestDropRate > 0) + { + if (Interlocked.Increment(ref dropCounter) % TestDropRate == 0) + { + return; + } + } +#endif + + try + { + await _socket.SendAsync(bytes, length, endPoint); + } + catch (SocketException e) + { + Logger.Error(e, "Could not send data as a SocketException occurred"); + } + catch (ObjectDisposedException) + { + //Keep alive timer probably ran, ignore + return; + } + } + + /// <summary> + /// Sends data from the listener socket. + /// </summary> + /// <param name="bytes">The bytes to send.</param> + /// <param name="length"></param> + /// <param name="endPoint">The endpoint to send to.</param> + internal void SendDataSync(byte[] bytes, int length, IPEndPoint endPoint) + { + try + { + _socket.Send(bytes, length, endPoint); + } + catch (SocketException e) + { + Logger.Error(e, "Could not send data sync as a SocketException occurred"); + } + } + + /// <summary> + /// Removes a virtual connection from the list. + /// </summary> + /// <param name="endPoint">The endpoint of the virtual connection.</param> + internal void RemoveConnectionTo(EndPoint endPoint) + { + this._allConnections.TryRemove(endPoint, out var conn); + } + + /// <inheritdoc /> + public override async ValueTask DisposeAsync() + { + foreach (var kvp in _allConnections) + { + kvp.Value.Dispose(); + } + + await StopAsync(); + + await _reliablePacketTimer.DisposeAsync(); + + _connectionRateLimit.Dispose(); + + await base.DisposeAsync(); + } + } +} diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionRateLimit.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionRateLimit.cs new file mode 100644 index 0000000..64881d3 --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnectionRateLimit.cs @@ -0,0 +1,75 @@ +using System; +using System.Collections.Concurrent; +using System.Net; +using System.Threading; +using Serilog; + +namespace Impostor.Hazel.Udp +{ + public class UdpConnectionRateLimit : IDisposable + { + private static readonly ILogger Logger = Log.ForContext<UdpConnectionRateLimit>(); + + // Allow burst to 5 connections. + // Decrease by 1 every second. + private const int MaxConnections = 5; + private const int FalloffMs = 1000; + + private readonly ConcurrentDictionary<IPAddress, int> _connectionCount; + private readonly Timer _timer; + private bool _isDisposed; + + public UdpConnectionRateLimit() + { + _connectionCount = new ConcurrentDictionary<IPAddress, int>(); + _timer = new Timer(UpdateRateLimit, null, FalloffMs, Timeout.Infinite); + } + + private void UpdateRateLimit(object state) + { + try + { + foreach (var pair in _connectionCount) + { + var count = pair.Value - 1; + if (count > 0) + { + _connectionCount.TryUpdate(pair.Key, count, pair.Value); + } + else + { + _connectionCount.TryRemove(pair); + } + } + } + catch (Exception e) + { + Logger.Error(e, "Exception caught in UpdateRateLimit."); + } + finally + { + if (!_isDisposed) + { + _timer.Change(FalloffMs, Timeout.Infinite); + } + } + } + + public bool IsAllowed(IPAddress key) + { + if (_connectionCount.TryGetValue(key, out var value) && value >= MaxConnections) + { + return false; + } + + _connectionCount.AddOrUpdate(key, _ => 1, (_, i) => i + 1); + return true; + } + + public void Dispose() + { + _isDisposed = true; + _timer.Dispose(); + } + } +}
\ No newline at end of file diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpServerConnection.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpServerConnection.cs new file mode 100644 index 0000000..22eed98 --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpServerConnection.cs @@ -0,0 +1,97 @@ +using System; +using System.Net; +using System.Threading.Tasks; +using Impostor.Api.Net.Messages; +using Microsoft.Extensions.ObjectPool; + +namespace Impostor.Hazel.Udp +{ + /// <summary> + /// Represents a servers's connection to a client that uses the UDP protocol. + /// </summary> + /// <inheritdoc/> + internal sealed class UdpServerConnection : UdpConnection + { + /// <summary> + /// The connection listener that we use the socket of. + /// </summary> + /// <remarks> + /// Udp server connections utilize the same socket in the listener for sends/receives, this is the listener that + /// created this connection and is hence the listener this conenction sends and receives via. + /// </remarks> + public UdpConnectionListener Listener { get; private set; } + + /// <summary> + /// Creates a UdpConnection for the virtual connection to the endpoint. + /// </summary> + /// <param name="listener">The listener that created this connection.</param> + /// <param name="endPoint">The endpoint that we are connected to.</param> + /// <param name="IPMode">The IPMode we are connected using.</param> + internal UdpServerConnection(UdpConnectionListener listener, IPEndPoint endPoint, IPMode IPMode, ObjectPool<MessageReader> readerPool) : base(listener, readerPool) + { + this.Listener = listener; + this.RemoteEndPoint = endPoint; + this.EndPoint = endPoint; + this.IPMode = IPMode; + + State = ConnectionState.Connected; + this.InitializeKeepAliveTimer(); + } + + /// <inheritdoc /> + protected override async ValueTask WriteBytesToConnection(byte[] bytes, int length) + { + await Listener.SendData(bytes, length, RemoteEndPoint); + } + + /// <inheritdoc /> + /// <remarks> + /// This will always throw a HazelException. + /// </remarks> + public override ValueTask ConnectAsync(byte[] bytes = null) + { + throw new InvalidOperationException("Cannot manually connect a UdpServerConnection, did you mean to use UdpClientConnection?"); + } + + /// <summary> + /// Sends a disconnect message to the end point. + /// </summary> + protected override async ValueTask<bool> SendDisconnect(MessageWriter data = null) + { + lock (this) + { + if (this._state != ConnectionState.Connected) return false; + this._state = ConnectionState.NotConnected; + } + + var bytes = EmptyDisconnectBytes; + if (data != null && data.Length > 0) + { + if (data.SendOption != MessageType.Unreliable) throw new ArgumentException("Disconnect messages can only be unreliable."); + + bytes = data.ToByteArray(true); + bytes[0] = (byte)UdpSendOption.Disconnect; + } + + try + { + await Listener.SendData(bytes, bytes.Length, RemoteEndPoint); + } + catch { } + + return true; + } + + protected override void Dispose(bool disposing) + { + Listener.RemoveConnectionTo(RemoteEndPoint); + + if (disposing) + { + SendDisconnect(); + } + + base.Dispose(disposing); + } + } +} |