summaryrefslogtreecommitdiff
path: root/Runtime/Threads/JobScheduler.cpp
diff options
context:
space:
mode:
authorchai <chaifix@163.com>2019-08-14 22:50:43 +0800
committerchai <chaifix@163.com>2019-08-14 22:50:43 +0800
commit15740faf9fe9fe4be08965098bbf2947e096aeeb (patch)
treea730ec236656cc8cab5b13f088adfaed6bb218fb /Runtime/Threads/JobScheduler.cpp
+Unity Runtime codeHEADmaster
Diffstat (limited to 'Runtime/Threads/JobScheduler.cpp')
-rw-r--r--Runtime/Threads/JobScheduler.cpp447
1 files changed, 447 insertions, 0 deletions
diff --git a/Runtime/Threads/JobScheduler.cpp b/Runtime/Threads/JobScheduler.cpp
new file mode 100644
index 0000000..cf28a71
--- /dev/null
+++ b/Runtime/Threads/JobScheduler.cpp
@@ -0,0 +1,447 @@
+#include "UnityPrefix.h"
+#include "Configuration/UnityConfigure.h"
+#include "JobScheduler.h"
+
+#if ENABLE_JOB_SCHEDULER
+
+#include "Thread.h"
+#include "ThreadUtility.h"
+#include "Runtime/Utilities/dynamic_array.h"
+#include "Mutex.h"
+#include "Semaphore.h"
+#include "Event.h"
+#if !UNITY_EXTERNAL_TOOL
+#include "Runtime/Misc/SystemInfo.h"
+#include "Runtime/Profiler/Profiler.h"
+#endif
+#include "AtomicOps.h"
+
+// ------------------------------------------------------------------------
+
+struct JobInfo
+{
+ JobScheduler::JobFunction func;
+ void* userData;
+ JobScheduler::ReturnCode* returnCode;
+};
+
+// ------------------------------------------------------------------------
+
+struct JobGroup
+{
+ enum
+ {
+ kSizeIncrement = 256,
+ kTaskCountUnused = -1
+ };
+ JobGroup() : taskCount(kTaskCountUnused), activeThreads(0), nextJob(0), jobsAdded(0) {}
+
+ volatile int taskCount;
+ volatile int activeThreads;
+ volatile int nextJob;
+ volatile int jobsAdded;
+ dynamic_array<JobInfo> jobQueue;
+ Semaphore doneSemaphore;
+};
+
+// ------------------------------------------------------------------------
+
+JobInfo* JobScheduler::FetchNextJob( int& activeGroup )
+{
+ if( activeGroup == m_PriorityGroup )
+ {
+ JobInfo* job = FetchJobInGroup(activeGroup);
+ if( job )
+ return job;
+ }
+
+ // We need a lock to change groups!
+ SimpleLock::AutoLock lock(m_Lock);
+ if( activeGroup != -1 )
+ m_Groups[activeGroup].activeThreads--;
+ int group = m_PriorityGroup;
+ for( int i = 0; i < m_GroupCount; i++ )
+ {
+ JobInfo* job = FetchJobInGroup(group);
+ if( job )
+ {
+ m_Groups[group].activeThreads++;
+ // Set priority group to one that is actually available
+ // Less work for other threads to find a good group
+ m_PriorityGroup = group;
+ activeGroup = group;
+ return job;
+ }
+ if( ++group >= m_GroupCount )
+ group = 0;
+ }
+ activeGroup = -1;
+ return NULL;
+}
+
+JobInfo* JobScheduler::FetchJobInGroup( int group )
+{
+ JobGroup& jg = m_Groups[group];
+ int curJob = jg.nextJob;
+ while( curJob < jg.jobsAdded )
+ {
+ if( AtomicCompareExchange(&jg.nextJob, curJob + 1, curJob) )
+ return &jg.jobQueue[curJob];
+
+ curJob = jg.nextJob;
+ }
+ return NULL;
+}
+
+void JobScheduler::ProcessJob( JobInfo& job, int group )
+{
+ DebugAssert( job.func );
+ void* ret = job.func(job.userData);
+ if( job.returnCode )
+ {
+ UnityMemoryBarrier();
+ *job.returnCode = ret;
+ }
+
+ // Signal if we are the last to finish
+ JobGroup& jg = m_Groups[group];
+ if( AtomicDecrement(&jg.taskCount) == 0 )
+ jg.doneSemaphore.Signal();
+}
+
+
+void JobScheduler::AwakeIdleWorkerThreads( int count )
+{
+ for( int i = 0; i < count && m_ThreadsIdle > 0; i++ )
+ {
+ if( AtomicDecrement(&m_ThreadsIdle) < 0 )
+ {
+ AtomicIncrement(&m_ThreadsIdle);
+ break;
+ }
+ m_AwakeSemaphore.Signal();
+ }
+}
+
+void* JobScheduler::WorkLoop( void* data )
+{
+ int activeGroup = -1;
+ JobScheduler* js = (JobScheduler*)data;
+
+ #if ENABLE_JOB_SCHEDULER_PROFILER
+ profiler_initialize_thread ("Worker Thread", true);
+ int workThreadIndex = AtomicIncrement (&js->m_WorkerThreadCounter);
+ WorkerProfilerInfo* profInfo = &js->m_ProfInfo[workThreadIndex];
+ bool insideFrame = false;
+ #endif
+
+ while( !js->m_Quit )
+ {
+ #if ENABLE_JOB_SCHEDULER_PROFILER
+ HandleProfilerFrames (profInfo, &insideFrame);
+ #endif
+
+ JobInfo* job = js->FetchNextJob(activeGroup);
+ if( job )
+ {
+ js->ProcessJob(*job, activeGroup);
+ }
+ else
+ {
+ AtomicIncrement(&js->m_ThreadsIdle);
+ js->m_AwakeSemaphore.WaitForSignal();
+ }
+ }
+
+ #if ENABLE_JOB_SCHEDULER_PROFILER
+ if (insideFrame)
+ {
+ //@TODO: Is this actually necessary? (It seems like cleanup thread should take care of killing it all?)
+ profiler_set_active_seperate_thread(false);
+ profiler_end_frame_seperate_thread(0);
+ }
+ profiler_cleanup_thread();
+ #endif
+
+ return NULL;
+}
+
+
+#if ENABLE_JOB_SCHEDULER_PROFILER
+void JobScheduler::HandleProfilerFrames (WorkerProfilerInfo* profInfo, bool* insideFrame)
+{
+ // Don't do fancy synchnonization here; all we need is for worker
+ // threads to do begin/end profiler frame once in a while.
+ // Worst case, we'll get a missing profiler info for a frame.
+ int endFrameID = profInfo->endFrameID;
+ if (endFrameID != -1)
+ {
+ if (*insideFrame)
+ {
+ profiler_set_active_seperate_thread (false);
+ profiler_end_frame_seperate_thread (endFrameID);
+ *insideFrame = false;
+ }
+ profiler_begin_frame_seperate_thread (kProfilerGame);
+ profiler_set_active_seperate_thread (true);
+ *insideFrame = true;
+
+ profInfo->endFrameID = -1;
+ UnityMemoryBarrier();
+ }
+}
+#endif
+
+
+
+JobScheduler::JobScheduler( int numthreads, int maxGroups, int startProcessor )
+: m_ThreadCount( numthreads )
+, m_GroupCount(maxGroups)
+, m_Quit(false)
+, m_ThreadsIdle(0)
+, m_PriorityGroup(0)
+{
+ m_Groups = new JobGroup[maxGroups];
+ if( m_ThreadCount > 0 )
+ {
+ #if ENABLE_JOB_SCHEDULER_PROFILER
+ m_WorkerThreadCounter = -1;
+ m_ProfInfo = new WorkerProfilerInfo[numthreads];
+ memset (m_ProfInfo, -1, sizeof(m_ProfInfo[0])*numthreads);
+ #endif
+
+ m_Threads = new Thread[numthreads];
+
+ for( int i = 0; i < numthreads; ++i )
+ {
+ int processor = DEFAULT_UNITY_THREAD_PROCESSOR;
+ if( startProcessor >= 0 )
+ {
+ processor = startProcessor + i;
+ }
+ m_Threads[i].SetName ("UnityWorker");
+ m_Threads[i].Run( WorkLoop, this, DEFAULT_UNITY_THREAD_STACK_SIZE, processor );
+ }
+ }
+ else
+ {
+ m_Threads = NULL;
+ #if ENABLE_JOB_SCHEDULER_PROFILER
+ m_WorkerThreadCounter = -1;
+ m_ProfInfo = NULL;
+ #endif
+ }
+}
+
+JobScheduler::~JobScheduler()
+{
+ if( m_ThreadCount > 0 )
+ {
+ m_Quit = true;
+ UnityMemoryBarrier();
+ // wait while threads exit
+ // first signal as many times as there are threads, then wait for each to exit
+ for( int i = 0; i < m_ThreadCount; ++i )
+ m_AwakeSemaphore.Signal();
+ for( int i = 0; i < m_ThreadCount; ++i )
+ m_Threads[i].WaitForExit();
+ delete[] m_Threads;
+
+ #if ENABLE_JOB_SCHEDULER_PROFILER
+ delete[] m_ProfInfo;
+ #endif
+ }
+
+ delete[] m_Groups;
+}
+
+bool JobScheduler::SubmitJob( JobGroupID group, JobFunction func, void *data, ReturnCode *returnCode )
+{
+ AssertIf( func == NULL );
+ if( m_ThreadCount <= 0 )
+ {
+ // not multi-threaded: execute job right now
+ void* z = func( data );
+ if( returnCode )
+ *returnCode = z;
+ return true;
+ }
+
+ if( group >= m_GroupCount || group < 0)
+ {
+ ErrorString( "Invalid job group ID" );
+ return false;
+ }
+
+ JobGroup& jg = m_Groups[group];
+ AtomicIncrement(&jg.taskCount);
+ int jobIndex = jg.jobsAdded;
+ JobInfo& job = jg.jobQueue[jobIndex];
+ job.func = func;
+ job.userData = data;
+ job.returnCode = returnCode;
+ int nextIndex = AtomicIncrement(&jg.jobsAdded);
+ // This may fail if you add jobs from multiple threads to the same group
+ Assert(nextIndex == jobIndex + 1);
+ AwakeIdleWorkerThreads(nextIndex - jg.nextJob);
+ return true;
+}
+
+JobScheduler::JobGroupID JobScheduler::BeginGroup( int maxJobs )
+{
+ // See if we can allocate a group without blocking.
+ for( int isBlocking = 0; isBlocking < 2; ++isBlocking )
+ {
+ // If a group still has active threads we can't use it immediately after WaitForGroup().
+ // By blocking we guarantee to find a group, as long as we stay within maxGroups.
+ JobGroupID id = BeginGroupInternal(maxJobs, isBlocking != 0);
+ if( id != -1 )
+ return id;
+ }
+ ErrorString("JobScheduler: too many job groups");
+ return -1;
+}
+
+JobScheduler::JobGroupID JobScheduler::BeginGroupInternal( int maxJobs, bool isBlocking )
+{
+ // Find unused group. We need a lock for that.
+ m_Lock.Lock();
+ for( int i = 0; i < m_GroupCount; ++i )
+ {
+ JobGroup& group = m_Groups[i];
+ if( group.taskCount == JobGroup::kTaskCountUnused
+ && ( isBlocking || group.activeThreads == 0 ) )
+ {
+ // We consider finishing group a pending task
+ // Keeps job group alive until everything is done
+ group.taskCount = 1;
+
+ // Spin while worker threads are using our group
+ // Do this *after* we've marked it used (case 492417)
+ while( group.activeThreads != 0 )
+ {
+ m_Lock.Unlock();
+ m_Lock.Lock();
+ }
+ group.jobsAdded = 0;
+ group.nextJob = 0;
+ const int rounding = JobGroup::kSizeIncrement;
+ int roundedSize = (maxJobs + rounding - 1) / rounding * rounding;
+ group.jobQueue.reserve(roundedSize);
+ group.jobQueue.resize_uninitialized(maxJobs);
+ m_Lock.Unlock();
+ return i;
+ }
+ }
+ m_Lock.Unlock();
+ return -1;
+}
+
+bool JobScheduler::IsGroupFinished( JobGroupID group )
+{
+ const JobGroup& jg = m_Groups[group];
+ // Last reference is kept until WaitForGroup()
+ return jg.taskCount == 1;
+}
+
+void JobScheduler::WaitForGroup( JobGroupID group )
+{
+ if( group >= m_GroupCount )
+ {
+ ErrorString( "Invalid job group ID" );
+ return;
+ }
+
+ JobGroup& jg = m_Groups[group];
+
+ // Release our reference to job group
+ // Pending jobs (if any) also have refs
+ if( AtomicDecrement(&jg.taskCount) != 0 )
+ {
+ // Set our group as having priority so other threads fetch from it
+ m_PriorityGroup = group;
+
+ for( ;; )
+ {
+ JobInfo* job = FetchJobInGroup(group);
+ if( !job )
+ break;
+ ProcessJob(*job, group);
+ }
+
+ jg.doneSemaphore.WaitForSignal();
+ Assert(jg.nextJob == jg.jobsAdded);
+ Assert(jg.taskCount == 0);
+ }
+
+ // Set count to kTaskCountUnused (-1)
+ AtomicDecrement(&jg.taskCount);
+}
+
+
+#if ENABLE_JOB_SCHEDULER_PROFILER
+void JobScheduler::EndProfilerFrame (UInt32 frameIDAndValidFlag)
+{
+ // Don't do fancy synchnonization here; all we need is for worker
+ // threads to do begin/end profiler frame once in a while.
+ // Worst case, we'll get a missing profiler info for a frame.
+ for (int i = 0; i < m_ThreadCount; ++i)
+ {
+ m_ProfInfo[i].endFrameID = frameIDAndValidFlag;
+ }
+ UnityMemoryBarrier();
+}
+#endif
+
+
+// ----------------------------------------------------------------------
+
+#if !UNITY_EXTERNAL_TOOL
+
+static JobScheduler* g_Scheduler = NULL;
+
+void CreateJobScheduler()
+{
+ AssertIf( g_Scheduler );
+
+ const int kMaxJobGroups = 16; // increase if we ever run out of concurrent separate job groups
+#if UNITY_XENON
+ // Use threads 1 (core 0) and 2/3 (core 1)
+ int startProcessor = 1;
+ int workerThreads = 3;
+#elif UNITY_PS3
+ int startProcessor = 1;
+ int workerThreads = 1;
+#elif UNITY_OSX
+ int startProcessor = 1;
+ int workerThreads = systeminfo::GetNumberOfCores() - 1;
+ Thread::SetCurrentThreadProcessor(0);
+#else
+ int startProcessor = -1;
+ int workerThreads = systeminfo::GetProcessorCount() - 1;
+#endif
+
+ // Don't use an unreasonable amount of threads on future hardware.
+ // Mono GC has a 256 thread limit for the process (case 443576).
+ if (workerThreads > 128)
+ workerThreads = 128;
+
+ g_Scheduler = new JobScheduler( workerThreads, kMaxJobGroups, startProcessor );
+}
+
+void DestroyJobScheduler()
+{
+ delete g_Scheduler;
+ g_Scheduler = NULL;
+}
+
+JobScheduler& GetJobScheduler()
+{
+ AssertIf( !g_Scheduler );
+ return *g_Scheduler;
+}
+
+#endif
+
+
+#endif