Commit ecc8c6ed by xiaotong

define the functions of all-reduce collection

parent 6a0f0557
......@@ -290,15 +290,15 @@ void XLeaderAllReduce::RunUpdate(XConfig* config, XOptimizer* optimizer, const i
if (finishedCount[j] == activeModelCount) {
paramServer.flag = PARAM_STATE_COLLECTED;
/* call the all-reduce method to collect the gradient and share
/* run the all-reduce procedure to collect the gradient and share
the gradient sum across models */
/* update on every model. NOTE THAT we do not worry about the
inconsistence issue of updated parameters across models because
the all-reduce method can garantee that the model shared the same
copy of the gradient. */
the all-reduce method can guarantee that all the models share
the same copy of the gradient. */
for (int k = 0; k < modelNum; k++) {
XWorkerUpdate* updater = (XWorkerUpdate*)uworkers[k];
XWorkerUpdate * updater = (XWorkerUpdate*)uworkers[k];
updater->AddJobUpdate(NULL, &paramServer, optimizer);
updater->AddJobEnqueueFinished();
}
......
......@@ -298,7 +298,7 @@ void XLeaderPS::RunUpdate(XConfig* config, XOptimizer* optimizer, const int* act
XQueue* jobQueue = (XQueue*)jobQueues.GetItem(j);
/* data transmit */
collecter->AddJobCollectDataP2P(jobQueue, paramWorker.grad, paramServer.grad);
collecter->AddJobCollectGradP2P(jobQueue, &paramWorker, &paramServer);
collecter->AddJobEnqueueFinished(jobQueue);
/* We keep the worker parameter in a list. It would be used when we broadcast
......
......@@ -76,35 +76,24 @@ void XWorkerCollect::CollectP2P(XTensor * source, XTensor * target)
delete sourceOnSite;
}
}
/*
sum-reduce for given tensors
target += source_0
target += source_1
...
target += source_n
/*
P2P data collection
target += source
>> source - the source tensor
>> target - the target tensor
>> isGrad - indicates whether we want to collect gradient
*/
void XWorkerCollect::CollectReduceSum(XList * source, XTensor * target)
void XWorkerCollect::CollectP2P(XTensorKeeper * source, XTensorKeeper * target, const bool isGrad)
{
for (int i = 0; i < source->count; i++) {
XTensor * s = (XTensor*)source->GetItem(i);
CollectP2P(s, target);
}
}
CheckNTErrors(source != NULL, "The source tensor keeper should not be NULL!");
CheckNTErrors(target != NULL, "The target tensor keeper should not be NULL!");
/*
all-reduce: the well-known all-reduce method
every tensor is involved in every data transmition. The final outcome
is that all input tensors share the same value (i.e., the sum of them).
>> all - the tensors for sum
*/
void XWorkerCollect::CollectAllReduce(XList * all)
{
ShowNTErrors("TODO!");
if(isGrad)
CollectP2P(source->grad, target->grad);
else
CollectP2P(source->tensor, target->tensor);
}
/* wrapper of Collect */
......@@ -113,11 +102,12 @@ void XWorkerCollect::CollectDataP2P(XList * args)
int paramCount = 0;
XWorkerCollect * collecter = (XWorkerCollect*)args->GetItem(paramCount++);
XTensor * source = (XTensor*)args->GetItem(paramCount++);
XTensor * target = (XTensor*)args->GetItem(paramCount++);
XTensorKeeper * source = (XTensorKeeper*)args->GetItem(paramCount++);
XTensorKeeper * target = (XTensorKeeper*)args->GetItem(paramCount++);
bool isGrad = (bool)args->GetInt(paramCount++);
if(collecter != NULL)
collecter->CollectP2P(source, target);
collecter->CollectP2P(source, target, isGrad);
}
/*
......@@ -125,8 +115,9 @@ add a new job of collecting data
>> jobQueue - the queue where we run the job
>> source - where we collect the data from
>> target - where we place the data (on the server end)
>> isGrad - indicates whether we want to collect gradient
*/
bool XWorkerCollect::AddJobCollectDataP2P(XQueue * jobQueue, XTensor * source, XTensor * target)
bool XWorkerCollect::AddJobCollectDataP2P(XQueue * jobQueue, XTensorKeeper * source, XTensorKeeper * target, const bool isGrad)
{
CheckNTErrors(source != NULL, "No input soure tensor!");
CheckNTErrors(target != NULL, "No input target tensor!");
......@@ -135,6 +126,7 @@ bool XWorkerCollect::AddJobCollectDataP2P(XQueue * jobQueue, XTensor * source, X
args.Add(this);
args.Add(source);
args.Add(target);
args.AddInt((int)isGrad);
XQueue& queueRun = jobQueue != NULL ? *jobQueue : queue;
......@@ -146,4 +138,89 @@ bool XWorkerCollect::AddJobCollectDataP2P(XQueue * jobQueue, XTensor * source, X
return true;
}
/*
add a new job of collecting gradient
>> jobQueue - the queue where we run the job
>> source - where we collect the data from
>> target - where we place the data (on the server end)
*/
bool XWorkerCollect::AddJobCollectGradP2P(XQueue * jobQueue, XTensorKeeper * source, XTensorKeeper * target)
{
return AddJobCollectDataP2P(jobQueue, source, target, true);
}
/*
add a new job of collecting data in standard tensors
>> jobQueue - the queue where we run the job
>> source - where we collect the data from
>> target - where we place the data (on the server end)
*/
bool XWorkerCollect::AddJobCollectTensorP2P(XQueue * jobQueue, XTensorKeeper * source, XTensorKeeper * target)
{
return AddJobCollectDataP2P(jobQueue, source, target, false);
}
/*
all-reduce: the well-known all-reduce method
every tensor is involved in every data transmition. The final outcome
is that all input tensors share the same value (i.e., the sum of them).
>> all - the tensors for sum
>> isGrad - indicates whether we collect gradient
*/
void XWorkerCollect::CollectAllReduce(XList * all, const bool isGrad)
{
ShowNTErrors("TODO!");
}
/* wrapper of CollectAllReduce via all-reduce */
void XWorkerCollect::CollectDataAllReduce(XList * args)
{
int paramCount = 0;
XWorkerCollect * collecter = (XWorkerCollect*)args->GetItem(paramCount++);
XList * all = (XList*)args->GetItem(paramCount++);
bool isGrad = (bool)args->GetInt(paramCount++);
if(collecter != NULL)
collecter->CollectAllReduce(all, isGrad);
}
/*
add a new job of collecting data via all-reduce
>> jobQueue - the queue where we run the job
>> all - the tensors for sum
>> isGrad - indicates whether we collect gradient
*/
bool XWorkerCollect::AddJobCollectDataAllReduce(XQueue * jobQueue, XList * all, const bool isGrad)
{
CheckNTErrors(all != NULL, "No input tensor keeper list!");
XList args;
args.Add(this);
args.Add(all);
args.AddInt((int)isGrad);
XQueue& queueRun = jobQueue != NULL ? *jobQueue : queue;
if (isInstantRun)
XWorkerCollect::CollectDataAllReduce(&args);
else
queueRun.EnqueueJob((void*)(char*)XWorkerCollect::CollectDataAllReduce, &args);
return true;
}
/* add a new job of collecting gradient via all-reduce */
bool XWorkerCollect::AddJobCollectGradAllReduce(XQueue * jobQueue, XList * all)
{
return AddJobCollectDataAllReduce(jobQueue, all, true);
}
/* add a new job of collecting data in standard tensors via all-reduce */
bool XWorkerCollect::AddJobCollectTensorAllReduce(XQueue * jobQueue, XList * all)
{
return AddJobCollectDataAllReduce(jobQueue, all, false);
}
}
......@@ -67,21 +67,38 @@ public:
/* P2P data collection */
void CollectP2P(XTensor * source, XTensor * target);
/* sum-reduce for given tensors */
void CollectReduceSum(XList * source, XTensor * target);
/* all-reduce */
void CollectAllReduce(XList * all);
/* wrapper of Collect */
/* P2P data collection */
void CollectP2P(XTensorKeeper * source, XTensorKeeper * target, const bool isGrad);
/* wrapper of CollectP2P */
static
void CollectDataP2P(XList * args);
/* add a new job of collecting data */
bool AddJobCollectDataP2P(XQueue * jobQueue, XTensor * source, XTensor * target);
/* add a new job of collecting data via p2p data transmission */
bool AddJobCollectDataP2P(XQueue * jobQueue, XTensorKeeper * source, XTensorKeeper * target, const bool isGrad);
/* add a new job of collecting gradient via p2p data transmission */
bool AddJobCollectGradP2P(XQueue * jobQueue, XTensorKeeper * source, XTensorKeeper * target);
/* add a new job of collecting data in standard tensors via p2p data transmission */
bool AddJobCollectTensorP2P(XQueue * jobQueue, XTensorKeeper * source, XTensorKeeper * target);
/* all-reduce data collection via all-reduce */
void CollectAllReduce(XList * all, const bool isGrad);
/* wrapper of CollectAllReduce via all-reduce */
static
void CollectDataAllReduce(XList * args);
/* add a new job of collecting data via all-reduce */
bool AddJobCollectDataAllReduce(XQueue * jobQueue, XList * all, const bool isGrad);
/* add a new job of collecting gradient via all-reduce */
bool AddJobCollectGradAllReduce(XQueue * jobQueue, XList * all);
/* add a new job of collecting data in standard tensors via all-reduce */
bool AddJobCollectTensorAllReduce(XQueue * jobQueue, XList * all);
};
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论