summaryrefslogtreecommitdiff
path: root/Runtime/ClusterRenderer/ClusterNode.cpp
diff options
context:
space:
mode:
authorchai <chaifix@163.com>2019-08-14 22:50:43 +0800
committerchai <chaifix@163.com>2019-08-14 22:50:43 +0800
commit15740faf9fe9fe4be08965098bbf2947e096aeeb (patch)
treea730ec236656cc8cab5b13f088adfaed6bb218fb /Runtime/ClusterRenderer/ClusterNode.cpp
+Unity Runtime codeHEADmaster
Diffstat (limited to 'Runtime/ClusterRenderer/ClusterNode.cpp')
-rw-r--r--Runtime/ClusterRenderer/ClusterNode.cpp128
1 files changed, 128 insertions, 0 deletions
diff --git a/Runtime/ClusterRenderer/ClusterNode.cpp b/Runtime/ClusterRenderer/ClusterNode.cpp
new file mode 100644
index 0000000..c17dd50
--- /dev/null
+++ b/Runtime/ClusterRenderer/ClusterNode.cpp
@@ -0,0 +1,128 @@
+#include "UnityPrefix.h"
+#if ENABLE_CLUSTER_SYNC
+#include "ClusterNode.h"
+#include "ClusterNetwork.h"
+#include "ClusterTransfer.h"
+
+#ifdef DEBUG
+#include "Runtime/Input/InputManager.h"
+#include "ClusterRendererModule.h"
+#endif
+
+MasterNode::~MasterNode()
+{
+ delete m_Socket;
+ delete m_Synchronizer;
+}
+
+
+void MasterNode::Sync()
+{
+#ifdef DEBUG
+ if(ClusterRendererModule::IsInClusterTestMode)
+ {
+ static int count = 0;
+ if(count++ == 400)
+ {
+ GetInputManager().QuitApplication();
+ return;
+ }
+ }
+#endif
+
+ // this may early out
+ WaitForSlavesToConnect();
+
+ // the write buffer
+ dynamic_array<UInt8> buffer(kMemTempAlloc);
+ // Transfer
+ m_Synchronizer->TransferToBuffer(buffer);
+ // in the end, send out the buffer via zmq
+ m_Socket->Publish(buffer);
+
+ // sync up, this should block
+ WaitForSlavesToAck();
+}
+
+
+void MasterNode::WaitForSlavesToConnect()
+{
+ // don't try if we have all initialy connected
+ if(m_AllSlavesConnected) return;
+ // check and wait for each slave
+ for(int i = 0; i < m_InitialSlaveCount;)
+ {
+ // This will block
+ if(m_Socket->WaitForSubscriber())
+ i++;
+ }
+ // only do it once
+ m_AllSlavesConnected = true;
+}
+
+
+void MasterNode::WaitForSlavesToAck()
+{
+ int checkedInCount = 0;
+ // while there are slaves unaccounted for
+ while((m_CurrentSlaveCount - checkedInCount) > 0)
+ {
+ // check for unsubscription
+ if(m_Socket->CheckForUnsubscribe())
+ m_CurrentSlaveCount--;
+
+ // check control signal
+ if(m_Socket->GetAck())
+ checkedInCount++;
+ }
+}
+
+
+SlaveNode::SlaveNode(SlaveSocket* socket, ClusterTransfer* sync, int param)
+: ClusterNode(false)
+, m_Socket(socket)
+, m_Synchronizer(sync)
+, m_SlaveId(param)
+{
+ // should this be here?
+ m_Socket->Subscribe(m_SlaveId);
+}
+
+
+SlaveNode::~SlaveNode()
+{
+ // send the ubsubscribe signal
+ m_Socket->Unsubscribe(m_SlaveId);
+ // destroy stuff
+ delete m_Socket;
+ delete m_Synchronizer;
+}
+
+
+void SlaveNode::Sync()
+{
+#ifdef DEBUG
+ if(ClusterRendererModule::IsInClusterTestMode)
+ {
+ static int count = 0;
+ if(count++ == 300)
+ {
+ m_Synchronizer->TransferToFile(m_SlaveId);
+ GetInputManager().QuitApplication();
+ return;
+ }
+ }
+#endif
+
+ // TODO: fix potential buffer overflow
+ // the read buffer
+ dynamic_array<UInt8> buffer(4096, kMemTempAlloc);
+ // wait for the server to send the buffer over
+ m_Socket->Listen(buffer);
+ // send control signal
+ m_Socket->SendAck();
+ // use the data
+ m_Synchronizer->TransferFromBuffer(buffer);
+}
+
+#endif \ No newline at end of file