From e9ea621b93fbb58d9edfca8375918791637bbd52 Mon Sep 17 00:00:00 2001 From: chai Date: Wed, 30 Dec 2020 20:59:04 +0800 Subject: +init --- .../Impostor.Hazel/Udp/UdpConnection.Reliable.cs | 491 +++++++++++++++++++++ 1 file changed, 491 insertions(+) create mode 100644 Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs (limited to 'Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs') diff --git a/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs new file mode 100644 index 0000000..a7a4309 --- /dev/null +++ b/Impostor-dev/src/Impostor.Hazel/Udp/UdpConnection.Reliable.cs @@ -0,0 +1,491 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Impostor.Api.Net.Messages; + +namespace Impostor.Hazel.Udp +{ + partial class UdpConnection + { + /// + /// The starting timeout, in miliseconds, at which data will be resent. + /// + /// + /// + /// For reliable delivery data is resent at specified intervals unless an acknowledgement is received from the + /// receiving device. The ResendTimeout specifies the interval between the packets being resent, each time a packet + /// is resent the interval is increased for that packet until the duration exceeds the value. + /// + /// + /// Setting this to its default of 0 will mean the timeout is 2 times the value of the average ping, usually + /// resulting in a more dynamic resend that responds to endpoints on slower or faster connections. + /// + /// + public volatile int ResendTimeout = 0; + + /// + /// Max number of times to resend. 0 == no limit + /// + public volatile int ResendLimit = 0; + + /// + /// A compounding multiplier to back off resend timeout. + /// Applied to ping before first timeout when ResendTimeout == 0. + /// + public volatile float ResendPingMultiplier = 2; + + /// + /// Holds the last ID allocated. + /// + private int lastIDAllocated = 0; + + /// + /// The packets of data that have been transmitted reliably and not acknowledged. + /// + internal ConcurrentDictionary reliableDataPacketsSent = new ConcurrentDictionary(); + + /// + /// Packet ids that have not been received, but are expected. + /// + private HashSet reliableDataPacketsMissing = new HashSet(); + + /// + /// The packet id that was received last. + /// + private volatile ushort reliableReceiveLast = ushort.MaxValue; + + private object PingLock = new object(); + + /// + /// Returns the average ping to this endpoint. + /// + /// + /// This returns the average ping for a one-way trip as calculated from the reliable packets that have been sent + /// and acknowledged by the endpoint. + /// + public float AveragePingMs = 500; + + /// + /// The maximum times a message should be resent before marking the endpoint as disconnected. + /// + /// + /// Reliable packets will be resent at an interval defined in for the number of times + /// specified here. Once a packet has been retransmitted this number of times and has not been acknowledged the + /// connection will be marked as disconnected and the Disconnected event + /// will be invoked. + /// + public volatile int DisconnectTimeout = 5000; + + /// + /// Class to hold packet data + /// + public class Packet : IRecyclable + { + /// + /// Object pool for this event. + /// + public static readonly ObjectPoolCustom PacketPool = new ObjectPoolCustom(() => new Packet()); + + /// + /// Returns an instance of this object from the pool. + /// + /// + internal static Packet GetObject() + { + return PacketPool.GetObject(); + } + + public ushort Id; + private byte[] Data; + private UdpConnection Connection; + private int Length; + + public int NextTimeout; + public volatile bool Acknowledged; + + public Action AckCallback; + + public int Retransmissions; + public Stopwatch Stopwatch = new Stopwatch(); + + Packet() + { + } + + internal void Set(ushort id, UdpConnection connection, byte[] data, int length, int timeout, Action ackCallback) + { + this.Id = id; + this.Data = data; + this.Connection = connection; + this.Length = length; + + this.Acknowledged = false; + this.NextTimeout = timeout; + this.AckCallback = ackCallback; + this.Retransmissions = 0; + + this.Stopwatch.Restart(); + } + + // Packets resent + public async ValueTask Resend() + { + var connection = this.Connection; + if (!this.Acknowledged && connection != null) + { + long lifetime = this.Stopwatch.ElapsedMilliseconds; + if (lifetime >= connection.DisconnectTimeout) + { + if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) + { + await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetime}ms ({self.Retransmissions} resends)"); + + self.Recycle(); + } + + return 0; + } + + if (lifetime >= this.NextTimeout) + { + ++this.Retransmissions; + if (connection.ResendLimit != 0 + && this.Retransmissions > connection.ResendLimit) + { + if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self)) + { + await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetime}ms)"); + + self.Recycle(); + } + + return 0; + } + + this.NextTimeout += (int)Math.Min(this.NextTimeout * connection.ResendPingMultiplier, 1000); + try + { + await connection.WriteBytesToConnection(this.Data, this.Length); + connection.Statistics.LogMessageResent(); + return 1; + } + catch (InvalidOperationException) + { + await connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected"); + } + } + } + + return 0; + } + + /// + /// Returns this object back to the object pool from whence it came. + /// + public void Recycle() + { + this.Acknowledged = true; + this.Connection = null; + + PacketPool.PutObject(this); + } + } + + internal async ValueTask ManageReliablePackets() + { + int output = 0; + if (this.reliableDataPacketsSent.Count > 0) + { + foreach (var kvp in this.reliableDataPacketsSent) + { + Packet pkt = kvp.Value; + + try + { + output += await pkt.Resend(); + } + catch { } + } + } + + return output; + } + + /// + /// Adds a 2 byte ID to the packet at offset and stores the packet reference for retransmission. + /// + /// The buffer to attach to. + /// The offset to attach at. + /// The callback to make once the packet has been acknowledged. + protected void AttachReliableID(byte[] buffer, int offset, int sendLength, Action ackCallback = null) + { + ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); + + buffer[offset] = (byte)(id >> 8); + buffer[offset + 1] = (byte)id; + + Packet packet = Packet.GetObject(); + packet.Set( + id, + this, + buffer, + sendLength, + ResendTimeout > 0 ? ResendTimeout : (int)Math.Min(AveragePingMs * this.ResendPingMultiplier, 300), + ackCallback); + + if (!reliableDataPacketsSent.TryAdd(id, packet)) + { + throw new Exception("That shouldn't be possible"); + } + } + + public static int ClampToInt(float value, int min, int max) + { + if (value < min) return min; + if (value > max) return max; + return (int)value; + } + + /// + /// Sends the bytes reliably and stores the send. + /// + /// + /// The byte array to write to. + /// The callback to make once the packet has been acknowledged. + private async ValueTask ReliableSend(byte sendOption, byte[] data, Action ackCallback = null) + { + //Inform keepalive not to send for a while + ResetKeepAliveTimer(); + + byte[] bytes = new byte[data.Length + 3]; + + //Add message type + bytes[0] = sendOption; + + //Add reliable ID + AttachReliableID(bytes, 1, bytes.Length, ackCallback); + + //Copy data into new array + Buffer.BlockCopy(data, 0, bytes, bytes.Length - data.Length, data.Length); + + //Write to connection + await WriteBytesToConnection(bytes, bytes.Length); + + Statistics.LogReliableSend(data.Length, bytes.Length); + } + + /// + /// Handles a reliable message being received and invokes the data event. + /// + /// The buffer received. + private async ValueTask ReliableMessageReceive(MessageReader message) + { + if (await ProcessReliableReceive(message.Buffer, 1)) + { + message.Offset += 3; + message.Length -= 3; + message.Position = 0; + + await InvokeDataReceived(message, MessageType.Reliable); + } + + Statistics.LogReliableReceive(message.Length - 3, message.Length); + } + + /// + /// Handles receives from reliable packets. + /// + /// The buffer containing the data. + /// The offset of the reliable header. + /// Whether the packet was a new packet or not. + private async ValueTask ProcessReliableReceive(ReadOnlyMemory bytes, int offset) + { + var b1 = bytes.Span[offset]; + var b2 = bytes.Span[offset + 1]; + + //Get the ID form the packet + var id = (ushort)((b1 << 8) + b2); + + //Send an acknowledgement + await SendAck(id); + + /* + * It gets a little complicated here (note the fact I'm actually using a multiline comment for once...) + * + * In a simple world if our data is greater than the last reliable packet received (reliableReceiveLast) + * then it is guaranteed to be a new packet, if it's not we can see if we are missing that packet (lookup + * in reliableDataPacketsMissing). + * + * --------rrl############# (1) + * + * (where --- are packets received already and #### are packets that will be counted as new) + * + * Unfortunately if id becomes greater than 65535 it will loop back to zero so we will add a pointer that + * specifies any packets with an id behind it are also new (overwritePointer). + * + * ####op----------rrl##### (2) + * + * ------rll#########op---- (3) + * + * Anything behind than the reliableReceiveLast pointer (but greater than the overwritePointer is either a + * missing packet or something we've already received so when we change the pointers we need to make sure + * we keep note of what hasn't been received yet (reliableDataPacketsMissing). + * + * So... + */ + + lock (reliableDataPacketsMissing) + { + //Calculate overwritePointer + ushort overwritePointer = (ushort)(reliableReceiveLast - 32768); + + //Calculate if it is a new packet by examining if it is within the range + bool isNew; + if (overwritePointer < reliableReceiveLast) + isNew = id > reliableReceiveLast || id <= overwritePointer; //Figure (2) + else + isNew = id > reliableReceiveLast && id <= overwritePointer; //Figure (3) + + //If it's new or we've not received anything yet + if (isNew) + { + // Mark items between the most recent receive and the id received as missing + if (id > reliableReceiveLast) + { + for (ushort i = (ushort)(reliableReceiveLast + 1); i < id; i++) + { + reliableDataPacketsMissing.Add(i); + } + } + else + { + int cnt = (ushort.MaxValue - reliableReceiveLast) + id; + for (ushort i = 1; i < cnt; ++i) + { + reliableDataPacketsMissing.Add((ushort)(i + reliableReceiveLast)); + } + } + + //Update the most recently received + reliableReceiveLast = id; + } + + //Else it could be a missing packet + else + { + //See if we're missing it, else this packet is a duplicate as so we return false + if (!reliableDataPacketsMissing.Remove(id)) + { + return false; + } + } + } + + return true; + } + + /// + /// Handles acknowledgement packets to us. + /// + /// The buffer containing the data. + private void AcknowledgementMessageReceive(ReadOnlySpan bytes) + { + this.pingsSinceAck = 0; + + ushort id = (ushort)((bytes[1] << 8) + bytes[2]); + AcknowledgeMessageId(id); + + if (bytes.Length == 4) + { + byte recentPackets = bytes[3]; + for (int i = 1; i <= 8; ++i) + { + if ((recentPackets & 1) != 0) + { + AcknowledgeMessageId((ushort)(id - i)); + } + + recentPackets >>= 1; + } + } + + Statistics.LogReliableReceive(0, bytes.Length); + } + + private void AcknowledgeMessageId(ushort id) + { + // Dispose of timer and remove from dictionary + if (reliableDataPacketsSent.TryRemove(id, out Packet packet)) + { + float rt = packet.Stopwatch.ElapsedMilliseconds; + + packet.AckCallback?.Invoke(); + packet.Recycle(); + + lock (PingLock) + { + this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f); + } + } + else if (this.activePingPackets.TryRemove(id, out PingPacket pingPkt)) + { + float rt = pingPkt.Stopwatch.ElapsedMilliseconds; + + pingPkt.Recycle(); + + lock (PingLock) + { + this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f); + } + } + } + + /// + /// Sends an acknowledgement for a packet given its identification bytes. + /// + /// The first identification byte. + /// The second identification byte. + private async ValueTask SendAck(ushort id) + { + byte recentPackets = 0; + lock (this.reliableDataPacketsMissing) + { + for (int i = 1; i <= 8; ++i) + { + if (!this.reliableDataPacketsMissing.Contains((ushort)(id - i))) + { + recentPackets |= (byte)(1 << (i - 1)); + } + } + } + + byte[] bytes = new byte[] + { + (byte)UdpSendOption.Acknowledgement, + (byte)(id >> 8), + (byte)(id >> 0), + recentPackets + }; + + try + { + await WriteBytesToConnection(bytes, bytes.Length); + } + catch (InvalidOperationException) { } + } + + private void DisposeReliablePackets() + { + foreach (var kvp in reliableDataPacketsSent) + { + if (this.reliableDataPacketsSent.TryRemove(kvp.Key, out var pkt)) + { + pkt.Recycle(); + } + } + } + } +} -- cgit v1.1-26-g67d0