using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Impostor.Api.Net.Messages;
namespace Impostor.Hazel.Udp
{
partial class UdpConnection
{
///
/// The starting timeout, in miliseconds, at which data will be resent.
///
///
///
/// For reliable delivery data is resent at specified intervals unless an acknowledgement is received from the
/// receiving device. The ResendTimeout specifies the interval between the packets being resent, each time a packet
/// is resent the interval is increased for that packet until the duration exceeds the value.
///
///
/// Setting this to its default of 0 will mean the timeout is 2 times the value of the average ping, usually
/// resulting in a more dynamic resend that responds to endpoints on slower or faster connections.
///
///
public volatile int ResendTimeout = 0;
///
/// Max number of times to resend. 0 == no limit
///
public volatile int ResendLimit = 0;
///
/// A compounding multiplier to back off resend timeout.
/// Applied to ping before first timeout when ResendTimeout == 0.
///
public volatile float ResendPingMultiplier = 2;
///
/// Holds the last ID allocated.
///
private int lastIDAllocated = 0;
///
/// The packets of data that have been transmitted reliably and not acknowledged.
///
internal ConcurrentDictionary reliableDataPacketsSent = new ConcurrentDictionary();
///
/// Packet ids that have not been received, but are expected.
///
private HashSet reliableDataPacketsMissing = new HashSet();
///
/// The packet id that was received last.
///
private volatile ushort reliableReceiveLast = ushort.MaxValue;
private object PingLock = new object();
///
/// Returns the average ping to this endpoint.
///
///
/// This returns the average ping for a one-way trip as calculated from the reliable packets that have been sent
/// and acknowledged by the endpoint.
///
public float AveragePingMs = 500;
///
/// The maximum times a message should be resent before marking the endpoint as disconnected.
///
///
/// Reliable packets will be resent at an interval defined in for the number of times
/// specified here. Once a packet has been retransmitted this number of times and has not been acknowledged the
/// connection will be marked as disconnected and the Disconnected event
/// will be invoked.
///
public volatile int DisconnectTimeout = 5000;
///
/// Class to hold packet data
///
public class Packet : IRecyclable
{
///
/// Object pool for this event.
///
public static readonly ObjectPoolCustom PacketPool = new ObjectPoolCustom(() => new Packet());
///
/// Returns an instance of this object from the pool.
///
///
internal static Packet GetObject()
{
return PacketPool.GetObject();
}
public ushort Id;
private byte[] Data;
private UdpConnection Connection;
private int Length;
public int NextTimeout;
public volatile bool Acknowledged;
public Action AckCallback;
public int Retransmissions;
public Stopwatch Stopwatch = new Stopwatch();
Packet()
{
}
internal void Set(ushort id, UdpConnection connection, byte[] data, int length, int timeout, Action ackCallback)
{
this.Id = id;
this.Data = data;
this.Connection = connection;
this.Length = length;
this.Acknowledged = false;
this.NextTimeout = timeout;
this.AckCallback = ackCallback;
this.Retransmissions = 0;
this.Stopwatch.Restart();
}
// Packets resent
public async ValueTask Resend()
{
var connection = this.Connection;
if (!this.Acknowledged && connection != null)
{
long lifetime = this.Stopwatch.ElapsedMilliseconds;
if (lifetime >= connection.DisconnectTimeout)
{
if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
{
await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetime}ms ({self.Retransmissions} resends)");
self.Recycle();
}
return 0;
}
if (lifetime >= this.NextTimeout)
{
++this.Retransmissions;
if (connection.ResendLimit != 0
&& this.Retransmissions > connection.ResendLimit)
{
if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
{
await connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetime}ms)");
self.Recycle();
}
return 0;
}
this.NextTimeout += (int)Math.Min(this.NextTimeout * connection.ResendPingMultiplier, 1000);
try
{
await connection.WriteBytesToConnection(this.Data, this.Length);
connection.Statistics.LogMessageResent();
return 1;
}
catch (InvalidOperationException)
{
await connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected");
}
}
}
return 0;
}
///
/// Returns this object back to the object pool from whence it came.
///
public void Recycle()
{
this.Acknowledged = true;
this.Connection = null;
PacketPool.PutObject(this);
}
}
internal async ValueTask ManageReliablePackets()
{
int output = 0;
if (this.reliableDataPacketsSent.Count > 0)
{
foreach (var kvp in this.reliableDataPacketsSent)
{
Packet pkt = kvp.Value;
try
{
output += await pkt.Resend();
}
catch { }
}
}
return output;
}
///
/// Adds a 2 byte ID to the packet at offset and stores the packet reference for retransmission.
///
/// The buffer to attach to.
/// The offset to attach at.
/// The callback to make once the packet has been acknowledged.
protected void AttachReliableID(byte[] buffer, int offset, int sendLength, Action ackCallback = null)
{
ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated);
buffer[offset] = (byte)(id >> 8);
buffer[offset + 1] = (byte)id;
Packet packet = Packet.GetObject();
packet.Set(
id,
this,
buffer,
sendLength,
ResendTimeout > 0 ? ResendTimeout : (int)Math.Min(AveragePingMs * this.ResendPingMultiplier, 300),
ackCallback);
if (!reliableDataPacketsSent.TryAdd(id, packet))
{
throw new Exception("That shouldn't be possible");
}
}
public static int ClampToInt(float value, int min, int max)
{
if (value < min) return min;
if (value > max) return max;
return (int)value;
}
///
/// Sends the bytes reliably and stores the send.
///
///
/// The byte array to write to.
/// The callback to make once the packet has been acknowledged.
private async ValueTask ReliableSend(byte sendOption, byte[] data, Action ackCallback = null)
{
//Inform keepalive not to send for a while
ResetKeepAliveTimer();
byte[] bytes = new byte[data.Length + 3];
//Add message type
bytes[0] = sendOption;
//Add reliable ID
AttachReliableID(bytes, 1, bytes.Length, ackCallback);
//Copy data into new array
Buffer.BlockCopy(data, 0, bytes, bytes.Length - data.Length, data.Length);
//Write to connection
await WriteBytesToConnection(bytes, bytes.Length);
Statistics.LogReliableSend(data.Length, bytes.Length);
}
///
/// Handles a reliable message being received and invokes the data event.
///
/// The buffer received.
private async ValueTask ReliableMessageReceive(MessageReader message)
{
if (await ProcessReliableReceive(message.Buffer, 1))
{
message.Offset += 3;
message.Length -= 3;
message.Position = 0;
await InvokeDataReceived(message, MessageType.Reliable);
}
Statistics.LogReliableReceive(message.Length - 3, message.Length);
}
///
/// Handles receives from reliable packets.
///
/// The buffer containing the data.
/// The offset of the reliable header.
/// Whether the packet was a new packet or not.
private async ValueTask ProcessReliableReceive(ReadOnlyMemory bytes, int offset)
{
var b1 = bytes.Span[offset];
var b2 = bytes.Span[offset + 1];
//Get the ID form the packet
var id = (ushort)((b1 << 8) + b2);
//Send an acknowledgement
await SendAck(id);
/*
* It gets a little complicated here (note the fact I'm actually using a multiline comment for once...)
*
* In a simple world if our data is greater than the last reliable packet received (reliableReceiveLast)
* then it is guaranteed to be a new packet, if it's not we can see if we are missing that packet (lookup
* in reliableDataPacketsMissing).
*
* --------rrl############# (1)
*
* (where --- are packets received already and #### are packets that will be counted as new)
*
* Unfortunately if id becomes greater than 65535 it will loop back to zero so we will add a pointer that
* specifies any packets with an id behind it are also new (overwritePointer).
*
* ####op----------rrl##### (2)
*
* ------rll#########op---- (3)
*
* Anything behind than the reliableReceiveLast pointer (but greater than the overwritePointer is either a
* missing packet or something we've already received so when we change the pointers we need to make sure
* we keep note of what hasn't been received yet (reliableDataPacketsMissing).
*
* So...
*/
lock (reliableDataPacketsMissing)
{
//Calculate overwritePointer
ushort overwritePointer = (ushort)(reliableReceiveLast - 32768);
//Calculate if it is a new packet by examining if it is within the range
bool isNew;
if (overwritePointer < reliableReceiveLast)
isNew = id > reliableReceiveLast || id <= overwritePointer; //Figure (2)
else
isNew = id > reliableReceiveLast && id <= overwritePointer; //Figure (3)
//If it's new or we've not received anything yet
if (isNew)
{
// Mark items between the most recent receive and the id received as missing
if (id > reliableReceiveLast)
{
for (ushort i = (ushort)(reliableReceiveLast + 1); i < id; i++)
{
reliableDataPacketsMissing.Add(i);
}
}
else
{
int cnt = (ushort.MaxValue - reliableReceiveLast) + id;
for (ushort i = 1; i < cnt; ++i)
{
reliableDataPacketsMissing.Add((ushort)(i + reliableReceiveLast));
}
}
//Update the most recently received
reliableReceiveLast = id;
}
//Else it could be a missing packet
else
{
//See if we're missing it, else this packet is a duplicate as so we return false
if (!reliableDataPacketsMissing.Remove(id))
{
return false;
}
}
}
return true;
}
///
/// Handles acknowledgement packets to us.
///
/// The buffer containing the data.
private void AcknowledgementMessageReceive(ReadOnlySpan bytes)
{
this.pingsSinceAck = 0;
ushort id = (ushort)((bytes[1] << 8) + bytes[2]);
AcknowledgeMessageId(id);
if (bytes.Length == 4)
{
byte recentPackets = bytes[3];
for (int i = 1; i <= 8; ++i)
{
if ((recentPackets & 1) != 0)
{
AcknowledgeMessageId((ushort)(id - i));
}
recentPackets >>= 1;
}
}
Statistics.LogReliableReceive(0, bytes.Length);
}
private void AcknowledgeMessageId(ushort id)
{
// Dispose of timer and remove from dictionary
if (reliableDataPacketsSent.TryRemove(id, out Packet packet))
{
float rt = packet.Stopwatch.ElapsedMilliseconds;
packet.AckCallback?.Invoke();
packet.Recycle();
lock (PingLock)
{
this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f);
}
}
else if (this.activePingPackets.TryRemove(id, out PingPacket pingPkt))
{
float rt = pingPkt.Stopwatch.ElapsedMilliseconds;
pingPkt.Recycle();
lock (PingLock)
{
this.AveragePingMs = Math.Max(50, this.AveragePingMs * .7f + rt * .3f);
}
}
}
///
/// Sends an acknowledgement for a packet given its identification bytes.
///
/// The first identification byte.
/// The second identification byte.
private async ValueTask SendAck(ushort id)
{
byte recentPackets = 0;
lock (this.reliableDataPacketsMissing)
{
for (int i = 1; i <= 8; ++i)
{
if (!this.reliableDataPacketsMissing.Contains((ushort)(id - i)))
{
recentPackets |= (byte)(1 << (i - 1));
}
}
}
byte[] bytes = new byte[]
{
(byte)UdpSendOption.Acknowledgement,
(byte)(id >> 8),
(byte)(id >> 0),
recentPackets
};
try
{
await WriteBytesToConnection(bytes, bytes.Length);
}
catch (InvalidOperationException) { }
}
private void DisposeReliablePackets()
{
foreach (var kvp in reliableDataPacketsSent)
{
if (this.reliableDataPacketsSent.TryRemove(kvp.Key, out var pkt))
{
pkt.Recycle();
}
}
}
}
}