Commit 78307f09 by xiaotong

updates with bugs

parent 726f5e21
...@@ -482,6 +482,91 @@ void XLeader::RunUpdate(XConfig * config, XOptimizer * optimizer, const int * ac ...@@ -482,6 +482,91 @@ void XLeader::RunUpdate(XConfig * config, XOptimizer * optimizer, const int * ac
} }
if(1){ if(1){
int finished = 0;
for (int j = 0; j < serverModel.paramNum; j++)
serverModel.params[j].flag = PARAM_STATE_NOT_READY;
/* check */
for (int i = 0; i < membersAll.count; i++) {
XModel * source = (XModel*)membersAll.GetItem(i);
CheckNTErrors(source->paramNum == serverModel.paramNum, "Incompatiable models!");
}
for (int i = 0; i < members.count; i++) {
XModel * source = (XModel*)members.GetItem(i);
CheckNTErrors(source->paramNum == serverModel.paramNum, "Incompatiable models!");
}
CheckNTErrors(jobQueues.count == serverModel.paramNum, "Incompatiable model!");
/* counts how many member models are collect for each parameters */
int * finishedCount = new int[serverModel.paramNum];
memset(finishedCount, 0, sizeof(int) * serverModel.paramNum);
/* This is a simple implementation of the wait-and-collect process. But
there is a risk that some models are not available, that is, the
loop would never stop. A solution might be that we force the loop
to break after waiting for a short time. */
while (1) {
for (int j = 0; j < serverModel.paramNum; j++) {
XParamKeeper &paramServer = serverModel.params[j];
/* tp[j]->isGradFinished is true only if the model finishes the computation
(in another process) */
if (paramServer.flag != PARAM_STATE_NOT_READY || !paramServer.param->isGradFinished)
continue;
/* check if all the models (or part of them) are ready */
for (int i = 0; i < members.count; i++) {
XModel * source = (XModel*)members.GetItem(i);
XParamKeeper &paramSource = source->params[j];
/* sp[j]->isGradFinished is true only if the model finishes the computation
(in another process) */
if (paramSource.flag == PARAM_STATE_NOT_READY && paramSource.param->isGradFinished) {
/* data transmit */
CollectP2P(paramSource.param->grad, paramServer.param->grad);
/* reset the flag */
paramSource.flag = PARAM_STATE_COLLECTED;
finished++;
finishedCount[j]++;
/* we call model update (in another thread) and then
broadcast the new parameters to member models
(in another thread) */
if (finishedCount[j] == memberActive->count) {
paramServer.flag = PARAM_STATE_COLLECTED;
if (updater != NULL) {
XQueue* jobQueue = (XQueue*)jobQueues->GetItem(j);
/* update the parameters */
updater->AddJobUpdate(jobQueue, server, j, optimizer);
updater->AddJobEnqueueFinished(jobQueue);
/* broadcast the new parameter to other models*/
broadcaster->AddJobBroadcastSingle(jobQueue, server, memberAll, j);
broadcaster->AddJobEnqueueFinished(jobQueue);
}
}
else if (finishedCount[j] > memberActive->count) {
ShowNTErrors("Something is wrong with finishedCount!");
}
}
}
}
/* the collection finishes if all data tensors are processed */
if (finished == server->paramNum * memberActive->count)
break;
XSleep(sleepTime);
}
delete[] finishedCount;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论