Commit 58181c8d by liyinqiao

1. Clean the codes;

2. Merge with Xiao branch.
parent d1714e17
...@@ -166,6 +166,8 @@ int T2TBatchLoader::LoadBuf(FILE * file, bool isSorted, int step) ...@@ -166,6 +166,8 @@ int T2TBatchLoader::LoadBuf(FILE * file, bool isSorted, int step)
if(wordCount >= bufSize - MAX_SEQUENCE_LENGTH) if(wordCount >= bufSize - MAX_SEQUENCE_LENGTH)
break; break;
CheckNTErrors(seqCount % step == 0, "Wrong number of sequences!");
} }
nseqBuf = seqCount; nseqBuf = seqCount;
......
...@@ -293,10 +293,10 @@ void T2TSearch::Generate(T2TStateBundle * beam) ...@@ -293,10 +293,10 @@ void T2TSearch::Generate(T2TStateBundle * beam)
CopyValues(index, preID); CopyValues(index, preID);
/* "preID" represents the id (or the offset) of previous state used to make the current /* "preID" represents the id (or the offset) of the previous state used to make the current
hypothesis. Note that we reshape the "score" tensor into a matrix where each hypothesis. Note that we reshape the "score" tensor into a matrix where each
row means a previous state. The column number is size-of-beam * vocab-size. We, row means a previous state. The column number is size-of-beam \times vocab-size. We,
therefore, divide entries of the top-k index by vocab-size to compute the id of therefore, divide entries of the top-k index by vocab-size to compute the id of the
previous state for each hypothesis in the top-k list. */ previous state for each hypothesis in the top-k list. */
Descale(preID, sizeVocab); Descale(preID, sizeVocab);
......
...@@ -201,7 +201,8 @@ void XDevice::SetGPUDevice(int devID) ...@@ -201,7 +201,8 @@ void XDevice::SetGPUDevice(int devID)
cudaError_t error = cudaSetDevice(devID); cudaError_t error = cudaSetDevice(devID);
if (error != cudaSuccess){ if (error != cudaSuccess){
fprintf(stderr, "Error! Calling cudaSetDevice(%d) fails(%d:%s)\n", devID, error, cudaGetErrorString(error)); fprintf(stderr, "Error! Calling cudaSetDevice(%d) fails(%d:%s)\n",
devID, error, cudaGetErrorString(error));
exit(1); exit(1);
} }
#else #else
...@@ -216,7 +217,7 @@ void XDevice::SetGPUDeviceFast(int devID) ...@@ -216,7 +217,7 @@ void XDevice::SetGPUDeviceFast(int devID)
SetFastFlags(); SetFastFlags();
} }
/* switch to a get current dev */ /* get the id of the current GPU device */
int XDevice::GetGPUDevice() int XDevice::GetGPUDevice()
{ {
#ifdef USE_CUDA #ifdef USE_CUDA
...@@ -224,7 +225,8 @@ int XDevice::GetGPUDevice() ...@@ -224,7 +225,8 @@ int XDevice::GetGPUDevice()
cudaError_t error = cudaGetDevice(&devID); cudaError_t error = cudaGetDevice(&devID);
if (error != cudaSuccess){ if (error != cudaSuccess){
fprintf(stderr, "Error! Calling cudaGetDevice(%d) fails(%d:%s)\n", devID, error, cudaGetErrorString(error)); fprintf(stderr, "Error! Calling cudaGetDevice(%d) fails(%d:%s)\n",
devID, error, cudaGetErrorString(error));
exit(1); exit(1);
} }
...@@ -248,7 +250,7 @@ void XDevice::SetFastFlags() ...@@ -248,7 +250,7 @@ void XDevice::SetFastFlags()
#endif #endif
} }
/* reset cuda flag for more efficient cuda execution (all devices) */ /* reset the cuda flag for more efficient cuda execution (all devices) */
void XDevice::SetFastFlagsAllDevices() void XDevice::SetFastFlagsAllDevices()
{ {
#ifdef USE_CUDA #ifdef USE_CUDA
...@@ -274,7 +276,7 @@ XDevManager::~XDevManager() ...@@ -274,7 +276,7 @@ XDevManager::~XDevManager()
} }
/* initialize it and get the CPU and GPU information */ /* initialization */
void XDevManager::Init() void XDevManager::Init()
{ {
srand((unsigned int)time(NULL)); srand((unsigned int)time(NULL));
...@@ -318,7 +320,7 @@ void XDevManager::Clear() ...@@ -318,7 +320,7 @@ void XDevManager::Clear()
#ifdef USE_CUDA #ifdef USE_CUDA
/* get the handle of GPU */ /* get the handle of a given GPU */
cublasHandle_t * XDevManager::GetCudaHandle(const int devID) cublasHandle_t * XDevManager::GetCudaHandle(const int devID)
{ {
CheckNTErrors(devID < nGPU, "index of GPU is out of range."); CheckNTErrors(devID < nGPU, "index of GPU is out of range.");
...@@ -326,7 +328,7 @@ cublasHandle_t * XDevManager::GetCudaHandle(const int devID) ...@@ -326,7 +328,7 @@ cublasHandle_t * XDevManager::GetCudaHandle(const int devID)
return GPUs[devID].GetCublasHandle(); return GPUs[devID].GetCublasHandle();
} }
/* get the stream of cuda */ /* get the stream of a given GPU */
cudaStream_t * XDevManager::GetCudaStream(const int devID) cudaStream_t * XDevManager::GetCudaStream(const int devID)
{ {
CheckNTErrors(devID < nGPU, "index of GPU is out of range."); CheckNTErrors(devID < nGPU, "index of GPU is out of range.");
...@@ -523,7 +525,7 @@ get device ids for the given device information ...@@ -523,7 +525,7 @@ get device ids for the given device information
devInfo = "0:CPU-1 1:GPU-0 2:CPU-1" devInfo = "0:CPU-1 1:GPU-0 2:CPU-1"
means that the first device is CPU, the second device means that the first device is CPU, the second device
is GPU-0, the third device is CPU. is GPU-0, the third device is CPU.
>> devIDs - device sequence specified by devInfo >> devIDs - device IDs specified by devInfo
<< return - number of devices << return - number of devices
*/ */
int XDevManager::GetDeviceIDs(char * devInfo, int * devIDs) int XDevManager::GetDeviceIDs(char * devInfo, int * devIDs)
...@@ -565,7 +567,7 @@ int XDevManager::GetDeviceIDs(char * devInfo, int * devIDs) ...@@ -565,7 +567,7 @@ int XDevManager::GetDeviceIDs(char * devInfo, int * devIDs)
return devCount; return devCount;
} }
/* show id sequence */ /* show device IDs */
void XDevManager::ShowDeviceIDs(char * devInfo, char * msg) void XDevManager::ShowDeviceIDs(char * devInfo, char * msg)
{ {
msg[0] = 0; msg[0] = 0;
......
...@@ -63,7 +63,7 @@ constructor ...@@ -63,7 +63,7 @@ constructor
>> myMode - mode of running the memory pool >> myMode - mode of running the memory pool
UNI_FREE: free all the space at the end of using the memory pool UNI_FREE: free all the space at the end of using the memory pool
FREE_ON_THE_FLY: normal "malloc" and "free" mode FREE_ON_THE_FLY: normal "malloc" and "free" mode
>> myBlockSize - size of memory block >> myBlockSize - size of a memory block
>> myBlockNum - number of memory blocks >> myBlockNum - number of memory blocks
>> myBufSize - size of buffer >> myBufSize - size of buffer
*/ */
...@@ -108,7 +108,7 @@ initialize it ...@@ -108,7 +108,7 @@ initialize it
>> myMode - mode of running the memory pool >> myMode - mode of running the memory pool
UNI_FREE: free all the space at the end of using the memory pool UNI_FREE: free all the space at the end of using the memory pool
FREE_ON_THE_FLY: normal "malloc" and "free" mode FREE_ON_THE_FLY: normal "malloc" and "free" mode
>> myBlockSize - size of memory block >> myBlockSize - size of a memory block
>> myBlockNum - number of memory blocks >> myBlockNum - number of memory blocks
>> myBufSize - size of buffer >> myBufSize - size of buffer
*/ */
...@@ -222,8 +222,8 @@ void XMem::Free(int myDevID, void * mem) ...@@ -222,8 +222,8 @@ void XMem::Free(int myDevID, void * mem)
} }
/* /*
get signature get the signature
<< return - return the signature << return - the signature
*/ */
MTYPE XMem::GetSignature() MTYPE XMem::GetSignature()
{ {
...@@ -231,7 +231,7 @@ MTYPE XMem::GetSignature() ...@@ -231,7 +231,7 @@ MTYPE XMem::GetSignature()
} }
/* /*
use string as the name of the memory pool set the name of the memory pool
>> myName - name of the memory pool >> myName - name of the memory pool
*/ */
void XMem::SetName(const char * myName) void XMem::SetName(const char * myName)
...@@ -264,7 +264,7 @@ void XMem::SetDevice(int myDevID) ...@@ -264,7 +264,7 @@ void XMem::SetDevice(int myDevID)
} }
/* /*
switch to the device (with fast cuda execution mode) we want to work switch to the device (with fast cuda execution mode) we intend to work on
>> myDevID - device id(-1: CPU memory, >=0: GPU device ID) >> myDevID - device id(-1: CPU memory, >=0: GPU device ID)
*/ */
void XMem::SetDeviceFast(int myDevID) void XMem::SetDeviceFast(int myDevID)
...@@ -280,7 +280,7 @@ void XMem::SetDeviceFast(int myDevID) ...@@ -280,7 +280,7 @@ void XMem::SetDeviceFast(int myDevID)
} }
/* /*
run in static mode run in the static mode
>> myIsStatic - specify if the memory allocation is static >> myIsStatic - specify if the memory allocation is static
*/ */
void XMem::SetStaticMode(bool myIsStatic) void XMem::SetStaticMode(bool myIsStatic)
......
/* NiuTrans.Tensor - an open-source tensor library
/* NiuTrans.Tensor - an open-source tensor library
* Copyright (C) 2017, Natural Language Processing Lab, Northestern University.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* This is an implementation of queue. Actually we intend to use it to maintain
* a priority job list
*
* $Created by: XIAO Tong (xiaotong@mail.neu.edu.cn) 2017-04-05
*
*/
#include <stdio.h>
#include <stdlib.h>
#include "XQueue.h"
#include "XDevice.h"
#include "XList.h"
#include "XUtility.h"
/* the nts (NiuTrans.Tensor) namespace */
namespace nts{
/**************************************
job item used in queues
*/
/* constructor */
JobQueueNode::JobQueueNode()
{
job = NULL;
args = new TensorList(1);
}
/* de-constructor */
JobQueueNode::~JobQueueNode()
{
delete args;
}
/**************************************
This class provides standard utilities of Queue.
*/
/* constuctor */
XQueue::XQueue(int mySize)
{
queue = new void*[mySize];
memset(queue, 0, sizeof(void*) * mySize);
size = mySize;
itemCount = 0;
head = 0;
tail = 0;
isJobQueue = false;
jobDequeuerArgs = new TensorList(1);
jobDequeuerBreak = false;
runningJobCount = 0;
jobStream = NULL;
jobStream1 = NULL;
jobStream2 = NULL;
MUTEX_INIT(enqueueMutex);
MUTEX_INIT(dequeueMutex);
COND_INIT(queueCond);
MUTEX_INIT(jobQueueMutex);
}
/* deconstructor */
XQueue::~XQueue()
{
delete[] queue;
delete jobDequeuerArgs;
delete jobStream;
delete jobStream1;
delete jobStream2;
//if(isJobQueue)
// StopJobConsumer();
MUTEX_DELE(enqueueMutex);
MUTEX_DELE(dequeueMutex);
COND_DELE(queueCond);
MUTEX_DELE(jobQueueMutex);
}
/*
put an item in the tail of the queue
>> item - the item we intend to add into the queue
*/
void XQueue::Enqueue(void * item)
{
MUTEX_LOCK(enqueueMutex);
MUTEX_LOCK(dequeueMutex);
CheckNTErrors((itemCount < size), "Put too many items into the queue!");
queue[tail] = item;
tail = (tail + 1) % size;
itemCount++;
COND_SIGNAL(queueCond);
MUTEX_UNLOCK(dequeueMutex);
MUTEX_UNLOCK(enqueueMutex);
}
/*
fetch an item from head of the queue
<< return - the head item of the queue
*/
void * XQueue::Dequeue()
{
MUTEX_LOCK(dequeueMutex);
while(itemCount == 0)
{
#ifdef WIN32
MUTEX_UNLOCK(dequeueMutex);
#endif
COND_WAIT(queueCond, dequeueMutex);
#ifdef WIN32
MUTEX_LOCK(dequeueMutex);
#endif
}
void * r = queue[head];
head = (head + 1) % size;
itemCount--;
MUTEX_UNLOCK(dequeueMutex);
return r;
}
/* return if the queue is empty */
bool XQueue::IsEmpty()
{
return itemCount == 0;
}
/* wait until the queue is empty */
void XQueue::WaitForEmptyJobQueue()
{
while(runningJobCount > 0){
XSleep(10);
}
if(jobStream != NULL){
CheckNTErrors((jobStream->IsFinished()), "None fineished jobs remain");
jobStream->Clear();
}
if(jobStream1 != NULL){
CheckNTErrors((jobStream1->IsFinished()), "None fineished jobs remain");
jobStream1->Clear();
}
if(jobStream2 != NULL){
CheckNTErrors((jobStream2->IsFinished()), "None fineished jobs remain");
jobStream2->Clear();
}
}
int devids[16] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
int cpuid = -1;
/*
run job consumer (in another thread)
>> jobDevID - id of the device for running the jobs
*/
void XQueue::RunJobConsumer(int jobDevID)
{
CheckNTErrors((jobDevID < 16), "device id is out of scope!");
isJobQueue = true;
jobDequeuerArgs->Clear();
jobDequeuerArgs->Add(this);
jobDequeuerArgs->Add(jobDevID >= 0 ? devids + jobDevID : &cpuid);
jobDequeuer.function = (TFunction)DequeueJobs;
jobDequeuer.argv = jobDequeuerArgs;
jobDequeuer.Start();
jobDequeuer.LetItGo();
}
/* stop the job consumer */
void XQueue::StopJobConsumer()
{
jobDequeuerBreak = true;
XSleep(10);
EnqueueJob(NULL, NULL);
jobDequeuer.End();
isJobQueue = false;
}
/* add a job item to process */
void XQueue::EnqueueJob(void * job, TensorList * jobArgs)
{
MUTEX_LOCK(jobQueueMutex);
runningJobCount++;
MUTEX_UNLOCK(jobQueueMutex);
JobQueueNode * node = new JobQueueNode();
node->job = job;
if(jobArgs != NULL)
node->args->AddList(jobArgs);
Enqueue(node);
}
/* job item consumer */
void XQueue::DequeueJobs(TensorList * args)
{
CheckNTErrors((args->count == 2), "Illegal arguments!");
XQueue * q = (XQueue*)args->GetItem(0);
int devID = *(int*)args->GetItem(1);
int devIDBackup = XDevice::GetGPUDevice();
if(devID >= 0)
XDevice::SetGPUDevice(devID);
while(1){
JobQueueNode * node = (JobQueueNode*)q->Dequeue();
if(q->GetJobBreak())
break;
CheckNTErrors((node != NULL), "Illegal job!");
/* process a job */
((TFunction)node->job)(node->args);
delete node;
MUTEX_LOCK(q->jobQueueMutex);
q->runningJobCount--;
MUTEX_UNLOCK(q->jobQueueMutex);
}
if(devID >= 0)
XDevice::SetGPUDevice(devIDBackup);
}
/* get the break flag */
bool XQueue::GetJobBreak()
{
return jobDequeuerBreak;
}
/* get job stream */
XStream * XQueue::GetJobStream(int n)
{
if(n == 0)
return jobStream;
else if(n == 1)
return jobStream1;
else if(n == 2)
return jobStream2;
else{
ShowNTErrors("invalid stream id!");
}
return NULL;
}
/* make job streams */
void XQueue::MakeJobStreams(int devID, int devID1, int devID2)
{
if(devID != INVALID_DEVICE_ID)
jobStream = new XStream(0, devID);
if(devID1 != INVALID_DEVICE_ID)
jobStream1 = new XStream(0, devID1);
if(devID2 != INVALID_DEVICE_ID)
jobStream2 = new XStream(0, devID2);
}
} /* end of the nts (NiuTrans.Tensor) namespace */
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论