diff options
Diffstat (limited to 'Runtime/ClusterRenderer')
-rw-r--r-- | Runtime/ClusterRenderer/ClusterNetwork.cpp | 118 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterNetwork.h | 40 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterNode.cpp | 128 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterNode.h | 64 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterRendererDefines.h | 13 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterRendererModule.cpp | 109 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterRendererModule.h | 32 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterRendererModule.jam | 177 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterRendererModuleRegistration.cpp | 10 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterTransfer.cpp | 93 | ||||
-rw-r--r-- | Runtime/ClusterRenderer/ClusterTransfer.h | 16 |
11 files changed, 800 insertions, 0 deletions
diff --git a/Runtime/ClusterRenderer/ClusterNetwork.cpp b/Runtime/ClusterRenderer/ClusterNetwork.cpp new file mode 100644 index 0000000..e68fe38 --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterNetwork.cpp @@ -0,0 +1,118 @@ +#include "UnityPrefix.h" +#if ENABLE_CLUSTER_SYNC + +#include "ClusterNetwork.h" +#include "External/zmq/include/zmq.h" + +void* CreateZMQContext() +{ + return zmq_ctx_new(); +} + + +void DestroyZMQContext(void* context) +{ + zmq_term(context); +} + + +MasterSocket::MasterSocket(void* context, const char* mainUrl, const char* ctrlUrl) +: m_Context(context) +{ + // create them + m_MainSocket = zmq_socket(m_Context, ZMQ_XPUB); + m_CtrlSocket = zmq_socket(m_Context, ZMQ_PULL); + // bind them + zmq_bind(m_MainSocket, mainUrl); + zmq_bind(m_CtrlSocket, ctrlUrl); +} + + +MasterSocket::~MasterSocket() +{ + if(m_MainSocket) + zmq_close(m_MainSocket); + if(m_CtrlSocket) + zmq_close(m_CtrlSocket); +} + + +void MasterSocket::Publish(dynamic_array<UInt8>& buffer) +{ + zmq_send(m_MainSocket, buffer.data(), buffer.size(), 0); +} + + +bool MasterSocket::WaitForSubscriber() +{ + UInt8 buffer[3]; + int rc = zmq_recv(m_MainSocket, &buffer, 3, 0); + return (rc == 3 && buffer[0] == 1 && buffer[1] == 'S'); +} + + +bool MasterSocket::CheckForUnsubscribe() +{ + UInt8 buffer[3]; + int rc = zmq_recv(m_MainSocket, &buffer, 3, ZMQ_DONTWAIT); + return (rc == 3 && buffer[0] == 0 && buffer[1] == 'S'); +} + + +bool MasterSocket::GetAck() +{ + UInt8 buffer[1]; + int rc = zmq_recv(m_CtrlSocket, &buffer, 1, ZMQ_DONTWAIT); + return (rc == 1 && buffer[0] == 1); +} + + +SlaveSocket::SlaveSocket(void* context, const char* mainUrl, const char* ctrlUrl) +: m_Context(context) +{ + // create them + m_MainSocket = zmq_socket(m_Context, ZMQ_XSUB); + m_CtrlSocket = zmq_socket(m_Context, ZMQ_PUSH); + // connect them + zmq_connect(m_MainSocket, mainUrl); + zmq_connect(m_CtrlSocket, ctrlUrl); +} + + +SlaveSocket::~SlaveSocket() +{ + if(m_MainSocket) + zmq_close(m_MainSocket); + if(m_CtrlSocket) + zmq_close(m_CtrlSocket); +} + + +void SlaveSocket::Listen(dynamic_array<UInt8>& buffer) +{ + zmq_recv(m_MainSocket, buffer.data(), buffer.size(), 0); +} + + +void SlaveSocket::Subscribe(int slaveId) +{ + UInt8 buffer[] = {1, 'S', slaveId}; + zmq_send(m_MainSocket, buffer, 3, 0); + zmq_send(m_MainSocket, buffer, 1, 0); +} + + +void SlaveSocket::Unsubscribe(int slaveId) +{ + UInt8 buffer[] = {0, 'S', slaveId}; + zmq_send(m_MainSocket, buffer, 3, 0); +} + + +void SlaveSocket::SendAck() +{ + UInt8 buffer[] = {1}; + zmq_send(m_CtrlSocket, buffer, 1, 0); +} + +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterNetwork.h b/Runtime/ClusterRenderer/ClusterNetwork.h new file mode 100644 index 0000000..0d84a08 --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterNetwork.h @@ -0,0 +1,40 @@ +#pragma once +#if ENABLE_CLUSTER_SYNC +#include "Runtime/Utilities/dynamic_array.h" + +void* CreateZMQContext(); +void DestroyZMQContext(void* context); + +class MasterSocket +{ +public: + MasterSocket(void* context, const char* mainUrl, const char* ctrlUrl); + ~MasterSocket(); + void Publish(dynamic_array<UInt8>& buffer); + bool WaitForSubscriber(); + bool CheckForUnsubscribe(); + bool GetAck(); +private: + void* m_Context; + void* m_MainSocket; + void* m_CtrlSocket; +}; + + +class SlaveSocket +{ +public: + SlaveSocket(void* context, const char* mainUrl, const char* ctrlUrl); + ~SlaveSocket(); + void Listen(dynamic_array<UInt8>& buffer); + void Subscribe(int slaveId); + void Unsubscribe(int slaveId); + void SendAck(); +private: + void* m_Context; + void* m_MainSocket; + void* m_CtrlSocket; +}; + + +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterNode.cpp b/Runtime/ClusterRenderer/ClusterNode.cpp new file mode 100644 index 0000000..c17dd50 --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterNode.cpp @@ -0,0 +1,128 @@ +#include "UnityPrefix.h" +#if ENABLE_CLUSTER_SYNC +#include "ClusterNode.h" +#include "ClusterNetwork.h" +#include "ClusterTransfer.h" + +#ifdef DEBUG +#include "Runtime/Input/InputManager.h" +#include "ClusterRendererModule.h" +#endif + +MasterNode::~MasterNode() +{ + delete m_Socket; + delete m_Synchronizer; +} + + +void MasterNode::Sync() +{ +#ifdef DEBUG + if(ClusterRendererModule::IsInClusterTestMode) + { + static int count = 0; + if(count++ == 400) + { + GetInputManager().QuitApplication(); + return; + } + } +#endif + + // this may early out + WaitForSlavesToConnect(); + + // the write buffer + dynamic_array<UInt8> buffer(kMemTempAlloc); + // Transfer + m_Synchronizer->TransferToBuffer(buffer); + // in the end, send out the buffer via zmq + m_Socket->Publish(buffer); + + // sync up, this should block + WaitForSlavesToAck(); +} + + +void MasterNode::WaitForSlavesToConnect() +{ + // don't try if we have all initialy connected + if(m_AllSlavesConnected) return; + // check and wait for each slave + for(int i = 0; i < m_InitialSlaveCount;) + { + // This will block + if(m_Socket->WaitForSubscriber()) + i++; + } + // only do it once + m_AllSlavesConnected = true; +} + + +void MasterNode::WaitForSlavesToAck() +{ + int checkedInCount = 0; + // while there are slaves unaccounted for + while((m_CurrentSlaveCount - checkedInCount) > 0) + { + // check for unsubscription + if(m_Socket->CheckForUnsubscribe()) + m_CurrentSlaveCount--; + + // check control signal + if(m_Socket->GetAck()) + checkedInCount++; + } +} + + +SlaveNode::SlaveNode(SlaveSocket* socket, ClusterTransfer* sync, int param) +: ClusterNode(false) +, m_Socket(socket) +, m_Synchronizer(sync) +, m_SlaveId(param) +{ + // should this be here? + m_Socket->Subscribe(m_SlaveId); +} + + +SlaveNode::~SlaveNode() +{ + // send the ubsubscribe signal + m_Socket->Unsubscribe(m_SlaveId); + // destroy stuff + delete m_Socket; + delete m_Synchronizer; +} + + +void SlaveNode::Sync() +{ +#ifdef DEBUG + if(ClusterRendererModule::IsInClusterTestMode) + { + static int count = 0; + if(count++ == 300) + { + m_Synchronizer->TransferToFile(m_SlaveId); + GetInputManager().QuitApplication(); + return; + } + } +#endif + + // TODO: fix potential buffer overflow + // the read buffer + dynamic_array<UInt8> buffer(4096, kMemTempAlloc); + // wait for the server to send the buffer over + m_Socket->Listen(buffer); + // send control signal + m_Socket->SendAck(); + // use the data + m_Synchronizer->TransferFromBuffer(buffer); +} + +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterNode.h b/Runtime/ClusterRenderer/ClusterNode.h new file mode 100644 index 0000000..2545702 --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterNode.h @@ -0,0 +1,64 @@ +#pragma once +#if ENABLE_CLUSTER_SYNC + +class MasterSocket; +class SlaveSocket; +class ClusterTransfer; + +class ClusterNode +{ +public: + virtual ~ClusterNode() {} + virtual void Sync() = 0; + bool IsMaster() { return m_Master; } + +protected: + ClusterNode(bool master) + : m_Master(master) {} + +private: + bool m_Master; +}; + + +class MasterNode : public ClusterNode +{ +public: + MasterNode(MasterSocket* socket, ClusterTransfer* sync, int param) + : ClusterNode(true) + , m_Socket(socket) + , m_Synchronizer(sync) + , m_AllSlavesConnected(false) + , m_InitialSlaveCount(param) + , m_CurrentSlaveCount(param) {} + ~MasterNode(); + + virtual void Sync(); + +private: + void WaitForSlavesToConnect(); + void WaitForSlavesToAck(); + + MasterSocket* m_Socket; + ClusterTransfer* m_Synchronizer; + bool m_AllSlavesConnected; + int m_InitialSlaveCount; + int m_CurrentSlaveCount; +}; + + +class SlaveNode : public ClusterNode +{ +public: + SlaveNode(SlaveSocket* socket, ClusterTransfer* sync, int param); + ~SlaveNode(); + + virtual void Sync(); + +private: + int m_SlaveId; + SlaveSocket* m_Socket; + ClusterTransfer* m_Synchronizer; +}; + +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterRendererDefines.h b/Runtime/ClusterRenderer/ClusterRendererDefines.h new file mode 100644 index 0000000..9b6ccda --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterRendererDefines.h @@ -0,0 +1,13 @@ +#pragma once + +#if ENABLE_CLUSTER_SYNC + #define DECLARE_CLUSTER_SERIALIZE(x) \ + template<class TransferFunc> void ClusterTransfer (TransferFunc& transfer); + #define IMPLEMENT_CLUSTER_SERIALIZE(x) \ + void x##UnusedClusterTemplateInitializer_() { \ + x *a = NULL; StreamedBinaryWrite<false> *w = NULL; StreamedBinaryRead<false> *r = NULL; \ + a->ClusterTransfer(*r); a->ClusterTransfer(*w); } +#else + #define DECLARE_CLUSTER_SERIALIZE(x) + #define IMPLEMENT_CLUSTER_SERIALIZE(x) +#endif diff --git a/Runtime/ClusterRenderer/ClusterRendererModule.cpp b/Runtime/ClusterRenderer/ClusterRendererModule.cpp new file mode 100644 index 0000000..86e349c --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterRendererModule.cpp @@ -0,0 +1,109 @@ +#include "UnityPrefix.h" +#if ENABLE_CLUSTER_SYNC +#include "ClusterRendererModule.h" +#include "ClusterNode.h" +#include "ClusterNetwork.h" +#include "ClusterTransfer.h" +#include "Runtime/Interfaces/IClusterRenderer.h" +#include "Runtime/Utilities/Argv.h" + +using namespace std; + +bool ClusterRendererModule::IsInClusterTestMode; + +ClusterRendererModule::ClusterRendererModule() +: m_Node(NULL) +, m_Context(NULL) +{ + +} + +ClusterRendererModule::~ClusterRendererModule() +{ + if(m_Node != NULL) + delete m_Node; + + if(m_Context != NULL) + DestroyZMQContext(m_Context); +} + +void ClusterRendererModule::ProcessServerArgs(vector<string> args) +{ + m_Context = CreateZMQContext(); + ClusterTransfer* sync = new ClusterTransfer(); + MasterSocket* socket = new MasterSocket(m_Context, args[1].c_str(), args[2].c_str()); + m_Node = new MasterNode(socket, sync, atoi(args[0].c_str())); +} + +void ClusterRendererModule::ProcessClientArgs(vector<string> args) +{ + m_Context = CreateZMQContext(); + ClusterTransfer* sync = new ClusterTransfer(); + SlaveSocket* socket = new SlaveSocket(m_Context, args[1].c_str(), args[2].c_str()); + m_Node = new SlaveNode(socket, sync, atoi(args[0].c_str())); +} + + +// TODO: user memory manager NEW +void ClusterRendererModule::InitCluster() +{ + AssertIf(m_Node != NULL); + vector<string> values; + + if(HasARGV("server")) + { + ProcessServerArgs(GetValuesForARGV("server")); + } + else if(HasARGV("client")) + { + ProcessClientArgs(GetValuesForARGV("client")); + } + +#ifdef DEBUG + IsInClusterTestMode = HasARGV("it"); + // output some logs to indicate we are ready to go + // for IntegrationTest + LogString("ClusterReady"); +#endif +} + + +void ClusterRendererModule::SynchronizeCluster() +{ + // TODO check for NULL sockets + if(m_Node!= NULL) + m_Node->Sync(); +} + +bool ClusterRendererModule::IsMasterOfCluster() +{ + return (m_Node != NULL) ? m_Node->IsMaster() : true; +} + +void ClusterRendererModule::ShutdownCluster() +{ + if(m_Node != NULL) + { + delete m_Node; + m_Node = NULL; + } + + if(m_Context != NULL) + { + DestroyZMQContext(m_Context); + } +} + +void InitializeClusterRendererModule () +{ + SetIClusterRenderer(UNITY_NEW_AS_ROOT(ClusterRendererModule, kMemClusterRenderer, "ClusterRendererInterface", "")); +} + +void CleanupClusterRendererModule () +{ + ClusterRendererModule* module = reinterpret_cast<ClusterRendererModule*> (GetIClusterRenderer ()); + UNITY_DELETE(module, kMemClusterRenderer); + SetIClusterRenderer (NULL); +} + +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterRendererModule.h b/Runtime/ClusterRenderer/ClusterRendererModule.h new file mode 100644 index 0000000..d07aeb7 --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterRendererModule.h @@ -0,0 +1,32 @@ +#pragma once +#if ENABLE_CLUSTER_SYNC +#include "Runtime/Interfaces/IClusterRenderer.h" + +class ClusterNode; + +using namespace std; + +class ClusterRendererModule : public IClusterRenderer +{ +public: + ClusterRendererModule(); + virtual ~ClusterRendererModule(); + virtual void InitCluster(); + virtual void SynchronizeCluster(); + virtual bool IsMasterOfCluster(); + virtual void ShutdownCluster(); +private: + void ProcessServerArgs(vector<string> args); + void ProcessClientArgs(vector<string> args); + ClusterNode* m_Node; + void* m_Context; +#ifdef DEBUG +public: + static bool IsInClusterTestMode; +#endif + +}; + +void InitializeClusterRendererModule (); +void CleanupClusterRendererModule (); +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterRendererModule.jam b/Runtime/ClusterRenderer/ClusterRendererModule.jam new file mode 100644 index 0000000..1baf137 --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterRendererModule.jam @@ -0,0 +1,177 @@ +rule ClusterRendererModule_ReportCpp +{ + return + Runtime/ClusterRenderer/ClusterRendererDefines.h + Runtime/ClusterRenderer/ClusterNetwork.cpp + Runtime/ClusterRenderer/ClusterNetwork.h + Runtime/ClusterRenderer/ClusterTransfer.cpp + Runtime/ClusterRenderer/ClusterTransfer.h + Runtime/ClusterRenderer/ClusterNode.cpp + Runtime/ClusterRenderer/ClusterNode.h + Runtime/ClusterRenderer/ClusterRendererModule.cpp + Runtime/ClusterRenderer/ClusterRendererModule.h + Runtime/ClusterRenderer/ClusterRendererModuleRegistration.cpp + + External/zmq/src/address.cpp + External/zmq/src/address.hpp + External/zmq/src/array.hpp + External/zmq/src/atomic_counter.hpp + External/zmq/src/atomic_ptr.hpp + External/zmq/src/blob.hpp + External/zmq/src/clock.cpp + External/zmq/src/clock.hpp + External/zmq/src/command.hpp + External/zmq/src/config.hpp + External/zmq/src/ctx.cpp + External/zmq/src/ctx.hpp + External/zmq/src/dealer.cpp + External/zmq/src/dealer.hpp + External/zmq/src/decoder.cpp + External/zmq/src/decoder.hpp + External/zmq/src/devpoll.cpp + External/zmq/src/devpoll.hpp + External/zmq/src/dist.cpp + External/zmq/src/dist.hpp + External/zmq/src/encoder.cpp + External/zmq/src/encoder.hpp + External/zmq/src/epoll.cpp + External/zmq/src/epoll.hpp + External/zmq/src/err.cpp + External/zmq/src/err.hpp + External/zmq/src/fd.hpp + External/zmq/src/fq.cpp + External/zmq/src/fq.hpp + External/zmq/src/i_decoder.hpp + External/zmq/src/i_encoder.hpp + External/zmq/src/i_engine.hpp + External/zmq/src/i_msg_sink.hpp + External/zmq/src/i_msg_source.hpp + External/zmq/src/i_poll_events.hpp + External/zmq/src/io_object.cpp + External/zmq/src/io_object.hpp + External/zmq/src/io_thread.cpp + External/zmq/src/io_thread.hpp + External/zmq/src/ip.cpp + External/zmq/src/ip.hpp + External/zmq/src/ipc_address.cpp + External/zmq/src/ipc_address.hpp + External/zmq/src/ipc_connecter.cpp + External/zmq/src/ipc_connecter.hpp + External/zmq/src/ipc_listener.cpp + External/zmq/src/ipc_listener.hpp + External/zmq/src/kqueue.cpp + External/zmq/src/kqueue.hpp + External/zmq/src/lb.cpp + External/zmq/src/lb.hpp + External/zmq/src/libzmq.pc.in + External/zmq/src/likely.hpp + External/zmq/src/mailbox.cpp + External/zmq/src/mailbox.hpp + External/zmq/src/msg.cpp + External/zmq/src/msg.hpp + External/zmq/src/mtrie.cpp + External/zmq/src/mtrie.hpp + External/zmq/src/mutex.hpp + External/zmq/src/object.cpp + External/zmq/src/object.hpp + External/zmq/src/options.cpp + External/zmq/src/options.hpp + External/zmq/src/own.cpp + External/zmq/src/own.hpp + External/zmq/src/pair.cpp + External/zmq/src/pair.hpp + External/zmq/src/pgm_receiver.cpp + External/zmq/src/pgm_receiver.hpp + External/zmq/src/pgm_sender.cpp + External/zmq/src/pgm_sender.hpp + External/zmq/src/pgm_socket.cpp + External/zmq/src/pgm_socket.hpp + External/zmq/src/pipe.cpp + External/zmq/src/pipe.hpp + External/zmq/src/platform.hpp.in + External/zmq/src/poll.cpp + External/zmq/src/poll.hpp + External/zmq/src/poller.hpp + External/zmq/src/poller_base.cpp + External/zmq/src/poller_base.hpp + External/zmq/src/precompiled.cpp + External/zmq/src/precompiled.hpp + External/zmq/src/proxy.cpp + External/zmq/src/proxy.hpp + External/zmq/src/pub.cpp + External/zmq/src/pub.hpp + External/zmq/src/pull.cpp + External/zmq/src/pull.hpp + External/zmq/src/push.cpp + External/zmq/src/push.hpp + External/zmq/src/random.cpp + External/zmq/src/random.hpp + External/zmq/src/reaper.cpp + External/zmq/src/reaper.hpp + External/zmq/src/rep.cpp + External/zmq/src/rep.hpp + External/zmq/src/req.cpp + External/zmq/src/req.hpp + External/zmq/src/router.cpp + External/zmq/src/router.hpp + External/zmq/src/select.cpp + External/zmq/src/select.hpp + External/zmq/src/session_base.cpp + External/zmq/src/session_base.hpp + External/zmq/src/signaler.cpp + External/zmq/src/signaler.hpp + External/zmq/src/socket_base.cpp + External/zmq/src/socket_base.hpp + External/zmq/src/stdint.hpp + External/zmq/src/stream_engine.cpp + External/zmq/src/stream_engine.hpp + External/zmq/src/sub.cpp + External/zmq/src/sub.hpp + External/zmq/src/tcp.cpp + External/zmq/src/tcp.hpp + External/zmq/src/tcp_address.cpp + External/zmq/src/tcp_address.hpp + External/zmq/src/tcp_connecter.cpp + External/zmq/src/tcp_connecter.hpp + External/zmq/src/tcp_listener.cpp + External/zmq/src/tcp_listener.hpp + External/zmq/src/thread.cpp + External/zmq/src/thread.hpp + External/zmq/src/trie.cpp + External/zmq/src/trie.hpp + External/zmq/src/v1_decoder.cpp + External/zmq/src/v1_decoder.hpp + External/zmq/src/v1_encoder.cpp + External/zmq/src/v1_encoder.hpp + External/zmq/src/v1_protocol.hpp + External/zmq/src/windows.hpp + External/zmq/src/wire.hpp + External/zmq/src/xpub.cpp + External/zmq/src/xpub.hpp + External/zmq/src/xsub.cpp + External/zmq/src/xsub.hpp + External/zmq/src/ypipe.hpp + External/zmq/src/yqueue.hpp + External/zmq/src/zmq.cpp + External/zmq/src/zmq_utils.cpp + ; +} + +rule ClusterRendererModule_ReportIncludes +{ + return + External/zmq/include + ; +} + +rule ClusterRendererModule_Init +{ + # enable this feature. This is only required now since modularization and licensing are not ON by default. + C.Defines : ENABLE_CLUSTER_SYNC=1 ; + + # register other override + OverrideModule ClusterRenderer : GetModule_Cpp : byOverridingWithMethod : ClusterRendererModule_ReportCpp ; + OverrideModule ClusterRenderer : GetModule_Inc : byOverridingWithMethod : ClusterRendererModule_ReportIncludes ; +} + +#RegisterModule ClusterRenderer ;
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterRendererModuleRegistration.cpp b/Runtime/ClusterRenderer/ClusterRendererModuleRegistration.cpp new file mode 100644 index 0000000..73be20d --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterRendererModuleRegistration.cpp @@ -0,0 +1,10 @@ +#include "UnityPrefix.h" +#if ENABLE_CLUSTER_SYNC +#include "ClusterRendererModule.h" + +extern "C" EXPORT_MODULE void RegisterModule_ClusterRenderer () +{ + InitializeClusterRendererModule (); +} + +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterTransfer.cpp b/Runtime/ClusterRenderer/ClusterTransfer.cpp new file mode 100644 index 0000000..40b6633 --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterTransfer.cpp @@ -0,0 +1,93 @@ +#include "UnityPrefix.h" +#if ENABLE_CLUSTER_SYNC +#include "ClusterTransfer.h" +#include "Runtime/Serialize/FileCache.h" +#include "Runtime/Serialize/CacheWrap.h" +#include "Runtime/Serialize/TransferFunctions/StreamedBinaryWrite.h" +#include "Runtime/Serialize/TransferFunctions/StreamedBinaryRead.h" +#include "Runtime/Input/InputManager.h" +#include "Runtime/Input/TimeManager.h" +#include "Runtime/Dynamics/PhysicsManager.h" + +#ifdef DEBUG +#include "Runtime/Dynamics/Rigidbody.h" +#include "Runtime/Graphics/Transform.h" +#endif + +template<class TransferFunc> +void TransferManagerStates (TransferFunc& transfer) +{ + // TODO: use macro incase manager is not present + GetInputManager().ClusterTransfer(transfer); + GetTimeManager().ClusterTransfer(transfer); + GetPhysicsManager().ClusterTransfer(transfer); +} + +void ClusterTransfer::TransferToBuffer(dynamic_array<UInt8>& buffer) +{ + StreamedBinaryWrite<false> writeStream; + CachedWriter& writeCache = writeStream.Init (0, BuildTargetSelection::NoTarget()); + MemoryCacheWriter memoryCache (buffer); + writeCache.InitWrite (memoryCache); + // transfer stuff + TransferManagerStates(writeStream); + // end of transfer + writeCache.CompleteWriting(); +} + +void ClusterTransfer::TransferFromBuffer(dynamic_array<UInt8>& buffer) +{ + // transfer setup + StreamedBinaryRead<false> readStream; + CachedReader& readCache = readStream.Init (0); + MemoryCacheReader memoryCache (buffer); + readCache.InitRead (memoryCache, 0, buffer.size()); + // transfer stuff + TransferManagerStates(readStream); + // end of transfer + readCache.End(); +} + +#ifdef DEBUG +void ClusterTransfer::TransferToFile(int slaveId) +{ + // create the buffer + dynamic_array<UInt8> buffer(kMemTempAlloc); + + // create the stream + StreamedBinaryWrite<false> writeStream; + CachedWriter& writeCache = writeStream.Init (0, BuildTargetSelection::NoTarget()); + MemoryCacheWriter memoryCache (buffer); + writeCache.InitWrite (memoryCache); + + // now write the data out to a file + for (int level=0;level<PhysicsManager::kMaxSortedActorsDepth;level++) + { + PhysicsManager::RigidbodyList& bodies = GetPhysicsManager().m_SortedActors[level]; + for (PhysicsManager::RigidbodyList::iterator i=bodies.begin();i != bodies.end();i++) + { + Rigidbody& body = **i; + + if (body.m_DisableReadUpdateTransform == 0) + { + GameObject& go = body.GetGameObject(); + Transform& transform = go.GetComponent (Transform); + transform.Transfer(writeStream); + } + } + } + + // end of transfer + writeCache.CompleteWriting(); + + + // now we have the buffer, we put it into a file, and compare that with other slaves. + char fileName[32]; + sprintf(fileName, "dump%d.dat\0", slaveId); + FILE* dump = fopen(fileName, "wb"); + fwrite(buffer.data(), sizeof(UInt8), buffer.size(), dump); + fclose(dump); +} +#endif // DEBUG + +#endif
\ No newline at end of file diff --git a/Runtime/ClusterRenderer/ClusterTransfer.h b/Runtime/ClusterRenderer/ClusterTransfer.h new file mode 100644 index 0000000..3b87fbd --- /dev/null +++ b/Runtime/ClusterRenderer/ClusterTransfer.h @@ -0,0 +1,16 @@ +#pragma once +#if ENABLE_CLUSTER_SYNC +#include "Runtime/Utilities/dynamic_array.h" + +class ClusterTransfer +{ +public: + void TransferToBuffer(dynamic_array<UInt8>& buffer); + void TransferFromBuffer(dynamic_array<UInt8>& buffer); +#ifdef DEBUG + void TransferToFile(int slaveId); +#endif +}; + + +#endif
\ No newline at end of file |