Commit 51e02fd6 by xiaotong

rename pworker as aworker

parent c0bcb78b
......@@ -69,9 +69,9 @@ void XLeader::Init()
delete (XWorkerBroadcast*)bworkers.GetItem(i);
bworkers.Clear();
for(int i = 0; i < pworkers.count; i++)
delete (XWorker*)pworkers.GetItem(i);
pworkers.Clear();
for(int i = 0; i < aworkers.count; i++)
delete (XWorker*)aworkers.GetItem(i);
aworkers.Clear();
serverRecord.Clear();
}
......@@ -142,7 +142,7 @@ void XLeader::InitForRun()
workers.AddList(&cworkers);
workers.AddList(&uworkers);
workers.AddList(&bworkers);
workers.AddList(&pworkers);
workers.AddList(&aworkers);
for (int i = 0; i < workers.count; i++) {
XWorker* worker = (XWorker*)workers[i];
......@@ -234,8 +234,8 @@ void XLeader::SetInstantRun(bool flag)
worker->SetInstantRun(flag);
}
for (int i = 0; i < pworkers.count; i++) {
XWorker * worker = (XWorker*)pworkers.GetItem(i);
for (int i = 0; i < aworkers.count; i++) {
XWorker * worker = (XWorker*)aworkers.GetItem(i);
worker->SetInstantRun(flag);
}
}
......@@ -266,8 +266,8 @@ void XLeader::Start()
worker->Start();
}
for (int i = 0; i < pworkers.count; i++) {
XWorker * worker = (XWorker*)pworkers.GetItem(i);
for (int i = 0; i < aworkers.count; i++) {
XWorker * worker = (XWorker*)aworkers.GetItem(i);
worker->Start();
}
}
......@@ -331,7 +331,7 @@ void XLeader::AddParamterWorker(int n)
{
for (int i = 0; i < n; i++) {
XWorker * worker = new XWorker();
pworkers.Add(worker);
aworkers.Add(worker);
}
}
......@@ -378,7 +378,7 @@ void XLeader::MakeParamMap()
paramMap[i][c].tensor = model->params[i].tensor;
paramMap[i][c].grad = model->params[i].tensor->grad;
paramMap[i][c].flag = PARAM_STATE_NOT_READY;
paramMap[i][c].trainFlag = PARAM_STATE_NOT_READY;;
paramMap[i][c].trainFlag = PARAM_STATE_NOT_READY;
c++;
}
else {
......
......@@ -89,11 +89,10 @@ protected:
/* data-broadcasting workers */
XList bworkers;
/* parameter workers (each for a paramter). cworkers,
uworkers, and bworkers would push their jobs into
parameter workers. So they are actually pipelines
of jobs. */
XList pworkers;
/* auxiliary workers. These workers are pipelines of
updating parameters (one for each parameter, or one
for each job worker) */
XList aworkers;
/* map of parameters. (x,y) indexes parameter x of
worker y. Note that a worker keeps a copy of the
......
......@@ -59,7 +59,6 @@ void XLeaderAllReduce::MakeAll(XConfig * config, XModel * model, const int * dev
AddCollectWorker();
for(int i = 0; i < jobWorkerNum; i++)
AddUpdateWorker(model);
AddParamterWorker(model->paramNum);
XLeader::MakeAll(config, model);
}
......@@ -111,7 +110,7 @@ bool XLeaderAllReduce::Run(XConfig* config, DataDistributeBase* dataDistributor,
CheckNTErrors(cworkers.count > 0, "No cworkers!");
CheckNTErrors(uworkers.count > 0, "No uworkers!");
CheckNTErrors(bworkers.count > 0, "No bworkers!");
CheckNTErrors(pworkers.count > 0, "No pworkers!");
CheckNTErrors(aworkers.count > 0, "No pworkers!");
bool isToUpdate = (optimizer != NULL);
int activeJobCount = 0;
......@@ -213,8 +212,8 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
membersAll.Add(worker->GetModel());
}
for (int i = 0; i < pworkers.count; i++) {
XWorker* worker = (XWorker*)pworkers[i];
for (int i = 0; i < aworkers.count; i++) {
XWorker* worker = (XWorker*)aworkers[i];
jobQueues.Add(worker->GetJobQueue());
}
......@@ -333,4 +332,4 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
delete[] paramList;
}
}
\ No newline at end of file
}
......@@ -116,7 +116,7 @@ bool XLeaderPS::Run(XConfig* config, DataDistributeBase* dataDistributor, XOptim
CheckNTErrors(cworkers.count > 0, "No cworkers!");
CheckNTErrors(uworkers.count > 0, "No uworkers!");
CheckNTErrors(bworkers.count > 0, "No bworkers!");
CheckNTErrors(pworkers.count > 0, "No pworkers!");
CheckNTErrors(aworkers.count > 0, "No pworkers!");
bool isToUpdate = (optimizer != NULL);
int activeJobCount = 0;
......@@ -218,8 +218,8 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
membersAll.Add(worker->GetModel());
}
for (int i = 0; i < pworkers.count; i++) {
XWorker* worker = (XWorker*)pworkers[i];
for (int i = 0; i < aworkers.count; i++) {
XWorker* worker = (XWorker*)aworkers[i];
jobQueues.Add(worker->GetJobQueue());
}
......@@ -338,4 +338,4 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
delete[] paramList;
}
}
\ No newline at end of file
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论