diff options
Diffstat (limited to 'source/libs/asura-lib-utils/threading/thread.cpp')
-rw-r--r-- | source/libs/asura-lib-utils/threading/thread.cpp | 217 |
1 files changed, 195 insertions, 22 deletions
diff --git a/source/libs/asura-lib-utils/threading/thread.cpp b/source/libs/asura-lib-utils/threading/thread.cpp index 628eee4..0f4f5da 100644 --- a/source/libs/asura-lib-utils/threading/thread.cpp +++ b/source/libs/asura-lib-utils/threading/thread.cpp @@ -10,14 +10,50 @@ namespace AsuraEngine namespace Threading { - Thread::Thread(ThreadType type, Luax::LuaxState& luaThread, const std::string& name/* = ""*/) + Thread::Thread(lua_State* luaThread, ThreadType type /*= THREAD_TYPE_DEFERRED*/, uint sleepTime /*= 0*/, const std::string& name /*= ""*/) : mName(name) + , mState(THREAD_STATE_IDLE) + , mType(type) + , mLuaThread(luaThread) + , mCallbackThread(nullptr) + , mSleepTime(sleepTime) { + LUAX_STATE(luaThread); + if (type == THREAD_TYPE_IMMEDIATE) + { + Luax::LuaxVM* vm = state.GetVM(); + ASSERT(vm); + mCallbackThread = vm->CreateThread(); + ASSERT(mCallbackThread); + SetLuaxMemberRef(state, mCallbackThreadRef, -1); + state.Pop(); // callback thread + } } Thread::~Thread() { - delete mImpl; + if (mImpl) + { + delete mImpl; + mImpl = nullptr; + } + } + + bool Thread::AddTask(Task* task) + { + lock(mTaskQueueMutex); + mTaskQueue.push(task); + return true; + } + + uint Thread::GetTaskCount() + { + return mTaskQueue.size(); + } + + void Thread::Idle() + { + mState = THREAD_STATE_IDLE; } #define try_start_thread(impl)\ @@ -31,20 +67,72 @@ namespace AsuraEngine } \ } - bool Thread::AddTask(Task* task) + bool Thread::Start(bool isDaemon /*= true*/, uint32 stacksize /*= 0*/) { - lock(mMutex); - mTaskQueue.push(task); - return true; - } + if (mState != THREAD_STATE_IDLE) + return false; + + // Ѿһ֮ǰģر + if (mImpl) + { + delete mImpl; + mImpl = nullptr; + } - void Thread::Start(uint32 stacksize) - { #if ASURA_THREAD_WIN32 try_start_thread(ThreadImplWin32); #endif + if (!mImpl) + return false; + + mIsDaemon = isDaemon; + mStateMutex.Lock(); + mState = THREAD_STATE_RUNNING; + mStateMutex.Unlock(); + } + + void Thread::Pause() + { + ASSERT(mImpl); + + lock(mStateMutex); + mState = THREAD_STATE_PAUSED; + } + + void Thread::Resume() + { + ASSERT(mImpl); + + lock(mStateMutex); + if(mState == THREAD_STATE_PAUSED) + mState = THREAD_STATE_RUNNING; + } + + void Thread::Stop() + { ASSERT(mImpl); + + lock(mStateMutex); + mState = THREAD_STATE_STOPPED; + } + + void Thread::PauseSync() + { + Pause(); + wait(mSemPause); + } + + void Thread::ResumeSync() + { + Resume(); + wait(mSemResume); + } + + void Thread::StopSync() + { + Stop(); + wait(mSemStop); } void Thread::Join() @@ -53,21 +141,40 @@ namespace AsuraEngine mImpl->Join(); } - void Thread::Kill() + ThreadState Thread::GetState() { - ASSERT(mImpl); - mImpl->Kill(); + ThreadState state; + mStateMutex.Lock(); + state = mState; + mStateMutex.Unlock(); + return state; } bool Thread::IsRunning() { ASSERT(mImpl); - return mImpl->IsRunning(); + + return GetState() == THREAD_STATE_RUNNING; + } + + bool Thread::IsPaused() + { + ASSERT(mImpl); + + return GetState() == THREAD_STATE_PAUSED; + } + + bool Thread::IsStopped() + { + ASSERT(mImpl); + + return GetState() == THREAD_STATE_STOPPED; } bool Thread::IsCurrent() { ASSERT(mImpl); + return mImpl->IsCurrent(); } @@ -78,22 +185,88 @@ namespace AsuraEngine void Thread::Process() { - LUAX_STATE(AEScripting::LuaEnv::Get()->GetMainThread()); - while (!mTaskQueue.empty()) + LUAX_STATE(mLuaThread); + + do{ + if (IsRunning()) + { + while (!mTaskQueue.empty()) + { + Task* task = mTaskQueue.front(); + if (task && task->Execute()) + { + if (mType == THREAD_TYPE_DEFERRED) + { + mFinishedMutex.Lock(); + mFinishedTasks.push(task); + mFinishedMutex.Unlock(); + } + else if (mType == THREAD_TYPE_IMMEDIATE) + { + task->Invoke(mCallbackThread); + this->LuaxRelease<Task>(state, task); + } + mTaskQueueMutex.Lock(); + mTaskQueue.pop(); + mTaskQueueMutex.Unlock(); + } + } + } + + // ˳ѭ + if (IsStopped()) + break; + + // CPUʹ + Sleep(mSleepTime); + + } while (mIsDaemon); + + // ػ̣߳еstop״̬ + if (!mIsDaemon) + Stop(); + + signal(mSemStop); + + // ״̬ΪIdle + Idle(); + } + + /// + /// ӳģʽص + /// + void Thread::Post() + { + ASSERT(mType == THREAD_TYPE_DEFERRED); + + LUAX_STATE(mLuaThread); + while (!mFinishedTasks.empty()) { - Task* task = mTaskQueue.front(); - if (task && task->Execute()) + Task* task = mFinishedTasks.front(); + if (task) { - // unsafe - task->Invoke(); + task->Invoke(mLuaThread); this->LuaxRelease<Task>(state, task); + mFinishedMutex.Lock(); + mFinishedTasks.pop(); + mFinishedMutex.Unlock(); } + } + } - mMutex.Lock(); - mTaskQueue.pop(); - mMutex.Unlock(); + void Thread::Sleep(uint ms) + { + ASSERT(mImpl); + if (mImpl) + { + mImpl->Sleep(ms); } } + void Thread::SetSleepTime(uint ms) + { + mSleepTime = ms; + } + } }
\ No newline at end of file |