http://blog.csdn.net/wzjking0929/article/details/20312675
線程池:簡單地說,線程池 就是預先創建好一批線程,方便、快速地處理收到的業務。比起傳統的到來一個任務,即時創建一個線程來處理,節省了線程的創建和回收的開銷,響應更快,效率更高。
?
在linux中,使用的是posix線程庫,首先介紹幾個常用的函數:
1 線程的創建和取消函數
pthread_create
創建線程
pthread_join
合并線程
pthread_cancel
取消線程
2 線程同步函數
pthread_mutex_lock
pthread_mutex_unlock
pthread_cond_signal
pthread_cond_wait
?
關于函數的詳細說明,參考man手冊
?
線程池的實現:
線程池的實現主要分為三部分,線程的創建、添加任務到線程池中、工作線程從任務隊列中取出任務進行處理。
主要有兩個類來實現,CTask,CThreadPool
/**
執行任務的類,設置任務數據并執行
**/
- class?CTask??
- {??
- protected:??
- ?string?m_strTaskName;??//任務的名稱??
- ?void*?m_ptrData;???????//要執行的任務的具體數據??
- public:??
- ?CTask(){}??
- ?CTask(string?taskName)??
- ?{??
- ??this->m_strTaskName?=?taskName;??
- ??m_ptrData?=?NULL;??
- ?}??
- ?virtual?int?Run()=?0;??
- ?void?SetData(void*?data);????//設置任務數據??
- };??
?
任務類是個虛類,所有的任務要從CTask類中繼承 ,實現run接口,run接口中需要實現的就是具體解析任務的邏輯。m_ptrData是指向任務數據的指針,可以是簡單數據類型,也可以是自定義的復雜數據類型。
?
線程池類
/**
線程池
**/
- class?CThreadPool??
- {??
- private:??
- ?vector<CTask*>?m_vecTaskList;?????????//任務列表??
- ?int?m_iThreadNum;????????????????????????????//線程池中啟動的線程數?????????????
- ?static?vector<pthread_t>?m_vecIdleThread;???//當前空閑的線程集合??
- ?static?vector<pthread_t>?m_vecBusyThread;???//當前正在執行的線程集合??
- ?static?pthread_mutex_t?m_pthreadMutex;????//線程同步鎖??
- ?static?pthread_cond_t?m_pthreadCond;????//線程同步的條件變量??
- protected:??
- ?static?void*?ThreadFunc(void?*?threadData);?//新線程的線程函數??
- ?static?int?MoveToIdle(pthread_t?tid);???//線程執行結束后,把自己放入到空閑線程中??
- ?static?int?MoveToBusy(pthread_t?tid);???//移入到忙碌線程中去??
- ?int?Create();??????????//創建所有的線程??
- public:??
- ?CThreadPool(int?threadNum);??
- ?int?AddTask(CTask?*task);??????//把任務添加到線程池中??
- ?int?StopAll();??
- };??
?
當線程池對象創建后,啟動一批線程,并把所有的線程放入空閑列表中,當有任務到達時,某一個線程取出任務并進行處理。
線程之間的同步用線程鎖和條件變量。
這個類的對外接口有兩個:
AddTask函數把任務添加到線程池的任務列表中,并通知線程進行處理。當任務到到時,把任務放入m_vecTaskList任務列表中,并用pthread_cond_signal喚醒一個線程進行處理。
StopAll函數停止所有的線程
?
- ************************************************??
- ??
- 代碼:??
- ??
- ××××××××××××××××××××CThread.h??
- ??
- ???
- ??
- #ifndef?__CTHREAD??
- #define?__CTHREAD??
- #include?<vector>??
- #include?<string>??
- #include?<pthread.h>??
- ??
- using?namespace?std;??
- ??
- /**?
- 執行任務的類,設置任務數據并執行?
- **/??
- class?CTask??
- {??
- protected:??
- ?string?m_strTaskName;??//任務的名稱??
- ?void*?m_ptrData;???????//要執行的任務的具體數據??
- public:??
- ?CTask(){}??
- ?CTask(string?taskName)??
- ?{??
- ??this->m_strTaskName?=?taskName;??
- ??m_ptrData?=?NULL;??
- ?}??
- ?virtual?int?Run()=?0;??
- ?void?SetData(void*?data);????//設置任務數據??
- };??
- ??
- /**?
- 線程池?
- **/??
- class?CThreadPool??
- {??
- private:??
- ?vector<CTask*>?m_vecTaskList;?????????//任務列表??
- ?int?m_iThreadNum;????????????????????????????//線程池中啟動的線程數?????????????
- ?static?vector<pthread_t>?m_vecIdleThread;???//當前空閑的線程集合??
- ?static?vector<pthread_t>?m_vecBusyThread;???//當前正在執行的線程集合??
- ?static?pthread_mutex_t?m_pthreadMutex;????//線程同步鎖??
- ?static?pthread_cond_t?m_pthreadCond;????//線程同步的條件變量??
- protected:??
- ?static?void*?ThreadFunc(void?*?threadData);?//新線程的線程函數??
- ?static?int?MoveToIdle(pthread_t?tid);???//線程執行結束后,把自己放入到空閑線程中??
- ?static?int?MoveToBusy(pthread_t?tid);???//移入到忙碌線程中去??
- ?int?Create();??????????//創建所有的線程??
- public:??
- ?CThreadPool(int?threadNum);??
- ?int?AddTask(CTask?*task);??????//把任務添加到線程池中??
- ?int?StopAll();??
- };??
- ??
- #endif??
- ??
- ???
- ??
- ???
- ??
- ???
- ??
- 類的實現為:??
- ??
- ××××××××××××××××××××CThread.cpp??
- ??
- ???
- ??
- #include?"CThread.h"??
- #include?<string>??
- #include?<iostream>??
- ??
- using?namespace?std;??
- ??
- void?CTask::SetData(void?*?data)??
- {??
- ?m_ptrData?=?data;??
- }??
- ??
- vector<pthread_t>?CThreadPool::m_vecBusyThread;??
- vector<pthread_t>?CThreadPool::m_vecIdleThread;??
- pthread_mutex_t?CThreadPool::m_pthreadMutex?=?PTHREAD_MUTEX_INITIALIZER;??
- pthread_cond_t?CThreadPool::m_pthreadCond?=?PTHREAD_COND_INITIALIZER;??
- ??
- CThreadPool::CThreadPool(int?threadNum)??
- {??
- ?this->m_iThreadNum?=?threadNum;??
- ?Create();??
- }??
- int?CThreadPool::MoveToIdle(pthread_t?tid)??
- {??
- ?vector<pthread_t>::iterator?busyIter?=?m_vecBusyThread.begin();??
- ?while(busyIter?!=?m_vecBusyThread.end())??
- ?{??
- ??if(tid?==?*busyIter)??
- ??{??
- ???break;??
- ??}??
- ??busyIter++;??
- ?}??
- ?m_vecBusyThread.erase(busyIter);??
- ?m_vecIdleThread.push_back(tid);??
- ?return?0;??
- }??
- ??
- int?CThreadPool::MoveToBusy(pthread_t?tid)??
- {??
- ?vector<pthread_t>::iterator?idleIter?=?m_vecIdleThread.begin();??
- ?while(idleIter?!=?m_vecIdleThread.end())??
- ?{??
- ??if(tid?==?*idleIter)??
- ??{??
- ???break;??
- ??}??
- ??idleIter++;??
- ?}??
- ?m_vecIdleThread.erase(idleIter);??
- ?m_vecBusyThread.push_back(tid);??
- ?return?0;??
- }??
- void*?CThreadPool::ThreadFunc(void?*?threadData)??
- {??
- ?pthread_t?tid?=?pthread_self();??
- ?while(1)??
- ?{??
- ??pthread_mutex_lock(&m_pthreadMutex);??
- ??pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);??
- ??cout?<<?"tid:"?<<?tid?<<?"?run"?<<?endl;??
- ??//get?task??
- ??vector<CTask*>*?taskList?=?(vector<CTask*>*)threadData;??
- ??vector<CTask*>::iterator?iter?=?taskList->begin();??
- ??while(iter?!=?taskList->end())??
- ??{??
- ?????
- ???MoveToBusy(tid);??
- ???break;??
- ??}??
- ??CTask*?task?=?*iter;??
- ??taskList->erase(iter);??
- ??pthread_mutex_unlock(&m_pthreadMutex);??
- ??cout?<<?"idel?thread?number:"?<<?CThreadPool::m_vecIdleThread.size()?<<?endl;??
- ??cout?<<?"busy?thread?number:"?<<?CThreadPool::m_vecBusyThread.size()?<<?endl;??
- ??//cout?<<?"task?to?be?run:"?<<?taskList->size()?<<?endl;??
- ??task->Run();??
- ????
- ??//cout?<<?"CThread::thread?work"?<<?endl;??
- ??cout?<<?"tid:"?<<?tid?<<?"?idle"?<<?endl;??
- ????
- ?}??
- ?return?(void*)0;??
- }??
- ??
- int?CThreadPool::AddTask(CTask?*task)??
- {??
- ?this->m_vecTaskList.push_back(task);??
- ?pthread_cond_signal(&m_pthreadCond);??
- ?return?0;??
- }??
- int?CThreadPool::Create()??
- {??
- ?for(int?i?=?0;?i?<?m_iThreadNum;i++)??
- ?{??
- ??pthread_t?tid?=?0;??
- ??pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);??
- ??m_vecIdleThread.push_back(tid);??
- ?}??
- ?return?0;??
- }??
- ??
- int?CThreadPool::StopAll()??
- {??
- ?vector<pthread_t>::iterator?iter?=?m_vecIdleThread.begin();??
- ?while(iter?!=?m_vecIdleThread.end())??
- ?{??
- ??pthread_cancel(*iter);??
- ??pthread_join(*iter,NULL);??
- ??iter++;??
- ?}??
- ??
- ?iter?=?m_vecBusyThread.begin();??
- ?while(iter?!=?m_vecBusyThread.end())??
- ?{??
- ??pthread_cancel(*iter);??
- ??pthread_join(*iter,NULL);??
- ??iter++;??
- ?}??
- ???
- ?return?0;??
- }??
- ??
- 簡單示例:??
- ??
- ××××××××test.cpp??
- ??
- #include?"CThread.h"??
- #include?<iostream>??
- ??
- using?namespace?std;??
- ??
- class?CWorkTask:?public?CTask??
- {??
- public:??
- ?CWorkTask()??
- ?{}??
- ?int?Run()??
- ?{??
- ??cout?<<?(char*)this->m_ptrData?<<?endl;??
- ??sleep(10);??
- ??return?0;??
- ?}??
- };??
- int?main()??
- {??
- ?CWorkTask?taskObj;??
- ?char?szTmp[]?=?"this?is?the?first?thread?running,haha?success";??
- ?taskObj.SetData((void*)szTmp);??
- ?CThreadPool?threadPool(10);??
- ?for(int?i?=?0;i?<?11;i++)??
- ?{??
- ??threadPool.AddTask(&taskObj);??
- ?}??
- ?while(1)??
- ?{??
- ??sleep(120);??
- ?}??
- ?return?0;??
- } ?