線程池
概念:
一堆線程+任務隊列
作用
- 避免大量線程頻繁的創建/銷毀時間成本
- 避免瞬間大量線程創建耗盡資源,程序崩潰危險
實現
創建固定數量的線程+創建一個線程安全的任務隊列
一種線程使用模式。
- 線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。
- 而線程池維護著多個 線程,等待著監督管理者分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。
- 線程池不 僅能夠保證內核的充分利用,還能防止過分調度。可用線程數量應該取決于可用的并發處理器、處理器內核、內 存、網絡sockets等的數量
線程池應用場景
- 需要大量的線程來完成任務,且完成任務的時間比較短。 WEB服務器完成網頁請求這樣的任務,使用線程池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點擊次數。 但對于長時間的任務,比如一個Telnet連接請求,線程池的優點就不明顯了。因為Telnet會話時間比線程的創建時間大 多了。
- 對性能要求苛刻的應用,比如要求服務器迅速響應客戶請求。
- **接受突發性的大量請求,但不至于使服務器因此產生大量線程的應用。**突發性大量客戶請求,在沒 有線程池情況下,將產生大量線程,雖然理論上大部分操作系統線程數目最大值不是問題,短時間內產生大量線程 可能使內存到達極限,出現錯誤
線程池的分類
- FixThreadPool------固定線程池
- CachedThreadPool-----緩存線程池
- ScheduledThreadPool—調度線程池
- SingleThreadPool-----單任務線程池
特點介紹
FixedThreadPool
- 通過Exector的newFixedThreadPool靜態方法來創建
- 線程數量固定的線程池
- 只有核心線程切并且不會被回收
- 當所有線程都處于活動狀態時,新任務都會處于等待狀態,直到有線程空閑出來
CachedThreadPool
- 通過Exector的newCachedThreadPool靜態靜態方法來創建
- 線程數量不定的線程池
- 只有非核心線程,最大線程數量為Integer.MAX_VALUE,可視為任意大
- 有超時機制,時長為60s,即超過60s的空閑線程就會被回收
- 當線程池中的線程都處于活動狀態時,線程池會創建新的線程來處理新任務,否則就會利用空閑的線程來處理新任務。因此任何任務都會被立即執行
- 該線程池比較適合執行大量耗時較少的任務
ScheduledThreadPool
- 通過Exector的newScheduledThreadPool靜態方法來創建
- 核心線程數量是固定的,而非核心線程數不固定的,并且非核心線程有超時機制,只要處于閑置狀態就會被立即回收
- 該線程池主要用于執行定時任務和具有固定周期的重復任務
SingleThreadPool
- 通過Exector的newSingleThreadPool靜態方法來創建
- 只有一個核心線程,它確保所有的任務都在同一個線程中按順序執行。因此在這些任務之間不需要處理線程同步的問題
線程池實現
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>typedef bool (*task_callback)(int data);
class Task
{public:Task(){} Task(int data, task_callback handler){_data = data;_handler = handler;} ~Task(){} public://設置任務處理的數據以及處理方法void SetTask(int data, task_callback handler){_data = data;_handler = handler;} //執行任務bool Run() {return _handler(_data);} private:int _data;task_callback _handler;
};
#define MAX_THR 5
#define MAX_QUE 10
class ThreadPool
{public:ThreadPool(int qmax = MAX_QUE, int tmax = MAX_THR):_thr_max(tmax), _capacity(qmax), _thr_cur(tmax){pthread_mutex_init(&_mutex, NULL);pthread_cond_init(&_cond_con, NULL);pthread_cond_init(&_cond_pro, NULL);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond_con);pthread_cond_destroy(&_cond_pro);}public:static void *thr_start(void *arg) {ThreadPool *pool = (ThreadPool*)arg;while(1) {pool->QueueLock();while(pool->QueueIsEmpty()){pool->ConWait();}Task tt;pool->QueuePop(&tt);pool->ProWakeUp();pool->QueueUnLock();//為了防止處理時間過長導致其它線程無法獲取鎖//因此解鎖之后才進行處理tt.Run();}return NULL;}bool ThreadPoolInit() {pthread_t tid;int ret, i;for (i = 0; i < _thr_max; i++) {ret = pthread_create(&tid, NULL, thr_start,(void*)this);if (ret != 0) {std::cout<<"thread create error\n";return false; }pthread_detach(tid);}return true;}void AddTask(Task tt) {//向線程池添加任務QueueLock();while(QueueIsFull()) {ProWait();}QueuePush(tt);ConWakeUp();QueueUnLock();}void ThreadPoolQuit(){//退出線程池中所有的線程_quit_flag = true;while(_thr_cur > 0) {ConWakeUpAll(); usleep(1000);}return;}private:void QueuePush(Task tt){_queue.push(tt);}void QueuePop(Task *tt){*tt = _queue.front();_queue.pop();}void QueueLock(){pthread_mutex_lock(&_mutex);}void QueueUnLock(){pthread_mutex_unlock(&_mutex);}void ProWait(){pthread_cond_wait(&_cond_pro, &_mutex);} void ProWakeUp(){pthread_cond_signal(&_cond_pro);}void ConWait(){//進入這個函表示現在沒有任務if (_quit_flag == true) {//若線程池要求退出_thr_cur--;std::cout<<"thread:"<<pthread_self()<<"exit\n";pthread_mutex_unlock(&_mutex);pthread_exit(NULL);}pthread_cond_wait(&_cond_con, &_mutex);}void ConWakeUp(){pthread_cond_signal(&_cond_con);}void ConWakeUpAll(){pthread_cond_broadcast(&_cond_con);}bool QueueIsFull(){return (_queue.size() == _capacity);}bool QueueIsEmpty(){return _queue.empty();}private:int _thr_max;int _thr_cur;int _quit_flag;std::queue<Task> _queue;int _capacity;pthread_mutex_t _mutex;pthread_cond_t _cond_pro;pthread_cond_t _cond_con;
};
bool task_handler(int data){//休眠一段時間srand(time(NULL));int sec = rand()%5;std::cout<<"thread:"<<pthread_self()<<" sleep "<<sec<<"second\n";sleep(sec);return true;
}
int main()
{ThreadPool pool;Task tt[10];pool.ThreadPoolInit();int i;for (i = 0; i < 10; i++) {tt[i].SetTask(i, task_handler);pool.AddTask(tt[i]);}pool.ThreadPoolQuit();return 0;
}