一.線程池開發框架
我所開發的線程池由以下幾部分組成:?
1.工作中的線程。也就是線程池中的線程,主要是執行分發來的task。?
2.管理線程池的監督線程。這個線程的創建獨立于線程池的創建,按照既定的管理方法進行管理線程池中的所有線程,主要任務是監聽任務的到來,喚醒線程池中的空閑線程,分發任務;如果任務增多,動態的創建一批線程加入原來的線程池中,進行工作;適當的銷毀線程,減少系統開銷。
這個線程池的開發涉及了以下幾個數據結構、設計模式和軟件結構:?
1.任務隊列。整個框架有兩個任務隊列,1)等待任務隊列(以下簡稱wait_queue)。2)正在執行中任務隊列(以下簡稱doing_queue)。隊列采用先進先出的數據結構。當一個任務來到時,會先被push到wait_queue,監督線程會一直監督wait_queue中的元素,一旦有任務,便會pop wait_queue中的元素,再push到doing_queue中。?
2.單例設計模式。線程池的類被設計成單例模式,防止一個程序中多次創建線程池對象,出現紊亂現象,用戶只能調用靜態方法初始化得到線程池的對象。?
3.回調函數。回調函數的設計主要是為了能夠把任務接口(也就是需要線程去執行的任務,通常是一個寫好的函數方法)提前初始化注冊,然后延遲調用。
下圖是所用類的的大概結構圖:?
程序整體結構如下:?
二.線程池開發具體實現
1.思路分析。?
線程池顧名思義就是同時有數個線程處于待執行狀態,編碼上的初始化的實現無非就是循環創建指定數量的線程,然后等待任務的到來喚醒空閑線程。以下是ThreadPoll的類:
class ThreadPool
{private:pthread_t *_thread; //線程池pthread_t *_thread_bak; //備用線程池,當任務過多會自動創建pthread_t taskqueue_thread; //管理線程int u4sequence;int wait_time;int CANCEL_SIGNAL;bool F_improve_ThrdPoll; //備用線程池創建標志Mutex *mutex; //互斥鎖CondThread *task_cond; //條件變量TaskFuncCallback callback; //聲明回調函數,即線程所需要執行的函數int _num_threads; //線程池數量//構造函數的實現為private屬性,禁止用戶用構造函數初始化對象。ThreadPool(int num_threads):_num_threads(num_threads),F_improve_ThrdPoll(0),wait_time(0),u4sequence(0),CANCEL_SIGNAL(0){init(); //一些變量的創建ManagerThreadInit(); //創建管理線程ThreadPoolInit(num_threads);//初始化線程池}public:LVQueue<TASK_QUEUE_T> task_wait_queue;//創建任務等待隊列LVQueue<TASK_QUEUE_T> task_doing_queue;//創建任務執行隊列~ThreadPool(){delete(mutex);delete(task_cond);delete(_thread);delete(_thread_bak);}//用戶通過調用此靜態方法得到線程池的對象(單例模式)static ThreadPool* createThreadPool(int num){ static ThreadPool *_pool = new ThreadPool(num);return _pool;}void init(){_thread = new pthread_t[_num_threads];mutex = new Mutex();task_cond = new CondThread();}API_RETURN_TYPE_T ThreadPoolInit(int num_thr);//線程池初始化,核心接口API_RETURN_TYPE_T run(); //線程執行函數API_RETURN_TYPE_T ManagerThreadInit();//管理線程初始化API_RETURN_TYPE_T managerThread();線程執行函數API_RETURN_TYPE_T wakeupThread(TaskFuncCallback p_func);//用戶調用此接口喚醒線程執行任務,參數為傳入的任務執行函數地址API_RETURN_TYPE_T AutoComputeOptimumThreadNum(int wait_que_num,int &_u4sequence);//一種自動計算需要增加多少線程到線程池,當任務繁多時會調到。API_RETURN_TYPE_T ThreadJoin();//所有線程阻塞API_RETURN_TYPE_T ReleaseSubThreadPool();//釋放備用線程池API_RETURN_TYPE_T DestroyThreadPool();//釋放線程池
};
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
下面是cpp的實現:
//線程池初始化的實現
API_RETURN_TYPE_T ThreadPool::ThreadPoolInit(int num_thr)
{printf("num = %d.\n",num_thr); if(num_thr == 0){return API_SUCCESS;}//設置創建線程的屬性為DETACHED,線程被釋放后,資源會被回收。pthread_attr_t attr;pthread_attr_init (&attr);pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);int i = 0;if(F_improve_ThrdPoll == 1)//備用線程池創建的標志位,初始化線程不會走這邊{_thread_bak = new pthread_t[num_thr];for(;i < num_thr;i++){if(RET_OK != pthread_create(&(_thread_bak[i]), &attr, &thread_func, this)){return API_FAIL;}}return API_SUCCESS;}//create thread pool.for(;i < num_thr;i++){if(RET_OK != pthread_create(&(_thread[i]), &attr, &thread_func, this)){return API_FAIL;}}pthread_attr_destroy (&attr);return API_SUCCESS;
}
//創建的所有線程都會跑到這個線程函數,最終指向run
void *thread_func(void *arg)
{ThreadPool *thread = (ThreadPool*)arg;thread->run();
}
//線程池核心內容
API_RETURN_TYPE_T ThreadPool::run()
{//printf("this is run thread.\n");void *arg;while(1)//線程池內部一直在循環{printf ("thread 0x%x begin\n", pthread_self ()); this->mutex->lock();//上鎖if((CANCEL_SIGNAL == 0) && (task_doing_queue.length() < _num_threads || F_improve_ThrdPoll == 1) )//以上條件第一個是備用線程釋放標志,第二個是任務執行隊列數量為0,第三個是備用線程創建標志(或的關系,為了滿足新增線程進入wait狀態),第一次這些條件都會滿足{printf ("thread 0x%x is waiting\n", pthread_self ()); this->task_cond->wait(mutex);//每次創建的新線程都會阻塞到這里,執行完任務的線程也會阻塞在這里,等待喚醒的signal,雖然是阻塞在這里,但是互斥鎖已經是unlock狀態了,這是linux的機制。}usleep(200000);this->mutex->unlock();//解鎖pthread_testcancel();//設置取消線程點pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);//(1)頭尾保護下面這段code,保證在執行任務的時候屏蔽外部的線程取消信號if(callback != NULL){callback(arg); //執行回調函數,此時的回調函數應該指向當前任務執行函數的地址callback = NULL;}task_doing_queue.popFront();//執行完任務,任務執行隊列出隊列元素一個pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);//同(1)printf("wait len =%d.\n",task_wait_queue.length());printf("thread 0x%x done.length() = %d.\n",pthread_self (),task_doing_queue.length());}return API_SUCCESS;}//管理線程的初始化
API_RETURN_TYPE_T ThreadPool::ManagerThreadInit()
{//create manager threadpool thread.if(RET_OK != pthread_create(&taskqueue_thread, NULL, &thread_task_queue, this)){ return API_FAIL;}return API_SUCCESS;
}
//管理線程的執行函數
void *thread_task_queue(void *arg)
{ ThreadPool *thread = (ThreadPool*)arg;thread->managerThread();}
//管理線程的核心內容
API_RETURN_TYPE_T ThreadPool::managerThread()
{while(1){usleep(400000);printf("managerThread!.\n");this->mutex->lock();//上鎖TASK_QUEUE_T task1; //初始化兩個隊列元素對象TASK_QUEUE_T task2;task1.sTask = TASK_DOING;if(task_wait_queue.length() != 0){//printf("len =%d.\n",task_doing_queue.length());if(task_doing_queue.length() < _num_threads)//只要任務執行隊列的數目小于線程池中線程的總數,就會執行{task2 = task_wait_queue.popFront();//pop任務等待隊列的元素,并得到這個元素的對象callback = task2.cTaskFunc;//獲得任務的執行函數地址task_doing_queue.pushBack(task1);//將任務push到任務執行隊列task_cond->signal();//發送信號,喚醒一個空閑線程printf("signal cond.\n");}}//當人任務隊列的等待任務數量大于線程池線程總數時會執行if(task_wait_queue.length() >= _num_threads && F_improve_ThrdPoll == 0){//通過簡單的機制計算當前是否需要另外新增線程到線程池AutoComputeOptimumThreadNum(task_wait_queue.length(),u4sequence);F_improve_ThrdPoll = 1;ThreadPoolInit(u4sequence);//如果需要新增線程,u4sequence不為0. sleep(2);//緩沖線程創建}if(F_improve_ThrdPoll == 1 ){//檢測到備用線程池的創建while(task_wait_queue.length() == 0 && task_doing_queue.length() == 0){//也就是當前任務等待隊列和任務執行隊列都沒有任務時printf("no task!.\n");usleep(500000);wait_time++;//計時等待一段時間if(wait_time == NO_TASK_TIMEOUT){this->mutex->unlock();ReleaseSubThreadPool();//釋放備用線程池printf("release!.\n");F_improve_ThrdPoll = 2;wait_time = 0;break;}}wait_time = 0;}if(F_improve_ThrdPoll != 2)this->mutex->unlock();}return API_SUCCESS;}//自動計算是否需要創建新的線程池的簡單機制,后續會結合讀取當前CPU的使用率進一步優化此機制
API_RETURN_TYPE_T ThreadPool::AutoComputeOptimumThreadNum(int wait_que_num,int &_u4sequence)
{if(wait_que_num >= 4*_num_threads){_u4sequence = _num_threads;}else if(wait_que_num >= 2*_num_threads){_u4sequence = _num_threads/2;}else{_u4sequence = 0;}return API_SUCCESS;
}//釋放備用線程池,待優化API_RETURN_TYPE_T ThreadPool::ReleaseSubThreadPool()
{this->mutex->lock();CANCEL_SIGNAL = 1;this->mutex->unlock();task_cond->broadcast();for(int i = 0;i < _num_threads;i++){if(RET_OK != pthread_cancel(_thread_bak[i])){return API_FAIL;}}this->mutex->lock();printf("4444.\n");CANCEL_SIGNAL = 0;this->mutex->unlock();return API_SUCCESS;
}
//摧毀線程池,待優化
API_RETURN_TYPE_T ThreadPool::DestroyThreadPool()
{//first ,destroy manager thread.if(RET_OK != pthread_cancel(taskqueue_thread)){return API_FAIL;}return API_SUCCESS;
}API_RETURN_TYPE_T ThreadPool::ThreadJoin()
{for(int i = 0;i < _num_threads;i++){pthread_join(_thread[i],NULL);}pthread_join(taskqueue_thread,NULL);return API_SUCCESS;}
//用戶調用此函數接口喚醒
API_RETURN_TYPE_T ThreadPool::wakeupThread(TaskFuncCallback p_func)
{printf("wakeupThread in .\n");this->mutex->lock();TASK_QUEUE_T task;task.cTaskFunc = p_func;//將函數執行地址賦值到隊列元素中task.sTask = TASK_WAIT;if(task_wait_queue.length() < MAX_TASK_NUM ){ this->task_wait_queue.pushBack(task); //push任務到等待任務隊列中}else{//線程池數量過多,此機制后續會優化printf("Current Thread Buffer is full!Please wait a moment!\n");this->mutex->unlock();return API_FAIL;}this->mutex->unlock();return API_SUCCESS;}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
下面新加的關于LVQueue的實現:
#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDEDtemplate < typename T >
class LVQueue {friend struct Iterator;struct Item {T value;Item * next;Item * prev;Item(T & v) : value(v), next(NULL), prev(NULL) {}};Item * head;Item * tail;int count;Item * remove(Item * p) {if (!p)return NULL;if (!p->prev)head = p->next;elsep->prev->next = p->next;if (!p->next)tail = p->prev;elsep->next->prev = p->prev;p->next = NULL;p->prev = NULL;count--;if (count == 0) {head = tail = NULL;}return p;}void moveToHead(Item * item) {Item * p = remove(item);if (head) {head->prev = p;p->next = head;head = p;} else {head = tail = p;}count++;}
public:struct Iterator {private:LVQueue * queue;Item * currentItem;public:Iterator(const Iterator & v) {queue = v.queue;currentItem = v.currentItem;}Iterator(LVQueue * _queue) : queue(_queue), currentItem(NULL) {}T get() { return currentItem ? currentItem->value : T(); }void set(T value) { if (currentItem) currentItem->value = value; }bool next() {if (!currentItem) {// first timecurrentItem = queue->head;} else {// continuecurrentItem = currentItem->next;}return currentItem != NULL;}T remove() {if (!currentItem)return T();Item * next = currentItem->next;Item * p = queue->remove(currentItem);currentItem = next;T res = p->value;delete p;return res;}void moveToHead() {if (currentItem)queue->moveToHead(currentItem);}};public:Iterator iterator() { return Iterator(this); }LVQueue() : head(NULL), tail(NULL), count(0) {}~LVQueue() { clear(); }
// T & operator [] (int index) {
// Item * p = head;
// for (int i = 0; i < index; i++) {
// if (!p)
// return
// }
// }int length() { return count; }void pushBack(T item) {Item * p = new Item(item);if (tail) {tail->next = p;p->prev = tail;tail = p;} else {head = tail = p;}count++;}void pushFront(T item) {Item * p = new Item(item);if (head) {head->prev = p;p->next = head;head = p;} else {head = tail = p;}count++;}T popFront() {if (!head)return T();Item * p = remove(head);T res = p->value;delete p;return res;}T popBack() {if (!tail)return T();Item * p = remove(tail);T res = p->value;delete p;return res;}void clear() {while (head) {Item * p = head;head = p->next;delete p;}head = NULL;tail = NULL;count = 0;}
};#endif // LVQUEUE_H_INCLUDED
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
以下是簡單的test程序:
ThreadPool *thread3 = ThreadPool::createThreadPool(8);//得到線程池對象printf("task coming.\n");//test threadpoolfor(int i = 0;i < 15;i++){thread3->wakeupThread(thread11_func);//每隔一秒喚醒線程,thread11_func一個函數的地址sleep(1);thread3->wakeupThread(thread3_func);}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
下面是test程序運行的結果,線程喚醒無一秒間隔
num = 8.
task coming.
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
wakeupThread in .
thread 0xd528c700 begin
thread 0xd528c700 is waiting
thread 0xd4a8b700 begin
thread 0xd4a8b700 is waiting
thread 0xd428a700 begin
thread 0xd428a700 is waiting
thread 0xd5a8d700 begin
thread 0xd5a8d700 is waiting
thread 0xd628e700 begin
thread 0xd628e700 is waiting
thread 0xd6a8f700 begin
thread 0xd6a8f700 is waiting
thread 0xd7290700 begin
thread 0xd7290700 is waiting
thread 0xd7a91700 begin
thread 0xd7a91700 is waiting
managerThread!.
signal cond.
num = 4.
thread 0xd2286700 begin
thread 0xd2a87700 begin
thread 0xd3288700 begin
thread 0xd3a89700 begin
thread 0xd2286700 is waiting
thread 0xd2a87700 is waiting
thread 0xd3288700 is waiting
thread 0xd3a89700 is waiting
managerThread!.
signal cond.
managerThread!.
signal cond.
managerThread!.
signal cond.
managerThread!.
signal cond.
managerThread!.
signal cond.
managerThread!.
signal cond.
managerThread!.
signal cond.
managerThread!.
managerThread!.
managerThread!.
managerThread!.
managerThread!.
rate = 4.82897
this is 0 task thread.
wait len =22.
thread 0xd528c700 done.length() = 7.
thread 0xd528c700 begin
thread 0xd528c700 is waiting
managerThread!.
signal cond.
rate = 4.64646
this is 1 task thread.
wait len =21.
thread 0xd4a8b700 done.length() = 7.
thread 0xd4a8b700 begin
thread 0xd4a8b700 is waiting
managerThread!.
signal cond.
rate = 4.64646
this is 2 task thread.
wait len =20.
thread 0xd428a700 done.length() = 7.
thread 0xd428a700 begin
thread 0xd428a700 is waiting
managerThread!.
signal cond.
rate = 4.25101
this is 3 task thread.
wait len =19.
thread 0xd5a8d700 done.length() = 7.
thread 0xd5a8d700 begin
thread 0xd5a8d700 is waiting
managerThread!.
signal cond.
rate = 4.23387
this is 4 task thread.
wait len =18.
thread 0xd628e700 done.length() = 7.
thread 0xd628e700 begin
thread 0xd628e700 is waiting
managerThread!.
signal cond.
rate = 4.04858
this is 5 task thread.
wait len =17.
thread 0xd6a8f700 done.length() = 7.
thread 0xd6a8f700 begin
thread 0xd6a8f700 is waiting
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
可以看到一次性喚醒了30個線程,創建了8個線程的線程池,后來通過優化計算又新增了4個線程到當前線程池中,每喚醒一個線程執行任務,大概是6s的時間,執行完后,又進入等待喚醒信號的狀態。管理線程檢測到當前所有線程都在執行,便會阻塞當前signal行為,直到有空余線程,馬上signal。
這些源代碼還有一些數據類型的封裝還沒公布出來,因為還在優化中,所以準備等到優化完畢,將會把完整的源代碼交到GitHub上托管,小弟資歷尚淺,如有出錯的地方,煩請不吝賜教。