http://blog.csdn.net/qq_25425023/article/details/53914609
線程池中的線程,在任務隊列為空的時候,等待任務的到來,任務隊列中有任務時,則依次獲取任務來執行,任務隊列需要同步。
? Linux線程同步有多種方法:互斥量、信號量、條件變量等。
? 下面是根據互斥量、信號量、條件變量封裝的三個類。
? 線程池中用到了互斥量和信號量。
?
[cpp]?view plain?copy
- #ifndef?_LOCKER_H_??
- #define?_LOCKER_H_??
- ??
- #include?<pthread.h>??
- #include?<stdio.h>??
- #include?<semaphore.h>??
- ??
- /*信號量的類*/??
- class?sem_locker??
- {??
- private:??
- ????sem_t?m_sem;??
- ??
- public:??
- ????//初始化信號量??
- ????sem_locker()??
- ????{??
- ????if(sem_init(&m_sem,?0,?0)?!=?0)??
- ????????printf("sem?init?error\n");??
- ????}??
- ????//銷毀信號量??
- ????~sem_locker()??
- ????{??
- ????sem_destroy(&m_sem);??
- ????}??
- ??
- ????//等待信號量??
- ????bool?wait()??
- ????{??
- ????return?sem_wait(&m_sem)?==?0;??
- ????}??
- ????//添加信號量??
- ????bool?add()??
- ????{??
- ????return?sem_post(&m_sem)?==?0;??
- ????}??
- };??
- ??
- ??
- /*互斥?locker*/??
- class?mutex_locker??
- {??
- private:??
- ????pthread_mutex_t?m_mutex;??
- ??
- public:??
- ????mutex_locker()??
- ????{??
- ????????if(pthread_mutex_init(&m_mutex,?NULL)?!=?0)??
- ????????printf("mutex?init?error!");??
- ????}??
- ????~mutex_locker()??
- ????{??
- ????pthread_mutex_destroy(&m_mutex);??
- ????}??
- ??
- ????bool?mutex_lock()??//lock?mutex??
- ????{??
- ????return?pthread_mutex_lock(&m_mutex)?==?0;??
- ????}??
- ????bool?mutex_unlock()???//unlock??
- ????{??
- ????return?pthread_mutex_unlock(&m_mutex)?==?0;??
- ????}??
- };??
- ??
- /*條件變量?locker*/??
- class?cond_locker??
- {??
- private:??
- ????pthread_mutex_t?m_mutex;??
- ????pthread_cond_t?m_cond;??
- ??
- public:??
- ????//?初始化?m_mutex?and?m_cond??
- ????cond_locker()??
- ????{??
- ????if(pthread_mutex_init(&m_mutex,?NULL)?!=?0)??
- ????????printf("mutex?init?error");??
- ????if(pthread_cond_init(&m_cond,?NULL)?!=?0)??
- ????{???//條件變量初始化是被,釋放初始化成功的mutex??
- ????????pthread_mutex_destroy(&m_mutex);??
- ????????printf("cond?init?error");??
- ????}??
- ????}??
- ????//?destroy?mutex?and?cond??
- ????~cond_locker()??
- ????{??
- ????pthread_mutex_destroy(&m_mutex);??
- ????pthread_cond_destroy(&m_cond);??
- ????}??
- ????//等待條件變量??
- ????bool?wait()??
- ????{??
- ????int?ans?=?0;??
- ????pthread_mutex_lock(&m_mutex);??
- ????ans?=?pthread_cond_wait(&m_cond,?&m_mutex);??
- ????pthread_mutex_unlock(&m_mutex);??
- ????return?ans?==?0;??
- ????}??
- ????//喚醒等待條件變量的線程??
- ????bool?signal()??
- ????{??
- ????return?pthread_cond_signal(&m_cond)?==?0;??
- ????}??
- ??
- };??
- ??
- #endif??
下面的是線程池類,是一個模版類:
[cpp]?view plain?copy
- #ifndef?_PTHREAD_POOL_??
- #define?_PTHREAD_POOL_??
- ??
- #include?"locker.h"??
- #include?<list>??
- #include?<stdio.h>??
- #include?<exception>??
- #include?<errno.h>??
- #include?<pthread.h>??
- #include?<iostream>??
- ??
- template<class?T>??
- class?threadpool??
- {??
- private:??
- ????int?thread_number;??//線程池的線程數??
- ????int?max_task_number;??//任務隊列中的最大任務數??
- ????pthread_t?*all_threads;???//線程數組??
- ????std::list<T?*>?task_queue;?//任務隊列??
- ????mutex_locker?queue_mutex_locker;??//互斥鎖??
- ????sem_locker?queue_sem_locker;???//信號量??
- ????bool?is_stop;?//是否結束線程??
- public:??
- ????threadpool(int?thread_num?=?20,?int?max_task_num?=?30);??
- ????~threadpool();??
- ????bool?append_task(T?*task);??
- ????void?start();??
- ????void?stop();??
- private:??
- ????//線程運行的函數。執行run()函數??
- ????static?void?*worker(void?*arg);??
- ????void?run();??
- };??
- ??
- template?<class?T>??
- threadpool<T>::threadpool(int?thread_num,?int?max_task_num):??
- ????thread_number(thread_num),?max_task_number(max_task_num),??
- ????is_stop(false),?all_threads(NULL)??
- {??
- ????if((thread_num?<=?0)?||?max_task_num?<=?0)??
- ????printf("threadpool?can't?init?because?thread_number?=?0"??
- ????????"?or?max_task_number?=?0");??
- ??
- ????all_threads?=?new?pthread_t[thread_number];??
- ????if(!all_threads)??
- ????????printf("can't?init?threadpool?because?thread?array?can't?new");??
- }??
- ??
- template?<class?T>??
- threadpool<T>::~threadpool()??
- {??
- ????delete?[]all_threads;??
- ????is_stop?=?true;??
- }??
- ??
- template?<class?T>??
- void?threadpool<T>::stop()??
- {??
- ????is_stop?=?true;??
- ????//queue_sem_locker.add();??
- }??
- ??
- template?<class?T>??
- void?threadpool<T>::start()??
- {??
- ????for(int?i?=?0;?i?<?thread_number;?++i)??
- ????{??
- ????printf("create?the?%dth?pthread\n",?i);??
- ????if(pthread_create(all_threads?+?i,?NULL,?worker,?this)?!=?0)??
- ????{//創建線程失敗,清除成功申請的資源并拋出異常??
- ????????delete?[]all_threads;??
- ????????throw?std::exception();??
- ????}??
- ????if(pthread_detach(all_threads[i]))??
- ????{//將線程設置為脫離線程,失敗則清除成功申請的資源并拋出異常??
- ????????delete?[]all_threads;??
- ????????throw?std::exception();??
- ????}??
- ????}??
- }??
- //添加任務進入任務隊列??
- template?<class?T>??
- bool?threadpool<T>::append_task(T?*task)??
- {???//獲取互斥鎖??
- ????queue_mutex_locker.mutex_lock();??
- ????//判斷隊列中任務數是否大于最大任務數??
- ????if(task_queue.size()?>?max_task_number)??
- ????{//是則釋放互斥鎖??
- ????queue_mutex_locker.mutex_unlock();??
- ????return?false;??
- ????}??
- ????//添加進入隊列??
- ????task_queue.push_back(task);??
- ????queue_mutex_locker.mutex_unlock();??
- ????//喚醒等待任務的線程??
- ????queue_sem_locker.add();??
- ????return?true;??
- }??
- ??
- template?<class?T>??
- void?*threadpool<T>::worker(void?*arg)??
- {??
- ????threadpool?*pool?=?(threadpool?*)arg;??
- ????pool->run();??
- ????return?pool;??
- }??
- ??
- template?<class?T>??
- void?threadpool<T>::run()??
- {??
- ????while(!is_stop)??
- ????{???//等待任務??
- ????queue_sem_locker.wait();??????
- ????if(errno?==?EINTR)??
- ????{??
- ????????printf("errno");??
- ????????continue;??
- ????}??
- ????//獲取互斥鎖??
- ????queue_mutex_locker.mutex_lock();??
- ????//判斷任務隊列是否為空??
- ????if(task_queue.empty())??
- ????{??
- ????????queue_mutex_locker.mutex_unlock();??
- ????????continue;??
- ????}??
- ????//獲取隊頭任務并執行??
- ????T?*task?=?task_queue.front();??
- ????task_queue.pop_front();??
- ????queue_mutex_locker.mutex_unlock();??
- ????if(!task)??
- ????????continue;??
- //??printf("pthreadId?=?%ld\n",?(unsigned?long)pthread_self());???
- ????task->doit();??//doit是T對象中的方法??
- ????}??
- ????//測試用??
- ????printf("close?%ld\n",?(unsigned?long)pthread_self());??
- }??
- ??
- #endif??
以上參考《Linux高性能服務器編程》
寫個程序對線程池進行測試:
[cpp]?view plain?copy
- #include?<stdio.h>??
- #include?<iostream>??
- #include?<unistd.h>??
- ??
- #include?"thread_pool.h"??
- ??
- class?task??
- {??
- private:??
- ????int?number;??
- ??
- public:??
- ????task(int?num)?:?number(num)??
- ????{??
- ????}??
- ????~task()??
- ????{??
- ????}??
- ??
- ????void?doit()??
- ????{??
- ????printf("this?is?the?%dth?task\n",?number);??
- ????}??
- };??
- ??
- int?main()??
- {??
- ????task?*ta;??
- ????threadpool<task>?pool(10,?15);??
- //????pool.start();??
- ????for(int?i?=?0;?i?<?20;?++i)??
- ????{??
- ????ta?=?new?task(i);??
- //??sleep(2);??
- ????pool.append_task(ta);??
- ????}??
- ????pool.start();??
- ????sleep(10);??
- ????printf("close?the?thread?pool\n");??
- ????pool.stop();??
- ????pause();??
- ????return?0;??
- }??
經測試,線程池可以正常使用。
下一篇博客,使用線程池來實現回射服務器,測試可以達到多大的并發量。