Commit 6c4d849b by xiaotong

udpates of XWorkerUpdate

parent 4c7d8dde
......@@ -268,7 +268,7 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
XTensorKeeper& paramServer = serverModel.params[j];
/* isGradFinished is true only if the model finishes the computation
(in another thread) */
(in another thread) */
if (paramServer.flag != PARAM_STATE_NOT_READY || !paramServer.tensor->isGradFinished)
continue;
......@@ -289,13 +289,6 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
(in another thread) */
if (paramWorker.flag == PARAM_STATE_NOT_READY && paramWorker.tensor->isGradFinished) {
/* get the gradient */
//paramWorker.grad = paramWorker.tensor->grad;
/* data transmit */
//collecter->AddJobCollectDataP2P(NULL, paramWorker.grad, paramServer.grad);
//collecter->AddJobEnqueueFinished();
/* We keep the worker parameter in a list. It would be used when we broadcast
the updated paramter to the workers, that is, this is a list of worker
parameters. */
......@@ -316,19 +309,23 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
for(int k = 0; k < modelNum; k++)
grads.Add(&paramMap[j][i]);
/* here we use the queue of the collecter as the job queue. First,
we put an all-reduce call in the queue. Then, we put the update call
in the queue. The update call would folk a number of jobs. Each of the
jobs updates a model. These jobs are executed simultaneously and are
actually run in its own queue. */
XQueue * queue = collecter->GetJobQueue();
/* run the all-reduce procedure to collect the gradient and share
the gradient sum across models */
collecter->AddJobCollectGradAllReduce(NULL, &grads);
collecter->AddJobCollectGradAllReduce(queue, &grads);
collecter->AddJobEnqueueFinished(queue);
/* update on every model. NOTE THAT we do not worry about the
inconsistence issue of updated parameters across models because
the all-reduce method can guarantee that all the models share
the same copy of the gradient. */
for (int k = 0; k < modelNum; k++) {
XWorkerUpdate * updater = (XWorkerUpdate*)uworkers[k];
updater->AddJobUpdate(NULL, &paramServer, optimizer);
updater->AddJobEnqueueFinished();
}
updaterPrime->AddJobUpdateBatch(queue, &uworkers, &paramList[i], &optimizers);
}
else if (finishedCount[j] > activeModelCount) {
ShowNTErrors("Something is wrong with finishedCount!");
......@@ -344,6 +341,11 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
XSleep(SLEEP_TIME_IN_WAITING_JOB_WORKERS);
}
if (activeModelCount < modelNum) {
/* TODO: broadcast the laster parameter to the models that
are not involved in the update. */
}
delete[] finishedCount;
delete[] modelFlag;
delete[] paramList;
......
......@@ -122,4 +122,83 @@ bool XWorkerUpdate::AddJobUpdate(XQueue * jobQueue,
return true;
}
/*
update a number of parameters simultaneously
>> updaters - a batch of updaters
>> paramKeepers - a batch of parameter keepers
>> optimizers - a batch of optimizers
*/
void XWorkerUpdate::UpdateParameterBatch(XList * updaters,
XList * paramKeepers,
XList * optimizers)
{
CheckNTErrors(updaters != NULL, "No updaters!");
CheckNTErrors(paramKeepers != NULL, "No paramter keepers!");
CheckNTErrors(optimizers != NULL, "No optimizers!");
CheckNTErrors(updaters->count != paramKeepers->count,
"Updaters and parameter keepers are not of the same number!");
CheckNTErrors(updaters->count != optimizers->count,
"Updaters and optimizers are not of the same number!");
for (int i = 0; i < updaters->count; i++) {
XWorkerUpdate * updater = (XWorkerUpdate*)updaters->GetItem(i);
XTensorKeeper * param = (XTensorKeeper*)paramKeepers->GetItem(i);
XOptimizer * optimizer = (XOptimizer*)optimizers->GetItem(i);
XQueue * queue = updater->GetJobQueue();
/* we update the parameter in each individual queue */
updater->AddJobUpdate(queue, param, optimizer);
updater->AddJobEnqueueFinished(queue);
}
}
/* wrapper of UpdateParameterBatch */
void XWorkerUpdate::UpdateBatch(XList * args)
{
int paramCount = 0;
CheckNTErrors(args != NULL && args->count == 4, "Illegal argument list!");
XWorkerUpdate * primitiveUpdater = (XWorkerUpdate*)args->GetItem(paramCount++);
XList * updaters = (XList*)args->GetItem(paramCount++);
XList * paramKeepers = (XList*)args->GetItem(paramCount++);
XList * optimizers = (XList*)args->GetItem(paramCount++);
CheckNTErrors(primitiveUpdater != NULL, "No updater!");
primitiveUpdater->UpdateParameterBatch(updaters, paramKeepers, optimizers);
}
/*
add a new job of parameter update (for a batch)
>> jobQueue - the queue for running the primitive job
>> updaters - a batch of updaters
>> paramKeepers - a batch of parameter keepers
>> optimizers - a batch of optimizers
*/
bool XWorkerUpdate::AddJobUpdateBatch(XQueue * jobQueue,
XList * updaters,
XList * paramKeepers,
XList * optimizers)
{
CheckNTErrors(updaters != NULL, "No updaters!");
CheckNTErrors(paramKeepers != NULL, "No paramter keepers!");
CheckNTErrors(optimizers != NULL, "No optimizers!");
XList args;
args.Add(this);
args.Add(updaters);
args.Add(paramKeepers);
args.Add(optimizers);
XQueue& queueRun = jobQueue != NULL ? *jobQueue : queue;
if (isInstantRun)
XWorkerUpdate::UpdateBatch(&args);
else
queueRun.EnqueueJob((void*)(char*)XWorkerUpdate::UpdateBatch, &args);
return true;
}
}
......@@ -63,8 +63,18 @@ public:
static
void Update(XList * args);
/* add a new job of model update (for a parameter) */
/* add a new job of parameter update */
bool AddJobUpdate(XQueue * jobQueue, XTensorKeeper * paramKeeper, XOptimizer * optimizer);
/* update a number of parameters simultaneously */
void UpdateParameterBatch(XList * updaters, XList * paramKeepers, XList * optimizers);
/* wrapper of UpdateParameterBatch */
static
void UpdateBatch(XList * args);
/* add a new job of parameter update (for a batch)*/
bool AddJobUpdateBatch(XQueue * jobQueue, XList * updaters, XList * paramKeepers, XList * optimizers);
};
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论