summaryrefslogtreecommitdiff
path: root/Other/AstarPathfindingDemo/Packages/com.arongranberg.astar/Core/Pathfinding/BlockableChannel.cs
diff options
context:
space:
mode:
authorchai <215380520@qq.com>2024-05-23 10:08:29 +0800
committerchai <215380520@qq.com>2024-05-23 10:08:29 +0800
commit8722a9920c1f6119bf6e769cba270e63097f8e25 (patch)
tree2eaf9865de7fb1404546de4a4296553d8f68cc3b /Other/AstarPathfindingDemo/Packages/com.arongranberg.astar/Core/Pathfinding/BlockableChannel.cs
parent3ba4020b69e5971bb0df7ee08b31d10ea4d01937 (diff)
+ astar project
Diffstat (limited to 'Other/AstarPathfindingDemo/Packages/com.arongranberg.astar/Core/Pathfinding/BlockableChannel.cs')
-rw-r--r--Other/AstarPathfindingDemo/Packages/com.arongranberg.astar/Core/Pathfinding/BlockableChannel.cs212
1 files changed, 212 insertions, 0 deletions
diff --git a/Other/AstarPathfindingDemo/Packages/com.arongranberg.astar/Core/Pathfinding/BlockableChannel.cs b/Other/AstarPathfindingDemo/Packages/com.arongranberg.astar/Core/Pathfinding/BlockableChannel.cs
new file mode 100644
index 0000000..e3fce03
--- /dev/null
+++ b/Other/AstarPathfindingDemo/Packages/com.arongranberg.astar/Core/Pathfinding/BlockableChannel.cs
@@ -0,0 +1,212 @@
+using System.Threading;
+using Pathfinding.Util;
+using UnityEngine.Assertions;
+
+namespace Pathfinding {
+ /// <summary>
+ /// Multi-producer-multi-consumer (MPMC) channel.
+ ///
+ /// This is a channel that can be used to send data between threads.
+ /// It is thread safe and can be used by multiple threads at the same time.
+ ///
+ /// Additionally, the channel can be put into a blocking mode, which will cause all calls to Receive to block until the channel is unblocked.
+ /// </summary>
+ internal class BlockableChannel<T> where T : class {
+ public enum PopState {
+ Ok,
+ Wait,
+ Closed,
+ }
+
+ readonly System.Object lockObj = new System.Object();
+
+ CircularBuffer<T> queue = new CircularBuffer<T>(16);
+ public int numReceivers { get; private set; }
+
+ // Marked as volatile such that the compiler will not try to optimize the allReceiversBlocked property too much (this is more of a theoretical concern than a practical issue).
+ volatile int waitingReceivers;
+#if !UNITY_WEBGL
+ ManualResetEvent starving = new ManualResetEvent(false);
+#endif
+ bool blocked;
+
+ /// <summary>True if <see cref="Close"/> has been called</summary>
+ public bool isClosed { get; private set; }
+
+ /// <summary>True if the queue is empty</summary>
+ public bool isEmpty {
+ get {
+ lock (lockObj) {
+ return queue.Length == 0;
+ }
+ }
+ }
+
+ /// <summary>True if blocking and all receivers are waiting for unblocking</summary>
+ // Note: This is designed to be lock-free for performance. But it will only generate a useful value if called from the same thread that is blocking/unblocking the queue, otherwise the return value could become invalid at any time.
+ public bool allReceiversBlocked => blocked && waitingReceivers == numReceivers;
+
+ /// <summary>If true, all calls to Receive will block until this property is set to false</summary>
+ public bool isBlocked {
+ get => blocked;
+ set {
+ lock (lockObj) {
+ blocked = value;
+ if (isClosed) return;
+ isStarving = value || queue.Length == 0;
+ }
+ }
+ }
+
+ /// <summary>All calls to Receive and ReceiveNoBlock will now return PopState.Closed</summary>
+ public void Close () {
+ lock (lockObj) {
+ isClosed = true;
+ isStarving = false;
+ }
+ }
+
+ bool isStarving {
+ get {
+#if UNITY_WEBGL
+ // In WebGL, semaphores are not supported.
+ // They will compile, but they don't work properly.
+ // So instead we directly use what the starving semaphore should indicate.
+ return (blocked || queue.Length == 0) && !isClosed;
+#else
+ return !starving.WaitOne(0);
+#endif
+ }
+ set {
+#if !UNITY_WEBGL
+ if (value) starving.Reset();
+ else starving.Set();
+#endif
+ }
+ }
+
+ /// <summary>
+ /// Resets a closed channel so that it can be used again.
+ ///
+ /// The existing queue is preserved.
+ ///
+ /// This will throw an exception if there are any receivers still active.
+ /// </summary>
+ public void Reopen () {
+ lock (lockObj) {
+ if (numReceivers != 0) throw new System.InvalidOperationException("Can only reopen a channel after Close has been called on all receivers");
+ Assert.AreEqual(waitingReceivers, 0);
+ isClosed = false;
+ isBlocked = false;
+ }
+ }
+
+ public Receiver AddReceiver () {
+ lock (lockObj) {
+ if (isClosed) throw new System.InvalidOperationException("Channel is closed");
+ this.numReceivers++;
+ return new Receiver(this);
+ }
+ }
+
+ /// <summary>Push a path to the front of the queue</summary>
+ public void PushFront (T path) {
+ lock (lockObj) {
+ if (isClosed) throw new System.InvalidOperationException("Channel is closed");
+ queue.PushStart(path);
+ if (!blocked) isStarving = false;
+ }
+ }
+
+ /// <summary>Push a path to the end of the queue</summary>
+ public void Push (T path) {
+ lock (lockObj) {
+ if (isClosed) throw new System.InvalidOperationException("Channel is closed");
+ queue.PushEnd(path);
+ if (!blocked) isStarving = false;
+ }
+ }
+
+ /// <summary>Allows receiving items from a channel</summary>
+ public struct Receiver {
+ BlockableChannel<T> channel;
+
+ public Receiver(BlockableChannel<T> channel) {
+ this.channel = channel;
+ }
+
+ /// <summary>
+ /// Call when a receiver was terminated.
+ ///
+ /// After this call, this receiver cannot be used for anything.
+ /// </summary>
+ public void Close () {
+ lock (channel.lockObj) {
+ Assert.IsTrue(channel.numReceivers > 0);
+ Assert.IsTrue(channel.waitingReceivers < channel.numReceivers);
+ channel.numReceivers--;
+ }
+ channel = null;
+ }
+
+ /// <summary>
+ /// Receives the next item from the channel.
+ /// This call will block if there are no items in the channel or if the channel is currently blocked.
+ ///
+ /// Returns: PopState.Ok and a non-null item in the normal case. Returns PopState.Closed if the channel has been closed.
+ /// </summary>
+ public PopState Receive (out T item) {
+#if UNITY_WEBGL
+ throw new System.Exception("Cannot block in WebGL. Use ReceiveNoBlock instead.");
+#else
+ Interlocked.Increment(ref channel.waitingReceivers);
+ while (true) {
+ channel.starving.WaitOne();
+ // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
+ lock (channel.lockObj) {
+ if (channel.isClosed) {
+ Interlocked.Decrement(ref channel.waitingReceivers);
+ item = null;
+ return PopState.Closed;
+ }
+ if (channel.queue.Length == 0) channel.isStarving = true;
+ if (channel.isStarving) continue;
+ Assert.IsFalse(channel.blocked);
+ Interlocked.Decrement(ref channel.waitingReceivers);
+ item = channel.queue.PopStart();
+ return PopState.Ok;
+ }
+ }
+#endif
+ }
+
+ /// <summary>
+ /// Receives the next item from the channel, this call will not block.
+ /// To ensure a consistent state, the caller must follow this pattern.
+ /// 1. Call ReceiveNoBlock(false), if PopState.Wait is returned, wait for a bit (e.g yield return null in a Unity coroutine)
+ /// 2. try again with PopNoBlock(true), if PopState.Wait, wait for a bit
+ /// 3. Repeat from step 2.
+ /// </summary>
+ public PopState ReceiveNoBlock (bool blockedBefore, out T item) {
+ item = null;
+ if (!blockedBefore) Interlocked.Increment(ref channel.waitingReceivers);
+ while (true) {
+ if (channel.isStarving) return PopState.Wait;
+ // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
+ lock (channel.lockObj) {
+ if (channel.isClosed) {
+ Interlocked.Decrement(ref channel.waitingReceivers);
+ return PopState.Closed;
+ }
+ if (channel.queue.Length == 0) channel.isStarving = true;
+ if (channel.isStarving) continue;
+ Assert.IsFalse(channel.blocked);
+ Interlocked.Decrement(ref channel.waitingReceivers);
+ item = channel.queue.PopStart();
+ return PopState.Ok;
+ }
+ }
+ }
+ }
+ }
+}