summaryrefslogtreecommitdiff
path: root/source/modules/asura-utils/threading
diff options
context:
space:
mode:
Diffstat (limited to 'source/modules/asura-utils/threading')
-rw-r--r--source/modules/asura-utils/threading/binding/_thread.cpp10
-rw-r--r--source/modules/asura-utils/threading/conditional.cpp86
-rw-r--r--source/modules/asura-utils/threading/conditional.h41
-rw-r--r--source/modules/asura-utils/threading/mutex.cpp7
-rw-r--r--source/modules/asura-utils/threading/mutex.h10
-rw-r--r--source/modules/asura-utils/threading/semaphore.cpp25
-rw-r--r--source/modules/asura-utils/threading/semaphore.h6
-rw-r--r--source/modules/asura-utils/threading/task.cpp2
-rw-r--r--source/modules/asura-utils/threading/thread.cpp43
-rw-r--r--source/modules/asura-utils/threading/thread.h21
-rw-r--r--source/modules/asura-utils/threading/thread_impl_std.h2
-rw-r--r--source/modules/asura-utils/threading/thread_impl_win32.cpp13
-rw-r--r--source/modules/asura-utils/threading/thread_impl_win32.h2
-rw-r--r--source/modules/asura-utils/threading/threadable.h25
14 files changed, 239 insertions, 54 deletions
diff --git a/source/modules/asura-utils/threading/binding/_thread.cpp b/source/modules/asura-utils/threading/binding/_thread.cpp
index a5aff03..ad2fb7f 100644
--- a/source/modules/asura-utils/threading/binding/_thread.cpp
+++ b/source/modules/asura-utils/threading/binding/_thread.cpp
@@ -23,7 +23,7 @@ namespace AsuraEngine
{ "IsStopped", _IsStopped },
{ "IsCurrent", _IsCurrent },
{ "Sleep", _Sleep },
- { "Post", _Post },
+ { "Dispatch", _Dispatch },
{ "GetName", _GetName },
{ "GetType", _GetType },
{ "GetState", _GetState }
@@ -37,7 +37,7 @@ namespace AsuraEngine
{ "IMMEDIATE", THREAD_TYPE_IMMEDIATE }
);
LUAX_REGISTER_ENUM(state, "EThreadState",
- { "READY", THREAD_STATE_IDLE },
+ { "IDLE", THREAD_STATE_IDLE },
{ "RUNNING", THREAD_STATE_RUNNING },
{ "PAUSED", THREAD_STATE_PAUSED },
{ "STOPPED", THREAD_STATE_STOPPED }
@@ -165,11 +165,11 @@ namespace AsuraEngine
return 0;
}
- // thread:Post()
- LUAX_IMPL_METHOD(Thread, _Post)
+ // thread:Dispatch()
+ LUAX_IMPL_METHOD(Thread, _Dispatch)
{
LUAX_PREPARE(L, Thread);
- self->Post();
+ self->Dispatch();
return 0;
}
diff --git a/source/modules/asura-utils/threading/conditional.cpp b/source/modules/asura-utils/threading/conditional.cpp
new file mode 100644
index 0000000..eb26e82
--- /dev/null
+++ b/source/modules/asura-utils/threading/conditional.cpp
@@ -0,0 +1,86 @@
+#include "conditional.h"
+
+namespace AsuraEngine
+{
+ namespace Threading
+ {
+
+ Conditional::Conditional()
+ : mWaiting(0)
+ , mSignals(0)
+ {
+ }
+
+ Conditional::~Conditional()
+ {
+ }
+
+ void Conditional::Signal()
+ {
+ mMutex.Lock();
+ if (mWaiting > mSignals)
+ {
+ ++mSignals;
+ signal(mWaitSem);
+ mMutex.Unlock();
+ wait(mDoneSem);
+ }
+ else
+ {
+ mMutex.Unlock();
+ }
+ }
+
+ void Conditional::Broadcast()
+ {
+ mMutex.Lock();
+ if (mWaiting> mSignals) {
+ int i, num_waiting;
+
+ num_waiting = (mWaiting - mSignals);
+ mSignals = mWaiting;
+ for (i = 0; i < num_waiting; ++i) {
+ signal(mWaitSem);
+ }
+ mMutex.Unlock();
+ for (i = 0; i < num_waiting; ++i) {
+ wait(mDoneSem);
+ }
+ }
+ else {
+ mMutex.Unlock();
+ }
+
+ }
+
+ bool Conditional::Wait(Mutex* mutex, int timeout /*= ASURA_MUTEX_MAXWAIT*/)
+ {
+ bool retval;
+
+ mMutex.Lock();
+ ++mWaiting;
+ mMutex.Unlock();
+
+ mutex->Unlock();
+
+ retval = wait(mWaitSem, timeout);
+
+ mMutex.Lock();
+ if (mSignals > 0) {
+ if (!retval) {
+ wait(mWaitSem);
+ }
+ signal(mDoneSem);
+
+ --mSignals;
+ }
+ --mWaiting;
+ mMutex.Unlock();
+
+ mMutex.Lock();
+
+ return retval;
+ }
+
+ }
+} \ No newline at end of file
diff --git a/source/modules/asura-utils/threading/conditional.h b/source/modules/asura-utils/threading/conditional.h
new file mode 100644
index 0000000..2f9633e
--- /dev/null
+++ b/source/modules/asura-utils/threading/conditional.h
@@ -0,0 +1,41 @@
+#ifndef __ASURA_CONDITIONAL_H__
+#define __ASURA_CONDITIONAL_H__
+
+#include "mutex.h"
+#include "semaphore.h"
+
+namespace AsuraEngine
+{
+ namespace Threading
+ {
+
+ ///
+ ///
+ ///
+ class Conditional
+ {
+ public:
+
+ Conditional();
+ ~Conditional();
+
+ void Signal();
+ void Broadcast();
+ bool Wait(Mutex* mutex, int timeout = ASURA_MUTEX_MAXWAIT);
+
+ private:
+
+ Mutex mMutex;
+
+ Semaphore mWaitSem;
+ Semaphore mDoneSem;
+
+ int mWaiting;
+ int mSignals;
+
+ };
+
+ }
+}
+
+#endif \ No newline at end of file
diff --git a/source/modules/asura-utils/threading/mutex.cpp b/source/modules/asura-utils/threading/mutex.cpp
index ffe67f6..bd7d419 100644
--- a/source/modules/asura-utils/threading/mutex.cpp
+++ b/source/modules/asura-utils/threading/mutex.cpp
@@ -34,7 +34,8 @@ namespace AsuraEngine
Mutex::~Mutex()
{
- delete mImpl;
+ if(mImpl)
+ delete mImpl;
}
void Mutex::Lock()
@@ -92,7 +93,7 @@ namespace AsuraEngine
void MutexImplWin32_KM::Lock()
{
- ::WaitForSingleObject(mHandle, INFINITE);
+ ::WaitForSingleObject(mHandle, ASURA_MUTEX_MAXWAIT);
}
void MutexImplWin32_KM::Unlock()
@@ -103,4 +104,4 @@ namespace AsuraEngine
#endif // ASURA_MUTEX_WIN32_KERNAL_MUTEX
}
-}
+} \ No newline at end of file
diff --git a/source/modules/asura-utils/threading/mutex.h b/source/modules/asura-utils/threading/mutex.h
index a3fbea5..7afbe35 100644
--- a/source/modules/asura-utils/threading/mutex.h
+++ b/source/modules/asura-utils/threading/mutex.h
@@ -14,6 +14,8 @@ namespace AsuraEngine
namespace Threading
{
+#define ASURA_MUTEX_MAXWAIT (~(uint32)0)
+
class MutexImpl;
class Mutex
@@ -44,17 +46,15 @@ namespace AsuraEngine
{
m.Unlock();
}
+ operator bool() { return false; };
private:
void* operator new(size_t);
Mutex& m;
};
// ڵջӴλÿʼջΪٽ
-#define lock(mutex) _mutex_locker _asura_scoped_lock_0x0(mutex)
-#define lock2(mutex) _mutex_locker _asura_scoped_lock_0x1(mutex)
-#define lock3(mutex) _mutex_locker _asura_scoped_lock_0x2(mutex)
-#define lock4(mutex) _mutex_locker _asura_scoped_lock_0x3(mutex)
-#define lock5(mutex) _mutex_locker _asura_scoped_lock_0x4(mutex)
+#define lock(m) \
+ if(_mutex_locker _asura_mutex_locker = m){} else
ASURA_ABSTRACT class MutexImpl
{
diff --git a/source/modules/asura-utils/threading/semaphore.cpp b/source/modules/asura-utils/threading/semaphore.cpp
index 12d4aab..aa5d9dd 100644
--- a/source/modules/asura-utils/threading/semaphore.cpp
+++ b/source/modules/asura-utils/threading/semaphore.cpp
@@ -1,6 +1,7 @@
#include "../exceptions/exception.h"
#include "../type.h"
+#include "mutex.h"
#include "semaphore.h"
namespace AsuraEngine
@@ -41,10 +42,10 @@ namespace AsuraEngine
mImpl->Signal();
}
- void Semaphore::Wait(int timeout)
+ bool Semaphore::Wait(int timeout /*= ASURA_MUTEX_MAXWAIT*/)
{
ASSERT(mImpl);
- mImpl->Wait(timeout);
+ return mImpl->Wait(timeout);
}
#if ASURA_THREAD_WIN32
@@ -52,9 +53,13 @@ namespace AsuraEngine
SemaphoreWin32::SemaphoreWin32(unsigned int init_value)
: SemaphoreImpl(init_value)
{
- mSem = CreateSemaphore(NULL, init_value, UINT_MAX, NULL);
+ // UINT_MAX get error.
+ mSem = CreateSemaphore(NULL, init_value, INT_MAX, NULL);
if (!mSem)
- throw Exception("Cant use win32 semaphore.");
+ {
+ int errorCode = GetLastError();
+ throw Exception("Cant use win32 semaphore. Error code: %d.", errorCode);
+ }
}
SemaphoreWin32::~SemaphoreWin32()
@@ -72,14 +77,22 @@ namespace AsuraEngine
bool SemaphoreWin32::Wait(int timeout)
{
int result;
- result = WaitForSingleObject(mSem, timeout < 0 ? INFINITE : timeout);
+ result = WaitForSingleObject(mSem, timeout);
if (result == WAIT_OBJECT_0)
{
InterlockedDecrement(&mCount);
return true;
}
- else
+ else if(result == WAIT_TIMEOUT)
+ {
+ // ʱ
return false;
+ }
+ else
+ {
+ // δ֪
+ throw Exception("WaitForSingleObject() failed");
+ }
}
#endif // ASURA_THREAD_WIN32
diff --git a/source/modules/asura-utils/threading/semaphore.h b/source/modules/asura-utils/threading/semaphore.h
index 80773d8..1a4e3b7 100644
--- a/source/modules/asura-utils/threading/semaphore.h
+++ b/source/modules/asura-utils/threading/semaphore.h
@@ -25,7 +25,7 @@ namespace AsuraEngine
~Semaphore();
void Signal();
- void Wait(int timeout = 0);
+ bool Wait(int timeout = ASURA_MUTEX_MAXWAIT);
private:
SemaphoreImpl* mImpl;
@@ -46,8 +46,8 @@ namespace AsuraEngine
unsigned int mCount;
};
-#define wait(sem) sem.Wait();
-#define signal(sem) sem.Signal();
+#define wait(sem, ...) sem.Wait(__VA_ARGS__)
+#define signal(sem) sem.Signal()
#if ASURA_THREAD_WIN32
diff --git a/source/modules/asura-utils/threading/task.cpp b/source/modules/asura-utils/threading/task.cpp
index 2e84ed4..decb74c 100644
--- a/source/modules/asura-utils/threading/task.cpp
+++ b/source/modules/asura-utils/threading/task.cpp
@@ -1,5 +1,5 @@
#include "task.h"
-#include "../scripting/lua_env.h"
+#include "../scripting/portable.hpp"
using namespace AEScripting;
diff --git a/source/modules/asura-utils/threading/thread.cpp b/source/modules/asura-utils/threading/thread.cpp
index 0f4f5da..fc397d2 100644
--- a/source/modules/asura-utils/threading/thread.cpp
+++ b/source/modules/asura-utils/threading/thread.cpp
@@ -41,8 +41,10 @@ namespace AsuraEngine
bool Thread::AddTask(Task* task)
{
- lock(mTaskQueueMutex);
- mTaskQueue.push(task);
+ lock(mTaskQueueMutex)
+ {
+ mTaskQueue.push(task);
+ }
return true;
}
@@ -96,25 +98,31 @@ namespace AsuraEngine
{
ASSERT(mImpl);
- lock(mStateMutex);
- mState = THREAD_STATE_PAUSED;
+ lock(mStateMutex)
+ {
+ mState = THREAD_STATE_PAUSED;
+ }
}
void Thread::Resume()
{
ASSERT(mImpl);
- lock(mStateMutex);
- if(mState == THREAD_STATE_PAUSED)
- mState = THREAD_STATE_RUNNING;
+ lock(mStateMutex)
+ {
+ if (mState == THREAD_STATE_PAUSED)
+ mState = THREAD_STATE_RUNNING;
+ }
}
void Thread::Stop()
{
ASSERT(mImpl);
- lock(mStateMutex);
- mState = THREAD_STATE_STOPPED;
+ lock(mStateMutex)
+ {
+ mState = THREAD_STATE_STOPPED;
+ }
}
void Thread::PauseSync()
@@ -144,9 +152,10 @@ namespace AsuraEngine
ThreadState Thread::GetState()
{
ThreadState state;
- mStateMutex.Lock();
- state = mState;
- mStateMutex.Unlock();
+ lock(mStateMutex)
+ {
+ state = mState;
+ }
return state;
}
@@ -183,7 +192,7 @@ namespace AsuraEngine
return mName;
}
- void Thread::Process()
+ int Thread::Process()
{
LUAX_STATE(mLuaThread);
@@ -203,6 +212,7 @@ namespace AsuraEngine
}
else if (mType == THREAD_TYPE_IMMEDIATE)
{
+ // unsafe
task->Invoke(mCallbackThread);
this->LuaxRelease<Task>(state, task);
}
@@ -230,14 +240,17 @@ namespace AsuraEngine
// ״̬ΪIdle
Idle();
+
+ return 0;
}
///
/// ӳģʽص
///
- void Thread::Post()
+ void Thread::Dispatch()
{
- ASSERT(mType == THREAD_TYPE_DEFERRED);
+ if (mType != THREAD_TYPE_DEFERRED)
+ return;
LUAX_STATE(mLuaThread);
while (!mFinishedTasks.empty())
diff --git a/source/modules/asura-utils/threading/thread.h b/source/modules/asura-utils/threading/thread.h
index 340fca3..d365fd0 100644
--- a/source/modules/asura-utils/threading/thread.h
+++ b/source/modules/asura-utils/threading/thread.h
@@ -9,6 +9,7 @@
#include "task.h"
#include "mutex.h"
#include "semaphore.h"
+#include "threadable.h"
namespace AsuraEngine
{
@@ -19,7 +20,7 @@ namespace AsuraEngine
///
/// ̵߳ļֲͬʵ֣
- /// 1: Deferredӳģʽ߳ϵɺҪ̵ֶ߳Post
+ /// 1: Deferredӳģʽ߳ϵɺҪ̵ֶ߳Dispatch
/// ̵߳ص첽Ϊͬlua_Stateͻ⡣
/// 2: Immediateģʽÿһ߳άһlua_newthreadlua_State
/// صڲͬlua_Stateеãⲻ̷ͬ߳ͬһlua_State
@@ -27,7 +28,7 @@ namespace AsuraEngine
enum ThreadType
{
THREAD_TYPE_DEFERRED,
- THREAD_TYPE_IMMEDIATE,
+ THREAD_TYPE_IMMEDIATE, // unsafe
};
enum ThreadState
@@ -43,6 +44,7 @@ namespace AsuraEngine
///
class Thread ASURA_FINAL
: public AEScripting::Portable<Thread>
+ , public Threadable
{
public:
@@ -102,14 +104,14 @@ namespace AsuraEngine
///
/// ִС
///
- void Process();
+ int Process() override;
const std::string& GetName();
///
/// ص
///
- void Post();
+ void Dispatch();
///
/// ߺ
@@ -141,7 +143,7 @@ namespace AsuraEngine
LUAX_DECL_METHOD(_IsStopped);
LUAX_DECL_METHOD(_IsCurrent);
LUAX_DECL_METHOD(_Sleep);
- LUAX_DECL_METHOD(_Post);
+ LUAX_DECL_METHOD(_Dispatch);
LUAX_DECL_METHOD(_GetName);
LUAX_DECL_METHOD(_GetType);
LUAX_DECL_METHOD(_GetState);
@@ -149,14 +151,15 @@ namespace AsuraEngine
//----------------------------------------------------------------------------//
+ ThreadImpl* mImpl;
+
+ lua_State* mLuaThread;
+
///
/// ˴Ƿػģʽ
///
bool mIsDaemon;
- lua_State* mLuaThread;
-
- ThreadImpl* mImpl;
std::string mName;
ThreadType mType;
uint mSleepTime;
@@ -204,7 +207,7 @@ namespace AsuraEngine
ThreadImpl() {};
virtual ~ThreadImpl() {};
- virtual bool Start(Thread* thread, uint32 stacksize = 0) = 0;
+ virtual bool Start(Threadable* thread, uint32 stacksize = 0) = 0;
virtual void Join() = 0;
virtual void Kill() = 0;
diff --git a/source/modules/asura-utils/threading/thread_impl_std.h b/source/modules/asura-utils/threading/thread_impl_std.h
index 0e7d3da..4cc5dfe 100644
--- a/source/modules/asura-utils/threading/thread_impl_std.h
+++ b/source/modules/asura-utils/threading/thread_impl_std.h
@@ -24,7 +24,7 @@ namespace AsuraEngine
ThreadImplSTD();
~ThreadImplSTD();
- bool Start(Thread* thread, uint32 stacksize) override;
+ bool Start(Threadable* thread, uint32 stacksize) override;
void Join() override;
void Kill() override;
diff --git a/source/modules/asura-utils/threading/thread_impl_win32.cpp b/source/modules/asura-utils/threading/thread_impl_win32.cpp
index 6871c2d..0e1569c 100644
--- a/source/modules/asura-utils/threading/thread_impl_win32.cpp
+++ b/source/modules/asura-utils/threading/thread_impl_win32.cpp
@@ -3,6 +3,8 @@
#include <iostream>
+#if ASURA_THREAD_WIN32
+
namespace AsuraEngine
{
namespace Threading
@@ -10,9 +12,8 @@ namespace AsuraEngine
static DWORD WINAPI _thread_win32_runner(LPVOID param)
{
- Thread* thread = (Thread*)param;
- thread->Process();
- return 0;
+ Threadable* thread = (Threadable*)param;
+ return thread->Process(); // β
}
ThreadImplWin32::ThreadImplWin32()
@@ -26,7 +27,7 @@ namespace AsuraEngine
mHandle = 0;
}
- bool ThreadImplWin32::Start(Thread* thread, uint32 stacksize/*=0*/)
+ bool ThreadImplWin32::Start(Threadable* thread, uint32 stacksize/*=0*/)
{
assert(!IsRunning());
mHandle = ::CreateThread(
@@ -73,4 +74,6 @@ namespace AsuraEngine
}
}
-} \ No newline at end of file
+}
+
+#endif // ASURA_THREAD_WIN32 \ No newline at end of file
diff --git a/source/modules/asura-utils/threading/thread_impl_win32.h b/source/modules/asura-utils/threading/thread_impl_win32.h
index a22aeef..fdcfc2b 100644
--- a/source/modules/asura-utils/threading/thread_impl_win32.h
+++ b/source/modules/asura-utils/threading/thread_impl_win32.h
@@ -24,7 +24,7 @@ namespace AsuraEngine
ThreadImplWin32();
~ThreadImplWin32();
- bool Start(Thread* thread, uint32 stacksize) override;
+ bool Start(Threadable* thread, uint32 stacksize) override;
void Join() override;
void Kill() override;
diff --git a/source/modules/asura-utils/threading/threadable.h b/source/modules/asura-utils/threading/threadable.h
new file mode 100644
index 0000000..08f807d
--- /dev/null
+++ b/source/modules/asura-utils/threading/threadable.h
@@ -0,0 +1,25 @@
+#ifndef __ASURA_THREADABLE_H__
+#define __ASURA_THREADABLE_H__
+
+#include "../type.h"
+
+namespace AsuraEngine
+{
+ namespace Threading
+ {
+
+ ASURA_ABSTRACT class Threadable
+ {
+ public:
+
+ Threadable() {};
+ virtual ~Threadable() {};
+
+ virtual int Process() = 0;
+
+ };
+
+ }
+}
+
+#endif \ No newline at end of file