using System; using System.Collections.Concurrent; using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; namespace Hazel.Udp { /// /// Listens for new UDP connections and creates UdpConnections for them. /// /// public class UdpConnectionListener : NetworkConnectionListener { private const int SendReceiveBufferSize = 1024 * 1024; private const int BufferSize = ushort.MaxValue; private Socket socket; private ILogger Logger; private Timer reliablePacketTimer; private ConcurrentDictionary allConnections = new ConcurrentDictionary(); public override double AveragePing => this.allConnections.Values.Sum(c => c.AveragePingMs) / this.allConnections.Count; public override int ConnectionCount { get { return this.allConnections.Count; } } public override int ReceiveQueueLength => throw new NotImplementedException(); public override int SendQueueLength => throw new NotImplementedException(); /// /// Creates a new UdpConnectionListener for the given , port and . /// /// The endpoint to listen on. public UdpConnectionListener(IPEndPoint endPoint, IPMode ipMode = IPMode.IPv4, ILogger logger = null) { this.Logger = logger; this.EndPoint = endPoint; this.IPMode = ipMode; this.socket = UdpConnection.CreateSocket(this.IPMode); socket.ReceiveBufferSize = SendReceiveBufferSize; socket.SendBufferSize = SendReceiveBufferSize; reliablePacketTimer = new Timer(ManageReliablePackets, null, 100, Timeout.Infinite); } ~UdpConnectionListener() { this.Dispose(false); } private void ManageReliablePackets(object state) { foreach (var kvp in this.allConnections) { var sock = kvp.Value; sock.ManageReliablePackets(); } try { this.reliablePacketTimer.Change(100, Timeout.Infinite); } catch { } } /// public override void Start() { try { socket.Bind(EndPoint); } catch (SocketException e) { throw new HazelException("Could not start listening as a SocketException occurred", e); } StartListeningForData(); } /// /// Instructs the listener to begin listening. /// private void StartListeningForData() { EndPoint remoteEP = EndPoint; MessageReader message = null; try { message = MessageReader.GetSized(this.ReceiveBufferSize); socket.BeginReceiveFrom(message.Buffer, 0, message.Buffer.Length, SocketFlags.None, ref remoteEP, ReadCallback, message); } catch (SocketException sx) { message?.Recycle(); this.Logger?.WriteError("Socket Ex in StartListening: " + sx.Message); Thread.Sleep(10); StartListeningForData(); return; } catch (Exception ex) { message.Recycle(); this.Logger?.WriteError("Stopped due to: " + ex.Message); return; } } void ReadCallback(IAsyncResult result) { var message = (MessageReader)result.AsyncState; int bytesReceived; EndPoint remoteEndPoint = new IPEndPoint(this.EndPoint.Address, this.EndPoint.Port); //End the receive operation try { bytesReceived = socket.EndReceiveFrom(result, ref remoteEndPoint); message.Offset = 0; message.Length = bytesReceived; } catch (ObjectDisposedException) { message.Recycle(); return; } catch (SocketException sx) { message.Recycle(); if (sx.SocketErrorCode == SocketError.NotConnected) { this.InvokeInternalError(HazelInternalErrors.ConnectionDisconnected); return; } // Client no longer reachable, pretend it didn't happen // TODO should this not inform the connection this client is lost??? // This thread suggests the IP is not passed out from WinSoc so maybe not possible // http://stackoverflow.com/questions/2576926/python-socket-error-on-udp-data-receive-10054 this.Logger?.WriteError($"Socket Ex {sx.SocketErrorCode} in ReadCallback: {sx.Message}"); Thread.Sleep(10); StartListeningForData(); return; } catch (Exception ex) { // Idk, maybe a null ref after dispose? message.Recycle(); this.Logger?.WriteError("Stopped due to: " + ex.Message); return; } // I'm a little concerned about a infinite loop here, but it seems like it's possible // to get 0 bytes read on UDP without the socket being shut down. if (bytesReceived == 0) { message.Recycle(); this.Logger?.WriteInfo("Received 0 bytes"); Thread.Sleep(10); StartListeningForData(); return; } //Begin receiving again StartListeningForData(); bool aware = true; bool isHello = message.Buffer[0] == (byte)UdpSendOption.Hello; // If we're aware of this connection use the one already // If this is a new client then connect with them! UdpServerConnection connection; if (!this.allConnections.TryGetValue(remoteEndPoint, out connection)) { lock (this.allConnections) { if (!this.allConnections.TryGetValue(remoteEndPoint, out connection)) { // Check for malformed connection attempts if (!isHello) { message.Recycle(); return; } if (AcceptConnection != null) { if (!AcceptConnection((IPEndPoint)remoteEndPoint, message.Buffer, out var response)) { message.Recycle(); if (response != null) { SendData(response, response.Length, remoteEndPoint); } return; } } aware = false; connection = new UdpServerConnection(this, (IPEndPoint)remoteEndPoint, this.IPMode, this.Logger); if (!this.allConnections.TryAdd(remoteEndPoint, connection)) { throw new HazelException("Failed to add a connection. This should never happen."); } } } } // If it's a new connection invoke the NewConnection event. // This needs to happen before handling the message because in localhost scenarios, the ACK and // subsequent messages can happen before the NewConnection event sets up OnDataRecieved handlers if (!aware) { // Skip header and hello byte; message.Offset = 4; message.Length = bytesReceived - 4; message.Position = 0; InvokeNewConnection(message, connection); } // Inform the connection of the buffer (new connections need to send an ack back to client) connection.HandleReceive(message, bytesReceived); } #if DEBUG public int TestDropRate = -1; private int dropCounter = 0; #endif /// /// Sends data from the listener socket. /// /// The bytes to send. /// The endpoint to send to. internal void SendData(byte[] bytes, int length, EndPoint endPoint) { if (length > bytes.Length) return; #if DEBUG if (TestDropRate > 0) { if (Interlocked.Increment(ref dropCounter) % TestDropRate == 0) { return; } } #endif try { socket.BeginSendTo( bytes, 0, length, SocketFlags.None, endPoint, SendCallback, null); this.Statistics.AddBytesSent(length); } catch (SocketException e) { this.Logger?.WriteError("Could not send data as a SocketException occurred: " + e); } catch (ObjectDisposedException) { //Keep alive timer probably ran, ignore return; } } private void SendCallback(IAsyncResult result) { try { socket.EndSendTo(result); } catch { } } /// /// Sends data from the listener socket. /// /// The bytes to send. /// The endpoint to send to. internal void SendDataSync(byte[] bytes, int length, EndPoint endPoint) { try { socket.SendTo( bytes, 0, length, SocketFlags.None, endPoint ); this.Statistics.AddBytesSent(length); } catch { } } /// /// Removes a virtual connection from the list. /// /// The endpoint of the virtual connection. internal void RemoveConnectionTo(EndPoint endPoint) { this.allConnections.TryRemove(endPoint, out var conn); } /// protected override void Dispose(bool disposing) { foreach (var kvp in this.allConnections) { kvp.Value.Dispose(); } try { this.socket.Shutdown(SocketShutdown.Both); } catch { } try { this.socket.Close(); } catch { } try { this.socket.Dispose(); } catch { } this.reliablePacketTimer.Dispose(); base.Dispose(disposing); } } }