http://blog.csdn.net/tennysonsky/article/details/46490099#
線程池基本原理
在傳統服務器結構中,常是有一個總的監聽線程監聽有沒有新的用戶連接服務器,每當有一個新的用戶進入,服務器就開啟一個新的線程用戶處理這 個用戶的數據包。這個線程只服務于這個用戶,當用戶與服務器端關閉連接以后,服務器端銷毀這個線程。(關于并發服務器更多詳情,請看《并發服務器》)。
然而頻繁地開辟與銷毀線程極大地占用了系統的資源,而且在大量用戶的情況下,系統為了開辟和銷毀線程將浪費大量的時間和資源。線程池提供了一個解決外部大量用戶與服務器有限資源的矛盾。
線程池和傳統的一個用戶對應一個線程的處理方法不同,它的基本思想就是在程序開始時就在內存中開辟一些線程,線程的數目是固定的,他們獨自形成一個類,屏蔽了對外的操作,而服務器只需要將數據包交給線程池就可以了。當有新的客戶請求到達時,不是新創建一個線程為其服務,而是從“池子”中選擇一個空閑的線程為新的客戶請求服務,服務完畢后,線程進入空閑線程池中。如果沒有線程空閑的話,就將數據包暫時積累, 等待線程池內有線程空閑以后再進行處理。通過對多個任務重用已經存在的線程對象,降低了對線程對象創建和銷毀的開銷。當客戶請求 時,線程對象已經存在,可以提高請求的響應時間,從而整體地提高了系統服務的表現。
線程池應用實例
一般來說實現一個線程池主要包括以下幾個組成部分:
1)線程管理器:用于創建并管理線程池。
2)工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在池中,這些初始化的線程一般處于空閑狀態,一般不占用 CPU,占用較小的內存空間。
3)任務接口:每個任務必須實現的接口,當線程池的任務隊列中有可執行任務時,被空閑的工作線程調去執行(線程的閑與忙是通過互斥量實現的),把任務抽象出來形成接口,可以做到線程池與具體的任務無關。
4)任務隊列:用來存放沒有處理的任務,提供一種緩沖機制,實現這種結構有好幾種方法,常用的是隊列,主要運用先進先出原理,另外一種是鏈表之類的數據結構,可以動態的為它分配內存空間,應用中比較靈活,此教程就是用到的鏈表。
什么時候需要創建線程池呢?簡單的說,如果一個應用需要頻繁的創建和銷毀線程,而任務執行的時間又非常短,這樣線程創建和銷毀的帶來的開銷就不容忽視,這時也是線程池該出場的機會了。如果線程創建和銷毀時間相比任務執行時間可以忽略不計,則沒有必要使用線程池了。
線程池實現示例代碼如下:
thread_pool.h 的示例代碼:
- #ifndef?__THREAD_POOL_H__??
- #define?__THREAD_POOL_H__??
- ??
- #include?<pthread.h>??
- ??
- ?/*********************************************************************?
- *?任務回調函數,也可根據需要自行修改?
- *********************************************************************/??
- typedef?void?*(*pool_task_f)(void?*arg);??
- ??
- /*********************************************************************?
- *?任務句柄?
- *********************************************************************/??
- typedef?struct?_task{??
- ????pool_task_f?process;/*回調函數,任務運行時會調用此函數,注意也可聲明成其它形式*/??
- ????void?*arg;?????/*回調函數的參數*/??
- ????struct?_task?*next;??
- }pool_task;??
- ??
- /*********************************************************************?
- *?線程池句柄?
- *********************************************************************/??
- typedef?struct??
- {??
- ????pthread_t?*threadid;????????/*?線程號?*/??
- ????int?threads_limit;??????????/*?線程池中允許的活動線程數目?*/??
- ????int?destroy_flag;???????????/*?是否銷毀線程池?,?0銷毀,1不銷毀*/??
- ????pool_task?*queue_head;??????/*?鏈表結構,線程池中所有等待任務?*/??
- ????int?task_in_queue;??????????/*?當前等待隊列的任務數目?*/??
- ????pthread_mutex_t?queue_lock;?/*?鎖?*/??
- ????pthread_cond_t?queue_ready;?/*?條件變量?*/??
- }pool_t;??
- ??
- /*********************************************************************?
- *功能:????????初始化線程池結構體并創建線程?
- *參數:?????????
- ????????????pool:線程池句柄?
- ????????????threads_limit:線程池中線程的數量?
- *返回值:???無?
- *********************************************************************/??
- void?pool_init(pool_t?*pool,?int?threads_limit);??
- ??
- /*********************************************************************?
- *功能:????????銷毀線程池,等待隊列中的任務不會再被執行,?
- ????????????但是正在運行的線程會一直,把任務運行完后再退出?
- *參數:????????線程池句柄?
- *返回值:???成功:0,失敗非0?
- *********************************************************************/??
- int?pool_uninit(pool_t?*pool);??
- ??
- /*********************************************************************?
- *功能:????????向線程池中添加一個任務?
- *參數:?????????
- ????????????pool:線程池句柄?
- ????????????process:任務處理函數?
- ????????????arg:任務參數?
- *返回值:???0?
- *********************************************************************/??
- int?pool_add_task(pool_t?*pool,?pool_task_f?process,?void?*arg);??
- ??
- ??
- #endif??
- #include?<stdio.h>??
- #include?<stdlib.h>??
- #include?<pthread.h>??
- #include?<assert.h>??
- ??
- #include?"thread_pool.h"??
- ??
- static?void?*pool_thread_server(void?*arg);??
- ??
- /*********************************************************************?
- *功能:????????初始化線程池結構體并創建線程?
- *參數:?????????
- ????????????pool:線程池句柄?
- ????????????threads_limit:線程池中線程的數量?
- *返回值:???無?
- *********************************************************************/??
- void?pool_init(pool_t?*pool,?int?threads_limit)??
- {??
- ????pool->threads_limit?=?threads_limit;??
- ????pool->queue_head?=?NULL;??
- ????pool->task_in_queue?=?0;??
- ????pool->destroy_flag?=?0;??
- ????/*創建存放線程ID的空間*/??
- ????pool->threadid?=?(pthread_t?*)calloc(threads_limit,?sizeof(pthread_t));??
- ????int?i?=?0;??
- ????/*初始化互斥鎖和條件變量*/??
- ????pthread_mutex_init(&(pool->queue_lock),?NULL);??
- ????pthread_cond_init(&(pool->queue_ready),?NULL);??
- ????/*循環創建threads_limit個線程*/??
- ????for?(i?=?0;?i?<?threads_limit;?i++){??
- ????????pthread_create(&(pool->threadid[i]),?NULL,?pool_thread_server,?pool);??
- ????}??
- ????return;??
- }??
- ??
- /*********************************************************************?
- *功能:????????銷毀線程池,等待隊列中的任務不會再被執行,?
- ????????????但是正在運行的線程會一直,把任務運行完后再退出?
- *參數:????????線程池句柄?
- *返回值:???成功:0,失敗非0?
- *********************************************************************/??
- int?pool_uninit(pool_t?*pool)??
- {??
- ????pool_task?*head?=?NULL;??
- ????int?i;??
- ??????
- ????pthread_mutex_lock(&(pool->queue_lock));??
- ????if(pool->destroy_flag)/*?防止兩次調用?*/??
- ????????return?-1;??
- ????pool->destroy_flag?=?1;??
- ????pthread_mutex_unlock(&(pool->queue_lock));??
- ????/*?喚醒所有等待線程,線程池要銷毀了?*/??
- ????pthread_cond_broadcast(&(pool->queue_ready));??
- ????/*?阻塞等待線程退出,否則就成僵尸了?*/??
- ????for?(i?=?0;?i?<?pool->threads_limit;?i++)??
- ????????pthread_join(pool->threadid[i],?NULL);??
- ????free(pool->threadid);??
- ????/*?銷毀等待隊列?*/??
- ????pthread_mutex_lock(&(pool->queue_lock));??
- ????while(pool->queue_head?!=?NULL){??
- ????????head?=?pool->queue_head;??
- ????????pool->queue_head?=?pool->queue_head->next;??
- ????????free(head);??
- ????}??
- ????pthread_mutex_unlock(&(pool->queue_lock));??
- ????/*條件變量和互斥量也別忘了銷毀*/??
- ????pthread_mutex_destroy(&(pool->queue_lock));??
- ????pthread_cond_destroy(&(pool->queue_ready));??
- ????return?0;??
- }??
- ??
- /*********************************************************************?
- *功能:????????向任務隊列中添加一個任務?
- *參數:?????????
- ????????????pool:線程池句柄?
- ????????????process:任務處理函數?
- ????????????arg:任務參數?
- *返回值:???無?
- *********************************************************************/??
- static?void?enqueue_task(pool_t?*pool,?pool_task_f?process,?void?*arg)??
- {??
- ????pool_task?*task?=?NULL;??
- ????pool_task?*member?=?NULL;??
- ??????
- ????pthread_mutex_lock(&(pool->queue_lock));??
- ??????
- ????if(pool->task_in_queue?>=?pool->threads_limit){??
- ????????printf("task_in_queue?>?threads_limit!\n");??
- ????????pthread_mutex_unlock?(&(pool->queue_lock));??
- ????????return;??
- ????}??
- ??????
- ????task?=?(pool_task?*)calloc(1,?sizeof(pool_task));??
- ????assert(task?!=?NULL);??
- ????task->process?=?process;??
- ????task->arg?=?arg;??
- ????task->next?=?NULL;??
- ????pool->task_in_queue++;??
- ????member?=?pool->queue_head;??
- ????if(member?!=?NULL){??
- ????????while(member->next?!=?NULL)??/*?將任務加入到任務鏈連的最后位置.?*/??
- ????????????member?=?member->next;??
- ????????member->next?=?task;??
- ????}else{??
- ????????pool->queue_head?=?task;?/*?如果是第一個任務的話,就指向頭?*/??
- ????}??
- ????printf("\ttasks?%d\n",?pool->task_in_queue);??
- ????/*?等待隊列中有任務了,喚醒一個等待線程?*/??
- ????pthread_cond_signal?(&(pool->queue_ready));??
- ????pthread_mutex_unlock?(&(pool->queue_lock));??
- }??
- ??
- /*********************************************************************?
- *功能:????????從任務隊列中取出一個任務?
- *參數:????????線程池句柄?
- *返回值:???任務句柄?
- *********************************************************************/??
- static?pool_task?*dequeue_task(pool_t?*pool)??
- {??
- ????pool_task?*task?=?NULL;??
- ??????
- ????pthread_mutex_lock(&(pool->queue_lock));??
- ????/*?判斷線程池是否要銷毀了?*/??
- ????if(pool->destroy_flag){??
- ????????pthread_mutex_unlock(&(pool->queue_lock));??
- ????????printf("thread?0x%lx?will?be?destroyed\n",?pthread_self());??
- ????????pthread_exit(NULL);??
- ????}??
- ????/*?如果等待隊列為0并且不銷毀線程池,則處于阻塞狀態?*/??
- ????if(pool->task_in_queue?==?0){??
- ????????while((pool->task_in_queue?==?0)?&&?(!pool->destroy_flag)){??
- ????????????printf("thread?0x%lx?is?waitting\n",?pthread_self());??
- ????????????/*?注意:pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒后會加鎖?*/??
- ????????????pthread_cond_wait(&(pool->queue_ready),?&(pool->queue_lock));??
- ????????}??
- ????}else{??
- ????????/*?等待隊列長度減去1,并取出隊列中的第一個元素?*/??
- ????????pool->task_in_queue--;??
- ????????task?=?pool->queue_head;??
- ????????pool->queue_head?=?task->next;??
- ????????printf("thread?0x%lx?received?a?task\n",?pthread_self());??
- ????}??
- ????pthread_mutex_unlock(&(pool->queue_lock));??
- ????return?task;??
- }??
- ??
- /*********************************************************************?
- *功能:????????向線程池中添加一個任務?
- *參數:?????????
- ????????????pool:線程池句柄?
- ????????????process:任務處理函數?
- ????????????arg:任務參數?
- *返回值:???0?
- *********************************************************************/??
- int?pool_add_task(pool_t?*pool,?pool_task_f?process,?void?*arg)??
- {??
- ????enqueue_task(pool,?process,?arg);??
- ????return?0;??
- }??
- ??
- /*********************************************************************?
- *功能:????????線程池服務程序?
- *參數:????????略?
- *返回值:???略?
- *********************************************************************/??
- static?void?*pool_thread_server(void?*arg)??
- {??
- ????pool_t?*pool?=?NULL;??
- ??????
- ????pool?=?(pool_t?*)arg;??
- ????while(1){??
- ????????pool_task?*task?=?NULL;??
- ????????task?=?dequeue_task(pool);??
- ????????/*調用回調函數,執行任務*/??
- ????????if(task?!=?NULL){??
- ????????????printf?("thread?0x%lx?is?busy\n",?pthread_self());??
- ????????????task->process(task->arg);??
- ????????????free(task);??
- ????????????task?=?NULL;??
- ????????}??
- ????}??
- ????/*這一句應該是不可達的*/??
- ????pthread_exit(NULL);????
- ????return?NULL;??
- }??
下面是測試代碼:
- #include?<stdio.h>??
- #include?<unistd.h>??
- ??
- #include?"thread_pool.h"??
- ??
- void?*task_test(void?*arg)??
- {??
- ????printf("\t\tworking?on?task?%d\n",?(int)arg);??
- ????sleep(1);???????????/*休息一秒,延長任務的執行時間*/??
- ????return?NULL;??
- }??
- ??
- void?thread_pool_demo(void)??
- {??
- ????pool_t?pool;??
- ????int?i?=?0;??
- ??
- ????pool_init(&pool,?2);//初始化一個線程池,其中創建2個線程??
- ????sleep(1);??
- ????for(i?=?0;?i?<?5;?i++){??
- ????????sleep(1);??
- ????????pool_add_task(&pool,?task_test,?(void?*)i);//添加一個任務??
- ????}??
- ????sleep(4);??
- ??
- ????pool_uninit(&pool);//刪除線程池??
- }??
- ??
- int?main?(int?argc,?char?*argv[])??
- {????
- ????thread_pool_demo();??
- ????return?0;??
- }??
運行結果如下:
本教程示例代碼下載請點此處。
參考資料:http://blog.csdn.net/hubi0952