/* NiuTrans.Tensor - an open-source tensor library * Copyright (C) 2017, Natural Language Processing Lab, Northestern University. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * * a naive implementation of thread pool (actually it is a pool) * * $Created by: XIAO Tong (xiaotong@mail.neu.edu.cn) 2016-03-08 * */ #include "XGlobal.h" #include "XThread.h" /* the nts (NiuTrans.Tensor) namespace */ namespace nts{ /* constructor */ XThread::XThread() { #ifdef USE_PTHREAD MUTEX_INIT(mutex); COND_INIT(cond); #endif MUTEX_INIT(gMutex); function = NULL; argv = NULL; toBreak = false; jobCount = 0; working = 0; MUTEX_INIT(workingMutex); COND_INIT(jobCond); isRunning = false; hnd = 0; } /* de-constructor */ XThread::~XThread() { End(); #ifdef USE_PTHREAD MUTEX_DELE(mutex); COND_DELE(cond); #endif MUTEX_DELE(gMutex); MUTEX_DELE(workingMutex); COND_DELE(jobCond); }; /* a wrapper for the start-routine parameter in pthread_create */ void * XThread::Wrapper(void * ptr) { XThread * p = (XThread *)ptr; p->Run(); return 0; } /* Tunning for this thread. It is very very native implementation. We loop and wait for a signal to activate the job processing. After that, we wait again if there is no new job. */ void XThread::Run() { #ifdef _WIN32 //COND_RESET(gCond); #endif while(1){ #ifdef USE_PTHREAD /* waiting for the job */ MUTEX_LOCK(mutex); while(jobCount == 0){ COND_WAIT(cond, mutex); // it unlocks the mutex first // and then wait } #else #ifdef _WIN32 //SuspendThread(hnd); COND_WAIT(jobCond, gMutex); #endif #endif if(toBreak){ #ifdef USE_PTHREAD MUTEX_UNLOCK(mutex); #endif break; } /* do what you want to do*/ function(argv); #ifdef USE_PTHREAD jobCount--; MUTEX_UNLOCK(mutex); #else #ifdef _WIN32 MUTEX_LOCK(workingMutex); working = 0; jobCount--; MUTEX_UNLOCK(workingMutex); #endif #endif } } /* create and run the thread */ bool XThread::Start() { toBreak = false; isRunning = true; #ifdef USE_PTHREAD int r = pthread_create(&hnd, NULL, &Wrapper, static_cast<void *>(this)); if(r != 0) return false; #else #ifdef _WIN32 DWORD id; hnd = BEGINTHREAD(0, 0, &Wrapper, this, 0, &id); if(hnd == 0) return false; #else Run(); #endif #endif return true; } /* end the thread */ void XThread::End() { toBreak = true; if(isRunning == false) return; while(jobCount > 0){ #ifdef _WIN32 Sleep(200); #else usleep(200 * 1000); #endif }; #ifdef USE_PTHREAD //MUTEX_LOCK(mutex); jobCount++; //COND_BROADCAST(cond); //COND_SIGNAL(cond); //MUTEX_UNLOCK(mutex); COND_BROADCAST(cond); #else COND_SIGNAL(jobCond); #endif Join(); isRunning = false; } /* wait for thread termination */ void XThread::Join() { #ifdef USE_PTHREAD pthread_join(hnd, 0); #else #ifdef _WIN32 WaitForSingleObject(hnd, INFINITE); CloseHandle(hnd); // are you sure if you want to do this? #endif #endif } /* let the thread process a job */ void XThread::LetItGo() { #ifdef USE_PTHREAD MUTEX_LOCK(mutex); jobCount++; MUTEX_UNLOCK(mutex); #else #ifdef _WIN32 /* reset various locks */ MUTEX_LOCK(workingMutex); jobCount++; COND_RESET(jobCond); MUTEX_UNLOCK(workingMutex); /* inform the job */ COND_SIGNAL(jobCond); #endif #endif } /* waith for a singal */ void XThread::Wait(COND_HANDLE * c, MUTEX_HANDLE * m) { #ifdef USE_PTHREAD MUTEX_LOCK(*m); COND_WAIT(*c, *m); MUTEX_UNLOCK(*m); #else #ifdef _WIN32 COND_WAIT(*c, *m); #endif #endif } /*********************************************** a counter with mutex */ /* constructor */ XCounter::XCounter() { count = 0; MUTEX_INIT(mutex); } /* deconstructor */ XCounter::~XCounter() { MUTEX_DELE(mutex); } /* add the counter by 1 */ void XCounter::Add() { MUTEX_LOCK(mutex); count++; MUTEX_UNLOCK(mutex); } /* get the counting number */ int XCounter::Get() { MUTEX_LOCK(mutex); int c = count; MUTEX_UNLOCK(mutex); return c; } } /* end of the nts (NiuTrans.Tensor) namespace */