Commit 4c7d8dde by xiaotong

updates

parent ecc8c6ed
...@@ -394,6 +394,8 @@ int XLeader::CountModels() ...@@ -394,6 +394,8 @@ int XLeader::CountModels()
} }
} }
CheckNTErrors(modelCount == jworkers.count, "We assume that a worker just has one model!");
return modelCount; return modelCount;
} }
......
...@@ -43,22 +43,44 @@ XLeaderAllReduce::XLeaderAllReduce() ...@@ -43,22 +43,44 @@ XLeaderAllReduce::XLeaderAllReduce()
/* deconstructor */ /* deconstructor */
XLeaderAllReduce::~XLeaderAllReduce() XLeaderAllReduce::~XLeaderAllReduce()
{ {
Clear();
}
/* clear */
void XLeaderAllReduce::Clear()
{
for(int i = 1; i < optimizers.count; i++){
XOptimizer * opt = (XOptimizer*)optimizers[i];
delete opt;
}
optimizers.Clear();
} }
/* /*
create workers and other stuff create workers and other stuff
>> config - configuration >> config - configuration
>> model - the model that we run >> model - the model that we run
>> optimizer - the optimizer
>> devIDs - device ids of the workers (the first id is for server) >> devIDs - device ids of the workers (the first id is for server)
>> jobWorkerNum - number of job workers >> jobWorkerNum - number of job workers
*/ */
void XLeaderAllReduce::MakeAll(XConfig * config, XModel * model, const int * devIDs, const int jobWorkerNum) void XLeaderAllReduce::MakeAll(XConfig * config, XModel * model, XOptimizer * optimizer,
const int * devIDs, const int jobWorkerNum)
{ {
Clear();
Init(); Init();
AddJobWorker(model, jobWorkerNum, devIDs); AddJobWorker(model, jobWorkerNum, devIDs);
AddCollectWorker(); AddCollectWorker();
AddUpdateWorker();
AddAuxiliaryWorker(CountModels()); /* Model updaters. One updater for each model. */
AddUpdateWorker(jobWorkerNum);
/* Optimizers of updating models. One optimizer for each model. */
optimizers.Add(optimizer);
for(int i = 1; i < jobWorkerNum; i++){
XOptimizer * opt = optimizer->Clone(devIDs[i]);
optimizers.Add(opt);
}
XLeader::MakeAll(config, model); XLeader::MakeAll(config, model);
} }
...@@ -193,8 +215,10 @@ update the model in a standard server-worker manner ...@@ -193,8 +215,10 @@ update the model in a standard server-worker manner
*/ */
void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* active) void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* active)
{ {
XWorkerCollect* collecter = (XWorkerCollect*)cworkers.GetItem(0); XWorkerCollect * collecter = (XWorkerCollect*)cworkers.GetItem(0);
XWorkerUpdate * updaterPrime = (XWorkerUpdate*)uworkers.GetItem(0);
CheckNTErrors(modelNum == jworkers.count, "We assume that a worker has one model only!");
CheckNTErrors(uworkers.count >= modelNum, "No enough updaters!"); CheckNTErrors(uworkers.count >= modelNum, "No enough updaters!");
/* parameter map */ /* parameter map */
...@@ -253,9 +277,7 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i ...@@ -253,9 +277,7 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
paramServer.grad = paramServer.tensor->grad; paramServer.grad = paramServer.tensor->grad;
/* check if all the models (or part of them) are ready */ /* check if all the models (or part of them) are ready */
for (int n = 0, i = 0; n < jworkers.count; n++) { for (int i = 0; i < jworkers.count; i++) {
XWorkerJob* worker = (XWorkerJob*)jworkers[n];
for (int m = 0; m < worker->GetModelNum(); m++, i++) {
/* skip the inactive model */ /* skip the inactive model */
if (modelFlag[i] == 0) if (modelFlag[i] == 0)
...@@ -290,8 +312,13 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i ...@@ -290,8 +312,13 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
if (finishedCount[j] == activeModelCount) { if (finishedCount[j] == activeModelCount) {
paramServer.flag = PARAM_STATE_COLLECTED; paramServer.flag = PARAM_STATE_COLLECTED;
XList grads;
for(int k = 0; k < modelNum; k++)
grads.Add(&paramMap[j][i]);
/* run the all-reduce procedure to collect the gradient and share /* run the all-reduce procedure to collect the gradient and share
the gradient sum across models */ the gradient sum across models */
collecter->AddJobCollectGradAllReduce(NULL, &grads);
/* update on every model. NOTE THAT we do not worry about the /* update on every model. NOTE THAT we do not worry about the
inconsistence issue of updated parameters across models because inconsistence issue of updated parameters across models because
...@@ -309,7 +336,6 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i ...@@ -309,7 +336,6 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
} }
} }
} }
}
/* finishes if all data tensors are processed */ /* finishes if all data tensors are processed */
if (finished == serverModel.paramNum * activeModelCount) if (finished == serverModel.paramNum * activeModelCount)
......
...@@ -43,6 +43,10 @@ namespace nts { // namespace nts(NiuTrans.Tensor) ...@@ -43,6 +43,10 @@ namespace nts { // namespace nts(NiuTrans.Tensor)
/* parameter server */ /* parameter server */
class XLeaderAllReduce : public XLeader class XLeaderAllReduce : public XLeader
{ {
protected:
/* optimizer for each model */
XList optimizers;
public: public:
/* constructor */ /* constructor */
XLeaderAllReduce(); XLeaderAllReduce();
...@@ -50,8 +54,11 @@ public: ...@@ -50,8 +54,11 @@ public:
/* deconstructor */ /* deconstructor */
~XLeaderAllReduce(); ~XLeaderAllReduce();
/* clear */
void Clear();
/* create workers and other stuff used in training */ /* create workers and other stuff used in training */
void MakeAll(XConfig * config, XModel * model, const int * devIDs, const int jobWorkerNum); void MakeAll(XConfig * config, XModel * model, XOptimizer * optimizer, const int * devIDs, const int jobWorkerNum);
/* wait for finished states (i.e., all workers finish their jobs) */ /* wait for finished states (i.e., all workers finish their jobs) */
void WaitForFinishing(const int * activeJobWorkers, const int isToUpdate); void WaitForFinishing(const int * activeJobWorkers, const int isToUpdate);
......
...@@ -223,6 +223,7 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act ...@@ -223,6 +223,7 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
jobQueues.Add(worker->GetJobQueue()); jobQueues.Add(worker->GetJobQueue());
} }
CheckNTErrors(modelNum == jworkers.count, "We assume that a worker has one model only!");
CheckNTErrors(jobQueues.count == serverModel.paramNum, "Incompatiable model!"); CheckNTErrors(jobQueues.count == serverModel.paramNum, "Incompatiable model!");
/* jobs in queue 2 (say jobQueue): collect the (gradient) data. /* jobs in queue 2 (say jobQueue): collect the (gradient) data.
...@@ -253,10 +254,6 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act ...@@ -253,10 +254,6 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
} }
} }
if (activeModelCount != jworkers.count) {
int nnn = 0;
}
XList* paramList = new XList[serverModel.paramNum]; XList* paramList = new XList[serverModel.paramNum];
CheckNTErrors(modelCount == modelNum, "Wrong model number!"); CheckNTErrors(modelCount == modelNum, "Wrong model number!");
...@@ -277,9 +274,7 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act ...@@ -277,9 +274,7 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
paramServer.grad = paramServer.tensor->grad; paramServer.grad = paramServer.tensor->grad;
/* check if all the models (or part of them) are ready */ /* check if all the models (or part of them) are ready */
for (int n = 0, i = 0; n < jworkers.count; n++) { for (int i = 0; i < jworkers.count; i++) {
XWorkerJob* worker = (XWorkerJob*)jworkers[n];
for (int m = 0; m < worker->GetModelNum(); m++, i++) {
/* skip the inactive model */ /* skip the inactive model */
if (modelFlag[i] == 0) if (modelFlag[i] == 0)
...@@ -333,7 +328,6 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act ...@@ -333,7 +328,6 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
} }
} }
} }
}
/* finishes if all data tensors are processed */ /* finishes if all data tensors are processed */
if (finished == serverModel.paramNum * activeModelCount) if (finished == serverModel.paramNum * activeModelCount)
......
...@@ -66,6 +66,32 @@ void XOptimizer::Reset() ...@@ -66,6 +66,32 @@ void XOptimizer::Reset()
{ {
} }
/* clone the optimizer (with the data in it) */
XOptimizer * XOptimizer::Clone(int devID)
{
XOptimizer * opt = new XOptimizer();
Copy(this, opt, devID);
return opt;
}
/*
copy data
>> source - where we copy the data from
>> target - where we copy the data to
>> devID - the device where place the new data
*/
void XOptimizer::Copy(XOptimizer * source, XOptimizer * target, int devID)
{
CheckNTErrors(source != NULL, "No input source optimizer");
CheckNTErrors(target != NULL, "No input source optimizer");
target->nstep = source->nstep;
target->nepoch = source->nepoch;
target->lrate = source->lrate;
}
void XOptimizer::ShowSettings() void XOptimizer::ShowSettings()
{ {
XPRINT(1, stderr, "[INFO] Optimizer Setup:\n"); XPRINT(1, stderr, "[INFO] Optimizer Setup:\n");
......
...@@ -67,6 +67,14 @@ public: ...@@ -67,6 +67,14 @@ public:
virtual virtual
void Reset(); void Reset();
/* clone the optimizer (with the data in it) */
virtual
XOptimizer * Clone(int devID);
/* copy data */
virtual
void Copy(XOptimizer * source, XOptimizer * target, int devID);
/* show settings */ /* show settings */
virtual virtual
void ShowSettings(); void ShowSettings();
......
...@@ -93,6 +93,52 @@ void Adam::Reset() ...@@ -93,6 +93,52 @@ void Adam::Reset()
adamBeta2T = 1.0F; adamBeta2T = 1.0F;
} }
/*
clone the optimizer (with the data in it)
>> devID - device where we place the data
*/
XOptimizer * Adam::Clone(int devID)
{
Adam * opt = new Adam();
Copy(this, opt, devID);
return (XOptimizer*)opt;
}
/* copy data */
void Adam::Copy(XOptimizer * source, XOptimizer * target, int devID)
{
XOptimizer::Copy(source, target, devID);
Adam * s = (Adam*)source;
Adam * t = (Adam*)target;
t->adamBeta1 = s->adamBeta1;
t->adamBeta2 = s->adamBeta2;
t->adamDelta = s->adamDelta;
t->adamBeta1T = s->adamBeta1T;
t->adamBeta2T = s->adamBeta2T;
t->moments.Clear();
for(int i = 0; i < s->moments.count; i++){
XTensor * st = s->moments[i];
XTensor * stNew = new XTensor();
InitTensorV2(stNew, st->order, st->dimSize, st->dataType, st->denseRatio, devID);
_CopyValues(st, stNew);
t->moments.Add(stNew);
}
t->moments2nd.Clear();
for(int i = 0; i < s->moments2nd.count; i++){
XTensor * st = s->moments2nd[i];
XTensor * stNew = new XTensor();
InitTensorV2(stNew, st->order, st->dimSize, st->dataType, st->denseRatio, devID);
_CopyValues(st, stNew);
t->moments2nd.Add(stNew);
}
}
/* show settings */ /* show settings */
void Adam::ShowSettings() void Adam::ShowSettings()
{ {
......
...@@ -67,6 +67,12 @@ public: ...@@ -67,6 +67,12 @@ public:
/* reset the optimizer (re-start) */ /* reset the optimizer (re-start) */
void Reset(); void Reset();
/* clone the optimizer (with the data in it) */
XOptimizer * Clone(int devID);
/* copy data */
void Copy(XOptimizer * source, XOptimizer * target, int devID);
/* show settings */ /* show settings */
void ShowSettings(); void ShowSettings();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论