diff options
Diffstat (limited to 'Runtime/Threads')
35 files changed, 3608 insertions, 0 deletions
diff --git a/Runtime/Threads/AtomicOps.h b/Runtime/Threads/AtomicOps.h new file mode 100644 index 0000000..a166cf4 --- /dev/null +++ b/Runtime/Threads/AtomicOps.h @@ -0,0 +1,180 @@ +#ifndef UNITY_ATOMIC_OPS_HPP_ +#define UNITY_ATOMIC_OPS_HPP_ + +// Primitive atomic instructions as defined by the CPU architecture. + +#include "Runtime/Allocator/MemoryMacros.h" // defines FORCE_INLINE (?!) + +// AtomicAdd - Returns the new value, after the operation has been performed (as defined by OSAtomicAdd32Barrier) +FORCE_INLINE int AtomicAdd (int volatile* i, int value); + +// AtomicSub - Returns the new value, after the operation has been performed (as defined by OSAtomicSub32Barrier) +FORCE_INLINE int AtomicSub (int volatile* i, int value); + +// AtomicIncrement - Returns the new value, after the operation has been performed (as defined by OSAtomicAdd32Barrier) +FORCE_INLINE int AtomicIncrement (int volatile* i); + +// AtomicDecrement - Returns the new value, after the operation has been performed (as defined by OSAtomicAdd32Barrier) +FORCE_INLINE int AtomicDecrement (int volatile* i); + +// AtomicCompareExchange - Returns value is the initial value of the Destination pointer (as defined by _InterlockedCompareExchange) +FORCE_INLINE bool AtomicCompareExchange (int volatile* i, int newValue, int expectedValue); + +// AtomicExchange - Returns the initial value pointed to by Target (as defined by _InterlockedExchange) +FORCE_INLINE int AtomicExchange (int volatile* i, int value); + +#define ATOMIC_API_GENERIC (UNITY_OSX || UNITY_IPHONE || UNITY_WIN || UNITY_XENON || UNITY_PS3 || UNITY_ANDROID || UNITY_PEPPER || UNITY_LINUX || UNITY_BB10 || UNITY_WII || UNITY_TIZEN) + +#if !ATOMIC_API_GENERIC && SUPPORT_THREADS +# include "PlatformAtomicOps.h" +#else + +// This file contains implementations for the platforms we already support. +// Going forward (some of) these implementations must move to the platform specific directories. + +#include "Runtime/Utilities/Utility.h" + +#if UNITY_WIN +#include <intrin.h> +typedef char UnityAtomicsTypesAssert_FailsIfIntSize_NEQ_LongSize[sizeof(int) == sizeof(LONG) ? 1 : -1]; +#elif UNITY_OSX + + + +#include <libkern/OSAtomic.h> + +#elif UNITY_IPHONE +#include <libkern/OSAtomic.h> +#elif UNITY_PS3 +#include <cell/atomic.h> +#elif UNITY_ANDROID || (UNITY_LINUX && defined(__GNUC__) && defined(__arm__)) +// use gcc builtin __sync_* +#elif UNITY_BB10 +#include <atomic.h> +#include <arm/smpxchg.h> +#endif + +#include "Runtime/Utilities/Utility.h" + +// AtomicAdd - Returns the new value, after the operation has been performed (as defined by OSAtomicAdd32Barrier) +FORCE_INLINE int AtomicAdd (int volatile* i, int value) { +#if UNITY_OSX || UNITY_IPHONE +#if defined(__ppc__) + #error "Atomic addition undefined for this platform" +#endif + + return OSAtomicAdd32Barrier (value, (int*)i); + +#elif UNITY_WIN || UNITY_XENON + return _InterlockedExchangeAdd ((long volatile*)i, value) + value; +#elif UNITY_PS3 + return cellAtomicAdd32((uint32_t*)i, value) + value; // on ps3 it returns the pre-increment value +#elif UNITY_ANDROID || UNITY_TIZEN + return __sync_add_and_fetch(i,value); +#elif UNITY_PEPPER + int temp = value; + __asm__ __volatile__("lock; xaddl %0,%1" + : "+r" (temp), "+m" (*i) + : : "memory"); + // temp now holds the old value of *ptr + // if (AtomicOps_Internalx86CPUFeatures.has_amd_lock_mb_bug) + __asm__ __volatile__("lfence" : : : "memory"); + return temp + value; +#elif UNITY_LINUX + return __sync_add_and_fetch(i, value); +#elif UNITY_BB10 + return atomic_add_value((unsigned int*)i, value)+value; +#elif UNITY_WII + int wasEnabled = OSDisableInterrupts(); + *i = (*i) + value; + OSRestoreInterrupts(wasEnabled); + return *i; +#elif !SUPPORT_THREADS + return *i+=value; +#else +#error "Atomic op undefined for this platform" +#endif +} + +// AtomicSub - Returns the new value, after the operation has been performed (as defined by OSAtomicSub32Barrier) +inline int AtomicSub (int volatile* i, int value) { +#if UNITY_PS3 + return cellAtomicSub32((uint32_t*)i, value) - value; // on ps3 it returns the pre-increment value +#elif UNITY_ANDROID || UNITY_TIZEN + return __sync_sub_and_fetch(i,value); +#elif UNITY_LINUX + return __sync_sub_and_fetch(i, value); +#elif UNITY_BB10 + return atomic_sub_value((unsigned int*)i, value) - value; +#else + return AtomicAdd(i, -value); +#endif +} + +// AtomicIncrement - Returns the new value, after the operation has been performed (as defined by OSAtomicAdd32Barrier) +FORCE_INLINE int AtomicIncrement (int volatile* i) { +#if UNITY_WIN || UNITY_XENON + return _InterlockedIncrement ((long volatile*)i); +#elif UNITY_PS3 + return cellAtomicIncr32((uint32_t*)i)+1; // on ps3 it returns the pre-increment value +#elif !SUPPORT_THREADS + return ++*i; +#else + return AtomicAdd(i, 1); +#endif +} + +// AtomicDecrement - Returns the new value, after the operation has been performed (as defined by OSAtomicAdd32Barrier) +FORCE_INLINE int AtomicDecrement (int volatile* i) { +#if UNITY_WIN || UNITY_XENON + return _InterlockedDecrement ((long volatile*)i); +#elif UNITY_PS3 + return cellAtomicDecr32((uint32_t*)i)-1; // on ps3 it returns the pre-increment value +#elif !SUPPORT_THREADS + return --*i; +#else + return AtomicSub(i, 1); +#endif +} + +// AtomicCompareExchange - Returns value is the initial value of the Destination pointer (as defined by _InterlockedCompareExchange) +#if UNITY_WIN || UNITY_XENON +FORCE_INLINE bool AtomicCompareExchange (int volatile* i, int newValue, int expectedValue) { + return _InterlockedCompareExchange ((long volatile*)i, (long)newValue, (long)expectedValue) == expectedValue; +} +#elif UNITY_OSX || UNITY_IPHONE +FORCE_INLINE bool AtomicCompareExchange (int volatile* i, int newValue, int expectedValue) { + return OSAtomicCompareAndSwap32Barrier (expectedValue, newValue, reinterpret_cast<volatile int32_t*>(i)); +} +#elif UNITY_LINUX || UNITY_PEPPER || UNITY_ANDROID || UNITY_BB10 || UNITY_TIZEN +FORCE_INLINE bool AtomicCompareExchange (int volatile* i, int newValue, int expectedValue) { +# if UNITY_BB10 + return _smp_cmpxchg((unsigned int*)i, expectedValue, newValue) == expectedValue; +# else + return __sync_bool_compare_and_swap(i, expectedValue, newValue); +#endif +} +#elif UNITY_PS3 +FORCE_INLINE bool AtomicCompareExchange (int volatile* i, int newValue, int expectedValue) { + return cellAtomicCompareAndSwap32((uint32_t*)i, (uint32_t)expectedValue, (uint32_t)newValue) == (uint32_t)expectedValue; +} +#endif + +// AtomicExchange - Returns the initial value pointed to by Target (as defined by _InterlockedExchange) +#if UNITY_WIN || UNITY_XENON +FORCE_INLINE int AtomicExchange (int volatile* i, int value) { + return (int)_InterlockedExchange ((long volatile*)i, (long)value); +} +#elif UNITY_ANDROID || UNITY_TIZEN || UNITY_OSX || UNITY_LINUX // fallback to loop +FORCE_INLINE int AtomicExchange (int volatile* i, int value) { + int prev; + do { prev = *i; } + while (!AtomicCompareExchange(i, value, prev)); + return prev; +} +#endif + +#endif // ATOMIC_API_GENERIC +#undef ATOMIC_API_GENERIC + +#endif //UNITY_ATOMIC_OPS_HPP_ diff --git a/Runtime/Threads/AtomicRefCounter.h b/Runtime/Threads/AtomicRefCounter.h new file mode 100644 index 0000000..9e6b5e7 --- /dev/null +++ b/Runtime/Threads/AtomicRefCounter.h @@ -0,0 +1,30 @@ +#ifndef __UNITY_ATOMIC_REFCOUNTER_H +#define __UNITY_ATOMIC_REFCOUNTER_H + +#include "AtomicOps.h" + +// Used for threadsafe refcounting +class AtomicRefCounter { +private: + volatile int m_Counter; +public: + // Upon the construction the self-counter is always set to 1, + // which means that this instance is already accounted for. + // This scheme shaves off the cycles that would be consumed + // during the unavoidable first Retain call otherwise. + AtomicRefCounter() : m_Counter(1) {} + + FORCE_INLINE void Retain () + { + AtomicIncrement(&m_Counter); + } + + FORCE_INLINE bool Release () + { + int afterDecrement = AtomicDecrement(&m_Counter); + AssertIf( afterDecrement < 0 ); // If we hit this assert, someone is Releasing without matching it with Retain + return afterDecrement == 0; + } +}; + +#endif // __UNITY_ATOMIC_REFCOUNTER_H diff --git a/Runtime/Threads/Event.h b/Runtime/Threads/Event.h new file mode 100644 index 0000000..2075bcf --- /dev/null +++ b/Runtime/Threads/Event.h @@ -0,0 +1,20 @@ +#ifndef __EVENT_H +#define __EVENT_H + +// Event synchronization object. + +#if SUPPORT_THREADS + +#if UNITY_WIN || UNITY_XENON +# include "Winapi/PlatformEvent.h" +#elif HAS_EVENT_OBJECT +# include "PlatformEvent.h" +#else +# include "Semaphore.h" + typedef Semaphore Event; +# pragma message("Event implementation missing. Using a Semaphore.") +#endif + +#endif // SUPPORT_THREADS + +#endif // __EVENT_H diff --git a/Runtime/Threads/JobGroupRecycler.h b/Runtime/Threads/JobGroupRecycler.h new file mode 100644 index 0000000..9305582 --- /dev/null +++ b/Runtime/Threads/JobGroupRecycler.h @@ -0,0 +1,59 @@ +#ifndef JOB_GROUP_RECYCLER_H +#define JOB_GROUP_RECYCLER_H + +#include "JobScheduler.h" + +#if ENABLE_MULTITHREADED_CODE + +template<int MaxGroups> +class JobGroupRecycler +{ +public: + // Limits the amount of groups used by finishing older ones first. + // Restrictions: Must always be called from the same thread. + // All jobs must be submitted before beginning a new group. + + JobGroupRecycler() + { + memset( m_SlotOwners, 0, sizeof(m_SlotOwners) ); + memset( m_JobGroups, 0, sizeof(m_JobGroups) ); + m_NextSlot = 0; + } + + int BeginGroup( void* owner, int maxJobs ) + { + int slot = m_NextSlot; + if( m_SlotOwners[slot] ) + GetJobScheduler().WaitForGroup( m_JobGroups[slot] ); + m_JobGroups[slot] = GetJobScheduler().BeginGroup( maxJobs ); + m_SlotOwners[slot] = owner; + m_NextSlot = (m_NextSlot + 1) % MaxGroups; + return slot; + } + + bool SubmitJob( void* owner, int slot, JobScheduler::JobFunction func, void *data, JobScheduler::ReturnCode *returnCode ) + { + Assert( slot >= 0 && slot < MaxGroups ); + Assert( m_SlotOwners[slot] == owner ); + return GetJobScheduler().SubmitJob( m_JobGroups[slot], func, data, returnCode ); + } + + void WaitForGroup( void* owner, int slot ) + { + // Maybe someone else reused the job group + if( m_SlotOwners[slot] != owner ) + return; + GetJobScheduler().WaitForGroup( m_JobGroups[slot] ); + m_SlotOwners[slot] = NULL; + } + +private: + JobScheduler::JobGroupID m_JobGroups[MaxGroups]; + void* m_SlotOwners[MaxGroups]; + int m_NextSlot; +}; + + +#endif // ENABLE_MULTITHREADED_CODE + +#endif diff --git a/Runtime/Threads/JobScheduler.cpp b/Runtime/Threads/JobScheduler.cpp new file mode 100644 index 0000000..cf28a71 --- /dev/null +++ b/Runtime/Threads/JobScheduler.cpp @@ -0,0 +1,447 @@ +#include "UnityPrefix.h" +#include "Configuration/UnityConfigure.h" +#include "JobScheduler.h" + +#if ENABLE_JOB_SCHEDULER + +#include "Thread.h" +#include "ThreadUtility.h" +#include "Runtime/Utilities/dynamic_array.h" +#include "Mutex.h" +#include "Semaphore.h" +#include "Event.h" +#if !UNITY_EXTERNAL_TOOL +#include "Runtime/Misc/SystemInfo.h" +#include "Runtime/Profiler/Profiler.h" +#endif +#include "AtomicOps.h" + +// ------------------------------------------------------------------------ + +struct JobInfo +{ + JobScheduler::JobFunction func; + void* userData; + JobScheduler::ReturnCode* returnCode; +}; + +// ------------------------------------------------------------------------ + +struct JobGroup +{ + enum + { + kSizeIncrement = 256, + kTaskCountUnused = -1 + }; + JobGroup() : taskCount(kTaskCountUnused), activeThreads(0), nextJob(0), jobsAdded(0) {} + + volatile int taskCount; + volatile int activeThreads; + volatile int nextJob; + volatile int jobsAdded; + dynamic_array<JobInfo> jobQueue; + Semaphore doneSemaphore; +}; + +// ------------------------------------------------------------------------ + +JobInfo* JobScheduler::FetchNextJob( int& activeGroup ) +{ + if( activeGroup == m_PriorityGroup ) + { + JobInfo* job = FetchJobInGroup(activeGroup); + if( job ) + return job; + } + + // We need a lock to change groups! + SimpleLock::AutoLock lock(m_Lock); + if( activeGroup != -1 ) + m_Groups[activeGroup].activeThreads--; + int group = m_PriorityGroup; + for( int i = 0; i < m_GroupCount; i++ ) + { + JobInfo* job = FetchJobInGroup(group); + if( job ) + { + m_Groups[group].activeThreads++; + // Set priority group to one that is actually available + // Less work for other threads to find a good group + m_PriorityGroup = group; + activeGroup = group; + return job; + } + if( ++group >= m_GroupCount ) + group = 0; + } + activeGroup = -1; + return NULL; +} + +JobInfo* JobScheduler::FetchJobInGroup( int group ) +{ + JobGroup& jg = m_Groups[group]; + int curJob = jg.nextJob; + while( curJob < jg.jobsAdded ) + { + if( AtomicCompareExchange(&jg.nextJob, curJob + 1, curJob) ) + return &jg.jobQueue[curJob]; + + curJob = jg.nextJob; + } + return NULL; +} + +void JobScheduler::ProcessJob( JobInfo& job, int group ) +{ + DebugAssert( job.func ); + void* ret = job.func(job.userData); + if( job.returnCode ) + { + UnityMemoryBarrier(); + *job.returnCode = ret; + } + + // Signal if we are the last to finish + JobGroup& jg = m_Groups[group]; + if( AtomicDecrement(&jg.taskCount) == 0 ) + jg.doneSemaphore.Signal(); +} + + +void JobScheduler::AwakeIdleWorkerThreads( int count ) +{ + for( int i = 0; i < count && m_ThreadsIdle > 0; i++ ) + { + if( AtomicDecrement(&m_ThreadsIdle) < 0 ) + { + AtomicIncrement(&m_ThreadsIdle); + break; + } + m_AwakeSemaphore.Signal(); + } +} + +void* JobScheduler::WorkLoop( void* data ) +{ + int activeGroup = -1; + JobScheduler* js = (JobScheduler*)data; + + #if ENABLE_JOB_SCHEDULER_PROFILER + profiler_initialize_thread ("Worker Thread", true); + int workThreadIndex = AtomicIncrement (&js->m_WorkerThreadCounter); + WorkerProfilerInfo* profInfo = &js->m_ProfInfo[workThreadIndex]; + bool insideFrame = false; + #endif + + while( !js->m_Quit ) + { + #if ENABLE_JOB_SCHEDULER_PROFILER + HandleProfilerFrames (profInfo, &insideFrame); + #endif + + JobInfo* job = js->FetchNextJob(activeGroup); + if( job ) + { + js->ProcessJob(*job, activeGroup); + } + else + { + AtomicIncrement(&js->m_ThreadsIdle); + js->m_AwakeSemaphore.WaitForSignal(); + } + } + + #if ENABLE_JOB_SCHEDULER_PROFILER + if (insideFrame) + { + //@TODO: Is this actually necessary? (It seems like cleanup thread should take care of killing it all?) + profiler_set_active_seperate_thread(false); + profiler_end_frame_seperate_thread(0); + } + profiler_cleanup_thread(); + #endif + + return NULL; +} + + +#if ENABLE_JOB_SCHEDULER_PROFILER +void JobScheduler::HandleProfilerFrames (WorkerProfilerInfo* profInfo, bool* insideFrame) +{ + // Don't do fancy synchnonization here; all we need is for worker + // threads to do begin/end profiler frame once in a while. + // Worst case, we'll get a missing profiler info for a frame. + int endFrameID = profInfo->endFrameID; + if (endFrameID != -1) + { + if (*insideFrame) + { + profiler_set_active_seperate_thread (false); + profiler_end_frame_seperate_thread (endFrameID); + *insideFrame = false; + } + profiler_begin_frame_seperate_thread (kProfilerGame); + profiler_set_active_seperate_thread (true); + *insideFrame = true; + + profInfo->endFrameID = -1; + UnityMemoryBarrier(); + } +} +#endif + + + +JobScheduler::JobScheduler( int numthreads, int maxGroups, int startProcessor ) +: m_ThreadCount( numthreads ) +, m_GroupCount(maxGroups) +, m_Quit(false) +, m_ThreadsIdle(0) +, m_PriorityGroup(0) +{ + m_Groups = new JobGroup[maxGroups]; + if( m_ThreadCount > 0 ) + { + #if ENABLE_JOB_SCHEDULER_PROFILER + m_WorkerThreadCounter = -1; + m_ProfInfo = new WorkerProfilerInfo[numthreads]; + memset (m_ProfInfo, -1, sizeof(m_ProfInfo[0])*numthreads); + #endif + + m_Threads = new Thread[numthreads]; + + for( int i = 0; i < numthreads; ++i ) + { + int processor = DEFAULT_UNITY_THREAD_PROCESSOR; + if( startProcessor >= 0 ) + { + processor = startProcessor + i; + } + m_Threads[i].SetName ("UnityWorker"); + m_Threads[i].Run( WorkLoop, this, DEFAULT_UNITY_THREAD_STACK_SIZE, processor ); + } + } + else + { + m_Threads = NULL; + #if ENABLE_JOB_SCHEDULER_PROFILER + m_WorkerThreadCounter = -1; + m_ProfInfo = NULL; + #endif + } +} + +JobScheduler::~JobScheduler() +{ + if( m_ThreadCount > 0 ) + { + m_Quit = true; + UnityMemoryBarrier(); + // wait while threads exit + // first signal as many times as there are threads, then wait for each to exit + for( int i = 0; i < m_ThreadCount; ++i ) + m_AwakeSemaphore.Signal(); + for( int i = 0; i < m_ThreadCount; ++i ) + m_Threads[i].WaitForExit(); + delete[] m_Threads; + + #if ENABLE_JOB_SCHEDULER_PROFILER + delete[] m_ProfInfo; + #endif + } + + delete[] m_Groups; +} + +bool JobScheduler::SubmitJob( JobGroupID group, JobFunction func, void *data, ReturnCode *returnCode ) +{ + AssertIf( func == NULL ); + if( m_ThreadCount <= 0 ) + { + // not multi-threaded: execute job right now + void* z = func( data ); + if( returnCode ) + *returnCode = z; + return true; + } + + if( group >= m_GroupCount || group < 0) + { + ErrorString( "Invalid job group ID" ); + return false; + } + + JobGroup& jg = m_Groups[group]; + AtomicIncrement(&jg.taskCount); + int jobIndex = jg.jobsAdded; + JobInfo& job = jg.jobQueue[jobIndex]; + job.func = func; + job.userData = data; + job.returnCode = returnCode; + int nextIndex = AtomicIncrement(&jg.jobsAdded); + // This may fail if you add jobs from multiple threads to the same group + Assert(nextIndex == jobIndex + 1); + AwakeIdleWorkerThreads(nextIndex - jg.nextJob); + return true; +} + +JobScheduler::JobGroupID JobScheduler::BeginGroup( int maxJobs ) +{ + // See if we can allocate a group without blocking. + for( int isBlocking = 0; isBlocking < 2; ++isBlocking ) + { + // If a group still has active threads we can't use it immediately after WaitForGroup(). + // By blocking we guarantee to find a group, as long as we stay within maxGroups. + JobGroupID id = BeginGroupInternal(maxJobs, isBlocking != 0); + if( id != -1 ) + return id; + } + ErrorString("JobScheduler: too many job groups"); + return -1; +} + +JobScheduler::JobGroupID JobScheduler::BeginGroupInternal( int maxJobs, bool isBlocking ) +{ + // Find unused group. We need a lock for that. + m_Lock.Lock(); + for( int i = 0; i < m_GroupCount; ++i ) + { + JobGroup& group = m_Groups[i]; + if( group.taskCount == JobGroup::kTaskCountUnused + && ( isBlocking || group.activeThreads == 0 ) ) + { + // We consider finishing group a pending task + // Keeps job group alive until everything is done + group.taskCount = 1; + + // Spin while worker threads are using our group + // Do this *after* we've marked it used (case 492417) + while( group.activeThreads != 0 ) + { + m_Lock.Unlock(); + m_Lock.Lock(); + } + group.jobsAdded = 0; + group.nextJob = 0; + const int rounding = JobGroup::kSizeIncrement; + int roundedSize = (maxJobs + rounding - 1) / rounding * rounding; + group.jobQueue.reserve(roundedSize); + group.jobQueue.resize_uninitialized(maxJobs); + m_Lock.Unlock(); + return i; + } + } + m_Lock.Unlock(); + return -1; +} + +bool JobScheduler::IsGroupFinished( JobGroupID group ) +{ + const JobGroup& jg = m_Groups[group]; + // Last reference is kept until WaitForGroup() + return jg.taskCount == 1; +} + +void JobScheduler::WaitForGroup( JobGroupID group ) +{ + if( group >= m_GroupCount ) + { + ErrorString( "Invalid job group ID" ); + return; + } + + JobGroup& jg = m_Groups[group]; + + // Release our reference to job group + // Pending jobs (if any) also have refs + if( AtomicDecrement(&jg.taskCount) != 0 ) + { + // Set our group as having priority so other threads fetch from it + m_PriorityGroup = group; + + for( ;; ) + { + JobInfo* job = FetchJobInGroup(group); + if( !job ) + break; + ProcessJob(*job, group); + } + + jg.doneSemaphore.WaitForSignal(); + Assert(jg.nextJob == jg.jobsAdded); + Assert(jg.taskCount == 0); + } + + // Set count to kTaskCountUnused (-1) + AtomicDecrement(&jg.taskCount); +} + + +#if ENABLE_JOB_SCHEDULER_PROFILER +void JobScheduler::EndProfilerFrame (UInt32 frameIDAndValidFlag) +{ + // Don't do fancy synchnonization here; all we need is for worker + // threads to do begin/end profiler frame once in a while. + // Worst case, we'll get a missing profiler info for a frame. + for (int i = 0; i < m_ThreadCount; ++i) + { + m_ProfInfo[i].endFrameID = frameIDAndValidFlag; + } + UnityMemoryBarrier(); +} +#endif + + +// ---------------------------------------------------------------------- + +#if !UNITY_EXTERNAL_TOOL + +static JobScheduler* g_Scheduler = NULL; + +void CreateJobScheduler() +{ + AssertIf( g_Scheduler ); + + const int kMaxJobGroups = 16; // increase if we ever run out of concurrent separate job groups +#if UNITY_XENON + // Use threads 1 (core 0) and 2/3 (core 1) + int startProcessor = 1; + int workerThreads = 3; +#elif UNITY_PS3 + int startProcessor = 1; + int workerThreads = 1; +#elif UNITY_OSX + int startProcessor = 1; + int workerThreads = systeminfo::GetNumberOfCores() - 1; + Thread::SetCurrentThreadProcessor(0); +#else + int startProcessor = -1; + int workerThreads = systeminfo::GetProcessorCount() - 1; +#endif + + // Don't use an unreasonable amount of threads on future hardware. + // Mono GC has a 256 thread limit for the process (case 443576). + if (workerThreads > 128) + workerThreads = 128; + + g_Scheduler = new JobScheduler( workerThreads, kMaxJobGroups, startProcessor ); +} + +void DestroyJobScheduler() +{ + delete g_Scheduler; + g_Scheduler = NULL; +} + +JobScheduler& GetJobScheduler() +{ + AssertIf( !g_Scheduler ); + return *g_Scheduler; +} + +#endif + + +#endif diff --git a/Runtime/Threads/JobScheduler.h b/Runtime/Threads/JobScheduler.h new file mode 100644 index 0000000..a638113 --- /dev/null +++ b/Runtime/Threads/JobScheduler.h @@ -0,0 +1,100 @@ +#ifndef JOB_SCHEDULER_H +#define JOB_SCHEDULER_H + +#include "SimpleLock.h" +#include "Semaphore.h" +#include "Runtime/Modules/ExportModules.h" + +// On single-CPU platforms, this should never be used +#define ENABLE_JOB_SCHEDULER (ENABLE_MULTITHREADED_CODE || ENABLE_MULTITHREADED_SKINNING) + +#if ENABLE_JOB_SCHEDULER + +#define ENABLE_JOB_SCHEDULER_PROFILER (!UNITY_EXTERNAL_TOOL && ENABLE_PROFILER) + + +struct JobGroup; +struct JobInfo; +class Thread; + +class EXPORT_COREMODULE JobScheduler +{ +public: + // Usage and restrictions: + // Jobs in the same group must be submitted from the same thread + // It's fine to submit jobs in separate groups from different threads + // It's also fine to WaitForGroup() on a another thread than BeginGroup() + // + // Submitting jobs is lockless and wakes up idle worker threads if needed. + // Worker threads are lockless when they keep consuming from the same queue. + // Changing queues or going idle requires a lock. This is to keep track of + // how many workers try to consume a queue when the group gets recycled. + // Hopefully this is none, but being lockless we don't really know if anyone + // got stuck during the size check on the queue and before reading the next + // element atomically. The size check can safely use old values except when + // the group is recycled. In that case we need to flush all worker threads + // from accessing the queue and doing an invalid comparison. + + typedef void* (*JobFunction)(void*); + typedef void* volatile ReturnCode; + typedef int JobGroupID; + + JobScheduler( int numthreads, int maxGroups, int startProcessor = -1 ); + ~JobScheduler(); + + JobGroupID BeginGroup( int maxJobs ); + bool IsGroupFinished( JobGroupID group ); + void WaitForGroup( JobGroupID group ); + + bool SubmitJob( JobGroupID group, JobFunction func, void *data, ReturnCode *returnCode ); + int GetThreadCount () const { return m_ThreadCount; } + + #if ENABLE_JOB_SCHEDULER_PROFILER + void EndProfilerFrame (UInt32 frameIDAndValidFlag); + #endif + +private: + JobGroupID BeginGroupInternal( int maxJobs, bool isBlocking ); + JobInfo* FetchNextJob( int& threadActiveGroup ); + JobInfo* FetchJobInGroup( int group ); + void ProcessJob( JobInfo& job, int group ); + void AwakeIdleWorkerThreads( int count ); + + #if ENABLE_JOB_SCHEDULER_PROFILER + struct WorkerProfilerInfo + { + int endFrameID; + int pad[15]; // pad so individual threads don't hammer same cache line + }; + static void HandleProfilerFrames (WorkerProfilerInfo* info, bool* insideFrame); + #endif + + static void* WorkLoop( void* data ); + +private: + JobGroup* m_Groups; + int m_GroupCount; + int m_ThreadCount; + Thread* m_Threads; + volatile bool m_Quit; + SimpleLock m_Lock; + Semaphore m_AwakeSemaphore; + volatile int m_ThreadsIdle; + volatile int m_PriorityGroup; + + #if ENABLE_JOB_SCHEDULER_PROFILER + WorkerProfilerInfo* m_ProfInfo; + int m_WorkerThreadCounter; + #endif +}; + +#if !UNITY_EXTERNAL_TOOL +void CreateJobScheduler(); +void DestroyJobScheduler(); +EXPORT_COREMODULE JobScheduler& GetJobScheduler(); +#endif + +#endif // ENABLE_MULTITHREADED_CODE + + +#endif diff --git a/Runtime/Threads/Mutex.cpp b/Runtime/Threads/Mutex.cpp new file mode 100644 index 0000000..9ca9991 --- /dev/null +++ b/Runtime/Threads/Mutex.cpp @@ -0,0 +1,70 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS + +#include "Mutex.h" + +#ifndef THREAD_LOCK_DEBUG +#define THREAD_LOCK_DEBUG 0 +#endif + +Mutex::Mutex() +{ +} + +Mutex::~Mutex () +{ +} + +bool Mutex::IsLocked() +{ + if(m_Mutex.TryLock()) + { + Unlock(); + return false; + } + else + { + return true; + } +} + +void Mutex::BlockUntilUnlocked() +{ + Lock(); + Unlock(); +} + +void Mutex::Lock() +{ +#if THREAD_LOCK_DEBUG + m_PerThreadLockDepth.SetIntValue(m_PerThreadLockDepth.GetIntValue() + 1); +#endif + m_Mutex.Lock(); +} + +void Mutex::Unlock() +{ +#if THREAD_LOCK_DEBUG + AssertIf(m_PerThreadLockDepth.GetIntValue() == 0); + m_PerThreadLockDepth.SetIntValue(m_PerThreadLockDepth.GetIntValue() - 1); +#endif + m_Mutex.Unlock(); +} + +bool Mutex::TryLock() +{ +#if THREAD_LOCK_DEBUG + if (m_Mutex.TryLock()) + { + m_PerThreadLockDepth.SetIntValue(m_PerThreadLockDepth.GetIntValue() + 1); + return true; + } + else + return false; +#else + return m_Mutex.TryLock(); +#endif +} + +#endif // SUPPORT_THREADS ; has dummy mutex implemented in headerfile diff --git a/Runtime/Threads/Mutex.h b/Runtime/Threads/Mutex.h new file mode 100644 index 0000000..91e9927 --- /dev/null +++ b/Runtime/Threads/Mutex.h @@ -0,0 +1,86 @@ +#ifndef __MUTEX_H +#define __MUTEX_H + +#if SUPPORT_THREADS + +#if UNITY_WIN || UNITY_XENON +# include "Winapi/PlatformMutex.h" +#elif UNITY_OSX || UNITY_IPHONE || UNITY_ANDROID || UNITY_PEPPER || UNITY_LINUX || UNITY_BB10 || UNITY_TIZEN +# include "Posix/PlatformMutex.h" +#else +# include "PlatformMutex.h" +#endif + +#include "Runtime/Utilities/NonCopyable.h" + +/** + * A mutex class. Always recursive (a single thread can lock multiple times). + */ +class Mutex : public NonCopyable +{ +public: + + class AutoLock + { + public: + AutoLock( Mutex& mutex ) + : m_Mutex(&mutex) + { + mutex.Lock(); + } + + ~AutoLock() + { + m_Mutex->Unlock(); + } + + private: + AutoLock(const AutoLock&); + AutoLock& operator=(const AutoLock&); + + private: + Mutex* m_Mutex; + }; + + Mutex(); + ~Mutex(); + + void Lock(); + void Unlock(); + + // Returns true if locking succeeded + bool TryLock(); + + // Returns true if the mutex is currently locked + bool IsLocked(); + + void BlockUntilUnlocked(); + +private: + + PlatformMutex m_Mutex; +}; + +#else + +// Used for threadsafe refcounting +class Mutex +{ +public: + + class AutoLock + { + public: + AutoLock( Mutex& mutex ){} + ~AutoLock() {} + }; + + bool TryLock () { return true; } + void Lock () { } + void Unlock () { } + bool IsLocked () { return false; } + void BlockUntilUnlocked() { } +}; + +#endif //SUPPORT_THREADS +#endif diff --git a/Runtime/Threads/Posix/PlatformMutex.cpp b/Runtime/Threads/Posix/PlatformMutex.cpp new file mode 100644 index 0000000..b48e138 --- /dev/null +++ b/Runtime/Threads/Posix/PlatformMutex.cpp @@ -0,0 +1,51 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS + +#ifndef MUTEX_API_PTHREAD +#define MUTEX_API_PTHREAD (UNITY_OSX || UNITY_IPHONE || UNITY_ANDROID || UNITY_PEPPER || UNITY_LINUX || UNITY_BB10 || UNITY_TIZEN) +#endif + +#endif // SUPPORT_THREADS + +#if MUTEX_API_PTHREAD +// ------------------------------------------------------------------------------------------------- +// pthreads + +#include "PlatformMutex.h" + +#if defined(__native_client__) +#define PTHREAD_MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE_NP +#endif + +PlatformMutex::PlatformMutex ( ) +{ + pthread_mutexattr_t attr; + + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE ); + pthread_mutex_init(&mutex, &attr); + pthread_mutexattr_destroy(&attr); +} + +PlatformMutex::~PlatformMutex () +{ + pthread_mutex_destroy(&mutex); +} + +void PlatformMutex::Lock() +{ + pthread_mutex_lock(&mutex); +} + +void PlatformMutex::Unlock() +{ + pthread_mutex_unlock(&mutex); +} + +bool PlatformMutex::TryLock() +{ + return pthread_mutex_trylock(&mutex) == 0; +} + +#endif // MUTEX_API_PTHREAD diff --git a/Runtime/Threads/Posix/PlatformMutex.h b/Runtime/Threads/Posix/PlatformMutex.h new file mode 100644 index 0000000..0071094 --- /dev/null +++ b/Runtime/Threads/Posix/PlatformMutex.h @@ -0,0 +1,29 @@ +#ifndef __PLATFORMMUTEX_H +#define __PLATFORMMUTEX_H + +#if SUPPORT_THREADS + +#include <pthread.h> +#include "Runtime/Utilities/NonCopyable.h" + +/** + * A platform/api specific mutex class. Always recursive (a single thread can lock multiple times). + */ +class PlatformMutex : public NonCopyable +{ + friend class Mutex; +protected: + PlatformMutex(); + ~PlatformMutex(); + + void Lock(); + void Unlock(); + bool TryLock(); + +private: + + pthread_mutex_t mutex; +}; + +#endif // SUPPORT_THREADS +#endif // __PLATFORMMUTEX_H diff --git a/Runtime/Threads/Posix/PlatformSemaphore.h b/Runtime/Threads/Posix/PlatformSemaphore.h new file mode 100644 index 0000000..bc20a55 --- /dev/null +++ b/Runtime/Threads/Posix/PlatformSemaphore.h @@ -0,0 +1,69 @@ +#ifndef __PLATFORMSEMAPHORE_H +#define __PLATFORMSEMAPHORE_H + +#if SUPPORT_THREADS + +#ifndef SEMAPHORE_API_PTHREAD +#define SEMAPHORE_API_PTHREAD (UNITY_LINUX || UNITY_PEPPER || UNITY_ANDROID || UNITY_PS3 || UNITY_BB10 || UNITY_TIZEN) +#endif + +#endif // SUPPORT_THREADS + +#if SEMAPHORE_API_PTHREAD + +#if UNITY_PEPPER +# include <errno.h> +# if defined(__native_client__) +# include <semaphore.h> +# else +# include <sys/semaphore.h> +# endif +#else +# include <semaphore.h> +# include <errno.h> +#endif + +#include "Runtime/Utilities/Word.h" +#include "Runtime/Utilities/NonCopyable.h" + +class PlatformSemaphore : public NonCopyable +{ + friend class Semaphore; +protected: + void Create(); + void Destroy(); + + void WaitForSignal(); + void Signal(); + +private: + sem_t m_Semaphore; +}; + +#define REPORT_SEM_ERROR(action) ErrorStringMsg ("Failed to %s a semaphore (%s)\n", action, strerror (errno)) + + inline void PlatformSemaphore::Create() { if (sem_init(&m_Semaphore, 0, 0) == -1) REPORT_SEM_ERROR ("open"); } + inline void PlatformSemaphore::Destroy() { if (sem_destroy(&m_Semaphore) == -1) REPORT_SEM_ERROR ("destroy"); } +#if !UNITY_BB10 + inline void PlatformSemaphore::WaitForSignal() { if (sem_wait(&m_Semaphore) == -1) REPORT_SEM_ERROR ("wait on"); } +#else + inline void PlatformSemaphore::WaitForSignal() { + int ret = 0; + while ((ret = sem_wait(&m_Semaphore)) == -1 && errno == EINTR) + { + continue; + } + + if( ret == -1 ) + REPORT_SEM_ERROR ("wait on"); + } +#endif + inline void PlatformSemaphore::Signal() { + if (sem_post(&m_Semaphore) == -1) + REPORT_SEM_ERROR ("post to"); + } + +#undef REPORT_SEM_ERROR + +#endif // SEMAPHORE_API_PTHREAD +#endif // __PLATFORMSEMAPHORE_H diff --git a/Runtime/Threads/Posix/PlatformThread.cpp b/Runtime/Threads/Posix/PlatformThread.cpp new file mode 100644 index 0000000..3b3a9e0 --- /dev/null +++ b/Runtime/Threads/Posix/PlatformThread.cpp @@ -0,0 +1,185 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS + +#ifndef THREAD_API_PTHREAD +#define THREAD_API_PTHREAD (UNITY_OSX || UNITY_PS3 || UNITY_IPHONE || UNITY_ANDROID || UNITY_PEPPER || UNITY_LINUX || UNITY_BB10 || UNITY_TIZEN) +#endif + +#endif // SUPPORT_THREADS + +#if THREAD_API_PTHREAD + +#include "PlatformThread.h" +#include "Runtime/Threads/Thread.h" +#include "Runtime/Threads/ThreadHelper.h" + +#include "Runtime/Utilities/Word.h" +//#include "Runtime/Utilities/Utility.h" + +// module@TODO : Move this to PlatformThread.h +#if UNITY_PS3 +# include <sys/timer.h> +# include "pthread_ext/pthread_ext.h" +# define pthread_create pthread_ext_create +#endif + +#if UNITY_PEPPER && defined(__native_client__) +#include <sys/time.h> +#include <sys/signal.h> +#include <sys/nacl_syscalls.h> +#endif + +#if UNITY_ANDROID +#include <jni.h> + JavaVM* GetJavaVm(); + extern "C" void* GC_lookup_thread(pthread_t id); + extern "C" void GC_delete_thread(pthread_t id); +#endif + +PlatformThread::PlatformThread() +: m_Thread((ThreadID)NULL) +{ +} + +PlatformThread::~PlatformThread() +{ + AssertMsg(m_Thread == (ThreadID)NULL, "***Thread was not cleaned up!***"); +} + + +void PlatformThread::Create(const Thread* thread, const UInt32 stackSize, const int processor) +{ + m_DefaultPriority = 0; + m_Processor = processor; + + if(stackSize) + { + pthread_attr_t attr; + memset(&attr, 0, sizeof(attr)); + +// module@TODO : Implement pthread_attr_init/pthread_attr_setstacksize in PlatformThread.h +#if UNITY_PS3 + attr.stacksize = stackSize; + attr.name = "_UNITY_"; +#else + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, stackSize); +#endif + pthread_create(&m_Thread, &attr, Thread::RunThreadWrapper, (void*)thread); + } + else { + pthread_create(&m_Thread, NULL, Thread::RunThreadWrapper, (void*)thread); + } + +#if UNITY_OSX || UNITY_IPHONE || UNITY_PS3 + // Y U no want on Linux? + struct sched_param param; + int outputPolicy; + if (pthread_getschedparam(m_Thread, &outputPolicy, ¶m) == 0) + m_DefaultPriority = param.sched_priority; + AssertIf(m_DefaultPriority == 0); // Y U no like 0 priority? +#endif + + if (thread->m_Priority != kNormalPriority) + UpdatePriority(thread); +} + +void PlatformThread::Enter(const Thread* thread) +{ + ThreadHelper::SetThreadProcessor(thread, m_Processor); +} + +void PlatformThread::Exit(const Thread* thread, void* result) +{ +#if UNITY_ANDROID + if (GC_lookup_thread(pthread_self())) + GC_delete_thread(pthread_self()); + GetJavaVm()->DetachCurrentThread(); + pthread_exit(result); +#endif +} + +void PlatformThread::Join(const Thread* thread) +{ + if (Thread::EqualsCurrentThreadID(m_Thread)) + { + ErrorStringMsg("***Thread '%s' tried to join itself!***", thread->m_Name); + } + + if (m_Thread) + { + int error = pthread_join(m_Thread, NULL); + if (error) + ErrorString(Format("Error joining threads: %d", error)); + + m_Thread = 0; + } +} + +void PlatformThread::UpdatePriority(const Thread* thread) const +{ +#if UNITY_PEPPER || UNITY_BB10 + + // No thread priority in NaCl yet. + // For BB10 the user Unity is run as lacks permission to set priority. + +#else // Default POSIX impl below + + ThreadPriority p = thread->m_Priority; + +#if UNITY_OSX || UNITY_IPHONE || UNITY_PS3 + AssertIf(m_DefaultPriority == 0); +#endif + + struct sched_param param; + int policy; + ErrorIf(pthread_getschedparam(m_Thread, &policy, ¶m)); +#if UNITY_PS3 + int min = 3071; + int max = 0; +#else + int min = sched_get_priority_min(policy); + int max = sched_get_priority_max(policy); +#endif + + int iPriority; + switch (p) + { + case kLowPriority: + iPriority = min; + break; + + case kBelowNormalPriority: + iPriority = min + (m_DefaultPriority-min)/2; + break; + + case kNormalPriority: + iPriority = m_DefaultPriority; + break; + + case kHighPriority: + iPriority = max; + break; + + default: + iPriority = min; + AssertString("Undefined thread priority"); + break; + } + + if (param.sched_priority != iPriority) + { + param.sched_priority = iPriority; + ErrorIf(pthread_setschedparam(m_Thread, policy, ¶m)); + } + +#endif +} + +PlatformThread::ThreadID PlatformThread::GetCurrentThreadID() +{ + return (ThreadID)pthread_self(); +} + +#endif // THREAD_API_PTHREAD diff --git a/Runtime/Threads/Posix/PlatformThread.h b/Runtime/Threads/Posix/PlatformThread.h new file mode 100644 index 0000000..757d4d5 --- /dev/null +++ b/Runtime/Threads/Posix/PlatformThread.h @@ -0,0 +1,56 @@ +#ifndef PLATFORMTHREAD_H +#define PLATFORMTHREAD_H + +#if SUPPORT_THREADS + +#include <pthread.h> +#include "Runtime/Utilities/NonCopyable.h" + +class Thread; + +#if UNITY_PS3 // module@TODO : Move to Platforms/PS3/Include/PlatformThread.h +#define DEFAULT_UNITY_THREAD_STACK_SIZE 128*1024 +#endif + +#define UNITY_THREAD_FUNCTION_RETURNTYPE void* +#define UNITY_THREAD_FUNCTION_RETURN_SIGNATURE UNITY_THREAD_FUNCTION_RETURNTYPE + +class EXPORT_COREMODULE PlatformThread : public NonCopyable +{ + friend class Thread; + friend class ThreadHelper; + +protected: + typedef pthread_t ThreadID; + + // Starts a thread to execute within the calling process. + // Typically maps to 'CreateThread' (WinAPI) or 'pthread_create' (POSIX). + void Create(const Thread* thread, const UInt32 stackSize, const int processor); + // To be called from the thread's 'start_routine' (aka RunThreadWrapper) + // in order to boot-strap the thread (in terms of setting the thread affinity or similar). + void Enter(const Thread* thread); + // To be called as final exit/cleanup call when a thread's 'start_routine' (aka RunThreadWrapper) exits. + // Depending on the backing thread API this function may not return. + void Exit(const Thread* thread, void* result); + // The function waits for the thread specified by 'thread' to terminate. + // Typically maps to 'WaitForSingleObject' (WinAPI) or 'pthread_join' (POSIX). + void Join(const Thread* thread); + // Uses the thread->m_Priority to update the thread scheduler settings, if possible. + // Typically maps to 'SetThreadPriority' (WinAPI) or 'pthread_setschedparam' (POSIX). + // Depending on the process permissions this call may turn into a no-op. + void UpdatePriority(const Thread* thread) const; + // Returns a unique identifier for the currently executing thread. + static ThreadID GetCurrentThreadID(); + +private: + PlatformThread(); + ~PlatformThread(); + + pthread_t m_Thread; + int m_DefaultPriority; + int m_Processor; +}; + +#endif // SUPPORT_THREADS + +#endif // PLATFORMTHREAD_H diff --git a/Runtime/Threads/Posix/PlatformThreadSpecificValue.h b/Runtime/Threads/Posix/PlatformThreadSpecificValue.h new file mode 100644 index 0000000..7513834 --- /dev/null +++ b/Runtime/Threads/Posix/PlatformThreadSpecificValue.h @@ -0,0 +1,63 @@ +#ifndef PLATFORMTHREADSPECIFICVALUE_H +#define PLATFORMTHREADSPECIFICVALUE_H + +#if UNITY_DYNAMIC_TLS + +#include <pthread.h> +#include "Runtime/Utilities/LogAssert.h" +#include "Runtime/Utilities/Utility.h" + +class PlatformThreadSpecificValue +{ +public: + PlatformThreadSpecificValue(); + ~PlatformThreadSpecificValue(); + + void* GetValue() const; + void SetValue(void* value); + +private: + pthread_key_t m_TLSKey; +}; + +inline PlatformThreadSpecificValue::PlatformThreadSpecificValue() +{ + int rc = pthread_key_create(&m_TLSKey, NULL); + DebugAssertIf(rc != 0); + UNUSED(rc); +} + +inline PlatformThreadSpecificValue::~PlatformThreadSpecificValue() +{ + int rc = pthread_key_delete(m_TLSKey); + DebugAssertIf(rc != 0); + UNUSED(rc); +} + +inline void* PlatformThreadSpecificValue::GetValue() const +{ +#if !UNITY_LINUX + // 0 is a valid key on Linux and POSIX specifies keys as opaque objects, + // so technically we have no business snopping in them anyway... + DebugAssertIf(m_TLSKey == 0); +#endif + return pthread_getspecific(m_TLSKey); +} + +inline void PlatformThreadSpecificValue::SetValue(void* value) +{ +#if !UNITY_LINUX + // 0 is a valid key on Linux and POSIX specifies keys as opaque objects, + // so technically we have no business snopping in them anyway... + DebugAssertIf(m_TLSKey == 0); +#endif + pthread_setspecific(m_TLSKey, value); +} + +#else + + #error "POSIX doesn't define a static TLS path" + +#endif // UNITY_DYNAMIC_TLS + +#endif // PLATFORMTHREADSPECIFICVALUE_H diff --git a/Runtime/Threads/ProfilerMutex.h b/Runtime/Threads/ProfilerMutex.h new file mode 100644 index 0000000..1e82b7c --- /dev/null +++ b/Runtime/Threads/ProfilerMutex.h @@ -0,0 +1,118 @@ +#ifndef __PROFILERMUTEX_H +#define __PROFILERMUTEX_H + +#include "Mutex.h" + +#if ENABLE_PROFILER +#include "Runtime/Profiler/Profiler.h" +#endif + +#define THREAD_LOCK_WARNINGS 0 +#define THREAD_LOCK_TIMING 0 + +#if THREAD_LOCK_WARNINGS || (ENABLE_PROFILER && SUPPORT_THREADS) + +#define AQUIRE_AUTOLOCK(mutex,profilerInformation) ProfilerMutexAutoLock aquireAutoLock (mutex, #mutex, profilerInformation, __FILE__, __LINE__) +#define AQUIRE_AUTOLOCK_WARN_MAIN_THREAD(mutex,profilerInformation) ProfilerMutexAutoLock aquireAutoLock (mutex, #mutex, Thread::mainThreadId, profilerInformation, __FILE__, __LINE__) +#define LOCK_MUTEX(mutex,profilerInformation) ProfilerMutexLock(mutex,#mutex,profilerInformation,__FILE__,__LINE__) + +double GetTimeSinceStartup (); + +inline void ProfilerMutexLock (Mutex& mutex, char const* mutexName, ProfilerInformation& information, char const* file, int line) +{ +#if ENABLE_PROFILER || THREAD_LOCK_WARNINGS + + if (mutex.TryLock()) + return; + + #if THREAD_LOCK_WARNINGS + DebugStringToFile (std::string ("Mutex '") + mutexName + "' already locked: " + information.name, 0, file, line, kScriptingWarning); + #endif + + PROFILER_AUTO_THREAD_SAFE(information, NULL) + +#if THREAD_LOCK_TIMING + double start = GetTimeSinceStartup (); + while (!mutex.TryLock()) + Sleep (10); + + double duration = GetTimeSinceStartup () - start; + DebugStringToFile (std::string ("Mutex '") + mutexName + "' obtained after: " + FloatToString(duration, "%6.3f") + " s", 0, file, line, kScriptingWarning); +#else + mutex.Lock(); +#endif + +#else + mutex.Lock(); +#endif +} + +inline void ProfilerMutexLock (Mutex& mutex, char const* mutexName, Thread::ThreadID threadID, ProfilerInformation& information, char const* file, int line) +{ +#if ENABLE_PROFILER || THREAD_LOCK_WARNINGS + if (mutex.TryLock()) + return; + + #if THREAD_LOCK_WARNINGS + if (Thread::EqualsCurrentThreadID(threadID)) + { + DebugStringToFile (std::string ("Mutex '") + mutexName + "' already locked: " + information.name, 0, file, line, kScriptingWarning); + } + #endif + + PROFILER_AUTO_THREAD_SAFE(information, NULL) + +#if THREAD_LOCK_TIMING + double start = GetTimeSinceStartup (); + while (!mutex.TryLock()) + Sleep (10); + + double duration = GetTimeSinceStartup () - start; + DebugStringToFile (std::string ("Mutex '") + mutexName + "' obtained after: " + FloatToString(duration, "%6.3f") + " s", 0, file, line, kScriptingWarning); +#else + mutex.Lock(); +#endif + +#else + mutex.Lock(); +#endif +} + +class ProfilerMutexAutoLock +{ +public: + ProfilerMutexAutoLock (Mutex& mutex, char const* mutexName, ProfilerInformation& profilerInformation, char const* file, int line) + : m_Mutex (&mutex) + { + ProfilerMutexLock(mutex, mutexName, profilerInformation, file, line); + } + + ProfilerMutexAutoLock (Mutex& mutex, char const* mutexName, Thread::ThreadID threadID, ProfilerInformation& profilerInformation, char const* file, int line) + : m_Mutex (&mutex) + { + ProfilerMutexLock(mutex, mutexName, threadID, profilerInformation, file, line); + } + + ~ProfilerMutexAutoLock() + { + m_Mutex->Unlock(); + } + +private: + ProfilerMutexAutoLock(const ProfilerMutexAutoLock&); + ProfilerMutexAutoLock& operator=(const ProfilerMutexAutoLock&); + +private: + Mutex* m_Mutex; +}; + +#else + +#define AQUIRE_AUTOLOCK(mutex,profilerInformation) Mutex::AutoLock aquireAutoLock (mutex) +#define AQUIRE_AUTOLOCK_WARN_MAIN_THREAD(mutex,profilerInformation) Mutex::AutoLock aquireAutoLock (mutex) +#define LOCK_MUTEX(mutex,profilerInformation) (mutex).Lock () + + +#endif + +#endif diff --git a/Runtime/Threads/Semaphore.h b/Runtime/Threads/Semaphore.h new file mode 100644 index 0000000..151f64d --- /dev/null +++ b/Runtime/Threads/Semaphore.h @@ -0,0 +1,67 @@ +#ifndef __SEMAPHORE_H +#define __SEMAPHORE_H + +#if SUPPORT_THREADS + +#if UNITY_WIN || UNITY_XENON +# include "Winapi/PlatformSemaphore.h" +#elif UNITY_LINUX || UNITY_PEPPER || UNITY_ANDROID || UNITY_PS3 || UNITY_BB10 || UNITY_TIZEN +# include "Posix/PlatformSemaphore.h" +#else +# include "PlatformSemaphore.h" +#endif + +#include "Runtime/Utilities/NonCopyable.h" + +class Semaphore : public NonCopyable +{ +public: + Semaphore() { m_Semaphore.Create(); } + ~Semaphore() { m_Semaphore.Destroy(); } + void Reset() { m_Semaphore.Destroy(); m_Semaphore.Create(); } + void WaitForSignal() { m_Semaphore.WaitForSignal(); } + void Signal() { m_Semaphore.Signal(); } + +private: + PlatformSemaphore m_Semaphore; +}; + +class SuspendableSemaphore : public NonCopyable +{ +public: + explicit SuspendableSemaphore(bool suspended = true) : m_Suspended(suspended), m_SuspendedIndefinitely(false) { } + bool IsSuspended() const { return m_Suspended; } + void Reset() { m_Semaphore.Reset(); } + void WaitForSignal() { if (!m_Suspended) m_Semaphore.WaitForSignal(); }; + void Signal() { if (!m_Suspended) m_Semaphore.Signal(); } + void Resume(bool reset = true); + void Suspend(bool indefinitely = false); + +private: + volatile bool m_SuspendedIndefinitely; + volatile bool m_Suspended; + Semaphore m_Semaphore; +}; + +inline void SuspendableSemaphore::Resume(bool reset) +{ + if (reset) + Reset(); + + if (!m_SuspendedIndefinitely) + m_Suspended = false; +} + +inline void SuspendableSemaphore::Suspend(bool indefinitely) +{ + m_Suspended = true; + if (indefinitely) + m_SuspendedIndefinitely = indefinitely; + + m_Semaphore.Signal(); // release any waiting thread +} + + +#endif // SUPPORT_THREADS + +#endif // __SEMAPHORE_H diff --git a/Runtime/Threads/SimpleLock.h b/Runtime/Threads/SimpleLock.h new file mode 100644 index 0000000..ace47aa --- /dev/null +++ b/Runtime/Threads/SimpleLock.h @@ -0,0 +1,53 @@ +#ifndef SIMPLE_LOCK_H +#define SIMPLE_LOCK_H + +#if SUPPORT_THREADS + +#include "AtomicOps.h" +#include "Semaphore.h" + +// Simple, non-recursive mutual exclusion lock. Efficient when there is low contention. +// First tries to attain lock using atomic ops, then waits for lock using semaphores. +// Same idea as described here: http://preshing.com/20120226/roll-your-own-lightweight-mutex + +class SimpleLock : public NonCopyable +{ +public: + SimpleLock() : m_Count(0) {} + + class AutoLock : public NonCopyable + { + public: + AutoLock( SimpleLock& lock ) : m_Lock(lock) + { + m_Lock.Lock(); + } + + ~AutoLock() + { + m_Lock.Unlock(); + } + + private: + SimpleLock& m_Lock; + }; + + void Lock() + { + if (AtomicIncrement(&m_Count) != 1) + m_Semaphore.WaitForSignal(); + } + + void Unlock() + { + if (AtomicDecrement(&m_Count) != 0) + m_Semaphore.Signal(); + } + +private: + volatile int m_Count; + Semaphore m_Semaphore; +}; + +#endif // SUPPORT_THREADS +#endif diff --git a/Runtime/Threads/SpinlockMutex.h b/Runtime/Threads/SpinlockMutex.h new file mode 100644 index 0000000..63ca9aa --- /dev/null +++ b/Runtime/Threads/SpinlockMutex.h @@ -0,0 +1,63 @@ +#ifndef __SPINLOCK_MUTEX_H +#define __SPINLOCK_MUTEX_H + +#if UNITY_OSX + +#include <libkern/OSAtomic.h> + +class SpinlockMutex : public NonCopyable +{ +public: + + class AutoLock + { + public: + AutoLock( SpinlockMutex& mutex ) + : m_Mutex(&mutex) + { + mutex.Lock(); + } + + ~AutoLock() + { + m_Mutex->Unlock(); + } + + private: + AutoLock(const AutoLock&); + AutoLock& operator=(const AutoLock&); + + private: + SpinlockMutex* m_Mutex; + }; + + SpinlockMutex() + { + m_SpinLock = OS_SPINLOCK_INIT; + } + + ~SpinlockMutex() + {} + + void Lock() + { + OSSpinLockLock(&m_SpinLock); + } + + void Unlock() + { + OSSpinLockUnlock(&m_SpinLock); + } + +private: + + volatile OSSpinLock m_SpinLock; +}; + +#else + +typedef Mutex SpinlockMutex; + +#endif + +#endif
\ No newline at end of file diff --git a/Runtime/Threads/Thread.cpp b/Runtime/Threads/Thread.cpp new file mode 100644 index 0000000..c9b008c --- /dev/null +++ b/Runtime/Threads/Thread.cpp @@ -0,0 +1,123 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS +#include "Thread.h" +#include "ThreadHelper.h" +#include "ThreadUtility.h" +#include "Runtime/Utilities/Word.h" +#include "Runtime/Utilities/Utility.h" +#include "Runtime/Allocator/MemoryManager.h" + +#if !UNITY_EXTERNAL_TOOL +#include "Runtime/Utilities/LogAssert.h" +#include "Runtime/Allocator/MemoryMacros.h" +#endif + +#if SUPPORT_ERROR_EXIT +#include "Runtime/Utilities/ErrorExit.h" +#endif + +Thread::ThreadID Thread::mainThreadId = Thread::GetCurrentThreadID(); + +UNITY_THREAD_FUNCTION_RETURN_SIGNATURE Thread::RunThreadWrapper (void* ptr) +{ + Thread* thread = (Thread*)ptr; + + #if ENABLE_MEMORY_MANAGER + GetMemoryManager().ThreadInitialize(); + #endif + + thread->m_Thread.Enter(thread); // PlatformThread (Posix/Winapi/Custom) + + ThreadHelper::SetThreadName(thread); + + void *result = NULL; +#if SUPPORT_ERROR_EXIT + ERROR_EXIT_THREAD_ENTRY(); + result = thread->m_EntryPoint(thread->m_UserData); + ERROR_EXIT_THREAD_EXIT(); +#else // SUPPORT_ERROR_EXIT + result = thread->m_EntryPoint(thread->m_UserData); +#endif // SUPPORT_ERROR_EXIT + + // NOTE: code below will not execute if thread is terminated + thread->m_Running = false; + UnityMemoryBarrier(); + + #if ENABLE_MEMORY_MANAGER + GetMemoryManager().ThreadCleanup(); + #endif + + thread->m_Thread.Exit(thread, result); // PlatformThread (Posix/Winapi/Custom) + + return reinterpret_cast<UNITY_THREAD_FUNCTION_RETURNTYPE>(result); +} + +Thread::Thread () +: m_UserData(NULL) +, m_EntryPoint(NULL) +, m_Running(false) +, m_ShouldQuit(false) +, m_Priority(kNormalPriority) +, m_Name(NULL) +{ +} + +Thread::~Thread() +{ + AssertMsg(!m_Running, "***Thread '%s' is still running!***", m_Name); +} + +void Thread::Run(void* (*entry_point) (void*), void* data, const UInt32 stackSize, int processor) +{ + Assert(!m_Running); + + m_ShouldQuit = false; + m_UserData = data; + m_EntryPoint = entry_point; + m_Running = true; + + m_Thread.Create(this, stackSize, processor); +} + +void Thread::WaitForExit(bool signalQuit) +{ + if (m_Running && signalQuit) + SignalQuit(); + + m_Thread.Join(this); // PlatformThread (Posix/Winapi/Custom) + + Assert(!m_Running && "Thread shouldn't be running anymore"); + m_Running = false; +} + +Thread::ThreadID Thread::GetCurrentThreadID() +{ + return PlatformThread::GetCurrentThreadID(); +} + +void Thread::Sleep (double time) +{ + ThreadHelper::Sleep(time); +} + +void Thread::SetPriority(ThreadPriority prio) +{ + if (!m_Running || m_Priority == prio) + return; + + m_Priority = prio; + m_Thread.UpdatePriority(this); // PlatformThread (Posix/Winapi/Custom) +} + +void Thread::SetCurrentThreadProcessor(int processor) +{ + ThreadHelper::SetThreadProcessor(NULL, processor); +} + +double Thread::GetThreadRunningTime(ThreadID threadId) +{ + return ThreadHelper::GetThreadRunningTime(threadId); +} + +#endif diff --git a/Runtime/Threads/Thread.h b/Runtime/Threads/Thread.h new file mode 100644 index 0000000..2be0d8a --- /dev/null +++ b/Runtime/Threads/Thread.h @@ -0,0 +1,112 @@ +#ifndef THREAD_H +#define THREAD_H + +enum ThreadPriority { kLowPriority = 0, kBelowNormalPriority = 1, kNormalPriority = 2, kHighPriority = 4 }; + +#if SUPPORT_THREADS + +#define ASSERT_RUNNING_ON_MAIN_THREAD Assert(Thread::EqualsCurrentThreadIDForAssert(Thread::mainThreadId)); + +#if !UNITY_PLUGIN && !UNITY_EXTERNAL_TOOL +#include "Configuration/UnityConfigure.h" +#endif + +#if UNITY_WIN || UNITY_XENON +# include "Winapi/PlatformThread.h" +#elif UNITY_OSX || UNITY_PS3 || UNITY_IPHONE || UNITY_ANDROID || UNITY_PEPPER || UNITY_LINUX || UNITY_BB10 || UNITY_TIZEN +# include "Posix/PlatformThread.h" +#else +# include "PlatformThread.h" +#endif + + +#ifndef DEFAULT_UNITY_THREAD_STACK_SIZE +#define DEFAULT_UNITY_THREAD_STACK_SIZE 0 +#endif + +#ifndef DEFAULT_UNITY_THREAD_PROCESSOR +#define DEFAULT_UNITY_THREAD_PROCESSOR -1 +#endif + +#include "Runtime/Utilities/NonCopyable.h" + +/** + * A thread. + */ +class EXPORT_COREMODULE Thread : public NonCopyable +{ + friend class ThreadHelper; + friend class PlatformThread; + +public: + typedef PlatformThread::ThreadID ThreadID; + +public: + Thread(); + ~Thread(); + + void Run (void* (*entry_point) (void*), void* data, const UInt32 stackSize = DEFAULT_UNITY_THREAD_STACK_SIZE, int processor = DEFAULT_UNITY_THREAD_PROCESSOR); + bool IsRunning() const { return m_Running; } + + // Was the thread told to stop running? + bool IsQuitSignaled () const { return m_ShouldQuit; } + // Tells the thread to stop running, the thread main loop is responsible for checking this variable + void SignalQuit () { m_ShouldQuit = true; } + + // Signals quit and waits until the thread main function is exited! + void WaitForExit(bool signalQuit = true); + + void SetName (const char* name) { m_Name = name; } + + static void Sleep (double seconds); + + void SetPriority (ThreadPriority prior); + ThreadPriority GetPriority () const { return m_Priority; } + + static void SetCurrentThreadProcessor(int processor); + + static ThreadID GetCurrentThreadID(); + static bool EqualsCurrentThreadID(ThreadID thread) { return GetCurrentThreadID() == thread; } + static bool EqualsCurrentThreadIDForAssert(ThreadID thread) { return EqualsCurrentThreadID(thread); } + static bool EqualsThreadID (Thread::ThreadID lhs, Thread::ThreadID rhs) { return lhs == rhs; } + + void ExternalEarlySetRunningFalse() { m_Running = false; } + + static double GetThreadRunningTime(ThreadID thread); + + static ThreadID mainThreadId; + + static bool CurrentThreadIsMainThread() { return EqualsCurrentThreadID(mainThreadId); } + +private: + PlatformThread m_Thread; + + void* m_UserData; + void* (*m_EntryPoint) (void*); + + volatile bool m_Running; + volatile bool m_ShouldQuit; + ThreadPriority m_Priority; + const char* m_Name; + + static UNITY_THREAD_FUNCTION_RETURN_SIGNATURE RunThreadWrapper (void* ptr); +}; + + +#else // SUPPORT_THREADS + + +// No threads in this platform, stub minimal functionality + +#define ASSERT_RUNNING_ON_MAIN_THREAD + +class Thread { +public: + static void Sleep (double t) { } + static bool CurrentThreadIsMainThread() { return true; } +}; + + +#endif //SUPPORT_THREADS + +#endif diff --git a/Runtime/Threads/ThreadHelper.cpp b/Runtime/Threads/ThreadHelper.cpp new file mode 100644 index 0000000..913c606 --- /dev/null +++ b/Runtime/Threads/ThreadHelper.cpp @@ -0,0 +1,195 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS +#include "Thread.h" +#include "ThreadUtility.h" +#include "ThreadHelper.h" + +#if UNITY_OSX || UNITY_IPHONE +#include <mach/mach.h> +#include <dlfcn.h> +#endif +#if UNITY_PS3 +# include <sys/timer.h> +#endif +#if UNITY_PEPPER && defined(__native_client__) +#include <sys/time.h> +#include <sys/signal.h> +#include <sys/nacl_syscalls.h> +#endif +#if UNITY_ANDROID || UNITY_TIZEN +#include <sys/prctl.h> +#endif +#if UNITY_WINRT +#include "PlatformDependent/MetroPlayer/Win32Threads.h" +#endif + +void ThreadHelper::Sleep(double time) +{ +#if UNITY_WINRT + int milliseconds = (int)(time * 1000.0); + //::SleepEx(milliseconds, true); // Must be alertable so that mono runtime can request thread suspend + + HANDLE const event = CreateEventExW(nullptr, nullptr, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS); + Assert(NULL != event); + + if (NULL != event) + { + WaitForSingleObjectEx(event, milliseconds, TRUE); + + CloseHandle(event); + } + +#elif UNITY_WIN || UNITY_XENON + int milliseconds = (int)(time * 1000.0); + ::SleepEx(milliseconds, true); // Must be alertable so that mono runtime can request thread suspend +#elif UNITY_PS3 + sys_timer_usleep((int)(time * 1000.0)); +#elif UNITY_PEPPER + usleep((int)(time * 1000.0)); +#elif THREAD_API_PTHREAD || (UNITY_OSX || UNITY_IPHONE || UNITY_ANDROID || UNITY_LINUX || UNITY_BB10 || UNITY_TIZEN) + timespec ts; + int seconds = FloorfToInt(time); + double micro = (time - seconds) * 1000000.0; + int nano = (int)micro * 1000; + ts.tv_sec = seconds; + ts.tv_nsec = nano; + + // nanosleep takes a timespec that is an offset, not + // an absolute time. + nanosleep(&ts, 0); // note: the spleep is aborted if the thread receives a signal. It will return -1 and set errno to EINTR. +#else +#error Unknown OS API - not POSIX nor WinAPI + +#endif + +} + +void ThreadHelper::SetThreadName(const Thread* thread) +{ + if (!thread->m_Name) + return; + +#if (UNITY_WIN && !UNITY_WINRT) + const DWORD MS_VC_EXCEPTION=0x406D1388; + + #pragma pack(push,8) + typedef struct tagTHREADNAME_INFO + { + DWORD dwType; // Must be 0x1000. + LPCSTR szName; // Pointer to name (in user addr space). + DWORD dwThreadID; // Thread ID (-1=caller thread). + DWORD dwFlags; // Reserved for future use, must be zero. + } THREADNAME_INFO; + #pragma pack(pop) + + THREADNAME_INFO info; + info.dwType = 0x1000; + info.szName = thread->m_Name; + info.dwThreadID = thread->m_Thread.m_ThreadId; + info.dwFlags = 0; + + __try + { + RaiseException( MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR), (ULONG_PTR*)&info ); + } + __except(EXCEPTION_EXECUTE_HANDLER) + { + } +#elif UNITY_WINRT + #pragma message("todo: implement") // ?!- +#elif UNITY_XENON + typedef struct tagTHREADNAME_INFO { + DWORD dwType; // Must be 0x1000 + LPCSTR szName; // Pointer to name (in user address space) + DWORD dwThreadID; // Thread ID (-1 for caller thread) + DWORD dwFlags; // Reserved for future use; must be zero + } THREADNAME_INFO; + + THREADNAME_INFO info; + info.dwType = 0x1000; + info.szName = thread->m_Name; + info.dwThreadID = thread->m_Thread.m_ThreadId; + info.dwFlags = 0; + + __try + { + RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD *)&info ); + } + __except( GetExceptionCode()==0x406D1388 ? EXCEPTION_CONTINUE_EXECUTION : EXCEPTION_EXECUTE_HANDLER ) + { + } +#elif UNITY_OSX + // pthread_setname_np is OSX 10.6 and later only + int (*dynamic_pthread_setname_np)(const char*); + *reinterpret_cast<void**>(&dynamic_pthread_setname_np) = dlsym(RTLD_DEFAULT, "pthread_setname_np"); + if (dynamic_pthread_setname_np) + dynamic_pthread_setname_np(thread->m_Name); +#elif UNITY_ANDROID || UNITY_TIZEN + prctl(PR_SET_NAME, (unsigned long)(thread->m_Name ? thread->m_Name : "<Unknown>"),0,0,0); +#endif +} + +void ThreadHelper::SetThreadProcessor(const Thread* thread, int processor) +{ + if (processor == DEFAULT_UNITY_THREAD_PROCESSOR) + return; + +#if UNITY_OSX + if (thread == NULL || + thread->m_Thread.m_Thread == Thread::GetCurrentThreadID()) + { + #define THREAD_AFFINITY_POLICY 4 + integer_t tap = 1 + processor; + thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, (integer_t*) &tap, 1); + } +#elif UNITY_WIN + // We interpret 'processor' here as processor core +// HANDLE hThread = thread == NULL ? GetCurrentThread() : thread->m_Thread.m_Thread; +// DWORD affinity = systeminfo::GetCoreAffinityMask(processor); +// SetThreadAffinityMask(hThread, affinity); +#elif UNITY_XENON + HANDLE hThread = thread == NULL ? GetCurrentThread() : thread->m_Thread.m_Thread; + AssertIf(processor > 5); + XSetThreadProcessor(hThread, processor); +#endif +} + +double ThreadHelper::GetThreadRunningTime(Thread::ThreadID thread) +{ +#if UNITY_OSX || UNITY_IPHONE + mach_port_t mach_thread = pthread_mach_thread_np(thread); + thread_basic_info_data_t info; + mach_msg_type_number_t size = sizeof(thread_basic_info_data_t)/sizeof(integer_t); + if (thread_info (mach_thread, THREAD_BASIC_INFO, (integer_t*)&info, &size)) + return 0; + + return (double)info.user_time.microseconds / 1000000.0f + info.user_time.seconds + + (double)info.system_time.microseconds / 1000000.0f + info.system_time.seconds; +#elif (UNITY_WIN && !UNITY_WINRT) || UNITY_XENON + double time = 0.0; + + HANDLE threadHandle = OpenThread(THREAD_QUERY_INFORMATION, FALSE, thread); + + if (NULL != threadHandle) + { + FILETIME creationTime, exitTime, kernelTime, userTime; + + if (GetThreadTimes(threadHandle, &creationTime, &exitTime, &kernelTime, &userTime)) + { + ULARGE_INTEGER largeKernelTime = { kernelTime.dwLowDateTime, kernelTime.dwHighDateTime }; + ULARGE_INTEGER largeUserTime = { userTime.dwLowDateTime, userTime.dwHighDateTime }; + + time = ((largeKernelTime.QuadPart + largeUserTime.QuadPart) / 10000000.0); + } + + CloseHandle(threadHandle); + } + return time; +#else + return 0.0; +#endif +} + + +#endif diff --git a/Runtime/Threads/ThreadHelper.h b/Runtime/Threads/ThreadHelper.h new file mode 100644 index 0000000..3e21538 --- /dev/null +++ b/Runtime/Threads/ThreadHelper.h @@ -0,0 +1,30 @@ +#ifndef THREADHELPER_H +#define THREADHELPER_H + +#if SUPPORT_THREADS + +#include "Thread.h" + +// ThreadHelper is typically implemented on a per-platform basis, as it contains OS +// specific functionality outside regular POSIX / pthread / WinAPI threads. + +class ThreadHelper +{ + friend class Thread; + friend class PlatformThread; + +protected: + static void Sleep(double time); + + static void SetThreadName(const Thread* thread); + static void SetThreadProcessor(const Thread* thread, int processor); + + static double GetThreadRunningTime(Thread::ThreadID thread); + +private: + ThreadHelper(); +}; + +#endif //SUPPORT_THREADS + +#endif diff --git a/Runtime/Threads/ThreadSharedObject.h b/Runtime/Threads/ThreadSharedObject.h new file mode 100644 index 0000000..d53239a --- /dev/null +++ b/Runtime/Threads/ThreadSharedObject.h @@ -0,0 +1,30 @@ +#ifndef THREAD_SHARED_OBJECT_H +#define THREAD_SHARED_OBJECT_H + +#include "Runtime/Utilities/NonCopyable.h" +#include "Runtime/Threads/AtomicOps.h" + +class ThreadSharedObject : public NonCopyable +{ +public: + void AddRef() const { AtomicIncrement(&m_RefCount); } + void Release() const { if (AtomicDecrement(&m_RefCount) == 0) delete this; } + void Release(MemLabelId label) const + { + if (AtomicDecrement(&m_RefCount) == 0) + { + this->~ThreadSharedObject(); + UNITY_FREE(label,const_cast<ThreadSharedObject*>(this)); + } + } + int GetRefCount() const { return m_RefCount; } + +protected: + ThreadSharedObject(int refs = 1) : m_RefCount(refs) {} + virtual ~ThreadSharedObject() {} + +private: + volatile mutable int m_RefCount; +}; + +#endif diff --git a/Runtime/Threads/ThreadSpecificValue.cpp b/Runtime/Threads/ThreadSpecificValue.cpp new file mode 100644 index 0000000..6244754 --- /dev/null +++ b/Runtime/Threads/ThreadSpecificValue.cpp @@ -0,0 +1,7 @@ +#include "UnityPrefix.h" +#include "ThreadSpecificValue.h" + +#if UNITY_DYNAMIC_TLS + + +#endif // UNITY_DYNAMIC_TLS diff --git a/Runtime/Threads/ThreadSpecificValue.h b/Runtime/Threads/ThreadSpecificValue.h new file mode 100644 index 0000000..cf1cba8 --- /dev/null +++ b/Runtime/Threads/ThreadSpecificValue.h @@ -0,0 +1,108 @@ +#ifndef THREADSPECIFICVALUE_H +#define THREADSPECIFICVALUE_H + +#include "Configuration/UnityConfigure.h" + +#if !SUPPORT_THREADS + class PlatformThreadSpecificValue + { + public: + PlatformThreadSpecificValue(); + ~PlatformThreadSpecificValue(); + + void* GetValue() const; + void SetValue(void* value); + + private: + void* m_Value; + }; + + inline PlatformThreadSpecificValue::PlatformThreadSpecificValue() + { + m_Value = 0; + } + + inline PlatformThreadSpecificValue::~PlatformThreadSpecificValue() + { + //do nothing + } + + inline void* PlatformThreadSpecificValue::GetValue() const + { + return m_Value; + } + + inline void PlatformThreadSpecificValue::SetValue(void* value) + { + m_Value = value; + } +#elif UNITY_WIN || UNITY_XENON +# include "Winapi/PlatformThreadSpecificValue.h" +#elif UNITY_OSX || UNITY_IPHONE || UNITY_ANDROID || UNITY_PEPPER || UNITY_LINUX || UNITY_BB10 || UNITY_TIZEN +# include "Posix/PlatformThreadSpecificValue.h" +#else +# include "PlatformThreadSpecificValue.h" +#endif + +#if UNITY_DYNAMIC_TLS + +template<class T> class ThreadSpecificValue +{ + PlatformThreadSpecificValue m_TLSKey; + +public: + inline operator T () const + { + return (T)GetValue (); + } + + inline T operator -> () const + { + return (T)GetValue (); + } + + inline T operator ++ () + { + *this = *this + 1; + return *this; + } + + inline T operator -- () + { + *this = *this - 1; + return *this; + } + + inline T operator = (T value) + { + SetValue((void*)value); + return value; + } + +private: + + void* GetValue() const; + void SetValue(void* value); +}; + +template<class T> void* ThreadSpecificValue<T>::GetValue () const +{ + return m_TLSKey.GetValue(); +} + +template <class T> void ThreadSpecificValue<T>::SetValue (void* value) +{ + m_TLSKey.SetValue(value); +} + +#define UNITY_TLS_VALUE(type) ThreadSpecificValue<type> + +#else + +# ifndef UNITY_TLS_VALUE +# error "A static TLS mechanism is not defined for this platform" +# endif + +#endif // UNITY_DYNAMIC_TLS + +#endif diff --git a/Runtime/Threads/ThreadUtility.h b/Runtime/Threads/ThreadUtility.h new file mode 100644 index 0000000..b45a3a5 --- /dev/null +++ b/Runtime/Threads/ThreadUtility.h @@ -0,0 +1,91 @@ +#ifndef __THREAD_UTILITY_H +#define __THREAD_UTILITY_H + +#if UNITY_OSX +#ifndef __ppc__ +#include <libkern/OSAtomic.h> +#endif +#endif + +#if UNITY_IPHONE +#include <libkern/OSAtomic.h> +#endif + +#if UNITY_PEPPER +inline int NoBarrier_AtomicExchange(volatile int* ptr, + int new_value) { + __asm__ __volatile__("xchgl %1,%0" // The lock prefix is implicit for xchg. + : "=r" (new_value) + : "m" (*ptr), "0" (new_value) + : "memory"); + return new_value; // Now it's the previous value. +} +#endif + +// Memory barrier. +// +// Necessary to call this when using volatile to order writes/reads in multiple threads. +inline void UnityMemoryBarrier() +{ + #if UNITY_WIN || UNITY_XENON + #ifdef MemoryBarrier + MemoryBarrier(); + #else + long temp; + __asm xchg temp,eax; + #endif + + #elif UNITY_OSX + + OSMemoryBarrier(); + + #elif UNITY_PS3 + + __lwsync(); + + #elif UNITY_IPHONE + // No need for memory barriers on iPhone and Android - single CPU + OSMemoryBarrier(); + + #elif UNITY_PEPPER + + #if defined(__x86_64__) + + // 64-bit implementations of memory barrier can be simpler, because it + // "mfence" is guaranteed to exist. + __asm__ __volatile__("mfence" : : : "memory"); + + #else + +/* if (AtomicOps_Internalx86CPUFeatures.has_sse2) { + __asm__ __volatile__("mfence" : : : "memory"); + } else { // mfence is faster but not present on PIII*/ + int x = 0; + NoBarrier_AtomicExchange(&x, 0); // acts as a barrier on PIII +// } + + #endif + + #elif UNITY_ANDROID + + #elif UNITY_WII + + #elif UNITY_TIZEN + + __sync_synchronize(); + + #elif UNITY_LINUX || UNITY_BB10 + #ifdef __arm__ + __asm__ __volatile__ ("mcr p15, 0, %0, c7, c10, 5" : : "r" (0) : "memory"); + #else + __sync_synchronize(); + #endif + #elif UNITY_FLASH || UNITY_WEBGL + // Flash has no threads + #else + + #error "Unknown platform, implement memory barrier if at all possible" + #endif +} + +#endif diff --git a/Runtime/Threads/ThreadedStreamBuffer.cpp b/Runtime/Threads/ThreadedStreamBuffer.cpp new file mode 100644 index 0000000..87665f4 --- /dev/null +++ b/Runtime/Threads/ThreadedStreamBuffer.cpp @@ -0,0 +1,437 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS + +#include "ThreadedStreamBuffer.h" +#include "Runtime/Threads/Mutex.h" +#include "Runtime/Threads/Semaphore.h" +#include "Runtime/Threads/ThreadUtility.h" +#include "Runtime/Threads/AtomicOps.h" + +#if ENABLE_GFXDEVICE_REMOTE_PROCESS_CLIENT +#include <ppapi/cpp/instance.h> +#include "External/NPAPI2NaCl/Common/UnityInterfaces.h" +#include "PlatformDependent/PepperPlugin/UnityInstance.h" +#endif + +double GetTimeSinceStartup(); + +ThreadedStreamBuffer::ThreadedStreamBuffer() +{ + SetDefaults(); +} + +ThreadedStreamBuffer::ThreadedStreamBuffer(Mode mode, size_t size) +{ + SetDefaults(); + Create(mode, size); +} + +ThreadedStreamBuffer::~ThreadedStreamBuffer() +{ + Destroy(); +} + +void ThreadedStreamBuffer::CreateFromMemory(Mode mode, size_t size, void *buffer) +{ + Assert(mode != kModeReadOnly); + Assert(m_Buffer == NULL); + m_Mode = mode; + BufferHeader *header = (BufferHeader*)buffer; + m_Reader = &header->reader; + m_Writer = &header->writer; + m_Buffer = (char*)buffer + sizeof(BufferHeader); + m_BufferSize = size - sizeof(BufferHeader); + m_Reader->Reset(); + m_Writer->Reset(); + m_Writer->bufferEnd = size; + if (m_Mode == kModeThreaded) + { + m_Mutex = new Mutex; + m_ReadSemaphore = new Semaphore; + m_WriteSemaphore = new Semaphore; + } +} + +void ThreadedStreamBuffer::Create(Mode mode, size_t size) +{ + m_Reader = &m_Header.reader; + m_Writer = &m_Header.writer; + Assert(mode != kModeReadOnly); + Assert(m_Buffer == NULL); + m_Mode = mode; + if (size != 0) + m_Buffer = (char*)UNITY_MALLOC(kMemUtility, size); + m_BufferSize = size; + m_Reader->Reset(); + m_Writer->Reset(); + m_Writer->bufferEnd = size; + if (m_Mode == kModeThreaded) + { + m_Mutex = new Mutex; + m_ReadSemaphore = new Semaphore; + m_WriteSemaphore = new Semaphore; + } +} + +void ThreadedStreamBuffer::CreateReadOnly(const void* buffer, size_t size) +{ + m_Reader = &m_Header.reader; + m_Writer = &m_Header.writer; + m_Mode = kModeReadOnly; + m_Buffer = (char*)buffer; + m_BufferSize = size; + m_Reader->Reset(); + m_Writer->Reset(); + m_Reader->bufferEnd = size; +} + +void ThreadedStreamBuffer::ResetGrowable() +{ + Assert(m_Mode == kModeGrowable); + m_Reader->Reset(); + m_Writer->Reset(); + m_Writer->bufferEnd = m_BufferSize; +} + +void ThreadedStreamBuffer::Destroy() +{ + if (m_Buffer == NULL) return; + if (m_Mode != kModeReadOnly) + UNITY_FREE(kMemUtility, m_Buffer); + m_Reader->Reset(); + m_Writer->Reset(); + if (m_Mode == kModeThreaded) + { + delete m_Mutex; + delete m_ReadSemaphore; + delete m_WriteSemaphore; + } + SetDefaults(); +} + +ThreadedStreamBuffer::size_t ThreadedStreamBuffer::GetCurrentSize() const +{ + Assert(m_Mode != kModeThreaded); + if (m_Mode == kModeGrowable) + { + return m_Writer->bufferPos; + } + return m_BufferSize; +} + +const void* ThreadedStreamBuffer::GetBuffer() const +{ + //Assert(m_Mode != kModeThreaded); + return m_Buffer; +} + +void ThreadedStreamBuffer::ReadStreamingData(void* data, size_t size, size_t alignment, size_t step) +{ + Assert((step % alignment) == 0); + + // This should not be size_t, as the GfxDevice may run across processes of different + // bitness, and the data serialized in the command buffer must match. + size_t sz = ReadValueType<UInt32>(); + Assert(sz == size); + + char* dest = (char*)data; + for (size_t offset = 0; offset < size; offset += step) + { + size_t bytes = std::min(size - offset, step); + const void* src = GetReadDataPointer(bytes, alignment); + if (data) + UNITY_MEMCPY(dest, src, bytes); + + int magic = ReadValueType<int>(); + Assert(magic == 1234); + + ReadReleaseData(); + dest += step; + } +} + +ThreadedStreamBuffer::size_t ThreadedStreamBuffer::ReadStreamingData(DataConsumer consumer, void* userData, size_t alignment, size_t step) +{ + Assert((step % alignment) == 0); + Assert(consumer != NULL); + + size_t totalBytesRead = 0; + + bool moreData = false; + do + { + const void* src = GetReadDataPointer(step + sizeof(size_t), alignment); + const size_t bytesInBuffer = *(static_cast<const size_t*>(src)); + consumer(static_cast<const size_t*>(src) + 1, bytesInBuffer, userData); + totalBytesRead += bytesInBuffer; + int magic = ReadValueType<int>(); + Assert(magic == 1234); + moreData = ReadValueType<bool>(); + ReadReleaseData(); + } while (moreData); + + return totalBytesRead; +} + +void ThreadedStreamBuffer::ReadReleaseData() +{ + UnityMemoryBarrier(); + if (m_Reader->checkedWraps == m_Reader->bufferWraps) + { + // We only update the position + m_Reader->checkedPos = m_Reader->bufferPos; + } + else + { + // Both values need to be set atomically +#if !ENABLE_GFXDEVICE_REMOTE_PROCESS + Mutex::AutoLock lock(*m_Mutex); +#endif + m_Reader->checkedPos = m_Reader->bufferPos; + m_Reader->checkedWraps = m_Reader->bufferWraps; + } + UnityMemoryBarrier(); + SendReadSignal(); +} + +void ThreadedStreamBuffer::WriteStreamingData(const void* data, size_t size, size_t alignment, size_t step) +{ + // This should not be size_t, as the GfxDevice may run across processes of different + // bitness, and the data serialized in the command buffer must match. + WriteValueType<UInt32>(size); + Assert((step % alignment) == 0); + + const char* src = (const char*)data; + for (size_t offset = 0; offset < size; offset += step) + { + size_t bytes = std::min(size - offset, step); + void* dest = GetWriteDataPointer(bytes, alignment); + UNITY_MEMCPY(dest, src, bytes); + WriteValueType<int>(1234); + + // In the NaCl Web Player, make sure that only complete commands are submitted, as we are not truely + // asynchronous. + #if !ENABLE_GFXDEVICE_REMOTE_PROCESS + WriteSubmitData(); + #endif + + src += step; + } + WriteSubmitData(); +} + +void ThreadedStreamBuffer::WriteStreamingData(DataProvider provider, void* userData, size_t alignment, size_t step) +{ + Assert((step % alignment) == 0); + Assert(provider != NULL); + + bool moreData = false; + do + { + void* dest = GetWriteDataPointer(step + sizeof(size_t), alignment); + size_t outBytesWritten = 0; + moreData = provider(static_cast<size_t*>(dest) + 1, step, outBytesWritten, userData); + *((size_t*)dest) = outBytesWritten; + WriteValueType<int>(1234); + WriteValueType(moreData); + + // In the NaCl Web Player, make sure that only complete commands are submitted, as we are not truely + // asynchronous. + #if !ENABLE_GFXDEVICE_REMOTE_PROCESS + WriteSubmitData(); + #endif + } while (moreData); + + WriteSubmitData(); +} + +void ThreadedStreamBuffer::WriteSubmitData() +{ + UnityMemoryBarrier(); + if (m_Writer->checkedWraps == m_Writer->bufferWraps) + { + // We only update the position + m_Writer->checkedPos = m_Writer->bufferPos; + } + else + { + // Both values need to be set atomically +#if !ENABLE_GFXDEVICE_REMOTE_PROCESS + Mutex::AutoLock lock(*m_Mutex); +#endif + m_Writer->checkedPos = m_Writer->bufferPos; + m_Writer->checkedWraps = m_Writer->bufferWraps; + } + UnityMemoryBarrier(); + SendWriteSignal(); +} + +void ThreadedStreamBuffer::SetDefaults() +{ + m_Mode = kModeReadOnly; + m_Buffer = NULL; + m_BufferSize = 0; + m_GrowStepSize = 8*1024; + m_Mutex = NULL; + m_ReadSemaphore = NULL; + m_WriteSemaphore = NULL; + m_NeedsReadSignal = 0; + m_NeedsWriteSignal = 0; + m_ReadWaitTime = 0.0; + m_WriteWaitTime = 0.0; +}; + +bool ThreadedStreamBuffer::HasDataToRead() const +{ + if (m_Reader->bufferWraps == m_Writer->checkedWraps) + { + return (m_Reader->bufferPos < m_Writer->checkedPos) || (m_Reader->bufferPos < m_Reader->bufferEnd); + } + else + return true; +} + +void ThreadedStreamBuffer::HandleReadOverflow(size_t& dataPos, size_t& dataEnd) +{ +#if !ENABLE_GFXDEVICE_REMOTE_PROCESS + Assert(m_Mode == kModeThreaded); + Assert(m_Mutex != NULL); + Mutex::AutoLock lock(*m_Mutex); +#endif + + if (dataEnd > m_BufferSize) + { + dataEnd -= dataPos; + dataPos = 0; + m_Reader->bufferPos = 0; + m_Reader->bufferWraps++; + } + + for (;;) + { + // Get how many buffer lengths writer is ahead of reader + // This may be -1 if we are waiting for the writer to wrap + size_t comparedPos = m_Writer->checkedPos; + size_t comparedWraps = m_Writer->checkedWraps; + size_t wrapDist = comparedWraps - m_Reader->bufferWraps; + m_Reader->bufferEnd = (wrapDist == 0) ? comparedPos : (wrapDist == 1) ? m_BufferSize : 0; + + if (dataEnd <= m_Reader->bufferEnd) + { + break; + } +#if !ENABLE_GFXDEVICE_REMOTE_PROCESS + AtomicIncrement(&m_NeedsWriteSignal); + UnityMemoryBarrier(); + + m_Mutex->Unlock(); + if (comparedPos != m_Writer->checkedPos || comparedWraps != m_Writer->checkedWraps) + { + // Writer position changed while we requested a signal + // Request might be missed, so we signal ourselves to avoid deadlock + SendWriteSignal(); + } + SendReadSignal(); + // Wait for writer thread + double startTime = GetTimeSinceStartup(); + m_WriteSemaphore->WaitForSignal(); + m_ReadWaitTime += GetTimeSinceStartup() - startTime; + m_Mutex->Lock(); +#endif + } +} + +void ThreadedStreamBuffer::HandleWriteOverflow(size_t& dataPos, size_t& dataEnd) +{ + if (m_Mode == kModeGrowable) + { + size_t dataSize = dataEnd - dataPos; + size_t growSize = std::max(dataSize, m_GrowStepSize); + m_BufferSize += growSize; + m_Buffer = (char*)UNITY_REALLOC_(kMemUtility, m_Buffer, m_BufferSize); + m_Writer->bufferEnd = m_BufferSize; + return; + } +#if !ENABLE_GFXDEVICE_REMOTE_PROCESS + Assert(m_Mode == kModeThreaded); + Assert(m_Mutex != NULL); + + Mutex::AutoLock lock(*m_Mutex); +#endif + + if (dataEnd > m_BufferSize) + { + dataEnd -= dataPos; + dataPos = 0; + m_Writer->bufferPos = 0; + m_Writer->bufferWraps++; + } + + for (;;) + { + UnityMemoryBarrier(); + + // Get how many buffer lengths writer is ahead of reader + // This may be 2 if we are waiting for the reader to wrap + size_t comparedPos = m_Reader->checkedPos; + size_t comparedWraps = m_Reader->checkedWraps; + size_t wrapDist = m_Writer->bufferWraps - comparedWraps; + m_Writer->bufferEnd = (wrapDist == 0) ? m_BufferSize : (wrapDist == 1) ? comparedPos : 0; + + if (dataEnd <= m_Writer->bufferEnd) + { + break; + } +#if ENABLE_GFXDEVICE_REMOTE_PROCESS_CLIENT + struct UNITY_GfxDevice_1_0 *gfxInterface = (UNITY_GfxDevice_1_0*)pp::Module::Get()->GetBrowserInterface(UNITY_GFXDEVICE_INTERFACE_1_0); + gfxInterface->WaitForSignal(0); +#elif !ENABLE_GFXDEVICE_REMOTE_PROCESS + AtomicIncrement(&m_NeedsReadSignal); + UnityMemoryBarrier(); + + m_Mutex->Unlock(); + if (comparedPos != m_Reader->checkedPos || comparedWraps != m_Reader->checkedWraps) + { + // Reader position changed while we requested a signal + // Request might be missed, so we signal ourselves to avoid deadlock + SendReadSignal(); + } + SendWriteSignal(); + // Wait for reader thread + double startTime = GetTimeSinceStartup(); + m_ReadSemaphore->WaitForSignal(); + m_WriteWaitTime += GetTimeSinceStartup() - startTime; + m_Mutex->Lock(); +#endif + } +} + +void ThreadedStreamBuffer::SendReadSignal() +{ + if (AtomicCompareExchange(&m_NeedsReadSignal, 0, 1) ) + { + m_ReadSemaphore->Signal(); + } +} + +void ThreadedStreamBuffer::SendWriteSignal() +{ + if (AtomicCompareExchange(&m_NeedsWriteSignal, 0, 1) ) + { + m_WriteSemaphore->Signal(); + } +} +void ThreadedStreamBuffer::BufferState::Reset() volatile +{ + bufferPos = 0; + bufferEnd = 0; + bufferWraps = 0; + checkedPos = 0; + checkedWraps = 0; +#if !UNITY_RELEASE + totalBytes = 0; +#endif +} + +#endif diff --git a/Runtime/Threads/ThreadedStreamBuffer.h b/Runtime/Threads/ThreadedStreamBuffer.h new file mode 100644 index 0000000..5c213ce --- /dev/null +++ b/Runtime/Threads/ThreadedStreamBuffer.h @@ -0,0 +1,250 @@ +#ifndef THREADED_STREAM_BUFFER_H +#define THREADED_STREAM_BUFFER_H + +#if SUPPORT_THREADS + +#include "Runtime/Utilities/NonCopyable.h" +#include "Runtime/Utilities/Utility.h" +#include <new> // for placement new +#include "Runtime/Threads/Thread.h" + +class Mutex; +class Semaphore; + +/// A single producer, single consumer ringbuffer + +/// Most read and write operations are done even without atomic operations and use no expensive synchronization primitives +/// Each thread owns a part of the buffer and only locks when reaching the end + + +/// *** Common usage *** +/// * Create the ring buffer +/// ThreadedStreamBuffer buffer (kModeThreaded); + +/// * Producer thread... +/// buffer.WriteValueType<int>(5); +/// buffer.WriteValueType<int>(7); +/// buffer.WriteSubmitData(); + +/// * ConsumerThread... +/// print(buffer.ReadValueType<int>()); +/// print(buffer.ReadValueType<int>()); +/// buffer.ReadReleaseData(); + +class ThreadedStreamBuffer : public NonCopyable +{ +public: + + struct BufferState + { + // These should not be size_t, as the GfxDevice may run across processes of different + // bitness, and the data serialized in the command buffer must match. + void Reset() volatile; + volatile UInt32 bufferPos; + volatile UInt32 bufferEnd; + volatile UInt32 bufferWraps; + volatile UInt32 checkedPos; + volatile UInt32 checkedWraps; +#if !UNITY_RELEASE + volatile UInt32 totalBytes; +#endif + }; + + struct BufferHeader + { + BufferState reader; + BufferState writer; + }; + + typedef unsigned size_t; + + enum Mode + { + // This is the most common usage. One producer, one consumer on different threads. + kModeThreaded, + + // When in read mode, we pass a pointer to the external data which can then be read using ReadValueType and ReadReleaseData. + kModeReadOnly, + // When in growable you are only allowed to write into the ring buffer. Essentially like a std::vector. It will keep on growing as you write data. + kModeGrowable, + kModeCrossProcess, + }; + + ThreadedStreamBuffer(Mode mode, size_t size); + ThreadedStreamBuffer(); + ~ThreadedStreamBuffer(); + + enum + { + kDefaultAlignment = 4, + kDefaultStep = 4096 + }; + + // Read data from the ringbuffer + // This function blocks until data new data has arrived in the ringbuffer. + // It uses semaphores to wait on the producer thread in a efficient way. + template <class T> const T& ReadValueType(); + template <class T> T* ReadArrayType(int count); + // ReadReleaseData should be called when the data has been read & used completely. + // At this point the memory will become available to the producer to write into it again. + void ReadReleaseData(); + + // Write data into the ringbuffer + template <class T> void WriteValueType(const T& val); + template <class T> void WriteArrayType(const T* vals, int count); + template <class T> T* GetWritePointer(); + // WriteSubmitData should be called after data has been completely written and should be made available to the consumer thread to read it. + // Before WriteSubmitData is called, any data written with WriteValueType can not be read by the consumer. + void WriteSubmitData(); + + // Ringbuffer Streaming support. This will automatically call WriteSubmitData & ReadReleaseData. + // It splits the data into smaller chunks (step). So that the size of the ringbuffer can be smaller than the data size passed into this function. + // The consumer thread will be reading the streaming data while WriteStreamingData is still called on the producer thread. + void ReadStreamingData(void* data, size_t size, size_t alignment = kDefaultAlignment, size_t step = kDefaultStep); + void WriteStreamingData(const void* data, size_t size, size_t alignment = kDefaultAlignment, size_t step = kDefaultStep); + + + // Utility functions + void* GetReadDataPointer(size_t size, size_t alignment); + void* GetWriteDataPointer(size_t size, size_t alignment); + + size_t GetDebugReadPosition() const { return m_Reader->bufferPos; } + size_t GetDebugWritePosition() const { return m_Writer->bufferPos; } + + double GetReadWaitTime() const { return m_ReadWaitTime; } + double GetWriteWaitTime() const { return m_WriteWaitTime; } + void ResetReadWaitTime() { m_ReadWaitTime = 0.0; } + void ResetWriteWaitTime() { m_WriteWaitTime = 0.0; } + + + // Creation methods + void Create(Mode mode, size_t size); + void CreateReadOnly(const void* buffer, size_t size); + void CreateFromMemory(Mode mode, size_t size, void *buffer); + void ResetGrowable(); + void Destroy(); + + // + // Is there data available to be read + // typicall this is not used + bool HasData() const; + bool HasDataToRead() const; + + size_t GetAllocatedSize() const { return m_BufferSize; } + size_t GetCurrentSize() const; + const void* GetBuffer() const; + + + ////@TODO: Remove this + typedef void (*DataConsumer)(const void* buffer, size_t bufferSize, void* userData); + size_t ReadStreamingData(DataConsumer consumer, void* userData, size_t alignment = kDefaultAlignment, size_t step = kDefaultStep); + + typedef bool (*DataProvider)(void* dest, size_t bufferSize, size_t& bytesWritten, void* userData); + void WriteStreamingData(DataProvider provider, void* userData, size_t alignment = kDefaultAlignment, size_t step = kDefaultStep); + + +private: + FORCE_INLINE size_t Align(size_t pos, size_t alignment) const { return (pos+alignment-1)&~(alignment-1); } + + void SetDefaults(); + + void HandleReadOverflow(size_t& dataPos, size_t& dataEnd); + void HandleWriteOverflow(size_t& dataPos, size_t& dataEnd); + + void SendReadSignal(); + void SendWriteSignal(); + + Mode m_Mode; + char* m_Buffer; + size_t m_BufferSize; + size_t m_GrowStepSize; + BufferState *m_Reader; + BufferState *m_Writer; + BufferHeader m_Header; + Mutex* m_Mutex; + Semaphore* m_ReadSemaphore; + Semaphore* m_WriteSemaphore; + volatile int m_NeedsReadSignal; + volatile int m_NeedsWriteSignal; + double m_ReadWaitTime; + double m_WriteWaitTime; +}; + +FORCE_INLINE bool ThreadedStreamBuffer::HasData() const +{ + return (m_Reader->bufferPos != m_Writer->checkedPos); +} + +FORCE_INLINE void* ThreadedStreamBuffer::GetReadDataPointer(size_t size, size_t alignment) +{ + size = Align(size, alignment); + size_t dataPos = Align(m_Reader->bufferPos, alignment); + size_t dataEnd = dataPos + size; + if (dataEnd > m_Reader->bufferEnd) + { + HandleReadOverflow(dataPos, dataEnd); + } + m_Reader->bufferPos = dataEnd; +#if !UNITY_RELEASE + m_Reader->totalBytes += size; +#endif + return &m_Buffer[dataPos]; +} + +FORCE_INLINE void* ThreadedStreamBuffer::GetWriteDataPointer(size_t size, size_t alignment) +{ + size = Align(size, alignment); + Assert(size*2 <= m_BufferSize || m_Mode == kModeGrowable); + size_t dataPos = Align(m_Writer->bufferPos, alignment); + size_t dataEnd = dataPos + size; + if (dataEnd > m_Writer->bufferEnd) + { + HandleWriteOverflow(dataPos, dataEnd); + } + m_Writer->bufferPos = dataEnd; +#if !UNITY_RELEASE + m_Writer->totalBytes += size; +#endif + return &m_Buffer[dataPos]; +} + +template <class T> FORCE_INLINE const T& ThreadedStreamBuffer::ReadValueType() +{ + // Read simple data type from queue + const void* pdata = GetReadDataPointer(sizeof(T), ALIGN_OF(T)); + const T& src = *reinterpret_cast<const T*>(pdata); + return src; +} + +template <class T> FORCE_INLINE T* ThreadedStreamBuffer::ReadArrayType(int count) +{ + // Read array of data from queue- + void* pdata = GetReadDataPointer(count * sizeof(T), ALIGN_OF(T)); + T* src = reinterpret_cast<T*>(pdata); + return src; +} + +template <class T> FORCE_INLINE void ThreadedStreamBuffer::WriteValueType(const T& val) +{ + // Write simple data type to queue + void* pdata = GetWriteDataPointer(sizeof(T), ALIGN_OF(T)); + new (pdata) T(val); +} + +template <class T> FORCE_INLINE void ThreadedStreamBuffer::WriteArrayType(const T* vals, int count) +{ + // Write array of data to queue + T* pdata = (T*)GetWriteDataPointer(count * sizeof(T), ALIGN_OF(T)); + for (int i = 0; i < count; i++) + new (&pdata[i]) T(vals[i]); +} + +template <class T> FORCE_INLINE T* ThreadedStreamBuffer::GetWritePointer() +{ + // Write simple data type to queue + void* pdata = GetWriteDataPointer(sizeof(T), ALIGN_OF(T)); + return static_cast<T*>(pdata); +} + +#endif +#endif diff --git a/Runtime/Threads/Winapi/PlatformEvent.h b/Runtime/Threads/Winapi/PlatformEvent.h new file mode 100644 index 0000000..1aa590d --- /dev/null +++ b/Runtime/Threads/Winapi/PlatformEvent.h @@ -0,0 +1,70 @@ +#ifndef __PLATFORMEVENT_H +#define __PLATFORMEVENT_H + +// Event synchronization object. + +#if SUPPORT_THREADS + +#include "Runtime/Utilities/NonCopyable.h" + +class Event : public NonCopyable +{ +public: + explicit Event(); + ~Event(); + + void WaitForSignal(); + void Signal(); + +private: + HANDLE m_Event; +}; + +inline Event::Event() +{ +#if UNITY_WINRT + m_Event = CreateEventExW(nullptr, nullptr, 0, 0); // ?!- +#else + m_Event = CreateEvent(NULL, FALSE, FALSE, NULL); +#endif +} + +inline Event::~Event() +{ + if (m_Event != NULL) + CloseHandle(m_Event); +} + +inline void Event::WaitForSignal() +{ +#if UNITY_WINRT + WaitForSingleObjectEx(m_Event, INFINITE, FALSE); // ?!- +#else + while (1) + { + DWORD result = WaitForSingleObjectEx(m_Event, INFINITE, TRUE); + switch (result) + { + case WAIT_OBJECT_0: + // We got the event + return; + case WAIT_IO_COMPLETION: + // Allow thread to run IO completion task + Sleep(1); + break; + default: + Assert (false); + break; + } + } +#endif +} + +inline void Event::Signal() +{ + SetEvent(m_Event); +} + +#endif // SUPPORT_THREADS + +#endif // __PLATFORMEVENT_H diff --git a/Runtime/Threads/Winapi/PlatformMutex.cpp b/Runtime/Threads/Winapi/PlatformMutex.cpp new file mode 100644 index 0000000..3f2bd63 --- /dev/null +++ b/Runtime/Threads/Winapi/PlatformMutex.cpp @@ -0,0 +1,54 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS + +#ifndef MUTEX_API_WINAPI +#define MUTEX_API_WINAPI (UNITY_WIN || UNITY_XENON || UNITY_WINRT) +#endif + +#endif // SUPPORT_THREADS + +#if MUTEX_API_WINAPI + +#include "PlatformMutex.h" + +// ------------------------------------------------------------------------------------------------- +// windows + +// Note: TryEnterCriticalSection only exists on NT-derived systems. +// But we do not run on Win9x currently anyway, so just accept it. +#if !defined _WIN32_WINNT || _WIN32_WINNT < 0x0400 +extern "C" WINBASEAPI BOOL WINAPI TryEnterCriticalSection( IN OUT LPCRITICAL_SECTION lpCriticalSection ); +#endif + +PlatformMutex::PlatformMutex() +{ +#if UNITY_WINRT + BOOL const result = InitializeCriticalSectionEx(&crit_sec, 0, CRITICAL_SECTION_NO_DEBUG_INFO); + Assert(FALSE != result); +#else + InitializeCriticalSection( &crit_sec ); +#endif +} + +PlatformMutex::~PlatformMutex () +{ + DeleteCriticalSection( &crit_sec ); +} + +void PlatformMutex::Lock() +{ + EnterCriticalSection( &crit_sec ); +} + +void PlatformMutex::Unlock() +{ + LeaveCriticalSection( &crit_sec ); +} + +bool PlatformMutex::TryLock() +{ + return TryEnterCriticalSection( &crit_sec ) ? true : false; +} + +#endif // MUTEX_API_WINAPI diff --git a/Runtime/Threads/Winapi/PlatformMutex.h b/Runtime/Threads/Winapi/PlatformMutex.h new file mode 100644 index 0000000..11f174e --- /dev/null +++ b/Runtime/Threads/Winapi/PlatformMutex.h @@ -0,0 +1,28 @@ +#ifndef __PLATFORMMUTEX_H +#define __PLATFORMMUTEX_H + +#if SUPPORT_THREADS + +#include "Runtime/Utilities/NonCopyable.h" + +/** + * A platform/api specific mutex class. Always recursive (a single thread can lock multiple times). + */ +class PlatformMutex : public NonCopyable +{ + friend class Mutex; +protected: + PlatformMutex(); + ~PlatformMutex(); + + void Lock(); + void Unlock(); + bool TryLock(); + +private: + + CRITICAL_SECTION crit_sec; +}; + +#endif // SUPPORT_THREADS +#endif // __PLATFORMMUTEX_H diff --git a/Runtime/Threads/Winapi/PlatformSemaphore.h b/Runtime/Threads/Winapi/PlatformSemaphore.h new file mode 100644 index 0000000..0f00f20 --- /dev/null +++ b/Runtime/Threads/Winapi/PlatformSemaphore.h @@ -0,0 +1,59 @@ +#ifndef __PLATFORMSEMAPHORE_H +#define __PLATFORMSEMAPHORE_H + +#if SUPPORT_THREADS + +#include "Runtime/Utilities/NonCopyable.h" + +class PlatformSemaphore : public NonCopyable +{ + friend class Semaphore; +protected: + void Create(); + void Destroy(); + + void WaitForSignal(); + void Signal(); + +private: + HANDLE m_Semaphore; +}; + + inline void PlatformSemaphore::Create() + { +#if UNITY_WINRT + m_Semaphore = CreateSemaphoreExW(NULL, 0, 256, NULL, 0, (STANDARD_RIGHTS_REQUIRED | SEMAPHORE_MODIFY_STATE | SYNCHRONIZE)); +#else + m_Semaphore = CreateSemaphoreA( NULL, 0, 256, NULL ); +#endif + } + inline void PlatformSemaphore::Destroy(){ if( m_Semaphore ) CloseHandle(m_Semaphore); } + inline void PlatformSemaphore::WaitForSignal() + { +#if UNITY_WINRT // ?!- + WaitForSingleObjectEx(m_Semaphore, INFINITE, FALSE); +#else + while (1) + { + DWORD result = WaitForSingleObjectEx( m_Semaphore, INFINITE, TRUE ); + switch (result) + { + case WAIT_OBJECT_0: + // We got the signal + return; + case WAIT_IO_COMPLETION: + // Allow thread to run IO completion task + Sleep(1); + break; + default: + Assert(false); + break; + } + } +#endif + } + inline void PlatformSemaphore::Signal() { ReleaseSemaphore( m_Semaphore, 1, NULL ); } + +#endif // SUPPORT_THREADS + +#endif // __PLATFORMSEMAPHORE_H diff --git a/Runtime/Threads/Winapi/PlatformThread.cpp b/Runtime/Threads/Winapi/PlatformThread.cpp new file mode 100644 index 0000000..c0dd578 --- /dev/null +++ b/Runtime/Threads/Winapi/PlatformThread.cpp @@ -0,0 +1,140 @@ +#include "UnityPrefix.h" + +#if SUPPORT_THREADS + +#ifndef THREAD_API_WINAPI +#define THREAD_API_WINAPI (UNITY_WIN || UNITY_XENON || UNITY_WINRT) +#endif + +#endif // SUPPORT_THREADS + +#if THREAD_API_WINAPI + +#include "PlatformThread.h" +#include "Runtime/Threads/Thread.h" +#include "Runtime/Threads/ThreadHelper.h" + +#include "Runtime/Utilities/Word.h" + +// module@TODO : Move this to PlatformThread.h +#if UNITY_WINRT +#include "PlatformDependent/MetroPlayer/Win32Threads.h" +#endif + +PlatformThread::PlatformThread() +: m_Thread(NULL) +#if !UNITY_WINRT +, m_ThreadId(0) +#endif +{ +} + +PlatformThread::~PlatformThread() +{ + AssertMsg(m_Thread == NULL, "***Thread was not cleaned up!***"); +} + + +void PlatformThread::Create(const Thread* thread, const UInt32 stackSize, const int processor) +{ +#if UNITY_WINRT + m_Thread = win32::CreateThread(Thread::RunThreadWrapper, (LPVOID) thread); +#else // UNITY_WINRT + DWORD creationFlags = 0; +#if UNITY_XENON + if (processor != DEFAULT_UNITY_THREAD_PROCESSOR) + creationFlags = CREATE_SUSPENDED; +#endif + + m_Thread = ::CreateThread(NULL, stackSize, Thread::RunThreadWrapper, (LPVOID) thread, creationFlags, &m_ThreadId); + Assert(NULL != m_Thread); + +#if UNITY_XENON + if (processor != DEFAULT_UNITY_THREAD_PROCESSOR) + { + ThreadHelper::SetThreadProcessor(thread, processor); + ResumeThread(m_Thread); + } +#endif + +#endif // UNITY_WINRT + +} + +void PlatformThread::Enter(const Thread* thread) +{ + if (thread->m_Priority != kNormalPriority) + UpdatePriority(thread); +} + +void PlatformThread::Exit(const Thread* thread, void* result) +{ +} + +void PlatformThread::Join(const Thread* thread) +{ +#if !UNITY_WINRT // Why doesn't WINRT store the thread ID ? + if (Thread::EqualsCurrentThreadID(m_ThreadId)) + { + ErrorStringMsg("***Thread '%s' tried to join itself!***", thread->m_Name); + } +#endif + + if (thread->m_Running) + { + DWORD waitResult = WaitForSingleObjectEx(m_Thread, INFINITE, FALSE); + Assert(WAIT_OBJECT_0 == waitResult); + } + + if (m_Thread != NULL) + { + BOOL closeResult = CloseHandle(m_Thread); + Assert(FALSE != closeResult); + } + m_Thread = NULL; +} + +void PlatformThread::UpdatePriority(const Thread* thread) const +{ + ThreadPriority p = thread->m_Priority; + +#if UNITY_WINRT + + #pragma message("todo: implement") // ?!- + +#else + + int iPriority; + switch (p) + { + case kLowPriority: + iPriority = THREAD_PRIORITY_LOWEST; + break; + + case kBelowNormalPriority: + iPriority = THREAD_PRIORITY_BELOW_NORMAL; + break; + + case kNormalPriority: + iPriority = THREAD_PRIORITY_NORMAL; + break; + case kHighPriority: + iPriority = THREAD_PRIORITY_HIGHEST; + break; + + default: + AssertString("Undefined thread priority"); + } + + int res = SetThreadPriority(m_Thread, iPriority); + AssertIf(res == 0); + +#endif +} + +PlatformThread::ThreadID PlatformThread::GetCurrentThreadID() +{ + return GetCurrentThreadId(); +} + +#endif // THREAD_API_PTHREAD diff --git a/Runtime/Threads/Winapi/PlatformThread.h b/Runtime/Threads/Winapi/PlatformThread.h new file mode 100644 index 0000000..289e39e --- /dev/null +++ b/Runtime/Threads/Winapi/PlatformThread.h @@ -0,0 +1,67 @@ +#ifndef PLATFORMTHREAD_H +#define PLATFORMTHREAD_H + +#if SUPPORT_THREADS + +#include "Runtime/Utilities/NonCopyable.h" + +class Thread; + +#if UNITY_XENON // module@TODO : Move to Platforms/Xenon/Include/PlatformThread.h +#define DEFAULT_UNITY_THREAD_STACK_SIZE 128*1024 +#endif + +// The function signature is actually important here, +// and on Windows it must match the signature of LPTHREAD_START_ROUTINE, +// and no other way will be "ok". +// It does not suffice to cast to the more "general" version, because +// then you will run into ESP check failures sooner or later. Probably sooner. +#define UNITY_THREAD_FUNCTION_RETURNTYPE DWORD +#define UNITY_THREAD_FUNCTION_RETURN_SIGNATURE UNITY_THREAD_FUNCTION_RETURNTYPE WINAPI + +class EXPORT_COREMODULE PlatformThread : public NonCopyable +{ + friend class Thread; + friend class ThreadHelper; + +protected: + typedef DWORD ThreadID; + + // Starts a thread to execute within the calling process. + // Typically maps to 'CreateThread' (WinAPI) or 'pthread_create' (POSIX). + void Create(const Thread* thread, const UInt32 stackSize, const int processor); + // To be called from the thread's 'start_routine' (aka RunThreadWrapper) + // in order to boot-strap the thread (in terms of setting the thread affinity or similar). + void Enter(const Thread* thread); + // To be called as final exit/cleanup call when a thread's 'start_routine' (aka RunThreadWrapper) exits. + // Depending on the backing thread API this function may not return. + void Exit(const Thread* thread, void* result); + // The function waits for the thread specified by 'thread' to terminate. + // Typically maps to 'WaitForSingleObject' (WinAPI) or 'pthread_join' (POSIX). + void Join(const Thread* thread); + // Uses the thread->m_Priority to update the thread scheduler settings, if possible. + // Typically maps to 'SetThreadPriority' (WinAPI) or 'pthread_setschedparam' (POSIX). + // Depending on the process permissions this call may turn into a no-op. + void UpdatePriority(const Thread* thread) const; + // Returns a unique identifier for the currently executing thread. + static ThreadID GetCurrentThreadID(); + +private: + PlatformThread(); + ~PlatformThread(); + +#if UNITY_WINRT + + HANDLE m_Thread; + +#elif UNITY_WIN || UNITY_XENON + + HANDLE m_Thread; + DWORD m_ThreadId; + +#endif +}; + +#endif // SUPPORT_THREADS + +#endif // PLATFORMTHREAD_H diff --git a/Runtime/Threads/Winapi/PlatformThreadSpecificValue.h b/Runtime/Threads/Winapi/PlatformThreadSpecificValue.h new file mode 100644 index 0000000..8e6d42a --- /dev/null +++ b/Runtime/Threads/Winapi/PlatformThreadSpecificValue.h @@ -0,0 +1,61 @@ +#ifndef PLATFORMTHREADSPECIFICVALUE_H +#define PLATFORMTHREADSPECIFICVALUE_H + +#if UNITY_DYNAMIC_TLS + +#if UNITY_WINRT +#include "PlatformDependent/MetroPlayer/Win32Threads.h" +using win32::TlsAlloc; +using win32::TlsFree; +using win32::TlsGetValue; +using win32::TlsSetValue; +#endif + +class PlatformThreadSpecificValue +{ +public: + PlatformThreadSpecificValue(); + ~PlatformThreadSpecificValue(); + + void* GetValue() const; + void SetValue(void* value); + +private: + DWORD m_TLSKey; +}; + +inline PlatformThreadSpecificValue::PlatformThreadSpecificValue () +{ + m_TLSKey = TlsAlloc(); + AssertIf( m_TLSKey == TLS_OUT_OF_INDEXES ); +} + +inline PlatformThreadSpecificValue::~PlatformThreadSpecificValue () +{ + TlsFree( m_TLSKey ); +} + +inline void* PlatformThreadSpecificValue::GetValue () const +{ +#if UNITY_WIN + void* result = TlsGetValue(m_TLSKey); + DebugAssertIf( result == NULL && GetLastError() != ERROR_SUCCESS ); + return result; +#elif UNITY_XENON + return TlsGetValue(m_TLSKey); // on XENON TlsGetValue does not call SetLastError +#endif +} + +inline void PlatformThreadSpecificValue::SetValue (void* value) +{ + BOOL ok = TlsSetValue( m_TLSKey, value ); + DebugAssertIf( !ok ); +} + +#else + + #define UNITY_TLS_VALUE(type) __declspec(thread) type + +#endif // UNITY_DYNAMIC_TLS + +#endif // PLATFORMTHREADSPECIFICVALUE_H |