Commit 6a0f0557 by xiaotong

bug fixes for dealing with inactive job workers

parent 51e02fd6
......@@ -306,28 +306,27 @@ void XLeader::AddCollectWorker(DATA_COLLECT_TYPE mode)
cworkers.Add(worker);
}
/*
add a model-update worker
>> model - the model
*/
void XLeader::AddUpdateWorker(XModel * model)
/* add model-update workers */
void XLeader::AddUpdateWorker(int n)
{
XWorkerUpdate * worker = new XWorkerUpdate();
for (int i = 0; i < n; i++) {
XWorkerUpdate* worker = new XWorkerUpdate();
uworkers.Add(worker);
}
}
/* add a data-broadcasting worker */
void XLeader::AddBroadcastWorker()
{
XWorkerBroadcast * worker = new XWorkerBroadcast();
XWorkerBroadcast* worker = new XWorkerBroadcast();
bworkers.Add(worker);
}
/*
add a parameter worker (or a pipeline)
>> n - number of parameters
add parameter worker (or a pipeline)
>> n - number of workers
*/
void XLeader::AddParamterWorker(int n)
void XLeader::AddAuxiliaryWorker(int n)
{
for (int i = 0; i < n; i++) {
XWorker * worker = new XWorker();
......@@ -349,17 +348,7 @@ void XLeader::DestroyParamMap()
/* generate the map of parameters */
void XLeader::MakeParamMap()
{
int modelCount = 0;
for (int i = 0; i < jworkers.count; i++) {
XWorker * worker = (XWorker*)jworkers[i];
if (worker->GetWorkerType() == XWORKER_TYPE_JOB) {
modelCount += worker->GetModelNum();
CheckNTErrors(worker->GetModelNum() == 1, "Wrong model number!");
}
else {
ShowNTErrors("TODO: support a new XWorker type!");
}
}
int modelCount = CountModels();
if(modelCount != modelNum){
DestroyParamMap();
......@@ -390,4 +379,22 @@ void XLeader::MakeParamMap()
modelNum = modelCount;
}
/* count all the models */
int XLeader::CountModels()
{
int modelCount = 0;
for (int i = 0; i < jworkers.count; i++) {
XWorker* worker = (XWorker*)jworkers[i];
if (worker->GetWorkerType() == XWORKER_TYPE_JOB) {
modelCount += worker->GetModelNum();
CheckNTErrors(worker->GetModelNum() == 1, "Wrong model number!");
}
else {
ShowNTErrors("TODO: support a new XWorker type!");
}
}
return modelCount;
}
} /* end of the nts (NiuTrans.Tensor) namespace */
......@@ -161,20 +161,23 @@ public:
/* add a data-collecting worker */
void AddCollectWorker(DATA_COLLECT_TYPE mode = DATA_COLLECT_P2P);
/* add a model-update worker */
void AddUpdateWorker(XModel * model);
/* add model-update workers */
void AddUpdateWorker(int n = 1);
/* add a data-broadcasting worker */
void AddBroadcastWorker();
/* add a parameter worker (or a pipeline) */
void AddParamterWorker(int n);
/* add auxiliary worker (or a pipeline) */
void AddAuxiliaryWorker(int n);
/* destroy the parameter map (and gradient map) */
void DestroyParamMap();
/* generate the map of parameters */
void MakeParamMap();
/* count all the models */
int CountModels();
};
}
......
......@@ -57,8 +57,8 @@ void XLeaderAllReduce::MakeAll(XConfig * config, XModel * model, const int * dev
Init();
AddJobWorker(model, jobWorkerNum, devIDs);
AddCollectWorker();
for(int i = 0; i < jobWorkerNum; i++)
AddUpdateWorker(model);
AddUpdateWorker();
AddAuxiliaryWorker(CountModels());
XLeader::MakeAll(config, model);
}
......@@ -193,10 +193,9 @@ update the model in a standard server-worker manner
*/
void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* active)
{
/* workers */
XWorkerCollect* collecter = (XWorkerCollect*)cworkers.GetItem(0);
XWorkerUpdate* updater = (XWorkerUpdate*)uworkers.GetItem(0);
XWorkerBroadcast* broadcaster = (XWorkerBroadcast*)bworkers.GetItem(0);
CheckNTErrors(uworkers.count >= modelNum, "No enough updaters!");
/* parameter map */
MakeParamMap();
......@@ -204,26 +203,13 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
/* all member models */
XList membersAll(jworkers.count);
/* job queues */
XList jobQueues;
for (int i = 0; i < jworkers.count; i++) {
XWorkerJob* worker = (XWorkerJob*)jworkers[i];
membersAll.Add(worker->GetModel());
}
for (int i = 0; i < aworkers.count; i++) {
XWorker* worker = (XWorker*)aworkers[i];
jobQueues.Add(worker->GetJobQueue());
}
CheckNTErrors(jobQueues.count == serverModel.paramNum, "Incompatiable model!");
/* jobs in queue 2 (say jobQueue): collect the (gradient) data.
This is a reduce process. Then we add a job to to update the model. followed
by a job to broadcast the lastest parameters to workers. NOTE that we
would update a worker to the latest model parameters, even if it is not
involved in this run. */
/* we reduce gradient across all job workers and update the parameter
on each job worker. */
int finished = 0;
......@@ -270,6 +256,11 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
for (int n = 0, i = 0; n < jworkers.count; n++) {
XWorkerJob* worker = (XWorkerJob*)jworkers[n];
for (int m = 0; m < worker->GetModelNum(); m++, i++) {
/* skip the inactive model */
if (modelFlag[i] == 0)
continue;
XTensorKeeper& paramWorker = paramMap[j][i];
/* isGradFinished is true only if the model finishes the computation
......@@ -277,14 +268,11 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
if (paramWorker.flag == PARAM_STATE_NOT_READY && paramWorker.tensor->isGradFinished) {
/* get the gradient */
paramWorker.grad = paramWorker.tensor->grad;
/* the job queue of updating parameter j */
XQueue* jobQueue = (XQueue*)jobQueues.GetItem(j);
//paramWorker.grad = paramWorker.tensor->grad;
/* data transmit */
collecter->AddJobCollectDataP2P(jobQueue, paramWorker.grad, paramServer.grad);
collecter->AddJobEnqueueFinished(jobQueue);
//collecter->AddJobCollectDataP2P(NULL, paramWorker.grad, paramServer.grad);
//collecter->AddJobEnqueueFinished();
/* We keep the worker parameter in a list. It would be used when we broadcast
the updated paramter to the workers, that is, this is a list of worker
......@@ -301,15 +289,18 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
(in another thread) */
if (finishedCount[j] == activeModelCount) {
paramServer.flag = PARAM_STATE_COLLECTED;
if (updater != NULL) {
/* update the parameters */
updater->AddJobUpdate(jobQueue, &paramServer, optimizer);
updater->AddJobEnqueueFinished(jobQueue);
/* broadcast the new parameter to other models */
broadcaster->AddJobBroadcast(jobQueue, &paramServer, &paramList[j]);
broadcaster->AddJobEnqueueFinished(jobQueue);
/* call the all-reduce method to collect the gradient and share
the gradient sum across models */
/* update on every model. NOTE THAT we do not worry about the
inconsistence issue of updated parameters across models because
the all-reduce method can garantee that the model shared the same
copy of the gradient. */
for (int k = 0; k < modelNum; k++) {
XWorkerUpdate* updater = (XWorkerUpdate*)uworkers[k];
updater->AddJobUpdate(NULL, &paramServer, optimizer);
updater->AddJobEnqueueFinished();
}
}
else if (finishedCount[j] > activeModelCount) {
......
......@@ -55,9 +55,9 @@ void XLeaderPS::MakeAll(XConfig * config, XModel * model, const int * devIDs, co
Init();
AddJobWorker(model, jobWorkerNum, devIDs);
AddCollectWorker();
AddUpdateWorker(model);
AddUpdateWorker();
AddBroadcastWorker();
AddParamterWorker(model->paramNum);
AddAuxiliaryWorker(model->paramNum);
XLeader::MakeAll(config, model);
}
......@@ -253,6 +253,10 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
}
}
if (activeModelCount != jworkers.count) {
int nnn = 0;
}
XList* paramList = new XList[serverModel.paramNum];
CheckNTErrors(modelCount == modelNum, "Wrong model number!");
......@@ -276,6 +280,11 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
for (int n = 0, i = 0; n < jworkers.count; n++) {
XWorkerJob* worker = (XWorkerJob*)jworkers[n];
for (int m = 0; m < worker->GetModelNum(); m++, i++) {
/* skip the inactive model */
if (modelFlag[i] == 0)
continue;
XTensorKeeper& paramWorker = paramMap[j][i];
/* isGradFinished is true only if the model finishes the computation
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论