using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Impostor.Api.Net.Messages; namespace Impostor.Hazel.Udp { partial class UdpConnection { /// /// The starting timeout, in miliseconds, at which data will be resent. /// /// /// /// For reliable delivery data is resent at specified intervals unless an acknowledgement is received from the /// receiving device. The ResendTimeout specifies the interval between the packets being resent, each time a packet /// is resent the interval is increased for that packet until the duration exceeds the value. /// /// /// Setting this to its default of 0 will mean the timeout is 2 times the value of the average ping, usually /// resulting in a more dynamic resend that responds to endpoints on slower or faster connections. /// /// public volatile int ResendTimeout = 0; /// /// Max number of times to resend. 0 == no limit /// public volatile int ResendLimit = 0; /// /// A compounding multiplier to back off resend timeout. /// Applied to ping before first timeout when ResendTimeout == 0. /// public volatile float ResendPingMultiplier = 2; /// /// Holds the last ID allocated. /// private int lastIDAllocated = 0; /// /// The packets of data that have been transmitted reliably and not acknowledged. /// internal ConcurrentDictionary reliableDataPacketsSent = new ConcurrentDictionary(); /// /// Packet ids that have not been received, but are expected. /// private HashSet reliableDataPacketsMissing = new HashSet(); /// /// The packet id that was received last. /// private volatile ushort reliableReceiveLast = ushort.MaxValue; private object PingLock = new object(); /// /// Returns the average ping to this endpoint. /// /// /// This returns the average ping for a one-way trip as calculated from the reliable packets that have been sent /// and acknowledged by the endpoint. /// public float AveragePingMs = 500; /// /// The maximum times a message should be resent before marking the endpoint as disconnected. /// /// /// Reliable packets will be resent at an interval defined in for the number of times /// specified here. Once a packet has been retransmitted this number of times and has not been acknowledged the /// connection will be marked as disconnected and the Disconnected event /// will be invoked. /// public volatile int DisconnectTimeout = 5000; /// /// Class to hold packet data /// public class Packet : IRecyclable { /// /// Object pool for this event. /// public static readonly ObjectPoolCustom PacketPool = new ObjectPoolCustom(() => new Packet()); /// /// Returns an instance of this object from the pool. /// /// internal static Packet GetObject() { return PacketPool.GetObject(); } public ushort Id; private byte[] Data; private UdpConnection Connection; private int Length; public int NextTimeout; public volatile bool Acknowledged; public Action AckCallback; public int Retransmissions; public Stopwatch Stopwatch = new Stopwatch(); Packet() { } internal void Set(ushort id, UdpConnection connection, byte[] data, int length, int timeout, Action ackCallback) { this.Id = id; this.Data = data; this.Connection = connection; this.Length = length; this.Acknowledged = false; this.NextTimeout = timeout; this.AckCallback = ackCallback; this.Retransmissions = 0; this.Stopwatch.Restart(); } // Packets resent public async ValueTask Resend() { var connection = this.Connection; if (!this.Acknowledged && connection != null) { long lifetime = this.Stopwatch.ElapsedMilliseconds; if (lifetime >= connection.DisconnectTimeout) { if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) { await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetime}ms ({self.Retransmissions} resends)"); self.Recycle(); } return 0; } if (lifetime >= this.NextTimeout) { ++this.Retransmissions; if (connection.ResendLimit != 0 && this.Retransmissions > connection.ResendLimit) { if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) { await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetime}ms)"); self.Recycle(); } return 0; } this.NextTimeout += (int)Math.Min(this.NextTimeout * connection.ResendPingMultiplier, 1000); try { await connection.WriteBytesToConnection(this.Data, this.Length); connection.Statistics.LogMessageResent(); return 1; } catch (InvalidOperationException) { await connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected"); } } } return 0; } /// /// Returns this object back to the object pool from whence it came. /// public void Recycle() { this.Acknowledged = true; this.Connection = null; PacketPool.PutObject(this); } } internal async ValueTask ManageReliablePackets() { int output = 0; if (this.reliableDataPacketsSent.Count > 0) { foreach (var kvp in this.reliableDataPacketsSent) { Packet pkt = kvp.Value; try { output += await pkt.Resend(); } catch { } } } return output; } /// /// Adds a 2 byte ID to the packet at offset and stores the packet reference for retransmission. /// /// The buffer to attach to. /// The offset to attach at. /// The callback to make once the packet has been acknowledged. protected void AttachReliableID(byte[] buffer, int offset, int sendLength, Action ackCallback = null) { ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); buffer[offset] = (byte)(id >> 8); buffer[offset + 1] = (byte)id; Packet packet = Packet.GetObject(); packet.Set( id, this, buffer, sendLength, ResendTimeout > 0 ? ResendTimeout : (int)Math.Min(AveragePingMs * this.ResendPingMultiplier, 300), ackCallback); if (!reliableDataPacketsSent.TryAdd(id, packet)) { throw new Exception("That shouldn't be possible"); } } public static int ClampToInt(float value, int min, int max) { if (value < min) return min; if (value > max) return max; return (int)value; } /// /// Sends the bytes reliably and stores the send. /// /// /// The byte array to write to. /// The callback to make once the packet has been acknowledged. private async ValueTask ReliableSend(byte sendOption, byte[] data, Action ackCallback = null) { //Inform keepalive not to send for a while ResetKeepAliveTimer(); byte[] bytes = new byte[data.Length + 3]; //Add message type bytes[0] = sendOption; //Add reliable ID AttachReliableID(bytes, 1, bytes.Length, ackCallback); //Copy data into new array Buffer.BlockCopy(data, 0, bytes, bytes.Length - data.Length, data.Length); //Write to connection await WriteBytesToConnection(bytes, bytes.Length); Statistics.LogReliableSend(data.Length, bytes.Length); } /// /// Handles a reliable message being received and invokes the data event. /// /// The buffer received. private async ValueTask ReliableMessageReceive(MessageReader message) { if (await ProcessReliableReceive(message.Buffer, 1)) { message.Offset += 3; message.Length -= 3; message.Position = 0; await InvokeDataReceived(message, MessageType.Reliable); } Statistics.LogReliableReceive(message.Length - 3, message.Length); } /// /// Handles receives from reliable packets. /// /// The buffer containing the data. /// The offset of the reliable header. /// Whether the packet was a new packet or not. private async ValueTask ProcessReliableReceive(ReadOnlyMemory bytes, int offset) { var b1 = bytes.Span[offset]; var b2 = bytes.Span[offset + 1]; //Get the ID form the packet var id = (ushort)((b1 << 8) + b2); //Send an acknowledgement await SendAck(id); /* * It gets a little complicated here (note the fact I'm actually using a multiline comment for once...) * * In a simple world if our data is greater than the last reliable packet received (reliableReceiveLast) * then it is guaranteed to be a new packet, if it's not we can see if we are missing that packet (lookup * in reliableDataPacketsMissing). * * --------rrl############# (1) * * (where --- are packets received already and #### are packets that will be counted as new) * * Unfortunately if id becomes greater than 65535 it will loop back to zero so we will add a pointer that * specifies any packets with an id behind it are also new (overwritePointer). * * ####op----------rrl##### (2) * * ------rll#########op---- (3) * * Anything behind than the reliableReceiveLast pointer (but greater than the overwritePointer is either a * missing packet or something we've already received so when we change the pointers we need to make sure * we keep note of what hasn't been received yet (reliableDataPacketsMissing). * * So... */ lock (reliableDataPacketsMissing) { //Calculate overwritePointer ushort overwritePointer = (ushort)(reliableReceiveLast - 32768); //Calculate if it is a new packet by examining if it is within the range bool isNew; if (overwritePointer < reliableReceiveLast) isNew = id > reliableReceiveLast || id <= overwritePointer; //Figure (2) else isNew = id > reliableReceiveLast && id <= overwritePointer; //Figure (3) //If it's new or we've not received anything yet if (isNew) { // Mark items between the most recent receive and the id received as missing if (id > reliableReceiveLast) { for (ushort i = (ushort)(reliableReceiveLast + 1); i < id; i++) { reliableDataPacketsMissing.Add(i); } } else { int cnt = (ushort.MaxValue - reliableReceiveLast) + id; for (ushort i = 1; i < cnt; ++i) { reliableDataPacketsMissing.Add((ushort)(i + reliableReceiveLast)); } } //Update the most recently received reliableReceiveLast = id; } //Else it could be a missing packet else { //See if we're missing it, else this packet is a duplicate as so we return false if (!reliableDataPacketsMissing.Remove(id)) { return false; } } } return true; } /// /// Handles acknowledgement packets to us. /// /// The buffer containing the data. private void AcknowledgementMessageReceive(ReadOnlySpan bytes) { this.pingsSinceAck = 0; ushort id = (ushort)((bytes[1] << 8) + bytes[2]); AcknowledgeMessageId(id); if (bytes.Length == 4) { byte recentPackets = bytes[3]; for (int i = 1; i <= 8; ++i) { if ((recentPackets & 1) != 0) { AcknowledgeMessageId((ushort)(id - i)); } recentPackets >>= 1; } } Statistics.LogReliableReceive(0, bytes.Length); } private void AcknowledgeMessageId(ushort id) { // Dispose of timer and remove from dictionary if (reliableDataPacketsSent.TryRemove(id, out Packet packet)) { float rt = packet.Stopwatch.ElapsedMilliseconds; packet.AckCallback?.Invoke(); packet.Recycle(); lock (PingLock) { this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f); } } else if (this.activePingPackets.TryRemove(id, out PingPacket pingPkt)) { float rt = pingPkt.Stopwatch.ElapsedMilliseconds; pingPkt.Recycle(); lock (PingLock) { this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f); } } } /// /// Sends an acknowledgement for a packet given its identification bytes. /// /// The first identification byte. /// The second identification byte. private async ValueTask SendAck(ushort id) { byte recentPackets = 0; lock (this.reliableDataPacketsMissing) { for (int i = 1; i <= 8; ++i) { if (!this.reliableDataPacketsMissing.Contains((ushort)(id - i))) { recentPackets |= (byte)(1 << (i - 1)); } } } byte[] bytes = new byte[] { (byte)UdpSendOption.Acknowledgement, (byte)(id >> 8), (byte)(id >> 0), recentPackets }; try { await WriteBytesToConnection(bytes, bytes.Length); } catch (InvalidOperationException) { } } private void DisposeReliablePackets() { foreach (var kvp in reliableDataPacketsSent) { if (this.reliableDataPacketsSent.TryRemove(kvp.Key, out var pkt)) { pkt.Recycle(); } } } } }