using System.Threading;
using Pathfinding.Util;
using UnityEngine.Assertions;
namespace Pathfinding {
///
/// Multi-producer-multi-consumer (MPMC) channel.
///
/// This is a channel that can be used to send data between threads.
/// It is thread safe and can be used by multiple threads at the same time.
///
/// Additionally, the channel can be put into a blocking mode, which will cause all calls to Receive to block until the channel is unblocked.
///
internal class BlockableChannel where T : class {
public enum PopState {
Ok,
Wait,
Closed,
}
readonly System.Object lockObj = new System.Object();
CircularBuffer queue = new CircularBuffer(16);
public int numReceivers { get; private set; }
// Marked as volatile such that the compiler will not try to optimize the allReceiversBlocked property too much (this is more of a theoretical concern than a practical issue).
volatile int waitingReceivers;
#if !UNITY_WEBGL
ManualResetEvent starving = new ManualResetEvent(false);
#endif
bool blocked;
/// True if has been called
public bool isClosed { get; private set; }
/// True if the queue is empty
public bool isEmpty {
get {
lock (lockObj) {
return queue.Length == 0;
}
}
}
/// True if blocking and all receivers are waiting for unblocking
// Note: This is designed to be lock-free for performance. But it will only generate a useful value if called from the same thread that is blocking/unblocking the queue, otherwise the return value could become invalid at any time.
public bool allReceiversBlocked => blocked && waitingReceivers == numReceivers;
/// If true, all calls to Receive will block until this property is set to false
public bool isBlocked {
get => blocked;
set {
lock (lockObj) {
blocked = value;
if (isClosed) return;
isStarving = value || queue.Length == 0;
}
}
}
/// All calls to Receive and ReceiveNoBlock will now return PopState.Closed
public void Close () {
lock (lockObj) {
isClosed = true;
isStarving = false;
}
}
bool isStarving {
get {
#if UNITY_WEBGL
// In WebGL, semaphores are not supported.
// They will compile, but they don't work properly.
// So instead we directly use what the starving semaphore should indicate.
return (blocked || queue.Length == 0) && !isClosed;
#else
return !starving.WaitOne(0);
#endif
}
set {
#if !UNITY_WEBGL
if (value) starving.Reset();
else starving.Set();
#endif
}
}
///
/// Resets a closed channel so that it can be used again.
///
/// The existing queue is preserved.
///
/// This will throw an exception if there are any receivers still active.
///
public void Reopen () {
lock (lockObj) {
if (numReceivers != 0) throw new System.InvalidOperationException("Can only reopen a channel after Close has been called on all receivers");
Assert.AreEqual(waitingReceivers, 0);
isClosed = false;
isBlocked = false;
}
}
public Receiver AddReceiver () {
lock (lockObj) {
if (isClosed) throw new System.InvalidOperationException("Channel is closed");
this.numReceivers++;
return new Receiver(this);
}
}
/// Push a path to the front of the queue
public void PushFront (T path) {
lock (lockObj) {
if (isClosed) throw new System.InvalidOperationException("Channel is closed");
queue.PushStart(path);
if (!blocked) isStarving = false;
}
}
/// Push a path to the end of the queue
public void Push (T path) {
lock (lockObj) {
if (isClosed) throw new System.InvalidOperationException("Channel is closed");
queue.PushEnd(path);
if (!blocked) isStarving = false;
}
}
/// Allows receiving items from a channel
public struct Receiver {
BlockableChannel channel;
public Receiver(BlockableChannel channel) {
this.channel = channel;
}
///
/// Call when a receiver was terminated.
///
/// After this call, this receiver cannot be used for anything.
///
public void Close () {
lock (channel.lockObj) {
Assert.IsTrue(channel.numReceivers > 0);
Assert.IsTrue(channel.waitingReceivers < channel.numReceivers);
channel.numReceivers--;
}
channel = null;
}
///
/// Receives the next item from the channel.
/// This call will block if there are no items in the channel or if the channel is currently blocked.
///
/// Returns: PopState.Ok and a non-null item in the normal case. Returns PopState.Closed if the channel has been closed.
///
public PopState Receive (out T item) {
#if UNITY_WEBGL
throw new System.Exception("Cannot block in WebGL. Use ReceiveNoBlock instead.");
#else
Interlocked.Increment(ref channel.waitingReceivers);
while (true) {
channel.starving.WaitOne();
// Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
lock (channel.lockObj) {
if (channel.isClosed) {
Interlocked.Decrement(ref channel.waitingReceivers);
item = null;
return PopState.Closed;
}
if (channel.queue.Length == 0) channel.isStarving = true;
if (channel.isStarving) continue;
Assert.IsFalse(channel.blocked);
Interlocked.Decrement(ref channel.waitingReceivers);
item = channel.queue.PopStart();
return PopState.Ok;
}
}
#endif
}
///
/// Receives the next item from the channel, this call will not block.
/// To ensure a consistent state, the caller must follow this pattern.
/// 1. Call ReceiveNoBlock(false), if PopState.Wait is returned, wait for a bit (e.g yield return null in a Unity coroutine)
/// 2. try again with PopNoBlock(true), if PopState.Wait, wait for a bit
/// 3. Repeat from step 2.
///
public PopState ReceiveNoBlock (bool blockedBefore, out T item) {
item = null;
if (!blockedBefore) Interlocked.Increment(ref channel.waitingReceivers);
while (true) {
if (channel.isStarving) return PopState.Wait;
// Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
lock (channel.lockObj) {
if (channel.isClosed) {
Interlocked.Decrement(ref channel.waitingReceivers);
return PopState.Closed;
}
if (channel.queue.Length == 0) channel.isStarving = true;
if (channel.isStarving) continue;
Assert.IsFalse(channel.blocked);
Interlocked.Decrement(ref channel.waitingReceivers);
item = channel.queue.PopStart();
return PopState.Ok;
}
}
}
}
}
}