-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy paththreadWrapper.cpp
109 lines (89 loc) · 2.04 KB
/
threadWrapper.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#include "threadWrapper.h"
threadWrapper::threadWrapper(){}
threadWrapper::threadWrapper(int n32BlockSize, int n32BufferSize): m_PendingWorkNum(0), m_ThreadID(0), m_BeginInThreadCallback(NULL)
, m_IfUpdate(false), m_IsStop(false){
}
threadWrapper::~threadWrapper() {
stop();
}
void threadWrapper::send(Buffer* apBuffer){
if (m_IsStop)
{
return;
}
if (NULL != apBuffer){
m_IOMutex.lock();
bool bNeedSig = m_Queue.empty();
m_Queue.push(apBuffer);
m_IOMutex.unlock();
++m_PendingWorkNum;
if (bNeedSig){
m_ConditionVar.notify_one();
}
}
}
void threadWrapper::Run() {
if (m_BeginInThreadCallback){
m_BeginInThreadCallback();
}
while (true){
Consume();
}
}
threadWrapper* threadWrapper::createThread(Callback aCallBack, int n32BlockSize, int n32BufferSize){
threadWrapper* aPtr = new threadWrapper(n32BlockSize, n32BufferSize);
aPtr->m_Callback = aCallBack;
return aPtr;
}
void threadWrapper::setCallBack(Callback aCallBack){
m_Callback = aCallBack;
}
bool threadWrapper::IfEmpty(){
return m_Queue.empty();
}
void threadWrapper::Consume(){
std::unique_lock<std::mutex> lock(m_IOMutex);
while(m_Queue.empty()){
m_ConditionVar.wait(lock);
}
m_SwapQueue.swap(m_Queue);
lock.unlock();
while(!m_SwapQueue.empty()){
Buffer* pBuffer = m_SwapQueue.front();
m_SwapQueue.pop();
m_Callback(pBuffer);
--m_PendingWorkNum;
if (pBuffer){
delete pBuffer;
}
}
}
void threadWrapper::stop(){
if (m_IsStop)
{
return;
}
printf("thread %u is try to stop\n", m_ThreadID);
m_IsStop = true;
m_ConditionVar.notify_one();
}
unsigned threadWrapper::getThreadID(){
return m_ThreadID;
}
void threadWrapper::RunWithUpdate(){
bool ifNotify = false;
bool ifQueueEmpty = false;
while(true){
ifNotify = false;
ifQueueEmpty = false;
}
}
void threadWrapper::start(){
m_Thread = std::thread(&threadWrapper::Run, this);
}
void threadWrapper::setThreadStartCallback(BeginInThreadCallback pBeginInThreadCallback){
m_BeginInThreadCallback = pBeginInThreadCallback;
}
int Buffer::GetMaxLength(){
return m_MaxLength;
}