using System.Threading; using Pathfinding.Util; using UnityEngine.Assertions; namespace Pathfinding { /// /// Multi-producer-multi-consumer (MPMC) channel. /// /// This is a channel that can be used to send data between threads. /// It is thread safe and can be used by multiple threads at the same time. /// /// Additionally, the channel can be put into a blocking mode, which will cause all calls to Receive to block until the channel is unblocked. /// internal class BlockableChannel where T : class { public enum PopState { Ok, Wait, Closed, } readonly System.Object lockObj = new System.Object(); CircularBuffer queue = new CircularBuffer(16); public int numReceivers { get; private set; } // Marked as volatile such that the compiler will not try to optimize the allReceiversBlocked property too much (this is more of a theoretical concern than a practical issue). volatile int waitingReceivers; #if !UNITY_WEBGL ManualResetEvent starving = new ManualResetEvent(false); #endif bool blocked; /// True if has been called public bool isClosed { get; private set; } /// True if the queue is empty public bool isEmpty { get { lock (lockObj) { return queue.Length == 0; } } } /// True if blocking and all receivers are waiting for unblocking // Note: This is designed to be lock-free for performance. But it will only generate a useful value if called from the same thread that is blocking/unblocking the queue, otherwise the return value could become invalid at any time. public bool allReceiversBlocked => blocked && waitingReceivers == numReceivers; /// If true, all calls to Receive will block until this property is set to false public bool isBlocked { get => blocked; set { lock (lockObj) { blocked = value; if (isClosed) return; isStarving = value || queue.Length == 0; } } } /// All calls to Receive and ReceiveNoBlock will now return PopState.Closed public void Close () { lock (lockObj) { isClosed = true; isStarving = false; } } bool isStarving { get { #if UNITY_WEBGL // In WebGL, semaphores are not supported. // They will compile, but they don't work properly. // So instead we directly use what the starving semaphore should indicate. return (blocked || queue.Length == 0) && !isClosed; #else return !starving.WaitOne(0); #endif } set { #if !UNITY_WEBGL if (value) starving.Reset(); else starving.Set(); #endif } } /// /// Resets a closed channel so that it can be used again. /// /// The existing queue is preserved. /// /// This will throw an exception if there are any receivers still active. /// public void Reopen () { lock (lockObj) { if (numReceivers != 0) throw new System.InvalidOperationException("Can only reopen a channel after Close has been called on all receivers"); Assert.AreEqual(waitingReceivers, 0); isClosed = false; isBlocked = false; } } public Receiver AddReceiver () { lock (lockObj) { if (isClosed) throw new System.InvalidOperationException("Channel is closed"); this.numReceivers++; return new Receiver(this); } } /// Push a path to the front of the queue public void PushFront (T path) { lock (lockObj) { if (isClosed) throw new System.InvalidOperationException("Channel is closed"); queue.PushStart(path); if (!blocked) isStarving = false; } } /// Push a path to the end of the queue public void Push (T path) { lock (lockObj) { if (isClosed) throw new System.InvalidOperationException("Channel is closed"); queue.PushEnd(path); if (!blocked) isStarving = false; } } /// Allows receiving items from a channel public struct Receiver { BlockableChannel channel; public Receiver(BlockableChannel channel) { this.channel = channel; } /// /// Call when a receiver was terminated. /// /// After this call, this receiver cannot be used for anything. /// public void Close () { lock (channel.lockObj) { Assert.IsTrue(channel.numReceivers > 0); Assert.IsTrue(channel.waitingReceivers < channel.numReceivers); channel.numReceivers--; } channel = null; } /// /// Receives the next item from the channel. /// This call will block if there are no items in the channel or if the channel is currently blocked. /// /// Returns: PopState.Ok and a non-null item in the normal case. Returns PopState.Closed if the channel has been closed. /// public PopState Receive (out T item) { #if UNITY_WEBGL throw new System.Exception("Cannot block in WebGL. Use ReceiveNoBlock instead."); #else Interlocked.Increment(ref channel.waitingReceivers); while (true) { channel.starving.WaitOne(); // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking. lock (channel.lockObj) { if (channel.isClosed) { Interlocked.Decrement(ref channel.waitingReceivers); item = null; return PopState.Closed; } if (channel.queue.Length == 0) channel.isStarving = true; if (channel.isStarving) continue; Assert.IsFalse(channel.blocked); Interlocked.Decrement(ref channel.waitingReceivers); item = channel.queue.PopStart(); return PopState.Ok; } } #endif } /// /// Receives the next item from the channel, this call will not block. /// To ensure a consistent state, the caller must follow this pattern. /// 1. Call ReceiveNoBlock(false), if PopState.Wait is returned, wait for a bit (e.g yield return null in a Unity coroutine) /// 2. try again with PopNoBlock(true), if PopState.Wait, wait for a bit /// 3. Repeat from step 2. /// public PopState ReceiveNoBlock (bool blockedBefore, out T item) { item = null; if (!blockedBefore) Interlocked.Increment(ref channel.waitingReceivers); while (true) { if (channel.isStarving) return PopState.Wait; // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking. lock (channel.lockObj) { if (channel.isClosed) { Interlocked.Decrement(ref channel.waitingReceivers); return PopState.Closed; } if (channel.queue.Length == 0) channel.isStarving = true; if (channel.isStarving) continue; Assert.IsFalse(channel.blocked); Interlocked.Decrement(ref channel.waitingReceivers); item = channel.queue.PopStart(); return PopState.Ok; } } } } } }