Commit 4a3a47f1 by hello

Merge with branch xiaotong.

parent 69589a2c
...@@ -162,6 +162,7 @@ void XNet::BackwardNode(XTensor * node, bool isEfficent) ...@@ -162,6 +162,7 @@ void XNet::BackwardNode(XTensor * node, bool isEfficent)
} }
else{ else{
node->visitMark = NODE_FINISHED; node->visitMark = NODE_FINISHED;
node->isGradFinished = true;
} }
} }
......
...@@ -21,8 +21,8 @@ ...@@ -21,8 +21,8 @@
#include "Decoder.h" #include "Decoder.h"
#include "Utility.h" #include "Utility.h"
#include "module/LayerNorm.h" #include "submodel/LayerNorm.h"
#include "module/CommonModules.h" #include "submodel/CommonModules.h"
#include "../../tensor/core/CHeader.h" #include "../../tensor/core/CHeader.h"
namespace nmt namespace nmt
......
...@@ -21,8 +21,8 @@ ...@@ -21,8 +21,8 @@
#include "Encoder.h" #include "Encoder.h"
#include "Utility.h" #include "Utility.h"
#include "module/LayerNorm.h" #include "submodel/LayerNorm.h"
#include "module/CommonModules.h" #include "submodel/CommonModules.h"
#include "../../tensor/core/CHeader.h" #include "../../tensor/core/CHeader.h"
namespace nmt namespace nmt
......
...@@ -23,10 +23,10 @@ ...@@ -23,10 +23,10 @@
#define __ENCODER_H__ #define __ENCODER_H__
#include "Utility.h" #include "Utility.h"
#include "module/FNN.h" #include "submodel/FNN.h"
#include "module/Attention.h" #include "submodel/Attention.h"
#include "module/Embedding.h" #include "submodel/Embedding.h"
#include "module/LayerNorm.h" #include "submodel/LayerNorm.h"
#include "../../network/XNet.h" #include "../../network/XNet.h"
using namespace nts; using namespace nts;
......
...@@ -24,10 +24,10 @@ ...@@ -24,10 +24,10 @@
#include "Encoder.h" #include "Encoder.h"
#include "Decoder.h" #include "Decoder.h"
#include "module/FNN.h" #include "submodel/FNN.h"
#include "module/Output.h" #include "submodel/Output.h"
#include "Utility.h" #include "Utility.h"
#include "module/Attention.h" #include "submodel/Attention.h"
namespace nmt namespace nmt
{ {
......
...@@ -98,6 +98,21 @@ public: ...@@ -98,6 +98,21 @@ public:
XTensor* batchDec, XTensor* paddingDec, XTensor* label, XTensor* batchDec, XTensor* paddingDec, XTensor* label,
size_t minSentBatch, size_t batchSize, int devID); size_t minSentBatch, size_t batchSize, int devID);
/* load the samples into the buffer (a list) */
bool LoadBatchToBuf(XList * buf);
/* load the samples into tensors from the buffer */
static
bool LoadBatch(XList * buf,
XTensor* batchEnc, XTensor* paddingEnc,
XTensor* batchDec, XTensor* paddingDec, XTensor* label,
size_t minSentBatch, size_t batchSize, int devID,
int &wc, int &sc);
/* release the samples in a buffer */
static
void ClearSamples(XList * buf);
/* initialization function */ /* initialization function */
void Init(const char* dataFile, int bucketSize, bool training); void Init(const char* dataFile, int bucketSize, bool training);
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include <iostream> #include <iostream>
#include "Predictor.h" #include "Predictor.h"
#include "../module/NNUtil.h" #include "../submodel/NNUtil.h"
using namespace nts; using namespace nts;
......
...@@ -42,7 +42,6 @@ XDevManager GDevs; ...@@ -42,7 +42,6 @@ XDevManager GDevs;
/* constructor */ /* constructor */
XDevice::XDevice() XDevice::XDevice()
{ {
stream = NULL;
isInitialized = false; isInitialized = false;
Clear(); Clear();
...@@ -141,8 +140,6 @@ void XDevice::Init(int myDevID) ...@@ -141,8 +140,6 @@ void XDevice::Init(int myDevID)
} }
else else
sprintf(name2, "GPU-%d %s", devID, name); sprintf(name2, "GPU-%d %s", devID, name);
stream = new XStream(0, devID);
#endif #endif
} }
...@@ -176,10 +173,6 @@ void XDevice::Clear() ...@@ -176,10 +173,6 @@ void XDevice::Clear()
curandDestroyGenerator(gen); curandDestroyGenerator(gen);
isGenReady = false; isGenReady = false;
} }
if (stream != NULL) {
delete stream;
stream = NULL;
}
#endif #endif
isInitialized = false; isInitialized = false;
} }
...@@ -227,17 +220,6 @@ cublasHandle_t * XDevice::GetCublasHandle() ...@@ -227,17 +220,6 @@ cublasHandle_t * XDevice::GetCublasHandle()
return &cublasHandle; return &cublasHandle;
} }
/* get the stream of cuda */
cudaStream_t * XDevice::GetCudaStream()
{
if (!isInitialized)
Init(devID);
CheckNTErrors(stream != NULL, "the stream is not initialized!");
return &stream->stream;
}
#endif // USE_CUDA #endif // USE_CUDA
/* switch to a device */ /* switch to a device */
...@@ -286,6 +268,28 @@ int XDevice::GetGPUDevice() ...@@ -286,6 +268,28 @@ int XDevice::GetGPUDevice()
#endif #endif
} }
/*
swith to a device (CPU or GPU)
>> devID - device id
*/
void XDevice::SetDevice(int devID)
{
if(devID >= 0)
SetGPUDevice(devID);
}
/*
swith to a device (CPU or GPU) with a backup of the device id
>> devID - device id
>> backupDevID - backup of the device id
*/
void XDevice::SetDevice(int devID, int &backupDevID)
{
backupDevID = GetGPUDevice();
if (devID >= 0)
SetGPUDevice(devID);
}
/* reset cuda flag for more efficient cuda execution. It should be called after "SetGPUDevice" when /* reset cuda flag for more efficient cuda execution. It should be called after "SetGPUDevice" when
no GPU context has been established. */ no GPU context has been established. */
void XDevice::SetFastFlags() void XDevice::SetFastFlags()
...@@ -312,13 +316,6 @@ void XDevice::SetFastFlagsAllDevices() ...@@ -312,13 +316,6 @@ void XDevice::SetFastFlagsAllDevices()
#endif #endif
} }
/* delete the default stream for the device */
void XDevice::DelDeviceStream()
{
if(stream != NULL)
delete stream;
}
/* constructor */ /* constructor */
XDevManager::XDevManager() XDevManager::XDevManager()
{ {
...@@ -391,14 +388,6 @@ cublasHandle_t * XDevManager::GetCudaHandle(const int devID) ...@@ -391,14 +388,6 @@ cublasHandle_t * XDevManager::GetCudaHandle(const int devID)
return GPUs[devID].GetCublasHandle(); return GPUs[devID].GetCublasHandle();
} }
/* get the stream of a given GPU */
cudaStream_t * XDevManager::GetCudaStream(const int devID)
{
CheckNTErrors(devID < nGPU, "index of GPU is out of range.");
return GPUs[devID].GetCudaStream();
}
#endif #endif
/* /*
...@@ -620,16 +609,5 @@ char * XDevManager::GetDevString(int devID) ...@@ -620,16 +609,5 @@ char * XDevManager::GetDevString(int devID)
} }
} }
/* delete the streams for all devices */
void XDevManager::DelDeviceStream()
{
for(int i = 0; i < GDevs.nCPU; i++) {
GDevs.CPUs[i].DelDeviceStream();
}
for(int i = 0; i < GDevs.nGPU; i++) {
GDevs.GPUs[i].DelDeviceStream();
}
}
} /* end of the nts (NiuTrans.Tensor) namespace */ } /* end of the nts (NiuTrans.Tensor) namespace */
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#define __XDEVICE_H__ #define __XDEVICE_H__
#include "XThread.h" #include "XThread.h"
#include "XStream.h"
#ifdef USE_CUDA #ifdef USE_CUDA
...@@ -97,9 +96,6 @@ public: ...@@ -97,9 +96,6 @@ public:
/* specify whether Unified Virtual Address Space (UVA) is supported */ /* specify whether Unified Virtual Address Space (UVA) is supported */
bool isUVASupported; bool isUVASupported;
/* default stream for the device */
XStream * stream;
/* seed for random number generation */ /* seed for random number generation */
int seed; int seed;
...@@ -140,12 +136,9 @@ public: ...@@ -140,12 +136,9 @@ public:
#ifdef USE_CUDA #ifdef USE_CUDA
/* get cublas handle */ /* get cublas handle */
cublasHandle_t * GetCublasHandle(); cublasHandle_t * GetCublasHandle();
/* get the stream of cuda */
cudaStream_t * GetCudaStream();
#endif #endif
/* switch to a device */ /* switch to a GPU device */
static static
void SetGPUDevice(int devID); void SetGPUDevice(int devID);
...@@ -153,10 +146,18 @@ public: ...@@ -153,10 +146,18 @@ public:
static static
void SetGPUDeviceFast(int devID); void SetGPUDeviceFast(int devID);
/* switch to a get current dev */ /* get current dev */
static static
int GetGPUDevice(); int GetGPUDevice();
/* swith to a device (CPU or GPU) */
static
void SetDevice(int devID);
/* swith to a device (CPU or GPU) with a backup of the device id */
static
void SetDevice(int devID, int &backupDevID);
/* reset cuda flag for more efficient cuda execution */ /* reset cuda flag for more efficient cuda execution */
static static
void SetFastFlags(); void SetFastFlags();
...@@ -164,9 +165,6 @@ public: ...@@ -164,9 +165,6 @@ public:
/* reset cuda flag for more efficient cuda execution (all devices) */ /* reset cuda flag for more efficient cuda execution (all devices) */
static static
void SetFastFlagsAllDevices(); void SetFastFlagsAllDevices();
/* delete the default stream for the device (call it before deleting the XDevice) */
void DelDeviceStream();
}; };
/* /*
...@@ -206,9 +204,6 @@ public: ...@@ -206,9 +204,6 @@ public:
#ifdef USE_CUDA #ifdef USE_CUDA
/* get the handle of GPU */ /* get the handle of GPU */
cublasHandle_t * GetCudaHandle(const int devID); cublasHandle_t * GetCudaHandle(const int devID);
/* get the stream of cuda */
cudaStream_t * GetCudaStream(const int devID);
#endif #endif
/* get grid and block sizes that max potential */ /* get grid and block sizes that max potential */
...@@ -228,10 +223,6 @@ public: ...@@ -228,10 +223,6 @@ public:
/* get the device information in string */ /* get the device information in string */
char * GetDevString(int devID); char * GetDevString(int devID);
/* delete the streams for all devices */
static
void DelDeviceStream();
}; };
/* managing the devices */ /* managing the devices */
......
...@@ -26,8 +26,6 @@ ...@@ -26,8 +26,6 @@
#ifndef __XLINK_H__ #ifndef __XLINK_H__
#define __XLINK_H__ #define __XLINK_H__
#include "XGlobal.h"
namespace nts{ // namespace nts(NiuTrans.Tensor) namespace nts{ // namespace nts(NiuTrans.Tensor)
/* cross reference */ /* cross reference */
......
...@@ -146,7 +146,7 @@ run a set of jobs in parallel ...@@ -146,7 +146,7 @@ run a set of jobs in parallel
>> jobArgs - the list of arguments for each job >> jobArgs - the list of arguments for each job
>> sleepTime - time to sleep (in ms) for each round >> sleepTime - time to sleep (in ms) for each round
*/ */
void XPRunner::Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepTime) void XPRunner::Run(XList * jobFunctions, XList * jobArgs, float sleepTime)
{ {
if(threadNum <= 0){ if(threadNum <= 0){
XPRINT(1, stderr, "Error! No threads were created!\n"); XPRINT(1, stderr, "Error! No threads were created!\n");
...@@ -195,13 +195,12 @@ void XPRunner::Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepT ...@@ -195,13 +195,12 @@ void XPRunner::Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepT
TFunction function = (TFunction)jobFunctions->GetItem(jobArgs->count - c); TFunction function = (TFunction)jobFunctions->GetItem(jobArgs->count - c);
/* the arguments that are passed to the function */ /* the arguments that are passed to the function */
volatile TensorList * args = (TensorList*)jobArgs->GetItem(jobArgs->count - c); XList * args = (XList*)jobArgs->GetItem(jobArgs->count - c);
/* thread */ /* thread */
XThread * thread = threads + availableThreads[i]; XThread * thread = threads + availableThreads[i];
thread->argv = args; thread->SetFunc(function, args);
thread->function = function;
MUTEX_LOCK(thread->workingMutex); MUTEX_LOCK(thread->workingMutex);
thread->working = 1; thread->working = 1;
......
...@@ -106,7 +106,7 @@ public: ...@@ -106,7 +106,7 @@ public:
void KillThreads(); void KillThreads();
/* run a set of jobs in parallel */ /* run a set of jobs in parallel */
void Run(TensorList * jobFunctions, TensorList * jobArgs, float sleepTime = 0); void Run(XList * jobFunctions, XList * jobArgs, float sleepTime = 0);
/* get the number of parallel jobs to run */ /* get the number of parallel jobs to run */
int GetJobNum(int size); int GetJobNum(int size);
......
...@@ -42,7 +42,7 @@ job item used in queues ...@@ -42,7 +42,7 @@ job item used in queues
JobQueueNode::JobQueueNode() JobQueueNode::JobQueueNode()
{ {
job = NULL; job = NULL;
args = new TensorList(1); args = new XList(1);
} }
/* de-constructor */ /* de-constructor */
...@@ -67,12 +67,9 @@ XQueue::XQueue(int mySize) ...@@ -67,12 +67,9 @@ XQueue::XQueue(int mySize)
head = 0; head = 0;
tail = 0; tail = 0;
isJobQueue = false; isJobQueue = false;
jobDequeuerArgs = new TensorList(1); jobDequeuerArgs = new XList(1);
jobDequeuerBreak = false; jobDequeuerBreak = false;
runningJobCount = 0; runningJobCount = 0;
jobStream = NULL;
jobStream1 = NULL;
jobStream2 = NULL;
MUTEX_INIT(enqueueMutex); MUTEX_INIT(enqueueMutex);
MUTEX_INIT(dequeueMutex); MUTEX_INIT(dequeueMutex);
...@@ -85,9 +82,6 @@ XQueue::~XQueue() ...@@ -85,9 +82,6 @@ XQueue::~XQueue()
{ {
delete[] queue; delete[] queue;
delete jobDequeuerArgs; delete jobDequeuerArgs;
delete jobStream;
delete jobStream1;
delete jobStream2;
//if(isJobQueue) //if(isJobQueue)
// StopJobConsumer(); // StopJobConsumer();
...@@ -160,19 +154,6 @@ void XQueue::WaitForEmptyJobQueue() ...@@ -160,19 +154,6 @@ void XQueue::WaitForEmptyJobQueue()
while(runningJobCount > 0){ while(runningJobCount > 0){
XSleep(10); XSleep(10);
} }
if(jobStream != NULL){
CheckNTErrors((jobStream->IsFinished()), "None fineished jobs remain");
jobStream->Clear();
}
if(jobStream1 != NULL){
CheckNTErrors((jobStream1->IsFinished()), "None fineished jobs remain");
jobStream1->Clear();
}
if(jobStream2 != NULL){
CheckNTErrors((jobStream2->IsFinished()), "None fineished jobs remain");
jobStream2->Clear();
}
} }
int devids[16] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; int devids[16] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
...@@ -189,12 +170,11 @@ void XQueue::RunJobConsumer(int jobDevID) ...@@ -189,12 +170,11 @@ void XQueue::RunJobConsumer(int jobDevID)
isJobQueue = true; isJobQueue = true;
jobDequeuerArgs->Clear(); jobDequeuerArgs->Clear();
// warning: this may cause unknown error /* warning: this may cause unknown errors */
jobDequeuerArgs->Add((XTensor*)this); jobDequeuerArgs->Add(this);
jobDequeuerArgs->Add(jobDevID >= 0 ? (XTensor*)(devids + jobDevID) : (XTensor*)&cpuid); jobDequeuerArgs->Add(jobDevID >= 0 ? (devids + jobDevID) : &cpuid);
jobDequeuer.function = (TFunction)DequeueJobs; jobDequeuer.SetFunc((TFunction)DequeueJobs, jobDequeuerArgs);
jobDequeuer.argv = jobDequeuerArgs;
jobDequeuer.Start(); jobDequeuer.Start();
jobDequeuer.LetItGo(); jobDequeuer.LetItGo();
...@@ -213,7 +193,7 @@ void XQueue::StopJobConsumer() ...@@ -213,7 +193,7 @@ void XQueue::StopJobConsumer()
} }
/* add a job item to process */ /* add a job item to process */
void XQueue::EnqueueJob(void * job, TensorList * jobArgs) void XQueue::EnqueueJob(void * job, XList * jobArgs)
{ {
MUTEX_LOCK(jobQueueMutex); MUTEX_LOCK(jobQueueMutex);
runningJobCount++; runningJobCount++;
...@@ -227,17 +207,15 @@ void XQueue::EnqueueJob(void * job, TensorList * jobArgs) ...@@ -227,17 +207,15 @@ void XQueue::EnqueueJob(void * job, TensorList * jobArgs)
} }
/* job item consumer */ /* job item consumer */
void XQueue::DequeueJobs(TensorList * args) void XQueue::DequeueJobs(XList * args)
{ {
CheckNTErrors((args->count == 2), "Illegal arguments!"); CheckNTErrors((args->count == 2), "Illegal arguments!");
XQueue * q = (XQueue*)args->GetItem(0); XQueue * q = (XQueue*)args->GetItem(0);
int devID = *(int*)args->GetItem(1); int devID = *(int*)args->GetItem(1);
int devIDBackup = XDevice::GetGPUDevice(); int devIDBackup = -1;
XDevice::SetDevice(devID, devIDBackup);
if(devID >= 0)
XDevice::SetGPUDevice(devID);
while(1){ while(1){
JobQueueNode * node = (JobQueueNode*)q->Dequeue(); JobQueueNode * node = (JobQueueNode*)q->Dequeue();
...@@ -258,8 +236,7 @@ void XQueue::DequeueJobs(TensorList * args) ...@@ -258,8 +236,7 @@ void XQueue::DequeueJobs(TensorList * args)
} }
if(devID >= 0) XDevice::SetDevice(devIDBackup);
XDevice::SetGPUDevice(devIDBackup);
} }
/* get the break flag */ /* get the break flag */
...@@ -268,31 +245,10 @@ bool XQueue::GetJobBreak() ...@@ -268,31 +245,10 @@ bool XQueue::GetJobBreak()
return jobDequeuerBreak; return jobDequeuerBreak;
} }
/* get job stream */ /* get the number of jobs */
XStream * XQueue::GetJobStream(int n) int XQueue::GetJobNum()
{
if(n == 0)
return jobStream;
else if(n == 1)
return jobStream1;
else if(n == 2)
return jobStream2;
else{
ShowNTErrors("invalid stream id!");
}
return NULL;
}
/* make job streams */
void XQueue::MakeJobStreams(int devID, int devID1, int devID2)
{ {
if(devID != INVALID_DEVICE_ID) return runningJobCount;
jobStream = new XStream(0, devID);
if(devID1 != INVALID_DEVICE_ID)
jobStream1 = new XStream(0, devID1);
if(devID2 != INVALID_DEVICE_ID)
jobStream2 = new XStream(0, devID2);
} }
} /* end of the nts (NiuTrans.Tensor) namespace */ } /* end of the nts (NiuTrans.Tensor) namespace */
...@@ -33,7 +33,6 @@ ...@@ -33,7 +33,6 @@
#include "XGlobal.h" #include "XGlobal.h"
#include "XThread.h" #include "XThread.h"
#include "XStream.h"
#include "XDevice.h" #include "XDevice.h"
#include "XList.h" #include "XList.h"
...@@ -52,7 +51,7 @@ public: ...@@ -52,7 +51,7 @@ public:
void * job; void * job;
/* arguments of the job */ /* arguments of the job */
TensorList * args; XList * args;
public: public:
/* constructor */ /* constructor */
...@@ -102,7 +101,7 @@ private: ...@@ -102,7 +101,7 @@ private:
XThread jobDequeuer; XThread jobDequeuer;
/* argument list of jobDequeuer */ /* argument list of jobDequeuer */
TensorList * jobDequeuerArgs; XList * jobDequeuerArgs;
/* indicates whether jobDequeuer stops */ /* indicates whether jobDequeuer stops */
bool jobDequeuerBreak; bool jobDequeuerBreak;
...@@ -110,11 +109,6 @@ private: ...@@ -110,11 +109,6 @@ private:
/* running job count */ /* running job count */
int runningJobCount; int runningJobCount;
/* job streams (we think that three streams is enough :)) */
XStream * jobStream;
XStream * jobStream1;
XStream * jobStream2;
public: public:
/* constuctor */ /* constuctor */
XQueue(int mySize = MAX_QUEUE_SIZE); XQueue(int mySize = MAX_QUEUE_SIZE);
...@@ -135,26 +129,23 @@ public: ...@@ -135,26 +129,23 @@ public:
void WaitForEmptyJobQueue(); void WaitForEmptyJobQueue();
/* run the job consumer */ /* run the job consumer */
void RunJobConsumer(int jobDevID = 0); void RunJobConsumer(int jobDevID = -1);
/* stop the job consumer */ /* stop the job consumer */
void StopJobConsumer(); void StopJobConsumer();
/* add a job item to process */ /* add a job item to process */
void EnqueueJob(void * job, TensorList * jobArgs); void EnqueueJob(void * job, XList * jobArgs);
/* job item consumer */ /* job item consumer */
static static
void DequeueJobs(TensorList * args); void DequeueJobs(XList * args);
/* get the break flag */ /* get the break flag */
bool GetJobBreak(); bool GetJobBreak();
/* get job stream */ /* get the number of jobs */
XStream * GetJobStream(int n = 0); int GetJobNum();
/* make job streams */
void MakeJobStreams(int devID = INVALID_DEVICE_ID, int devID1 = INVALID_DEVICE_ID, int devID2 = INVALID_DEVICE_ID);
}; };
} /* end of the nts (NiuTrans.Tensor) namespace */ } /* end of the nts (NiuTrans.Tensor) namespace */
......
/* NiuTrans.Tensor - an open-source tensor library
* Copyright (C) 2017, Natural Language Processing Lab, Northeastern University.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* This is for streaming (on GPU), i.e., run jobs in different stream for
* GPU Async capabilities.
*
*
* $Created by: XIAO Tong (xiaotong@mail.neu.edu.cn) 2016-03-09
*
*/
#include "stdio.h"
#include "stdlib.h"
#include "XGlobal.h"
#include "XStream.h"
#include "XDevice.h"
/* the nts (NiuTrans.Tensor) namespace */
namespace nts{
/*
This class defines the stream used in pipelining jobs. E.g., one can put
a sequence of jobs in a stream and asynchronously do something else. Basically
we can use multiply streams to hide the data transfer cost on GPUs by using
job overlaps.
*/
/* constructor */
XStream::XStream(int priority, int myDevID, int myMaxEventNum)
{
devID = myDevID;
#ifdef USE_CUDA
if(myDevID >= 0){
int backupDevID = XDevice::GetGPUDevice();
XDevice::SetGPUDevice(myDevID);
events = new cudaEvent_t[myMaxEventNum];
XDevice::SetGPUDevice(backupDevID);
maxEventNum = myMaxEventNum;
usedEventNum = 0;
}
else{
maxEventNum = 0;
usedEventNum = 0;
}
#endif
Create(priority, devID);
}
/* deconstructor */
XStream::~XStream()
{
Destroy();
#ifdef USE_CUDA
delete[] events;
#endif
}
/* create the stream */
void XStream::Create(int priority, int myDevID)
{
if(myDevID < 0)
return;
#ifdef USE_CUDA
int backupDevID = XDevice::GetGPUDevice();
XDevice::SetGPUDevice(myDevID);
//cudaStreamCreateWithPriority(&stream, cudaStreamDefault, priority);
CheckNTErrors((cudaStreamCreate(&stream) == cudaSuccess),
"cannot create the cuda stream!");
XDevice::SetGPUDevice(backupDevID);
#endif
devID = myDevID;
}
/* destroy the stream */
void XStream::Destroy()
{
if(devID < 0)
return;
#ifdef USE_CUDA
int backupDevID = XDevice::GetGPUDevice();
XDevice::SetGPUDevice(devID);
cudaStreamDestroy(stream);
XDevice::SetGPUDevice(backupDevID);
Clear();
#endif
}
/* clear it */
void XStream::Clear()
{
#ifdef USE_CUDA
int backupDevID = XDevice::GetGPUDevice();
XDevice::SetGPUDevice(devID);
for(int i = 0; i < usedEventNum; i++){
cudaEventDestroy(events[i]);
}
usedEventNum = 0;
XDevice::SetGPUDevice(backupDevID);
#endif
}
/* judge if all the jobs in the stream have been finished */
bool XStream::IsFinished()
{
#ifdef USE_CUDA
if(cudaStreamQuery(stream) == cudaSuccess)
return true;
else
return false;
#else
return true;
#endif
}
void XStream::StreamSynchronize()
{
#ifdef USE_CUDA
int devIDBackup = XDevice::GetGPUDevice();
if(devID != devIDBackup)
XDevice::SetGPUDevice(devID);
cudaStreamSynchronize(stream);
if(devID != devIDBackup)
XDevice::SetGPUDevice(devIDBackup);
#endif
}
void XStream::ThreadSynchronize()
{
#ifdef USE_CUDA
#if CUDART_VERSION < 10000
cudaThreadSynchronize();
#else
ShowNTErrors("TODO!");
#endif
#endif
}
void XStream::DeviceSynchronize(int devID)
{
#ifdef USE_CUDA
int devIDBackup = XDevice::GetGPUDevice();
cudaGetDevice(&devIDBackup);
if(devID != devIDBackup)
XDevice::SetGPUDevice(devID);
cudaDeviceSynchronize();
if(devID != devIDBackup)
XDevice::SetGPUDevice(devIDBackup);
#endif
}
/* make a dependency of two streams. i.e., current stream must wait for the last job finished in another stream */
void XStream::MakeDependency(XStream * precedingStream)
{
#ifdef USE_CUDA
cudaEvent_t * e = precedingStream->MakeEvent();
cudaEventRecord(*e, precedingStream->stream);
cudaStreamWaitEvent(stream, *e, 0);
#endif
}
/* get the stream */
#ifdef USE_CUDA
inline cudaStream_t * XStream::Get()
{
return &stream;
}
/* make a event */
inline cudaEvent_t * XStream::MakeEvent()
{
int backupDevID = XDevice::GetGPUDevice();
XDevice::SetGPUDevice(devID);
CheckNTErrors((usedEventNum < maxEventNum), "Too many events are required!");
cudaEvent_t * e = events + usedEventNum++;
cudaEventCreate(e);
XDevice::SetGPUDevice(backupDevID);
return e;
}
#endif
} /* end of the nts (NiuTrans.Tensor) namespace */
/* NiuTrans.Tensor - an open-source tensor library
* Copyright (C) 2017, Natural Language Processing Lab, Northeastern University.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* This is for streaming (on GPU), i.e., run jobs in different stream for
* GPU Async capabilities.
*
* $Created by: XIAO Tong (xiaotong@mail.neu.edu.cn) 2016-03-09
*
*/
#ifndef __XSTREAM_H__
#define __XSTREAM_H__
/* the CUDA stuff */
#ifdef USE_CUDA
#include <cuda_runtime.h>
#include <cublas_v2.h>
#include <cuda_fp16.h>
#endif
/* the nts (NiuTrans.Tensor) namespace */
namespace nts{
#define MAX_CUDA_EVENT_NUM_IN_A_STREAM 128
/*
This class defines the stream used in pipelining jobs. E.g., one can put
a sequence of jobs in a stream and asychronously do something else. Basically
we can use multiply streams to hide the data transfer cost on GPUs by using
job overlaps.
*/
class XStream
{
public:
#ifdef USE_CUDA
/* the cuda stream */
cudaStream_t stream;
/* list of cuda events for synchronize different streams */
cudaEvent_t * events;
/* max number of the events */
int maxEventNum;
/* number of used events */
int usedEventNum;
#else
/* virtual pointer */
void * stream;
#endif
/* device that holds the stream */
int devID;
public:
/* constructor */
XStream(int priority = 0, int devID = 0, int maxEventNum = MAX_CUDA_EVENT_NUM_IN_A_STREAM);
/* deconstructor */
~XStream();
/* create the stream */
void Create(int priority = 0, int devID = 0);
/* destroy the stream */
void Destroy();
/* clear it */
void Clear();
/* judge if all the jobs in the stream have been finished */
bool IsFinished();
/* stream synchronize */
void StreamSynchronize();
/* thread synchronize */
static
void ThreadSynchronize();
/* device synchronize */
static
void DeviceSynchronize(int devID);
/* make a dependency of two streams. i.e., current stream must wait for the last job finished in another stream */
void MakeDependency(XStream * precedingStream);
#ifdef USE_CUDA
/* get the stream */
cudaStream_t * Get();
/* make a event */
cudaEvent_t * MakeEvent();
#endif
};
} /* end of the nts (NiuTrans.Tensor) namespace */
#endif
...@@ -89,10 +89,6 @@ XTensor::XTensor() ...@@ -89,10 +89,6 @@ XTensor::XTensor()
Init(); Init();
id = MakeTensorID(); id = MakeTensorID();
isDefaultDType = true;
isInGlobalMem = false;
isInit = false;
isTmp = false;
reserved = 0; reserved = 0;
} }
...@@ -277,6 +273,7 @@ void XTensor::Init() ...@@ -277,6 +273,7 @@ void XTensor::Init()
isTmp = false; isTmp = false;
isGrad = false; isGrad = false;
isVar = false; isVar = false;
isGradFinished = false;
enableGrad = X_ENABLE_GRAD; enableGrad = X_ENABLE_GRAD;
visitMark = 0; visitMark = 0;
grad = NULL; grad = NULL;
...@@ -772,10 +769,9 @@ MTYPE XTensor::GetOffset3D(int d0, int d1, int d2) const ...@@ -772,10 +769,9 @@ MTYPE XTensor::GetOffset3D(int d0, int d1, int d2) const
} }
/* /*
a vector with all entries of 0 a tensor with all entries of 0
>> stream - stream for the job pipeline
*/ */
void XTensor::SetZeroAll(XStream* stream) void XTensor::SetZeroAll()
{ {
if(data == NULL) if(data == NULL)
return; return;
...@@ -788,12 +784,7 @@ void XTensor::SetZeroAll(XStream* stream) ...@@ -788,12 +784,7 @@ void XTensor::SetZeroAll(XStream* stream)
int devIDBackup = 0; int devIDBackup = 0;
cudaGetDevice(&devIDBackup); cudaGetDevice(&devIDBackup);
cudaSetDevice(devID); cudaSetDevice(devID);
cudaMemset(data, 0, size);
if(stream == NULL)
cudaMemset(data, 0, size);
else
cudaMemsetAsync(data, 0, size, stream->stream);
cudaSetDevice(devIDBackup); cudaSetDevice(devIDBackup);
#endif #endif
} }
...@@ -807,13 +798,8 @@ void XTensor::SetZeroAll(XStream* stream) ...@@ -807,13 +798,8 @@ void XTensor::SetZeroAll(XStream* stream)
#ifdef USE_CUDA #ifdef USE_CUDA
int devIDBackup = 0; int devIDBackup = 0;
cudaGetDevice(&devIDBackup); cudaGetDevice(&devIDBackup);
cudaSetDevice(devID); cudaSetDevice(devID);
cudaMemset(data, 0, unitNum * unitSize);
if(stream == NULL)
cudaMemset(data, 0, unitNum * unitSize);
else
cudaMemsetAsync(data, 0, unitNum * unitSize, stream->stream);
cudaSetDevice(devIDBackup); cudaSetDevice(devIDBackup);
#endif #endif
} }
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
#include <math.h> #include <math.h>
#include "XGlobal.h" #include "XGlobal.h"
#include "XPRunner.h" #include "XPRunner.h"
#include "XStream.h"
#include "XHeap.h" #include "XHeap.h"
#include "XList.h" #include "XList.h"
#include "XDataType.h" #include "XDataType.h"
...@@ -157,6 +156,11 @@ public: ...@@ -157,6 +156,11 @@ public:
/* mark for traversing the gragh */ /* mark for traversing the gragh */
unsigned int visitMark; unsigned int visitMark;
/* indicates whether the gradient of the tensor has been computed (in the backward process)
Note that the indicator could be modified by XNet (in back propagation) and be accessed
in XTrainer (and related classes). */
bool isGradFinished;
/* gradient (for back-propagation) */ /* gradient (for back-propagation) */
XTensor * grad; XTensor * grad;
...@@ -303,7 +307,7 @@ public: ...@@ -303,7 +307,7 @@ public:
MTYPE GetOffset3D(int d0, int d1, int d2) const; MTYPE GetOffset3D(int d0, int d1, int d2) const;
/* a tensor with all entries of 0 */ /* a tensor with all entries of 0 */
void SetZeroAll(XStream * stream = NULL); void SetZeroAll();
/* set the tensor with an data array */ /* set the tensor with an data array */
void SetData(const void * d, int num, int beg = 0); void SetData(const void * d, int num, int beg = 0);
......
...@@ -38,7 +38,7 @@ XThread::XThread() ...@@ -38,7 +38,7 @@ XThread::XThread()
#endif #endif
MUTEX_INIT(gMutex); MUTEX_INIT(gMutex);
function = NULL; function = NULL;
argv = NULL; argv.Clear();
toBreak = false; toBreak = false;
jobCount = 0; jobCount = 0;
working = 0; working = 0;
...@@ -69,6 +69,18 @@ void * XThread::Wrapper(void * ptr) ...@@ -69,6 +69,18 @@ void * XThread::Wrapper(void * ptr)
return 0; return 0;
} }
/*
initialize the thread with the function and its parameters
>> myFunc - the function to run
>> myArgv - arguments of the function
*/
void XThread::SetFunc(TFunction myFunc, XList * myArgv)
{
function = myFunc;
argv.Clear();
argv.AddList(myArgv);
}
/* /*
Tunning for this thread. It is very very native implementation. Tunning for this thread. It is very very native implementation.
...@@ -77,6 +89,10 @@ After that, we wait again if there is no new job. ...@@ -77,6 +89,10 @@ After that, we wait again if there is no new job.
*/ */
void XThread::Run() void XThread::Run()
{ {
if (function == NULL) {
ShowNTErrors("You are running a thread with no function specified!");
}
#ifdef _WIN32 #ifdef _WIN32
//COND_RESET(gCond); //COND_RESET(gCond);
#endif #endif
...@@ -104,7 +120,7 @@ void XThread::Run() ...@@ -104,7 +120,7 @@ void XThread::Run()
} }
/* do what you want to do*/ /* do what you want to do*/
function(argv); function(&argv);
#ifdef USE_PTHREAD #ifdef USE_PTHREAD
jobCount--; jobCount--;
......
...@@ -85,7 +85,7 @@ namespace nts{ ...@@ -85,7 +85,7 @@ namespace nts{
#endif #endif
typedef void (*TFunction) (volatile TensorList*); typedef void (*TFunction) (volatile XList*);
/* /*
This is a class that wraps the standard implementation of threading This is a class that wraps the standard implementation of threading
...@@ -128,12 +128,10 @@ public: ...@@ -128,12 +128,10 @@ public:
public: public:
/* function to run */ /* function to run */
volatile
TFunction function; TFunction function;
/* arguments (for the function to run) */ /* arguments (for the function to run) */
volatile XList argv;
TensorList * argv;
/* a flag to break */ /* a flag to break */
volatile volatile
...@@ -154,6 +152,9 @@ public: ...@@ -154,6 +152,9 @@ public:
/* a wrapper for the start-routine parameter in pthread_create */ /* a wrapper for the start-routine parameter in pthread_create */
static void * Wrapper(void * ptr); static void * Wrapper(void * ptr);
/* initialize the thread with the function and its parameters */
void SetFunc(TFunction myFunc, XList * myArgv);
/* /*
Core of the thread. It is very very native impelementation. Core of the thread. It is very very native impelementation.
We loop and wait for a singnal to activate the job processing. We loop and wait for a singnal to activate the job processing.
......
...@@ -311,44 +311,6 @@ void XMemCopy2D(void * t, size_t tPitch, int devIDT, const void * s, size_t sPit ...@@ -311,44 +311,6 @@ void XMemCopy2D(void * t, size_t tPitch, int devIDT, const void * s, size_t sPit
#endif #endif
} }
void XMemCopy2DAsync(void * t, size_t tPitch, int devIDT, const void * s, size_t sPitch, int devIDS, size_t mSize, int n, XStream * stream)
{
if (t == s)
return;
if (devIDT < 0 && devIDS < 0) {
for(int i = 0; i < n; i++)
memcpy((char*)t + tPitch * i, (char*)s + sPitch * i, mSize);
return;
}
#ifdef USE_CUDA
else{
CheckNTErrors(stream != NULL, "No stream found!");
cudaStream_t &cstream = stream->stream;
if (devIDT >= 0 && devIDS < 0) {
cudaError_t error = cudaMemcpy2DAsync(t, tPitch, s, sPitch, mSize, n, cudaMemcpyHostToDevice, cstream);
if(error != cudaSuccess){
ShowNTErrors("cudaMemcpy2D error (cudaMemcpyHostToDevice)");
}
}
else if (devIDT < 0 && devIDS >= 0) {
cudaError_t error = cudaMemcpy2DAsync(t, tPitch, s, sPitch, mSize, n, cudaMemcpyDeviceToHost, cstream);
if(error != cudaSuccess){
ShowNTErrors("cudaMemcpy error (cudaMemcpyDeviceToHost)");
}
}
else {
cudaError_t error = cudaMemcpy2DAsync(t, tPitch, s, sPitch, mSize, n, cudaMemcpyDeviceToDevice, cstream);
if (error != cudaSuccess) {
ShowNTErrors("cudaMemcpy error (cudaMemcpyDeviceToDevice)");
}
}
}
#else
ShowNTErrors("Please specify USE_CUDA and recompile the code!");
#endif
}
void * XMemAlloc(int devID, size_t size) void * XMemAlloc(int devID, size_t size)
{ {
void * p = NULL; void * p = NULL;
......
...@@ -42,7 +42,6 @@ extern void XMemSet(void * p, int value, size_t size); ...@@ -42,7 +42,6 @@ extern void XMemSet(void * p, int value, size_t size);
extern void XMemSet(int devID, void * p, int value, size_t size); extern void XMemSet(int devID, void * p, int value, size_t size);
extern void XMemCopy(void * t, int devIDT, const void * s, int devIDS, size_t size); extern void XMemCopy(void * t, int devIDT, const void * s, int devIDS, size_t size);
extern void XMemCopy2D(void * t, size_t tPitch, int devIDT, const void * s, size_t sPitch, int devIDS, size_t mSize, int n); extern void XMemCopy2D(void * t, size_t tPitch, int devIDT, const void * s, size_t sPitch, int devIDS, size_t mSize, int n);
extern void XMemCopy2DAsync(void * t, size_t tPitch, int devIDT, const void * s, size_t sPitch, int devIDS, size_t mSize, int n, XStream * stream);
extern void * XMemAlloc(int devID, size_t size); extern void * XMemAlloc(int devID, size_t size);
extern void * XMemAllocOnDev(int devID, size_t size); extern void * XMemAllocOnDev(int devID, size_t size);
extern void XMemFree(int devID, void * p); extern void XMemFree(int devID, void * p);
......
...@@ -42,12 +42,11 @@ where trans() return the transposed matrix if the flag is fired ...@@ -42,12 +42,11 @@ where trans() return the transposed matrix if the flag is fired
>> alpha - a coefficient >> alpha - a coefficient
>> beta - another coefficient >> beta - another coefficient
>> parallelRunner - parallel processing module >> parallelRunner - parallel processing module
>> stream - the string for creating the job pipeline
*/ */
void _MatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, void _MatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA,
const XTensor * b, MATRIX_TRANS_TYPE transposedB, const XTensor * b, MATRIX_TRANS_TYPE transposedB,
XTensor * c, DTYPE alpha, DTYPE beta, XTensor * c, DTYPE alpha, DTYPE beta,
XPRunner * parallelRunner, XStream * stream) XPRunner * parallelRunner)
{ {
CheckNTErrors((a && b && c), "Empty input tensors!"); CheckNTErrors((a && b && c), "Empty input tensors!");
CheckNTErrors((a->dataType == b->dataType), "Input tensors should have the same data type!"); CheckNTErrors((a->dataType == b->dataType), "Input tensors should have the same data type!");
...@@ -69,7 +68,7 @@ void _MatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, ...@@ -69,7 +68,7 @@ void _MatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA,
#ifdef USE_CUDA #ifdef USE_CUDA
if (a->devID >= 0 || b->devID >= 0 || c->devID >= 0) { if (a->devID >= 0 || b->devID >= 0 || c->devID >= 0) {
_CudaMatrixMul2D(a, transposedA, b, transposedB, c, alpha, beta, stream); _CudaMatrixMul2D(a, transposedA, b, transposedB, c, alpha, beta);
return; return;
} }
#endif #endif
......
...@@ -119,11 +119,10 @@ where trans() return the transposed matrix if the flag is fired ...@@ -119,11 +119,10 @@ where trans() return the transposed matrix if the flag is fired
>> c - where we put a*b >> c - where we put a*b
>> alpha - a coefficient >> alpha - a coefficient
>> beta - another coefficient >> beta - another coefficient
>> stream - the string for creating the job pipeline
*/ */
void _CudaMatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, void _CudaMatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA,
const XTensor * b, MATRIX_TRANS_TYPE transposedB, const XTensor * b, MATRIX_TRANS_TYPE transposedB,
XTensor * c, DTYPE alpha, DTYPE beta, XStream * stream) XTensor * c, DTYPE alpha, DTYPE beta)
{ {
int an = transposedA == X_TRANS ? a->dimSize[1] : a->dimSize[0]; int an = transposedA == X_TRANS ? a->dimSize[1] : a->dimSize[0];
int am = transposedA == X_TRANS ? a->dimSize[0] : a->dimSize[1]; int am = transposedA == X_TRANS ? a->dimSize[0] : a->dimSize[1];
...@@ -152,10 +151,6 @@ void _CudaMatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, ...@@ -152,10 +151,6 @@ void _CudaMatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA,
cublasHandle_t * handle = a->mem == NULL ? GDevs.GetCudaHandle(a->devID) : a->mem->GetCublasHandle(); cublasHandle_t * handle = a->mem == NULL ? GDevs.GetCudaHandle(a->devID) : a->mem->GetCublasHandle();
/* !!!! might have problems */
if (stream != NULL)
cublasSetStream(*handle, stream->stream);
if (beta == 0) if (beta == 0)
c->SetZeroAll(); c->SetZeroAll();
......
...@@ -43,7 +43,7 @@ c = trans(a) * trans(b) * alpha + c * beta ...@@ -43,7 +43,7 @@ c = trans(a) * trans(b) * alpha + c * beta
where trans() return the transposed matrix if the flag is fired where trans() return the transposed matrix if the flag is fired
*/ */
void _CudaMatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, const XTensor * b, MATRIX_TRANS_TYPE transposedB, XTensor * c, void _CudaMatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, const XTensor * b, MATRIX_TRANS_TYPE transposedB, XTensor * c,
DTYPE alpha = (DTYPE)1.0, DTYPE beta = 0, XStream * stream = NULL); DTYPE alpha = (DTYPE)1.0, DTYPE beta = 0);
#endif // USE_CUDA #endif // USE_CUDA
......
...@@ -32,7 +32,7 @@ c = trans(a) * trans(b) * alpha + c * beta ...@@ -32,7 +32,7 @@ c = trans(a) * trans(b) * alpha + c * beta
where trans() return the transposed matrix if the flag is fired where trans() return the transposed matrix if the flag is fired
*/ */
void _MatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, const XTensor * b, MATRIX_TRANS_TYPE transposedB, XTensor * c, void _MatrixMul2D(const XTensor * a, MATRIX_TRANS_TYPE transposedA, const XTensor * b, MATRIX_TRANS_TYPE transposedB, XTensor * c,
DTYPE alpha = (DTYPE)1.0, DTYPE beta = 0, XPRunner * parallelRunner = NULL, XStream * stream = NULL); DTYPE alpha = (DTYPE)1.0, DTYPE beta = 0, XPRunner * parallelRunner = NULL);
} // namespace nts(NiuTrans.Tensor) } // namespace nts(NiuTrans.Tensor)
......
...@@ -32,9 +32,8 @@ copy s to t ...@@ -32,9 +32,8 @@ copy s to t
>> s - source >> s - source
>> t - target >> t - target
>> stream - the stream for creating the job pipeline
*/ */
void _CopyValues(const XTensor * s, XTensor * t, XStream * stream) void _CopyValues(const XTensor * s, XTensor * t)
{ {
if(s->data == NULL && t->data == NULL) if(s->data == NULL && t->data == NULL)
return; return;
...@@ -55,7 +54,7 @@ void _CopyValues(const XTensor * s, XTensor * t, XStream * stream) ...@@ -55,7 +54,7 @@ void _CopyValues(const XTensor * s, XTensor * t, XStream * stream)
#ifdef USE_CUDA #ifdef USE_CUDA
if (s->devID >= 0 || t->devID >= 0) { if (s->devID >= 0 || t->devID >= 0) {
_CudaCopyValues(s, t, stream); _CudaCopyValues(s, t);
return; return;
} }
#endif #endif
...@@ -82,9 +81,8 @@ copy s to t ...@@ -82,9 +81,8 @@ copy s to t
>> sLen - length of the segment >> sLen - length of the segment
>> t - target >> t - target
>> tBeg - beginning of the segment on the target side >> tBeg - beginning of the segment on the target side
>> stream - the stream for creating the job pipeline
*/ */
void _CopyValues(const XTensor * s, const int sBeg, const int sLen, XTensor * t, const int tBeg, XStream * stream) void _CopyValues(const XTensor * s, const int sBeg, const int sLen, XTensor * t, const int tBeg)
{ {
if(s->data == NULL && t->data == NULL) if(s->data == NULL && t->data == NULL)
return; return;
...@@ -108,13 +106,12 @@ void _CopyValues(const XTensor * s, const int sBeg, const int sLen, XTensor * t, ...@@ -108,13 +106,12 @@ void _CopyValues(const XTensor * s, const int sBeg, const int sLen, XTensor * t,
/* /*
copy s to t (rename _CopyValues) copy s to t (rename _CopyValues)
>> s - source >> s - source
>> t - target >> t - target
>> stream - the stream for creating the job pipeline
*/ */
void CopyValues(const XTensor &s, XTensor &t, XStream * stream) void CopyValues(const XTensor &s, XTensor &t)
{ {
_CopyValues(&s, &t, stream); _CopyValues(&s, &t);
} }
/* /*
...@@ -122,16 +119,15 @@ copy s to t (return an XTensor structure) ...@@ -122,16 +119,15 @@ copy s to t (return an XTensor structure)
make a new tensor to keep the result and return it make a new tensor to keep the result and return it
>> s - source >> s - source
>> stream - the stream for creating the job pipeline
<< return - the copyed tensor t << return - the copyed tensor t
*/ */
XTensor CopyValues(const XTensor &s, XStream * stream) XTensor CopyValues(const XTensor &s)
{ {
XTensor t(&s); XTensor t(&s);
t.SetTMPFlag(); t.SetTMPFlag();
/* call _CopyValues function */ /* call _CopyValues function */
_CopyValues(&s, &t, stream); _CopyValues(&s, &t);
/* tensor connection */ /* tensor connection */
if (s.enableGrad) { if (s.enableGrad) {
......
...@@ -32,10 +32,9 @@ namespace nts { // namespace nts(NiuTrans.Tensor) ...@@ -32,10 +32,9 @@ namespace nts { // namespace nts(NiuTrans.Tensor)
copy a range of elements from a source vector to a target vector copy a range of elements from a source vector to a target vector
>> s - source matrix >> s - source matrix
>> t - target matrix >> t - target matrix
>> stream - the stream for creating the job pipeline
<< return - succeed or not << return - succeed or not
*/ */
void _CudaCopyValues(const XTensor * s, XTensor * t, XStream * stream) void _CudaCopyValues(const XTensor * s, XTensor * t)
{ {
CheckNTErrors(s != NULL && t != NULL, "The input tensor and output tensor must be nonempty!"); CheckNTErrors(s != NULL && t != NULL, "The input tensor and output tensor must be nonempty!");
CheckNTErrors(s->dataType == t->dataType, "Unmatched data type!"); CheckNTErrors(s->dataType == t->dataType, "Unmatched data type!");
...@@ -45,10 +44,7 @@ void _CudaCopyValues(const XTensor * s, XTensor * t, XStream * stream) ...@@ -45,10 +44,7 @@ void _CudaCopyValues(const XTensor * s, XTensor * t, XStream * stream)
/* dense -> dense */ /* dense -> dense */
if (!s->isSparse && !t->isSparse) { if (!s->isSparse && !t->isSparse) {
if (stream == NULL) XMemCopy(t->data, t->devID, s->data, s->devID, s->unitSize * s->unitNum);
XMemCopy(t->data, t->devID, s->data, s->devID, s->unitSize * s->unitNum);
else
XMemCopyAsync(t->data, t->devID, s->data, s->devID, s->unitSize * s->unitNum, stream->stream, stream->devID);
} }
/* dense -> sparse */ /* dense -> sparse */
else if (!s->isSparse && t->isSparse && else if (!s->isSparse && t->isSparse &&
...@@ -72,11 +68,8 @@ void _CudaCopyValues(const XTensor * s, XTensor * t, XStream * stream) ...@@ -72,11 +68,8 @@ void _CudaCopyValues(const XTensor * s, XTensor * t, XStream * stream)
int num = s->unitNumNonZero; int num = s->unitNumNonZero;
int size = sizeof(int) + num * (s->unitSize + sizeof(int)); int size = sizeof(int) + num * (s->unitSize + sizeof(int));
if (stream == NULL) XMemCopy(t->data, t->devID, s->data, s->devID, size);
XMemCopy(t->data, t->devID, s->data, s->devID, size);
else
XMemCopyAsync(t->data, t->devID, s->data, s->devID, size, stream->stream, stream->devID);
t->unitNumNonZero = num; t->unitNumNonZero = num;
} }
else { else {
......
...@@ -29,7 +29,7 @@ namespace nts { // namespace nts(NiuTrans.Tensor) ...@@ -29,7 +29,7 @@ namespace nts { // namespace nts(NiuTrans.Tensor)
#ifdef USE_CUDA #ifdef USE_CUDA
/* copy all elements from a source matrix to a target matrix */ /* copy all elements from a source matrix to a target matrix */
void _CudaCopyValues(const XTensor * s, XTensor * t, XStream * stream = NULL); void _CudaCopyValues(const XTensor * s, XTensor * t);
#endif // USE_CUDA #endif // USE_CUDA
......
...@@ -27,19 +27,19 @@ ...@@ -27,19 +27,19 @@
namespace nts { // namespace nts(NiuTrans.Tensor) namespace nts { // namespace nts(NiuTrans.Tensor)
/* copy s to t */ /* copy s to t */
void _CopyValues(const XTensor * s, XTensor * t, XStream * stream = NULL); void _CopyValues(const XTensor * s, XTensor * t);
/* copy a segment of s to t */ /* copy a segment of s to t */
void _CopyValues(const XTensor * s, const int sBeg, const int sLen, XTensor * t, const int tBeg, XStream * stream = NULL); void _CopyValues(const XTensor * s, const int sBeg, const int sLen, XTensor * t, const int tBeg);
/* copy s to t (rename _CopyValues) */ /* copy s to t (rename _CopyValues) */
void CopyValues(const XTensor &s, XTensor &t, XStream * stream = NULL); void CopyValues(const XTensor &s, XTensor &t);
/* /*
copy s to t (return an XTensor structure) copy s to t (return an XTensor structure)
make a new tensor to keep the result and return it make a new tensor to keep the result and return it
*/ */
XTensor CopyValues(const XTensor &s, XStream * stream = NULL); XTensor CopyValues(const XTensor &s);
} // namespace nts(NiuTrans.Tensor) } // namespace nts(NiuTrans.Tensor)
......
...@@ -96,25 +96,11 @@ void _Split(const XTensor * s, XTensor * t, int whereToSplit, int splitNum) ...@@ -96,25 +96,11 @@ void _Split(const XTensor * s, XTensor * t, int whereToSplit, int splitNum)
} }
} }
else{ else{
#ifdef USE_CUDA
#ifdef STREAMED_MEMCPOPY
XStream * stream = GDevs.GPUs[t->devID].stream;
for (int k = 0; k < splitNum; k++) {
XMemCopy2DAsync((char*)t->data + k * tStep, tPitch, t->devID,
(char*)s->data + k * sStep, sPitch, s->devID,
mSize, n, stream);
}
stream->StreamSynchronize();
#else
for (int k = 0; k < splitNum; k++) { for (int k = 0; k < splitNum; k++) {
XMemCopy2D((char*)t->data + k * tStep, tPitch, t->devID, XMemCopy2D((char*)t->data + k * tStep, tPitch, t->devID,
(char*)s->data + k * sStep, sPitch, s->devID, (char*)s->data + k * sStep, sPitch, s->devID,
mSize, n); mSize, n);
} }
#endif
#else
ShowNTErrors("Please specify USE_CUDA and recompile the code!");
#endif
} }
} }
else { else {
...@@ -321,27 +307,12 @@ void _Split(const XTensor * big, TensorList * smalls, int whereToSplit, int spli ...@@ -321,27 +307,12 @@ void _Split(const XTensor * big, TensorList * smalls, int whereToSplit, int spli
} }
} }
else{ else{
#ifdef USE_CUDA
#ifdef STREAMED_MEMCPOPY
XStream * stream = GDevs.GPUs[big->devID].stream;
for (int k = 0; k < splitNum; k++) {
XTensor * t = (XTensor*)smalls->GetItem(k);
XMemCopy2DAsync((char*)t->data + k * tStep, tPitch, t->devID,
(char*)big->data + k * sStep, sPitch, big->devID,
mSize, n, stream);
}
stream->StreamSynchronize();
#else
for (int k = 0; k < splitNum; k++) { for (int k = 0; k < splitNum; k++) {
XTensor * t = (XTensor*)smalls->GetItem(k); XTensor * t = (XTensor*)smalls->GetItem(k);
XMemCopy2D((char*)t->data + k * tStep, tPitch, t->devID, XMemCopy2D((char*)t->data + k * tStep, tPitch, t->devID,
(char*)big->data + k * sStep, sPitch, big->devID, (char*)big->data + k * sStep, sPitch, big->devID,
mSize, n); mSize, n);
} }
#endif
#else
ShowNTErrors("Please specify USE_CUDA and recompile the code!");
#endif
} }
} }
/* splitting with fewer kernel/api calls??? (i'm not sure about it!! may remove this later) */ /* splitting with fewer kernel/api calls??? (i'm not sure about it!! may remove this later) */
......
...@@ -51,7 +51,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -51,7 +51,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
CheckNTErrors(jobNum != 0, "TODO!"); CheckNTErrors(jobNum != 0, "TODO!");
/* argument list of the jobs */ /* argument list of the jobs */
TensorList * jobArgList = new TensorList(argNum); XList * jobArgList = new XList(argNum);
va_list ap; va_list ap;
va_start(ap, argNum); va_start(ap, argNum);
...@@ -62,8 +62,8 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -62,8 +62,8 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
va_end(ap); va_end(ap);
/* prepare the neccesary argument list for parallel processing */ /* prepare the neccesary argument list for parallel processing */
TensorList * jobs = new TensorList(jobNum); XList * jobs = new XList(jobNum);
TensorList * args = new TensorList(jobNum); XList * args = new XList(jobNum);
int * indexList = new int[jobNum * 4 * 4]; int * indexList = new int[jobNum * 4 * 4];
...@@ -78,7 +78,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -78,7 +78,7 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
*/ */
for (int i = 0; i < jobNum; i++) { for (int i = 0; i < jobNum; i++) {
IntList* indexArgs = new IntList(4); IntList* indexArgs = new IntList(4);
TensorList * blockArgs = new TensorList(argNum); XList * blockArgs = new XList(argNum);
int * blockIndex = indexList + i * 4; int * blockIndex = indexList + i * 4;
indexArgs->Add(blockIndex[0]); indexArgs->Add(blockIndex[0]);
...@@ -89,10 +89,10 @@ void RunParallel2D(XPRunner * parallelRunner, void * job, ...@@ -89,10 +89,10 @@ void RunParallel2D(XPRunner * parallelRunner, void * job,
for (int j = 0; j < argNum; j++) for (int j = 0; j < argNum; j++)
blockArgs->Add(jobArgList->GetItem(j)); blockArgs->Add(jobArgList->GetItem(j));
args->Add((XTensor*)indexArgs); args->Add((void*)indexArgs);
args->Add((XTensor*)blockArgs); args->Add((void*)blockArgs);
jobs->Add((XTensor*)job); jobs->Add((void*)job);
} }
args->count = jobNum * 2; args->count = jobNum * 2;
......
/* NiuTrans.Tensor - an open-source tensor library
* Copyright (C) 2021
* Natural Language Processing Lab, Northeastern University
* and
* NiuTrans Research
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* $Created by: XIAO Tong (xiaotong@mail.neu.edu.cn) 2021-02-23
*
*/
#include "XTrainer.h"
/* the nts (NiuTrans.Tensor) namespace */
namespace nts {
/* constructor */
XTrainer::XTrainer()
{
}
/* de-constructor */
XTrainer::~XTrainer()
{
}
} /* end of the nts (NiuTrans.Tensor) namespace */
\ No newline at end of file
/* NiuTrans.Tensor - an open-source tensor library
* Copyright (C) 2021
* Natural Language Processing Lab, Northeastern University
* and
* NiuTrans Research
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This class organizes the training process of neural models, e.g., nmt and lm models
* Distributed training is supported.
*
* $Created by: XIAO Tong (xiaotong@mail.neu.edu.cn) 2021-02-23
* I start coding in 2021 after one year since I typed last C code.
* BUT i was a GOOD tex writter in 2020 :)
*/
#ifndef __XTRAINER_H__
#define __XTRAINER_H__
#include "../network/XNet.h"
#include "../tensor/XQueue.h"
namespace nts { // namespace nts(NiuTrans.Tensor)
/*
Training of neural networks with gradient methods. Here we suppose that we
are training NLP models. The routine could be:
1). initialize all we need
2). data preparation
3). loop until convergence
a). read a batch of samples from the input file
b). reset the worker
c). forward computation with the input
d). backward computation with respect to the loss
e). collect the gradient (neccessary when several workers are available)
f). update the model (on the server end)
g). distribute the new model to each worker
Here a worker processes a batch of samples one time, and works with
other workers independently. The server is the origanizer. It distriute
the job to the workers and maintain the model.
*/
class XTrainer
{
private:
public:
/* constructor */
XTrainer();
/* de-constructor */
~XTrainer();
};
}
#endif // __XTRAINER_H__
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论