blob: a638113b8189336236bf5d57717c4ed66009a815 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
#ifndef JOB_SCHEDULER_H
#define JOB_SCHEDULER_H
#include "SimpleLock.h"
#include "Semaphore.h"
#include "Runtime/Modules/ExportModules.h"
// On single-CPU platforms, this should never be used
#define ENABLE_JOB_SCHEDULER (ENABLE_MULTITHREADED_CODE || ENABLE_MULTITHREADED_SKINNING)
#if ENABLE_JOB_SCHEDULER
#define ENABLE_JOB_SCHEDULER_PROFILER (!UNITY_EXTERNAL_TOOL && ENABLE_PROFILER)
struct JobGroup;
struct JobInfo;
class Thread;
class EXPORT_COREMODULE JobScheduler
{
public:
// Usage and restrictions:
// Jobs in the same group must be submitted from the same thread
// It's fine to submit jobs in separate groups from different threads
// It's also fine to WaitForGroup() on a another thread than BeginGroup()
//
// Submitting jobs is lockless and wakes up idle worker threads if needed.
// Worker threads are lockless when they keep consuming from the same queue.
// Changing queues or going idle requires a lock. This is to keep track of
// how many workers try to consume a queue when the group gets recycled.
// Hopefully this is none, but being lockless we don't really know if anyone
// got stuck during the size check on the queue and before reading the next
// element atomically. The size check can safely use old values except when
// the group is recycled. In that case we need to flush all worker threads
// from accessing the queue and doing an invalid comparison.
typedef void* (*JobFunction)(void*);
typedef void* volatile ReturnCode;
typedef int JobGroupID;
JobScheduler( int numthreads, int maxGroups, int startProcessor = -1 );
~JobScheduler();
JobGroupID BeginGroup( int maxJobs );
bool IsGroupFinished( JobGroupID group );
void WaitForGroup( JobGroupID group );
bool SubmitJob( JobGroupID group, JobFunction func, void *data, ReturnCode *returnCode );
int GetThreadCount () const { return m_ThreadCount; }
#if ENABLE_JOB_SCHEDULER_PROFILER
void EndProfilerFrame (UInt32 frameIDAndValidFlag);
#endif
private:
JobGroupID BeginGroupInternal( int maxJobs, bool isBlocking );
JobInfo* FetchNextJob( int& threadActiveGroup );
JobInfo* FetchJobInGroup( int group );
void ProcessJob( JobInfo& job, int group );
void AwakeIdleWorkerThreads( int count );
#if ENABLE_JOB_SCHEDULER_PROFILER
struct WorkerProfilerInfo
{
int endFrameID;
int pad[15]; // pad so individual threads don't hammer same cache line
};
static void HandleProfilerFrames (WorkerProfilerInfo* info, bool* insideFrame);
#endif
static void* WorkLoop( void* data );
private:
JobGroup* m_Groups;
int m_GroupCount;
int m_ThreadCount;
Thread* m_Threads;
volatile bool m_Quit;
SimpleLock m_Lock;
Semaphore m_AwakeSemaphore;
volatile int m_ThreadsIdle;
volatile int m_PriorityGroup;
#if ENABLE_JOB_SCHEDULER_PROFILER
WorkerProfilerInfo* m_ProfInfo;
int m_WorkerThreadCounter;
#endif
};
#if !UNITY_EXTERNAL_TOOL
void CreateJobScheduler();
void DestroyJobScheduler();
EXPORT_COREMODULE JobScheduler& GetJobScheduler();
#endif
#endif // ENABLE_MULTITHREADED_CODE
#endif
|