Commit 726f5e21 by xiaotong

udpates

parent 70b8f94b
......@@ -117,6 +117,12 @@ void XLeader::SetServerModel(XConfig * config, XModel * model)
SetServerModel(config, model, &members);
}
/* get server model */
XModel * XLeader::GetServerModel()
{
return &serverModel;
}
/* initialize the models for running them */
void XLeader::InitForRun()
......@@ -356,12 +362,10 @@ void XLeader::AddJobParamterWorker(int n)
run the model (for one time). Basically this is a map-reduce process.
>> config - the configuration
>> dataDistributor - data distributor
>> model - the neural network that we want to run
>> optimizer - the optimization method
<< return - if we can fetch the new data
*/
bool XLeader::Run(XConfig * config, DataDistributeBase * dataDistributor,
XModel * model, XOptimizer * optimizer)
bool XLeader::Run(XConfig * config, DataDistributeBase * dataDistributor, XOptimizer * optimizer)
{
CheckNTErrors(jworkers.count > 0, "No jworkers!");
CheckNTErrors(cworkers.count > 0, "No cworkers!");
......@@ -376,30 +380,58 @@ bool XLeader::Run(XConfig * config, DataDistributeBase * dataDistributor,
InitForRun();
/* run models on job workers */
activeJobCount = RunModel(config, dataDistributor, active);
/* update the model on the server side */
if (activeJobCount > 0 && isToUpdate)
RunUpdate(config, optimizer, active);
WaitForFinishing(active, isToUpdate);
for (int i = 0; i < jworkers.count; i++) {
XWorkerJob * worker = (XWorkerJob*)jworkers[i];
worker->Clear();
}
delete[] active;
return activeJobCount > 0;
}
/*
run the model
>> config - the configuration
>> dataDistributor - to load batches of samples
>> active - flag for each job worker (1 = active, 0 = not active)
<< return - number of active job workers
*/
int XLeader::RunModel(XConfig * config, DataDistributeBase * dataDistributor, int * active)
{
int activeJobCount = 0;
for (int i = 0; i < jworkers.count; i++)
active[i] = 0;
/* Feed the input to each worker and geneate the output.
For each worker, we define a job queue and enqueue jobs
into it.
For each worker, we define a job queue and enqueue jobs
into it.
*/
for (int i = 0; i < jworkers.count; i++) {
XWorkerJob * worker = (XWorkerJob*)jworkers[i];
XModel * jmodel = worker->GetModel();
/* get a batch of samples */
bool fetched = dataDistributor->GetBatchSimple(worker->GetInput(), worker->GetGold());
bool fetched = dataDistributor->GetBatchSimple(worker->GetInput(), worker->GetGold());
if (!fetched)
isDataOK = false;
else {
if (fetched){
/* job in queue 1: refresh the model */
worker->AddJobRefresh(jmodel);
/* job in queue 1: run the model */
worker->AddJobNeuralNet(jmodel,
worker->GetInput(), worker->GetOutput(),
worker->GetGold(), worker->GetLoss());
worker->GetInput(), worker->GetOutput(),
worker->GetGold(), worker->GetLoss());
/* job in queue 1: make a record of the run */
worker->AddJobRecord(&serverRecord);
......@@ -412,55 +444,57 @@ bool XLeader::Run(XConfig * config, DataDistributeBase * dataDistributor,
}
}
if (activeJobCount > 0 && isToUpdate) {
/* workers */
XWorkerCollect * collecter = (XWorkerCollect*)cworkers.GetItem(0);
XWorkerUpdate * updater = (XWorkerUpdate*)uworkers.GetItem(0);
XWorkerBroadcast * broadcaster = (XWorkerBroadcast*)bworkers.GetItem(0);
return activeJobCount;
}
/* member models that are active in this run */
XList members(jworkers.count);
/*
update the model
>> config - the configuration
>> optimizer - the optimizer
>> active - flag for each job worker (1 = active, 0 = not active)
*/
void XLeader::RunUpdate(XConfig * config, XOptimizer * optimizer, const int * active)
{
/* workers */
XWorkerCollect * collecter = (XWorkerCollect*)cworkers.GetItem(0);
XWorkerUpdate * updater = (XWorkerUpdate*)uworkers.GetItem(0);
XWorkerBroadcast * broadcaster = (XWorkerBroadcast*)bworkers.GetItem(0);
/* all member models */
XList membersAll(jworkers.count);
/* member models that are active in this run */
XList members(jworkers.count);
/* job queues */
XList jobQueues;
/* all member models */
XList membersAll(jworkers.count);
for (int i = 0; i < jworkers.count; i++) {
XWorkerJob* worker = (XWorkerJob*)jworkers[i];
membersAll.Add(worker->GetModel());
if (active[i] == 1)
members.Add(worker->GetModel());
}
/* job queues */
XList jobQueues;
for (int i = 0; i < pworkers.count; i++) {
XWorker * worker = (XWorker*)pworkers[i];
jobQueues.Add(worker->GetJobQueue());
}
for (int i = 0; i < jworkers.count; i++) {
XWorkerJob* worker = (XWorkerJob*)jworkers[i];
membersAll.Add(worker->GetModel());
if (active[i] == 1)
members.Add(worker->GetModel());
}
/* jobs in queue 2: collect the (gradient) data and other stuff. This
is a reduce process. The collector will add a job in queue 3
to update the model. The updater will add a job job in queue 4 to
broadcast the lastest parameters to workers. NOTE that we would update
a worker to the laster model parameters, even if it is not involved
in this run. */
collecter->AddJobUpdateAll(&jobQueues,
&members, &membersAll, &serverModel,
optimizer, updater, broadcaster);
collecter->AddJobEnqueueFinished();
for (int i = 0; i < pworkers.count; i++) {
XWorker * worker = (XWorker*)pworkers[i];
jobQueues.Add(worker->GetJobQueue());
}
WaitForFinishing(active, isToUpdate);
if(1){
for (int i = 0; i < jworkers.count; i++) {
XWorkerJob * worker = (XWorkerJob*)jworkers[i];
worker->Clear();
}
delete[] active;
return isDataOK;
/* jobs in queue 2: collect the (gradient) data and other stuff. This
is a reduce process. The collector will add a job in queue 3
to update the model. The updater will add a job job in queue 4 to
broadcast the lastest parameters to workers. NOTE that we would update
a worker to the laster model parameters, even if it is not involved
in this run. */
collecter->AddJobUpdateAll(&jobQueues,
&members, &membersAll, &serverModel,
optimizer, updater, broadcaster);
collecter->AddJobEnqueueFinished();
}
} /* end of the nts (NiuTrans.Tensor) namespace */
......@@ -115,6 +115,9 @@ public:
/* set the server model */
void SetServerModel(XConfig * config, XModel * model);
/* get server model */
XModel * GetServerModel();
/* initialize the models for running them */
void InitForRun();
......@@ -158,9 +161,14 @@ public:
/* add a parameter worker (or a pipeline) */
void AddJobParamterWorker(int n);
/* run the model (for one time) */
bool Run(XConfig * config, DataDistributeBase * dataDistributor,
XModel * model, XOptimizer * optimizer);
/* run the model and update it (for one time) */
bool Run(XConfig * config, DataDistributeBase * dataDistributor, XOptimizer * optimizer);
/* run the model */
int RunModel(XConfig * config, DataDistributeBase * dataDistributor, int * active);
/* update the model */
void RunUpdate(XConfig * config, XOptimizer * optimizer, const int * active);
};
}
......
......@@ -144,7 +144,7 @@ void XTrainer::Run(XConfig * config, DataDistributeBase * dataDistributor,
optimizer->SetLearningRate(LRScheduler.MakeLRTransformer(lrate, step + 1, nwarmup));
/* one step of udpate */
ok = leader.Run(config, dataDistributor, model, optimizer);
ok = leader.Run(config, dataDistributor, optimizer);
float loss = leader.GetLoss() / leader.GetSampleNum();
......@@ -159,7 +159,7 @@ void XTrainer::Run(XConfig * config, DataDistributeBase * dataDistributor,
}
else {
/* one step with no udpate */
ok = leader.Run(config, dataDistributor, model, NULL);
ok = leader.Run(config, dataDistributor, NULL);
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论