Commit 53c5f571 by xiaotong

bug fixes

parent d39baab3
...@@ -58,6 +58,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep ...@@ -58,6 +58,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep
{ {
TensorList & tp = target->params; TensorList & tp = target->params;
int finished = 0; int finished = 0;
for (int j = 0; j < tp.count; j++)
target->flags[j] = PARAM_STATE_NOT_READY;
/* check */ /* check */
for (int i = 0; i < sourceList->count; i++) { for (int i = 0; i < sourceList->count; i++) {
...@@ -73,9 +76,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep ...@@ -73,9 +76,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep
if (collectMode == DATA_COLLECT_P2P) { if (collectMode == DATA_COLLECT_P2P) {
for (int j = 0; j < tp.count; j++) { for (int j = 0; j < tp.count; j++) {
/* target->flags[j] is ready only if model finishes the computation /* tp[j]->isGradFinished is true only if the model finishes the computation
(in another process) */ (in another process) */
if (target->flags[j] != PARAM_STATE_READY) if (target->flags[j] == PARAM_STATE_COLLECTED || !tp[j]->isGradFinished)
continue; continue;
/* check if all the models (or part of them) are ready */ /* check if all the models (or part of them) are ready */
...@@ -83,9 +86,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep ...@@ -83,9 +86,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep
XModel * source = (XModel*)sourceList->GetItem(i); XModel * source = (XModel*)sourceList->GetItem(i);
TensorList & sp = source->params; TensorList & sp = source->params;
/* source->flags[j] is ready only if model finishes the computation /* sp[j]->isGradFinished is true only if the model finishes the computation
(in another process) */ (in another process) */
if (source->flags[j] == PARAM_STATE_READY) { if (source->flags[j] != PARAM_STATE_COLLECTED && sp[j]->isGradFinished) {
/* data transmit */ /* data transmit */
CollectP2P(sp.GetItem(j), tp.GetItem(j)); CollectP2P(sp.GetItem(j), tp.GetItem(j));
...@@ -103,9 +106,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep ...@@ -103,9 +106,9 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep
bool ready = true; bool ready = true;
/* target->flags[j] is ready only if model finishes the computation /* tp[j]->isGradFinished is true only if the model finishes the computation
(in another process) */ (in another process) */
if (target->flags[j] != PARAM_STATE_READY) if (target->flags[j] == PARAM_STATE_COLLECTED || !tp[j]->isGradFinished)
continue; continue;
/* check if all the models (or part of them) are ready */ /* check if all the models (or part of them) are ready */
...@@ -113,12 +116,16 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep ...@@ -113,12 +116,16 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep
XModel * source = (XModel*)sourceList->GetItem(i); XModel * source = (XModel*)sourceList->GetItem(i);
TensorList & sp = source->params; TensorList & sp = source->params;
/* source->flags[j] is ready only if model finishes the computation /* sp[j]->isGradFinished is true only if the model finishes the computation
(in another process) */ (in another process) */
if (source->flags[j] != PARAM_STATE_READY) { if (source->flags[j] == PARAM_STATE_COLLECTED || !sp[j]->isGradFinished) {
ready = false; ready = false;
break; break;
} }
else if(source->flags[j] == PARAM_STATE_NOT_READY){
source->flags[j] = PARAM_STATE_READY;
}
} }
if (ready) { if (ready) {
...@@ -152,14 +159,13 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep ...@@ -152,14 +159,13 @@ void XWorkerCollect::CollectData(XList * sourceList, XModel * target, long sleep
break; break;
/* reset the flags */ /* reset the flags */
for (int i = 0; i < tp.count; i++) { for (int j = 0; j < tp.count; j++)
target->flags[i] = PARAM_STATE_COLLECTED; target->flags[j] = PARAM_STATE_COLLECTED;
}
#ifdef _WIN32 #ifdef _WIN32
Sleep((DWORD)sleepTime); Sleep((DWORD)sleepTime);
#else #else
sleep(sleepTime / 1000); sleep((unsigned)sleepTime / 1000);
#endif #endif
} }
} }
...@@ -230,4 +236,4 @@ void XWorkerCollect::CollectAllReduce(XList * all) ...@@ -230,4 +236,4 @@ void XWorkerCollect::CollectAllReduce(XList * all)
ShowNTErrors("TODO!"); ShowNTErrors("TODO!");
} }
} }
\ No newline at end of file
...@@ -89,7 +89,7 @@ bool XWorkerJob::AddJobRefresh(XModel * myModel) ...@@ -89,7 +89,7 @@ bool XWorkerJob::AddJobRefresh(XModel * myModel)
XList args(1); XList args(1);
args.Add(myModel); args.Add(myModel);
queue.EnqueueJob(XModel::Refresh, &args); queue.EnqueueJob((void*)(char*)XModel::Refresh, &args);
return true; return true;
} }
...@@ -112,7 +112,7 @@ bool XWorkerJob::AddJobNeuralNet(XModel * myModel, XList * inputs, XList * outpu ...@@ -112,7 +112,7 @@ bool XWorkerJob::AddJobNeuralNet(XModel * myModel, XList * inputs, XList * outpu
args.AddList(inputs); args.AddList(inputs);
args.AddList(outputs); args.AddList(outputs);
queue.EnqueueJob(XModel::Run, &args); queue.EnqueueJob((void*)(char*)XModel::Run, &args);
return true; return true;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论