1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
|
using System.Threading;
using Pathfinding.Util;
using UnityEngine.Assertions;
namespace Pathfinding {
/// <summary>
/// Multi-producer-multi-consumer (MPMC) channel.
///
/// This is a channel that can be used to send data between threads.
/// It is thread safe and can be used by multiple threads at the same time.
///
/// Additionally, the channel can be put into a blocking mode, which will cause all calls to Receive to block until the channel is unblocked.
/// </summary>
internal class BlockableChannel<T> where T : class {
public enum PopState {
Ok,
Wait,
Closed,
}
readonly System.Object lockObj = new System.Object();
CircularBuffer<T> queue = new CircularBuffer<T>(16);
public int numReceivers { get; private set; }
// Marked as volatile such that the compiler will not try to optimize the allReceiversBlocked property too much (this is more of a theoretical concern than a practical issue).
volatile int waitingReceivers;
#if !UNITY_WEBGL
ManualResetEvent starving = new ManualResetEvent(false);
#endif
bool blocked;
/// <summary>True if <see cref="Close"/> has been called</summary>
public bool isClosed { get; private set; }
/// <summary>True if the queue is empty</summary>
public bool isEmpty {
get {
lock (lockObj) {
return queue.Length == 0;
}
}
}
/// <summary>True if blocking and all receivers are waiting for unblocking</summary>
// Note: This is designed to be lock-free for performance. But it will only generate a useful value if called from the same thread that is blocking/unblocking the queue, otherwise the return value could become invalid at any time.
public bool allReceiversBlocked => blocked && waitingReceivers == numReceivers;
/// <summary>If true, all calls to Receive will block until this property is set to false</summary>
public bool isBlocked {
get => blocked;
set {
lock (lockObj) {
blocked = value;
if (isClosed) return;
isStarving = value || queue.Length == 0;
}
}
}
/// <summary>All calls to Receive and ReceiveNoBlock will now return PopState.Closed</summary>
public void Close () {
lock (lockObj) {
isClosed = true;
isStarving = false;
}
}
bool isStarving {
get {
#if UNITY_WEBGL
// In WebGL, semaphores are not supported.
// They will compile, but they don't work properly.
// So instead we directly use what the starving semaphore should indicate.
return (blocked || queue.Length == 0) && !isClosed;
#else
return !starving.WaitOne(0);
#endif
}
set {
#if !UNITY_WEBGL
if (value) starving.Reset();
else starving.Set();
#endif
}
}
/// <summary>
/// Resets a closed channel so that it can be used again.
///
/// The existing queue is preserved.
///
/// This will throw an exception if there are any receivers still active.
/// </summary>
public void Reopen () {
lock (lockObj) {
if (numReceivers != 0) throw new System.InvalidOperationException("Can only reopen a channel after Close has been called on all receivers");
Assert.AreEqual(waitingReceivers, 0);
isClosed = false;
isBlocked = false;
}
}
public Receiver AddReceiver () {
lock (lockObj) {
if (isClosed) throw new System.InvalidOperationException("Channel is closed");
this.numReceivers++;
return new Receiver(this);
}
}
/// <summary>Push a path to the front of the queue</summary>
public void PushFront (T path) {
lock (lockObj) {
if (isClosed) throw new System.InvalidOperationException("Channel is closed");
queue.PushStart(path);
if (!blocked) isStarving = false;
}
}
/// <summary>Push a path to the end of the queue</summary>
public void Push (T path) {
lock (lockObj) {
if (isClosed) throw new System.InvalidOperationException("Channel is closed");
queue.PushEnd(path);
if (!blocked) isStarving = false;
}
}
/// <summary>Allows receiving items from a channel</summary>
public struct Receiver {
BlockableChannel<T> channel;
public Receiver(BlockableChannel<T> channel) {
this.channel = channel;
}
/// <summary>
/// Call when a receiver was terminated.
///
/// After this call, this receiver cannot be used for anything.
/// </summary>
public void Close () {
lock (channel.lockObj) {
Assert.IsTrue(channel.numReceivers > 0);
Assert.IsTrue(channel.waitingReceivers < channel.numReceivers);
channel.numReceivers--;
}
channel = null;
}
/// <summary>
/// Receives the next item from the channel.
/// This call will block if there are no items in the channel or if the channel is currently blocked.
///
/// Returns: PopState.Ok and a non-null item in the normal case. Returns PopState.Closed if the channel has been closed.
/// </summary>
public PopState Receive (out T item) {
#if UNITY_WEBGL
throw new System.Exception("Cannot block in WebGL. Use ReceiveNoBlock instead.");
#else
Interlocked.Increment(ref channel.waitingReceivers);
while (true) {
channel.starving.WaitOne();
// Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
lock (channel.lockObj) {
if (channel.isClosed) {
Interlocked.Decrement(ref channel.waitingReceivers);
item = null;
return PopState.Closed;
}
if (channel.queue.Length == 0) channel.isStarving = true;
if (channel.isStarving) continue;
Assert.IsFalse(channel.blocked);
Interlocked.Decrement(ref channel.waitingReceivers);
item = channel.queue.PopStart();
return PopState.Ok;
}
}
#endif
}
/// <summary>
/// Receives the next item from the channel, this call will not block.
/// To ensure a consistent state, the caller must follow this pattern.
/// 1. Call ReceiveNoBlock(false), if PopState.Wait is returned, wait for a bit (e.g yield return null in a Unity coroutine)
/// 2. try again with PopNoBlock(true), if PopState.Wait, wait for a bit
/// 3. Repeat from step 2.
/// </summary>
public PopState ReceiveNoBlock (bool blockedBefore, out T item) {
item = null;
if (!blockedBefore) Interlocked.Increment(ref channel.waitingReceivers);
while (true) {
if (channel.isStarving) return PopState.Wait;
// Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
lock (channel.lockObj) {
if (channel.isClosed) {
Interlocked.Decrement(ref channel.waitingReceivers);
return PopState.Closed;
}
if (channel.queue.Length == 0) channel.isStarving = true;
if (channel.isStarving) continue;
Assert.IsFalse(channel.blocked);
Interlocked.Decrement(ref channel.waitingReceivers);
item = channel.queue.PopStart();
return PopState.Ok;
}
}
}
}
}
}
|