Commit 2b03a447 by xiaotong

updates of XThead

parent dd7a67bc
...@@ -146,7 +146,7 @@ run a set of jobs in parallel ...@@ -146,7 +146,7 @@ run a set of jobs in parallel
>> jobArgs - the list of arguments for each job >> jobArgs - the list of arguments for each job
>> sleepTime - time to sleep (in ms) for each round >> sleepTime - time to sleep (in ms) for each round
*/ */
void XPRunner::Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepTime) void XPRunner::Run(XList * jobFunctions, XList * jobArgs, float sleepTime)
{ {
if(threadNum <= 0){ if(threadNum <= 0){
XPRINT(1, stderr, "Error! No threads were created!\n"); XPRINT(1, stderr, "Error! No threads were created!\n");
...@@ -195,13 +195,12 @@ void XPRunner::Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepT ...@@ -195,13 +195,12 @@ void XPRunner::Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepT
TFunction function = (TFunction)jobFunctions->GetItem(jobArgs->count - c); TFunction function = (TFunction)jobFunctions->GetItem(jobArgs->count - c);
/* the arguments that are passed to the function */ /* the arguments that are passed to the function */
volatile TensorList * args = (TensorList*)jobArgs->GetItem(jobArgs->count - c); XList * args = (XList*)jobArgs->GetItem(jobArgs->count - c);
/* thread */ /* thread */
XThread * thread = threads + availableThreads[i]; XThread * thread = threads + availableThreads[i];
thread->argv = args; thread->SetFunc(function, args);
thread->function = function;
MUTEX_LOCK(thread->workingMutex); MUTEX_LOCK(thread->workingMutex);
thread->working = 1; thread->working = 1;
......
...@@ -106,7 +106,7 @@ public: ...@@ -106,7 +106,7 @@ public:
void KillThreads(); void KillThreads();
/* run a set of jobs in parallel */ /* run a set of jobs in parallel */
void Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepTime = 0); void Run(XList * jobFunctions, XList * jobArgs, float sleepTime = 0);
/* get the number of parallel jobs to run */ /* get the number of parallel jobs to run */
int GetJobNum(int size); int GetJobNum(int size);
......
...@@ -42,7 +42,7 @@ job item used in queues ...@@ -42,7 +42,7 @@ job item used in queues
JobQueueNode::JobQueueNode() JobQueueNode::JobQueueNode()
{ {
job = NULL; job = NULL;
args = new TensorList(1); args = new XList(1);
} }
/* de-constructor */ /* de-constructor */
...@@ -67,7 +67,7 @@ XQueue::XQueue(int mySize) ...@@ -67,7 +67,7 @@ XQueue::XQueue(int mySize)
head = 0; head = 0;
tail = 0; tail = 0;
isJobQueue = false; isJobQueue = false;
jobDequeuerArgs = new TensorList(1); jobDequeuerArgs = new XList(1);
jobDequeuerBreak = false; jobDequeuerBreak = false;
runningJobCount = 0; runningJobCount = 0;
...@@ -171,11 +171,10 @@ void XQueue::RunJobConsumer(int jobDevID) ...@@ -171,11 +171,10 @@ void XQueue::RunJobConsumer(int jobDevID)
jobDequeuerArgs->Clear(); jobDequeuerArgs->Clear();
// warning: this may cause unknown error // warning: this may cause unknown error
jobDequeuerArgs->Add((XTensor*)this); jobDequeuerArgs->Add(this);
jobDequeuerArgs->Add(jobDevID >= 0 ? (XTensor*)(devids + jobDevID) : (XTensor*)&cpuid); jobDequeuerArgs->Add(jobDevID >= 0 ? (devids + jobDevID) : &cpuid);
jobDequeuer.function = (TFunction)DequeueJobs; jobDequeuer.SetFunc((TFunction)DequeueJobs, jobDequeuerArgs);
jobDequeuer.argv = jobDequeuerArgs;
jobDequeuer.Start(); jobDequeuer.Start();
jobDequeuer.LetItGo(); jobDequeuer.LetItGo();
...@@ -194,7 +193,7 @@ void XQueue::StopJobConsumer() ...@@ -194,7 +193,7 @@ void XQueue::StopJobConsumer()
} }
/* add a job item to process */ /* add a job item to process */
void XQueue::EnqueueJob(void * job, TensorList * jobArgs) void XQueue::EnqueueJob(void * job, XList * jobArgs)
{ {
MUTEX_LOCK(jobQueueMutex); MUTEX_LOCK(jobQueueMutex);
runningJobCount++; runningJobCount++;
...@@ -208,7 +207,7 @@ void XQueue::EnqueueJob(void * job, TensorList * jobArgs) ...@@ -208,7 +207,7 @@ void XQueue::EnqueueJob(void * job, TensorList * jobArgs)
} }
/* job item consumer */ /* job item consumer */
void XQueue::DequeueJobs(TensorList * args) void XQueue::DequeueJobs(XList * args)
{ {
CheckNTErrors((args->count == 2), "Illegal arguments!"); CheckNTErrors((args->count == 2), "Illegal arguments!");
......
...@@ -51,7 +51,7 @@ public: ...@@ -51,7 +51,7 @@ public:
void * job; void * job;
/* arguments of the job */ /* arguments of the job */
TensorList * args; XList * args;
public: public:
/* constructor */ /* constructor */
...@@ -101,7 +101,7 @@ private: ...@@ -101,7 +101,7 @@ private:
XThread jobDequeuer; XThread jobDequeuer;
/* argument list of jobDequeuer */ /* argument list of jobDequeuer */
TensorList * jobDequeuerArgs; XList * jobDequeuerArgs;
/* indicates whether jobDequeuer stops */ /* indicates whether jobDequeuer stops */
bool jobDequeuerBreak; bool jobDequeuerBreak;
...@@ -135,11 +135,11 @@ public: ...@@ -135,11 +135,11 @@ public:
void StopJobConsumer(); void StopJobConsumer();
/* add a job item to process */ /* add a job item to process */
void EnqueueJob(void * job, TensorList * jobArgs); void EnqueueJob(void * job, XList * jobArgs);
/* job item consumer */ /* job item consumer */
static static
void DequeueJobs(TensorList * args); void DequeueJobs(XList * args);
/* get the break flag */ /* get the break flag */
bool GetJobBreak(); bool GetJobBreak();
......
...@@ -38,7 +38,7 @@ XThread::XThread() ...@@ -38,7 +38,7 @@ XThread::XThread()
#endif #endif
MUTEX_INIT(gMutex); MUTEX_INIT(gMutex);
function = NULL; function = NULL;
argv = NULL; argv.Clear();
toBreak = false; toBreak = false;
jobCount = 0; jobCount = 0;
working = 0; working = 0;
...@@ -69,6 +69,18 @@ void * XThread::Wrapper(void * ptr) ...@@ -69,6 +69,18 @@ void * XThread::Wrapper(void * ptr)
return 0; return 0;
} }
/*
initialize the thread with the function and its parameters
>> myFunc - the function to run
>> myArgv - arguments of the function
*/
void XThread::SetFunc(TFunction myFunc, XList * myArgv)
{
function = myFunc;
argv.Clear();
argv.AddList(myArgv);
}
/* /*
Tunning for this thread. It is very very native implementation. Tunning for this thread. It is very very native implementation.
...@@ -77,6 +89,10 @@ After that, we wait again if there is no new job. ...@@ -77,6 +89,10 @@ After that, we wait again if there is no new job.
*/ */
void XThread::Run() void XThread::Run()
{ {
if (function == NULL) {
ShowNTErrors("You are running a thread with no function specified!");
}
#ifdef _WIN32 #ifdef _WIN32
//COND_RESET(gCond); //COND_RESET(gCond);
#endif #endif
...@@ -104,7 +120,7 @@ void XThread::Run() ...@@ -104,7 +120,7 @@ void XThread::Run()
} }
/* do what you want to do*/ /* do what you want to do*/
function(argv); function(&argv);
#ifdef USE_PTHREAD #ifdef USE_PTHREAD
jobCount--; jobCount--;
......
...@@ -85,7 +85,7 @@ namespace nts{ ...@@ -85,7 +85,7 @@ namespace nts{
#endif #endif
typedef void (*TFunction) (volatile TensorList*); typedef void (*TFunction) (volatile XList*);
/* /*
This is a class that wraps the standard implementation of threading This is a class that wraps the standard implementation of threading
...@@ -128,12 +128,10 @@ public: ...@@ -128,12 +128,10 @@ public:
public: public:
/* function to run */ /* function to run */
volatile
TFunction function; TFunction function;
/* arguments (for the function to run) */ /* arguments (for the function to run) */
volatile XList argv;
TensorList * argv;
/* a flag to break */ /* a flag to break */
volatile volatile
...@@ -154,6 +152,9 @@ public: ...@@ -154,6 +152,9 @@ public:
/* a wrapper for the start-routine parameter in pthread_create */ /* a wrapper for the start-routine parameter in pthread_create */
static void * Wrapper(void * ptr); static void * Wrapper(void * ptr);
/* initialize the thread with the function and its parameters */
void SetFunc(TFunction myFunc, XList * myArgv);
/* /*
Core of the thread. It is very very native impelementation. Core of the thread. It is very very native impelementation.
We loop and wait for a singnal to activate the job processing. We loop and wait for a singnal to activate the job processing.
......
...@@ -51,7 +51,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -51,7 +51,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
CheckNTErrors(jobNum != 0, "TODO!"); CheckNTErrors(jobNum != 0, "TODO!");
/* argument list of the jobs */ /* argument list of the jobs */
TensorList * jobArgList = new TensorList(argNum); XList * jobArgList = new XList(argNum);
va_list ap; va_list ap;
va_start(ap, argNum); va_start(ap, argNum);
...@@ -62,8 +62,8 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -62,8 +62,8 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
va_end(ap); va_end(ap);
/* prepare the neccesary argument list for parallel processing */ /* prepare the neccesary argument list for parallel processing */
TensorList * jobs = new TensorList(jobNum); XList * jobs = new XList(jobNum);
TensorList * args = new TensorList(jobNum); XList * args = new XList(jobNum);
int * indexList = new int[jobNum * 4 * 4]; int * indexList = new int[jobNum * 4 * 4];
...@@ -78,7 +78,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -78,7 +78,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
*/ */
for (int i = 0; i < jobNum; i++) { for (int i = 0; i < jobNum; i++) {
IntList* indexArgs = new IntList(4); IntList* indexArgs = new IntList(4);
TensorList * blockArgs = new TensorList(argNum); XList * blockArgs = new XList(argNum);
int * blockIndex = indexList + i * 4; int * blockIndex = indexList + i * 4;
indexArgs->Add(blockIndex[0]); indexArgs->Add(blockIndex[0]);
...@@ -89,10 +89,10 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -89,10 +89,10 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
for (int j = 0; j < argNum; j++) for (int j = 0; j < argNum; j++)
blockArgs->Add(jobArgList->GetItem(j)); blockArgs->Add(jobArgList->GetItem(j));
args->Add((XTensor*)indexArgs); args->Add((void*)indexArgs);
args->Add((XTensor*)blockArgs); args->Add((void*)blockArgs);
jobs->Add((XTensor*)job); jobs->Add((void*)job);
} }
args->count = jobNum * 2; args->count = jobNum * 2;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论