using System; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Impostor.Api.Net.Messages; using Microsoft.Extensions.ObjectPool; using Serilog; namespace Impostor.Hazel.Udp { /// /// Represents a connection that uses the UDP protocol. /// /// public abstract partial class UdpConnection : NetworkConnection { protected static readonly byte[] EmptyDisconnectBytes = { (byte)UdpSendOption.Disconnect }; private static readonly ILogger Logger = Log.ForContext(); private readonly ConnectionListener _listener; private readonly ObjectPool _readerPool; private readonly CancellationTokenSource _stoppingCts; private bool _isDisposing; private bool _isFirst = true; private Task _executingTask; protected UdpConnection(ConnectionListener listener, ObjectPool readerPool) { _listener = listener; _readerPool = readerPool; _stoppingCts = new CancellationTokenSource(); Pipeline = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }); } internal Channel Pipeline { get; } public Task StartAsync() { // Store the task we're executing _executingTask = Task.Factory.StartNew(ReadAsync, TaskCreationOptions.LongRunning); // If the task is completed then return it, this will bubble cancellation and failure to the caller if (_executingTask.IsCompleted) { return _executingTask; } // Otherwise it's running return Task.CompletedTask; } public void Stop() { // Stop called without start if (_executingTask == null) { return; } // Signal cancellation to methods. _stoppingCts.Cancel(); try { // Cancel reader. Pipeline.Writer.Complete(); } catch (ChannelClosedException) { // Already done. } // Remove references. if (!_isDisposing) { Dispose(true); } } private async Task ReadAsync() { var reader = new MessageReader(_readerPool); while (!_stoppingCts.IsCancellationRequested) { var result = await Pipeline.Reader.ReadAsync(_stoppingCts.Token); try { reader.Update(result); await HandleReceive(reader); } catch (Exception e) { Logger.Error(e, "Exception during ReadAsync"); Dispose(true); break; } } } /// /// Writes the given bytes to the connection. /// /// The bytes to write. /// protected abstract ValueTask WriteBytesToConnection(byte[] bytes, int length); /// public override async ValueTask SendAsync(IMessageWriter msg) { if (this._state != ConnectionState.Connected) throw new InvalidOperationException("Could not send data as this Connection is not connected. Did you disconnect?"); byte[] buffer = new byte[msg.Length]; Buffer.BlockCopy(msg.Buffer, 0, buffer, 0, msg.Length); switch (msg.SendOption) { case MessageType.Reliable: ResetKeepAliveTimer(); AttachReliableID(buffer, 1, buffer.Length); await WriteBytesToConnection(buffer, buffer.Length); Statistics.LogReliableSend(buffer.Length - 3, buffer.Length); break; default: await WriteBytesToConnection(buffer, buffer.Length); Statistics.LogUnreliableSend(buffer.Length - 1, buffer.Length); break; } } /// /// /// /// /// Udp connections can currently send messages using and /// . Fragmented messages are not currently supported and will default to /// until implemented. /// /// public override async ValueTask SendBytes(byte[] bytes, MessageType sendOption = MessageType.Unreliable) { //Add header information and send await HandleSend(bytes, (byte)sendOption); } /// /// Handles the reliable/fragmented sending from this connection. /// /// The data being sent. /// The specified as its byte value. /// The callback to invoke when this packet is acknowledged. /// The bytes that should actually be sent. protected async ValueTask HandleSend(byte[] data, byte sendOption, Action ackCallback = null) { switch (sendOption) { case (byte)UdpSendOption.Ping: case (byte)MessageType.Reliable: case (byte)UdpSendOption.Hello: await ReliableSend(sendOption, data, ackCallback); break; //Treat all else as unreliable default: await UnreliableSend(sendOption, data); break; } } /// /// Handles the receiving of data. /// /// The buffer containing the bytes received. protected async ValueTask HandleReceive(MessageReader message) { // Check if the first message received is the hello packet. if (_isFirst) { _isFirst = false; // Slice 4 bytes to get handshake data. if (_listener != null) { using (var handshake = message.Copy(4)) { await _listener.InvokeNewConnection(handshake, this); } } } switch (message.Buffer[0]) { //Handle reliable receives case (byte)MessageType.Reliable: await ReliableMessageReceive(message); break; //Handle acknowledgments case (byte)UdpSendOption.Acknowledgement: AcknowledgementMessageReceive(message.Buffer); break; //We need to acknowledge hello and ping messages but dont want to invoke any events! case (byte)UdpSendOption.Ping: await ProcessReliableReceive(message.Buffer, 1); Statistics.LogHelloReceive(message.Length); break; case (byte)UdpSendOption.Hello: await ProcessReliableReceive(message.Buffer, 1); Statistics.LogHelloReceive(message.Length); break; case (byte)UdpSendOption.Disconnect: using (var reader = message.Copy(1)) { await DisconnectRemote("The remote sent a disconnect request", reader); } break; //Treat everything else as unreliable default: using (var reader = message.Copy(1)) { await InvokeDataReceived(reader, MessageType.Unreliable); } Statistics.LogUnreliableReceive(message.Length - 1, message.Length); break; } } /// /// Sends bytes using the unreliable UDP protocol. /// /// The SendOption to attach. /// The data. ValueTask UnreliableSend(byte sendOption, byte[] data) { return UnreliableSend(sendOption, data, 0, data.Length); } /// /// Sends bytes using the unreliable UDP protocol. /// /// The data. /// The SendOption to attach. /// /// async ValueTask UnreliableSend(byte sendOption, byte[] data, int offset, int length) { byte[] bytes = new byte[length + 1]; //Add message type bytes[0] = sendOption; //Copy data into new array Buffer.BlockCopy(data, offset, bytes, bytes.Length - length, length); //Write to connection await WriteBytesToConnection(bytes, bytes.Length); Statistics.LogUnreliableSend(length, bytes.Length); } /// /// Sends a hello packet to the remote endpoint. /// /// /// The callback to invoke when the hello packet is acknowledged. protected ValueTask SendHello(byte[] bytes, Action acknowledgeCallback) { //First byte of handshake is version indicator so add data after byte[] actualBytes; if (bytes == null) { actualBytes = new byte[1]; } else { actualBytes = new byte[bytes.Length + 1]; Buffer.BlockCopy(bytes, 0, actualBytes, 1, bytes.Length); } return HandleSend(actualBytes, (byte)UdpSendOption.Hello, acknowledgeCallback); } /// protected override void Dispose(bool disposing) { if (disposing) { _isDisposing = true; Stop(); DisposeKeepAliveTimer(); DisposeReliablePackets(); } base.Dispose(disposing); } } }