diff options
Diffstat (limited to 'source/modules/asura-utils/threading')
-rw-r--r-- | source/modules/asura-utils/threading/binding/_thread.cpp | 10 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/conditional.cpp | 86 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/conditional.h | 41 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/mutex.cpp | 7 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/mutex.h | 10 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/semaphore.cpp | 25 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/semaphore.h | 6 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/task.cpp | 2 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/thread.cpp | 43 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/thread.h | 21 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/thread_impl_std.h | 2 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/thread_impl_win32.cpp | 13 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/thread_impl_win32.h | 2 | ||||
-rw-r--r-- | source/modules/asura-utils/threading/threadable.h | 25 |
14 files changed, 239 insertions, 54 deletions
diff --git a/source/modules/asura-utils/threading/binding/_thread.cpp b/source/modules/asura-utils/threading/binding/_thread.cpp index a5aff03..ad2fb7f 100644 --- a/source/modules/asura-utils/threading/binding/_thread.cpp +++ b/source/modules/asura-utils/threading/binding/_thread.cpp @@ -23,7 +23,7 @@ namespace AsuraEngine { "IsStopped", _IsStopped }, { "IsCurrent", _IsCurrent }, { "Sleep", _Sleep }, - { "Post", _Post }, + { "Dispatch", _Dispatch }, { "GetName", _GetName }, { "GetType", _GetType }, { "GetState", _GetState } @@ -37,7 +37,7 @@ namespace AsuraEngine { "IMMEDIATE", THREAD_TYPE_IMMEDIATE } ); LUAX_REGISTER_ENUM(state, "EThreadState", - { "READY", THREAD_STATE_IDLE }, + { "IDLE", THREAD_STATE_IDLE }, { "RUNNING", THREAD_STATE_RUNNING }, { "PAUSED", THREAD_STATE_PAUSED }, { "STOPPED", THREAD_STATE_STOPPED } @@ -165,11 +165,11 @@ namespace AsuraEngine return 0; } - // thread:Post() - LUAX_IMPL_METHOD(Thread, _Post) + // thread:Dispatch() + LUAX_IMPL_METHOD(Thread, _Dispatch) { LUAX_PREPARE(L, Thread); - self->Post(); + self->Dispatch(); return 0; } diff --git a/source/modules/asura-utils/threading/conditional.cpp b/source/modules/asura-utils/threading/conditional.cpp new file mode 100644 index 0000000..eb26e82 --- /dev/null +++ b/source/modules/asura-utils/threading/conditional.cpp @@ -0,0 +1,86 @@ +#include "conditional.h" + +namespace AsuraEngine +{ + namespace Threading + { + + Conditional::Conditional() + : mWaiting(0) + , mSignals(0) + { + } + + Conditional::~Conditional() + { + } + + void Conditional::Signal() + { + mMutex.Lock(); + if (mWaiting > mSignals) + { + ++mSignals; + signal(mWaitSem); + mMutex.Unlock(); + wait(mDoneSem); + } + else + { + mMutex.Unlock(); + } + } + + void Conditional::Broadcast() + { + mMutex.Lock(); + if (mWaiting> mSignals) { + int i, num_waiting; + + num_waiting = (mWaiting - mSignals); + mSignals = mWaiting; + for (i = 0; i < num_waiting; ++i) { + signal(mWaitSem); + } + mMutex.Unlock(); + for (i = 0; i < num_waiting; ++i) { + wait(mDoneSem); + } + } + else { + mMutex.Unlock(); + } + + } + + bool Conditional::Wait(Mutex* mutex, int timeout /*= ASURA_MUTEX_MAXWAIT*/) + { + bool retval; + + mMutex.Lock(); + ++mWaiting; + mMutex.Unlock(); + + mutex->Unlock(); + + retval = wait(mWaitSem, timeout); + + mMutex.Lock(); + if (mSignals > 0) { + if (!retval) { + wait(mWaitSem); + } + signal(mDoneSem); + + --mSignals; + } + --mWaiting; + mMutex.Unlock(); + + mMutex.Lock(); + + return retval; + } + + } +}
\ No newline at end of file diff --git a/source/modules/asura-utils/threading/conditional.h b/source/modules/asura-utils/threading/conditional.h new file mode 100644 index 0000000..2f9633e --- /dev/null +++ b/source/modules/asura-utils/threading/conditional.h @@ -0,0 +1,41 @@ +#ifndef __ASURA_CONDITIONAL_H__ +#define __ASURA_CONDITIONAL_H__ + +#include "mutex.h" +#include "semaphore.h" + +namespace AsuraEngine +{ + namespace Threading + { + + /// + /// + /// + class Conditional + { + public: + + Conditional(); + ~Conditional(); + + void Signal(); + void Broadcast(); + bool Wait(Mutex* mutex, int timeout = ASURA_MUTEX_MAXWAIT); + + private: + + Mutex mMutex; + + Semaphore mWaitSem; + Semaphore mDoneSem; + + int mWaiting; + int mSignals; + + }; + + } +} + +#endif
\ No newline at end of file diff --git a/source/modules/asura-utils/threading/mutex.cpp b/source/modules/asura-utils/threading/mutex.cpp index ffe67f6..bd7d419 100644 --- a/source/modules/asura-utils/threading/mutex.cpp +++ b/source/modules/asura-utils/threading/mutex.cpp @@ -34,7 +34,8 @@ namespace AsuraEngine Mutex::~Mutex() { - delete mImpl; + if(mImpl) + delete mImpl; } void Mutex::Lock() @@ -92,7 +93,7 @@ namespace AsuraEngine void MutexImplWin32_KM::Lock() { - ::WaitForSingleObject(mHandle, INFINITE); + ::WaitForSingleObject(mHandle, ASURA_MUTEX_MAXWAIT); } void MutexImplWin32_KM::Unlock() @@ -103,4 +104,4 @@ namespace AsuraEngine #endif // ASURA_MUTEX_WIN32_KERNAL_MUTEX } -} +}
\ No newline at end of file diff --git a/source/modules/asura-utils/threading/mutex.h b/source/modules/asura-utils/threading/mutex.h index a3fbea5..7afbe35 100644 --- a/source/modules/asura-utils/threading/mutex.h +++ b/source/modules/asura-utils/threading/mutex.h @@ -14,6 +14,8 @@ namespace AsuraEngine namespace Threading { +#define ASURA_MUTEX_MAXWAIT (~(uint32)0) + class MutexImpl; class Mutex @@ -44,17 +46,15 @@ namespace AsuraEngine { m.Unlock(); } + operator bool() { return false; }; private: void* operator new(size_t); Mutex& m; }; // ڵջӴλÿʼջΪٽ -#define lock(mutex) _mutex_locker _asura_scoped_lock_0x0(mutex) -#define lock2(mutex) _mutex_locker _asura_scoped_lock_0x1(mutex) -#define lock3(mutex) _mutex_locker _asura_scoped_lock_0x2(mutex) -#define lock4(mutex) _mutex_locker _asura_scoped_lock_0x3(mutex) -#define lock5(mutex) _mutex_locker _asura_scoped_lock_0x4(mutex) +#define lock(m) \ + if(_mutex_locker _asura_mutex_locker = m){} else ASURA_ABSTRACT class MutexImpl { diff --git a/source/modules/asura-utils/threading/semaphore.cpp b/source/modules/asura-utils/threading/semaphore.cpp index 12d4aab..aa5d9dd 100644 --- a/source/modules/asura-utils/threading/semaphore.cpp +++ b/source/modules/asura-utils/threading/semaphore.cpp @@ -1,6 +1,7 @@ #include "../exceptions/exception.h" #include "../type.h" +#include "mutex.h" #include "semaphore.h" namespace AsuraEngine @@ -41,10 +42,10 @@ namespace AsuraEngine mImpl->Signal(); } - void Semaphore::Wait(int timeout) + bool Semaphore::Wait(int timeout /*= ASURA_MUTEX_MAXWAIT*/) { ASSERT(mImpl); - mImpl->Wait(timeout); + return mImpl->Wait(timeout); } #if ASURA_THREAD_WIN32 @@ -52,9 +53,13 @@ namespace AsuraEngine SemaphoreWin32::SemaphoreWin32(unsigned int init_value) : SemaphoreImpl(init_value) { - mSem = CreateSemaphore(NULL, init_value, UINT_MAX, NULL); + // UINT_MAX get error. + mSem = CreateSemaphore(NULL, init_value, INT_MAX, NULL); if (!mSem) - throw Exception("Cant use win32 semaphore."); + { + int errorCode = GetLastError(); + throw Exception("Cant use win32 semaphore. Error code: %d.", errorCode); + } } SemaphoreWin32::~SemaphoreWin32() @@ -72,14 +77,22 @@ namespace AsuraEngine bool SemaphoreWin32::Wait(int timeout) { int result; - result = WaitForSingleObject(mSem, timeout < 0 ? INFINITE : timeout); + result = WaitForSingleObject(mSem, timeout); if (result == WAIT_OBJECT_0) { InterlockedDecrement(&mCount); return true; } - else + else if(result == WAIT_TIMEOUT) + { + // ʱ return false; + } + else + { + // δ֪ + throw Exception("WaitForSingleObject() failed"); + } } #endif // ASURA_THREAD_WIN32 diff --git a/source/modules/asura-utils/threading/semaphore.h b/source/modules/asura-utils/threading/semaphore.h index 80773d8..1a4e3b7 100644 --- a/source/modules/asura-utils/threading/semaphore.h +++ b/source/modules/asura-utils/threading/semaphore.h @@ -25,7 +25,7 @@ namespace AsuraEngine ~Semaphore(); void Signal(); - void Wait(int timeout = 0); + bool Wait(int timeout = ASURA_MUTEX_MAXWAIT); private: SemaphoreImpl* mImpl; @@ -46,8 +46,8 @@ namespace AsuraEngine unsigned int mCount; }; -#define wait(sem) sem.Wait(); -#define signal(sem) sem.Signal(); +#define wait(sem, ...) sem.Wait(__VA_ARGS__) +#define signal(sem) sem.Signal() #if ASURA_THREAD_WIN32 diff --git a/source/modules/asura-utils/threading/task.cpp b/source/modules/asura-utils/threading/task.cpp index 2e84ed4..decb74c 100644 --- a/source/modules/asura-utils/threading/task.cpp +++ b/source/modules/asura-utils/threading/task.cpp @@ -1,5 +1,5 @@ #include "task.h" -#include "../scripting/lua_env.h" +#include "../scripting/portable.hpp" using namespace AEScripting; diff --git a/source/modules/asura-utils/threading/thread.cpp b/source/modules/asura-utils/threading/thread.cpp index 0f4f5da..fc397d2 100644 --- a/source/modules/asura-utils/threading/thread.cpp +++ b/source/modules/asura-utils/threading/thread.cpp @@ -41,8 +41,10 @@ namespace AsuraEngine bool Thread::AddTask(Task* task) { - lock(mTaskQueueMutex); - mTaskQueue.push(task); + lock(mTaskQueueMutex) + { + mTaskQueue.push(task); + } return true; } @@ -96,25 +98,31 @@ namespace AsuraEngine { ASSERT(mImpl); - lock(mStateMutex); - mState = THREAD_STATE_PAUSED; + lock(mStateMutex) + { + mState = THREAD_STATE_PAUSED; + } } void Thread::Resume() { ASSERT(mImpl); - lock(mStateMutex); - if(mState == THREAD_STATE_PAUSED) - mState = THREAD_STATE_RUNNING; + lock(mStateMutex) + { + if (mState == THREAD_STATE_PAUSED) + mState = THREAD_STATE_RUNNING; + } } void Thread::Stop() { ASSERT(mImpl); - lock(mStateMutex); - mState = THREAD_STATE_STOPPED; + lock(mStateMutex) + { + mState = THREAD_STATE_STOPPED; + } } void Thread::PauseSync() @@ -144,9 +152,10 @@ namespace AsuraEngine ThreadState Thread::GetState() { ThreadState state; - mStateMutex.Lock(); - state = mState; - mStateMutex.Unlock(); + lock(mStateMutex) + { + state = mState; + } return state; } @@ -183,7 +192,7 @@ namespace AsuraEngine return mName; } - void Thread::Process() + int Thread::Process() { LUAX_STATE(mLuaThread); @@ -203,6 +212,7 @@ namespace AsuraEngine } else if (mType == THREAD_TYPE_IMMEDIATE) { + // unsafe task->Invoke(mCallbackThread); this->LuaxRelease<Task>(state, task); } @@ -230,14 +240,17 @@ namespace AsuraEngine // ״̬ΪIdle Idle(); + + return 0; } /// /// ӳģʽص /// - void Thread::Post() + void Thread::Dispatch() { - ASSERT(mType == THREAD_TYPE_DEFERRED); + if (mType != THREAD_TYPE_DEFERRED) + return; LUAX_STATE(mLuaThread); while (!mFinishedTasks.empty()) diff --git a/source/modules/asura-utils/threading/thread.h b/source/modules/asura-utils/threading/thread.h index 340fca3..d365fd0 100644 --- a/source/modules/asura-utils/threading/thread.h +++ b/source/modules/asura-utils/threading/thread.h @@ -9,6 +9,7 @@ #include "task.h" #include "mutex.h" #include "semaphore.h" +#include "threadable.h" namespace AsuraEngine { @@ -19,7 +20,7 @@ namespace AsuraEngine /// /// ̵߳ļֲͬʵ֣ - /// 1: Deferredӳģʽ߳ϵɺҪ̵ֶ߳Post + /// 1: Deferredӳģʽ߳ϵɺҪ̵ֶ߳Dispatch /// ̵߳ص첽Ϊͬlua_Stateͻ⡣ /// 2: Immediateģʽÿһ߳άһlua_newthreadlua_State /// صڲͬlua_Stateеãⲻ̷ͬ߳ͬһlua_State @@ -27,7 +28,7 @@ namespace AsuraEngine enum ThreadType { THREAD_TYPE_DEFERRED, - THREAD_TYPE_IMMEDIATE, + THREAD_TYPE_IMMEDIATE, // unsafe }; enum ThreadState @@ -43,6 +44,7 @@ namespace AsuraEngine /// class Thread ASURA_FINAL : public AEScripting::Portable<Thread> + , public Threadable { public: @@ -102,14 +104,14 @@ namespace AsuraEngine /// /// ִС /// - void Process(); + int Process() override; const std::string& GetName(); /// /// ص /// - void Post(); + void Dispatch(); /// /// ߺ @@ -141,7 +143,7 @@ namespace AsuraEngine LUAX_DECL_METHOD(_IsStopped); LUAX_DECL_METHOD(_IsCurrent); LUAX_DECL_METHOD(_Sleep); - LUAX_DECL_METHOD(_Post); + LUAX_DECL_METHOD(_Dispatch); LUAX_DECL_METHOD(_GetName); LUAX_DECL_METHOD(_GetType); LUAX_DECL_METHOD(_GetState); @@ -149,14 +151,15 @@ namespace AsuraEngine //----------------------------------------------------------------------------// + ThreadImpl* mImpl; + + lua_State* mLuaThread; + /// /// ˴Ƿػģʽ /// bool mIsDaemon; - lua_State* mLuaThread; - - ThreadImpl* mImpl; std::string mName; ThreadType mType; uint mSleepTime; @@ -204,7 +207,7 @@ namespace AsuraEngine ThreadImpl() {}; virtual ~ThreadImpl() {}; - virtual bool Start(Thread* thread, uint32 stacksize = 0) = 0; + virtual bool Start(Threadable* thread, uint32 stacksize = 0) = 0; virtual void Join() = 0; virtual void Kill() = 0; diff --git a/source/modules/asura-utils/threading/thread_impl_std.h b/source/modules/asura-utils/threading/thread_impl_std.h index 0e7d3da..4cc5dfe 100644 --- a/source/modules/asura-utils/threading/thread_impl_std.h +++ b/source/modules/asura-utils/threading/thread_impl_std.h @@ -24,7 +24,7 @@ namespace AsuraEngine ThreadImplSTD(); ~ThreadImplSTD(); - bool Start(Thread* thread, uint32 stacksize) override; + bool Start(Threadable* thread, uint32 stacksize) override; void Join() override; void Kill() override; diff --git a/source/modules/asura-utils/threading/thread_impl_win32.cpp b/source/modules/asura-utils/threading/thread_impl_win32.cpp index 6871c2d..0e1569c 100644 --- a/source/modules/asura-utils/threading/thread_impl_win32.cpp +++ b/source/modules/asura-utils/threading/thread_impl_win32.cpp @@ -3,6 +3,8 @@ #include <iostream> +#if ASURA_THREAD_WIN32 + namespace AsuraEngine { namespace Threading @@ -10,9 +12,8 @@ namespace AsuraEngine static DWORD WINAPI _thread_win32_runner(LPVOID param) { - Thread* thread = (Thread*)param; - thread->Process(); - return 0; + Threadable* thread = (Threadable*)param; + return thread->Process(); // β } ThreadImplWin32::ThreadImplWin32() @@ -26,7 +27,7 @@ namespace AsuraEngine mHandle = 0; } - bool ThreadImplWin32::Start(Thread* thread, uint32 stacksize/*=0*/) + bool ThreadImplWin32::Start(Threadable* thread, uint32 stacksize/*=0*/) { assert(!IsRunning()); mHandle = ::CreateThread( @@ -73,4 +74,6 @@ namespace AsuraEngine } } -}
\ No newline at end of file +} + +#endif // ASURA_THREAD_WIN32
\ No newline at end of file diff --git a/source/modules/asura-utils/threading/thread_impl_win32.h b/source/modules/asura-utils/threading/thread_impl_win32.h index a22aeef..fdcfc2b 100644 --- a/source/modules/asura-utils/threading/thread_impl_win32.h +++ b/source/modules/asura-utils/threading/thread_impl_win32.h @@ -24,7 +24,7 @@ namespace AsuraEngine ThreadImplWin32(); ~ThreadImplWin32(); - bool Start(Thread* thread, uint32 stacksize) override; + bool Start(Threadable* thread, uint32 stacksize) override; void Join() override; void Kill() override; diff --git a/source/modules/asura-utils/threading/threadable.h b/source/modules/asura-utils/threading/threadable.h new file mode 100644 index 0000000..08f807d --- /dev/null +++ b/source/modules/asura-utils/threading/threadable.h @@ -0,0 +1,25 @@ +#ifndef __ASURA_THREADABLE_H__ +#define __ASURA_THREADABLE_H__ + +#include "../type.h" + +namespace AsuraEngine +{ + namespace Threading + { + + ASURA_ABSTRACT class Threadable + { + public: + + Threadable() {}; + virtual ~Threadable() {}; + + virtual int Process() = 0; + + }; + + } +} + +#endif
\ No newline at end of file |