Commit 70b8f94b by xiaotong

new pipelines for updater and broadcaster

parent 959545df
...@@ -142,28 +142,38 @@ void XWorker::DequeueFinished(XList* args) ...@@ -142,28 +142,38 @@ void XWorker::DequeueFinished(XList* args)
worker->DequeueFinishedJob(); worker->DequeueFinishedJob();
} }
/* add a job of enqueuing a counting a finished job */ /*
void XWorker::AddJobEnqueueFinished() add a job of enqueuing a counting a finished job
>> jobQueue - the queue where we push the job
*/
void XWorker::AddJobEnqueueFinished(XQueue* jobQueue)
{ {
XList args; XList args;
args.Add(this); args.Add(this);
XQueue& queueRun = jobQueue != NULL ? *jobQueue : queue;
if (isInstantRun) if (isInstantRun)
XWorker::EnqueueFinished(&args); XWorker::EnqueueFinished(&args);
else else
queue.EnqueueJob((void*)(char*)XWorker::EnqueueFinished, &args); queueRun.EnqueueJob((void*)(char*)XWorker::EnqueueFinished, &args);
} }
/* add a job of dequeuing a counting a finished job */ /*
void XWorker::AddJobDequeueFinished() add a job of dequeuing a counting a finished job
>> jobQueue - the queue where we push the job
*/
void XWorker::AddJobDequeueFinished(XQueue* jobQueue)
{ {
XList args; XList args;
args.Add(this); args.Add(this);
XQueue& queueRun = jobQueue != NULL ? *jobQueue : queue;
if (isInstantRun) if (isInstantRun)
XWorker::DequeueFinished(&args); XWorker::DequeueFinished(&args);
else else
queue.EnqueueJob((void*)(char*)XWorker::DequeueFinished, &args); queueRun.EnqueueJob((void*)(char*)XWorker::DequeueFinished, &args);
} }
......
...@@ -122,10 +122,10 @@ public: ...@@ -122,10 +122,10 @@ public:
void DequeueFinished(XList* args); void DequeueFinished(XList* args);
/* add a job of enqueuing a counting a finished job */ /* add a job of enqueuing a counting a finished job */
void AddJobEnqueueFinished(); void AddJobEnqueueFinished(XQueue * jobQueue = NULL);
/* add a job of dequeuing a counting a finished job */ /* add a job of dequeuing a counting a finished job */
void AddJobDequeueFinished(); void AddJobDequeueFinished(XQueue* jobQueue = NULL);
}; };
} }
......
...@@ -51,14 +51,11 @@ void XWorkerBroadcast::SetBroadcastMode(DATA_BROADCAST_TYPE myMode) ...@@ -51,14 +51,11 @@ void XWorkerBroadcast::SetBroadcastMode(DATA_BROADCAST_TYPE myMode)
/* /*
broadcast data for a parameter broadcast data for a parameter
>> jobQueue - the queue where we push jobs
>> source - the data (as a model) that we want to broadcast >> source - the data (as a model) that we want to broadcast
>> targetList - the target places that we recieve the data >> targetList - the target places that we recieve the data
>> pid - the parameter index >> pid - the parameter index
*/ */
void XWorkerBroadcast::BroadcastDataSingle(XQueue * jobQueue, void XWorkerBroadcast::BroadcastDataSingle(XModel * source, XList * targetList, int pid)
XModel * source, XList * targetList,
int pid)
{ {
CheckNTErrors(source->params[pid].flag == PARAM_STATE_UPDATED, CheckNTErrors(source->params[pid].flag == PARAM_STATE_UPDATED,
"The parameter is not ready for broadcasting"); "The parameter is not ready for broadcasting");
...@@ -83,7 +80,6 @@ void XWorkerBroadcast::BroadcastSingle(XList * args) ...@@ -83,7 +80,6 @@ void XWorkerBroadcast::BroadcastSingle(XList * args)
int paramCount = 0; int paramCount = 0;
XWorkerBroadcast * broadcaster = (XWorkerBroadcast*)args->GetItem(paramCount++); XWorkerBroadcast * broadcaster = (XWorkerBroadcast*)args->GetItem(paramCount++);
XQueue * jobQueue = (XQueue*)args->GetItem(paramCount++);
XModel * source = (XModel*)args->GetItem(paramCount++); XModel * source = (XModel*)args->GetItem(paramCount++);
/* target models */ /* target models */
...@@ -97,7 +93,7 @@ void XWorkerBroadcast::BroadcastSingle(XList * args) ...@@ -97,7 +93,7 @@ void XWorkerBroadcast::BroadcastSingle(XList * args)
/* parameter index */ /* parameter index */
int p = args->GetInt(paramCount++); int p = args->GetInt(paramCount++);
broadcaster->BroadcastDataSingle(jobQueue, source, &target, p); broadcaster->BroadcastDataSingle(source, &target, p);
} }
/* /*
...@@ -117,7 +113,7 @@ void XWorkerBroadcast::BroadcastP2P(XTensor * source, XTensor * target) ...@@ -117,7 +113,7 @@ void XWorkerBroadcast::BroadcastP2P(XTensor * source, XTensor * target)
/* /*
add a new job of broadcasting data (for a parameter) add a new job of broadcasting data (for a parameter)
>> jobQueue - the queue that we push jobs here >> jobQueue - the queue where we push jobs
>> source - the data that we want to broadcast >> source - the data that we want to broadcast
>> targetList - the target places that we recieve the data >> targetList - the target places that we recieve the data
>> pid - the parameter index >> pid - the parameter index
...@@ -130,16 +126,17 @@ bool XWorkerBroadcast::AddJobBroadcastSingle(XQueue * jobQueue, XModel * source, ...@@ -130,16 +126,17 @@ bool XWorkerBroadcast::AddJobBroadcastSingle(XQueue * jobQueue, XModel * source,
XList args; XList args;
args.Add(this); args.Add(this);
args.Add(jobQueue);
args.Add(source); args.Add(source);
args.AddInt(targetList->count); args.AddInt(targetList->count);
args.AddList(targetList); args.AddList(targetList);
args.AddInt(pid); args.AddInt(pid);
XQueue& queueRun = jobQueue != NULL ? *jobQueue : queue;
if (isInstantRun) if (isInstantRun)
XWorkerBroadcast::BroadcastSingle(&args); XWorkerBroadcast::BroadcastSingle(&args);
else else
queue.EnqueueJob((void*)(char*)XWorkerBroadcast::BroadcastSingle, &args); queueRun.EnqueueJob((void*)(char*)XWorkerBroadcast::BroadcastSingle, &args);
return true; return true;
} }
......
...@@ -61,7 +61,7 @@ public: ...@@ -61,7 +61,7 @@ public:
void SetBroadcastMode(DATA_BROADCAST_TYPE myMode); void SetBroadcastMode(DATA_BROADCAST_TYPE myMode);
/* broadcast data for a parameter */ /* broadcast data for a parameter */
void BroadcastDataSingle(XQueue * jobQueue, XModel * source, XList * targetList, int pid); void BroadcastDataSingle(XModel * source, XList * targetList, int pid);
/* wrapper of BroadcastDataSingle */ /* wrapper of BroadcastDataSingle */
static static
......
...@@ -129,10 +129,15 @@ void XWorkerCollect::UpdateDataAll(XList * jobQueues, XList * memberActive, XLis ...@@ -129,10 +129,15 @@ void XWorkerCollect::UpdateDataAll(XList * jobQueues, XList * memberActive, XLis
if (finishedCount[j] == memberActive->count) { if (finishedCount[j] == memberActive->count) {
paramServer.flag = PARAM_STATE_COLLECTED; paramServer.flag = PARAM_STATE_COLLECTED;
if (updater != NULL) { if (updater != NULL) {
updater->AddJobUpdate((XQueue*)jobQueues->GetItem(j), XQueue* jobQueue = (XQueue*)jobQueues->GetItem(j);
server, memberAll, j,
optimizer, broadcaster); /* update the parameters */
updater->AddJobEnqueueFinished(); updater->AddJobUpdate(jobQueue, server, j, optimizer);
updater->AddJobEnqueueFinished(jobQueue);
/* broadcast the new parameter to other models*/
broadcaster->AddJobBroadcastSingle(jobQueue, server, memberAll, j);
broadcaster->AddJobEnqueueFinished(jobQueue);
} }
} }
else if (finishedCount[j] > memberActive->count) { else if (finishedCount[j] > memberActive->count) {
......
...@@ -54,15 +54,12 @@ XOptimizer * XWorkerUpdate::GetOptimizer() ...@@ -54,15 +54,12 @@ XOptimizer * XWorkerUpdate::GetOptimizer()
/* /*
update a parameter of a model update a parameter of a model
>> jobQueue - the queue that place the jobs called here
>> model - the model that we want to update (on the server side) >> model - the model that we want to update (on the server side)
>> members - models that would share the updated parameters
>> pid - the parameter index >> pid - the parameter index
>> optimizer - the optimizer >> optimizer - the optimizer
>> broadcaster - the worker that would broadcast the new parameter to members
*/ */
void XWorkerUpdate::UpdateParameter(XQueue * jobQueue, XModel * server, XList * members, int pid, void XWorkerUpdate::UpdateParameter(XModel * server, int pid,
XOptimizer * optimizer, XWorkerBroadcast * broadcaster) XOptimizer * optimizer)
{ {
CheckNTErrors(server->params[pid].flag == PARAM_STATE_COLLECTED, "The state of the parameter is wrong!"); CheckNTErrors(server->params[pid].flag == PARAM_STATE_COLLECTED, "The state of the parameter is wrong!");
...@@ -77,10 +74,6 @@ void XWorkerUpdate::UpdateParameter(XQueue * jobQueue, XModel * server, XList * ...@@ -77,10 +74,6 @@ void XWorkerUpdate::UpdateParameter(XQueue * jobQueue, XModel * server, XList *
/* set the flag */ /* set the flag */
server->params[pid].flag = PARAM_STATE_UPDATED; server->params[pid].flag = PARAM_STATE_UPDATED;
/* broadcast the new parameter to other models (in anotehr worker/thread) */
broadcaster->AddJobBroadcastSingle(jobQueue, server, members, pid);
broadcaster->AddJobEnqueueFinished();
} }
/* /*
...@@ -91,60 +84,45 @@ void XWorkerUpdate::Update(XList * args) ...@@ -91,60 +84,45 @@ void XWorkerUpdate::Update(XList * args)
{ {
int paramCount = 0; int paramCount = 0;
CheckNTErrors(args != NULL && args->count >= 6, "Illegal argument list!"); CheckNTErrors(args != NULL && args->count >= 4, "Illegal argument list!");
XWorkerUpdate * updater = (XWorkerUpdate*)args->GetItem(paramCount++); XWorkerUpdate * updater = (XWorkerUpdate*)args->GetItem(paramCount++);
XQueue * jobQueue = (XQueue*)args->GetItem(paramCount++);
XModel * server = (XModel*)args->GetItem(paramCount++); XModel * server = (XModel*)args->GetItem(paramCount++);
int memNum = args->GetInt(paramCount++);
XList members;
for (int i = 0; i < memNum; i++) {
XModel * member = (XModel*)args->GetItem(paramCount++);
members.Add(member);
}
int pid = args->GetInt(paramCount++); int pid = args->GetInt(paramCount++);
XOptimizer * optimizer = (XOptimizer*)args->GetItem(paramCount++); XOptimizer * optimizer = (XOptimizer*)args->GetItem(paramCount++);
XWorkerBroadcast * broadcaster = (XWorkerBroadcast*)args->GetItem(paramCount++);
if(updater != NULL) if(updater != NULL)
updater->UpdateParameter(jobQueue, server, &members, pid, optimizer, broadcaster); updater->UpdateParameter(server, pid, optimizer);
} }
/* /*
add a new job of model update (for a parameter) add a new job of model update (for a parameter)
>> jobQueue - the queue for sub-jobs executed in the job >> jobQueue - the queue for sub-jobs executed in the job
>> model - the model that we want to update (on the server side) >> model - the model that we want to update (on the server side)
>> members - models that would share the updated parameters
>> pid - the parameter index >> pid - the parameter index
>> optimizer - the optimizer >> optimizer - the optimizer
>> broadcaster - the worker that would broadcast the new parameter to members
*/ */
bool XWorkerUpdate::AddJobUpdate(XQueue * jobQueue, bool XWorkerUpdate::AddJobUpdate(XQueue * jobQueue,
XModel * model, XList * members, int pid, XModel * model, int pid,
XOptimizer * optimizer, XWorkerBroadcast * broadcaster) XOptimizer * optimizer)
{ {
CheckNTErrors(model != NULL, "No input model!"); CheckNTErrors(model != NULL, "No input model!");
CheckNTErrors(members != NULL, "No member model list!");
CheckNTErrors(optimizer != NULL, "No optimizer!"); CheckNTErrors(optimizer != NULL, "No optimizer!");
CheckNTErrors(broadcaster != NULL, "No broadcaster!");
CheckNTErrors(pid >= 0 && pid < model->paramNum, "Illegal parameter index!"); CheckNTErrors(pid >= 0 && pid < model->paramNum, "Illegal parameter index!");
XList args; XList args;
args.Add(this); args.Add(this);
args.Add(jobQueue);
args.Add(model); args.Add(model);
args.AddInt(members->count);
args.AddList(members);
args.AddInt(pid); args.AddInt(pid);
args.Add(optimizer); args.Add(optimizer);
args.Add(broadcaster);
XQueue& queueRun = jobQueue != NULL ? *jobQueue : queue;
if (isInstantRun) if (isInstantRun)
XWorkerUpdate::Update(&args); XWorkerUpdate::Update(&args);
else else
queue.EnqueueJob((void*)(char*)XWorkerUpdate::Update, &args); queueRun.EnqueueJob((void*)(char*)XWorkerUpdate::Update, &args);
return true; return true;
} }
......
...@@ -57,8 +57,7 @@ public: ...@@ -57,8 +57,7 @@ public:
XOptimizer * GetOptimizer(); XOptimizer * GetOptimizer();
/* update the parameter */ /* update the parameter */
void UpdateParameter(XQueue * jobQueue, XModel * server, XList * members, int pid, void UpdateParameter(XModel * server, int pid, XOptimizer * optimizer);
XOptimizer * optimizer, XWorkerBroadcast * broadcaster);
/* wrapper of UpdateParameter */ /* wrapper of UpdateParameter */
...@@ -67,8 +66,7 @@ public: ...@@ -67,8 +66,7 @@ public:
/* add a new job of model update (for a parameter) */ /* add a new job of model update (for a parameter) */
bool AddJobUpdate(XQueue * jobQueue, XModel * model, XList * members, int pid, bool AddJobUpdate(XQueue * jobQueue, XModel * model, int pid, XOptimizer * optimizer);
XOptimizer * optimizer, XWorkerBroadcast * broadcaster);
}; };
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论