我設計這個線程池的初衷是為了與socket對接的。線程池的實現千變萬化,我得這個并不一定是最好的,但卻是否和我心目中需求模型的。現把部分設計思路和代碼貼出,以期拋磚引玉。個人比較喜歡搞開源,所以大家如果覺得有什么需要改善的地方,歡迎給予評論。思前想后,也沒啥設計圖能表達出設計思想,就把類圖貼出來吧。
類圖設計如下:
Command類是我們的業務類。這個類里只能存放簡單的內置類型,這樣方便與socket的直接傳輸。我定義了一個cmd_成員用于存放命令字,arg_用于存放業務的參數。這個參數可以使用分隔符來分隔各個參數。我設計的只是簡單實現,如果有序列化操作了,完全不需要使用我這種方法啦。
ThreadProcess就是業務處理類,這里邊定義了各個方法用于進行業務處理,它將在ThreadPool中的Process函數中調用。
ThreadPool就是我們的線程池類。其中的成員變量都是靜態變量,Process就是線程處理函數。
#define MAX_THREAD_NUM 50 // 該值目前需要設定為初始線程數的整數倍
#define ADD_FACTOR 40 // 該值表示一個線程可以處理的最大任務數
#define THREAD_NUM 10 // 初始線程數
bshutdown_:用于線程退出。
command_:用于存放任務隊列
command_cond_:條件變量
command_mutex_:互斥鎖
icurr_thread_num_:當前線程池中的線程數
thread_id_map_:這個map用于存放線程對應的其它信息,我只存放了線程的狀態,0為正常,1為退出。還可以定義其它的結構來存放更多的信息,例如存放套接字。
InitializeThreads:用于初始化線程池,先創建THREAD_NUM個線程。后期擴容也需要這個函數。
Process:線程處理函數,這里邊會調用AddThread和DeleteThread在進行線程池的伸縮。
AddWork:往隊列中添加一個任務。
ThreadDestroy:線程銷毀函數。
AddThread:擴容THREAD_NUM個線程
DeleteThread:如果任務隊列為空,則將原來的線程池恢復到THREAD_NUM個。這里可以根據需要進行修改。
?
以下貼出代碼以供大家參考。
command.h
#ifndef COMMAND_H_ #define COMMAND_H_class Command { public:int get_cmd();char* get_arg();void set_cmd(int cmd);void set_arg(char* arg); private:int cmd_;char arg_[65]; };#endif /* COMMAND_H_ */
command.cpp
#include <string.h> #include "command.h"int Command::get_cmd() {return cmd_; }char* Command::get_arg() {return arg_; }void Command::set_cmd(int cmd) {cmd_ = cmd; }void Command::set_arg(char* arg) {if(NULL == arg){return;}strncpy(arg_,arg,64);arg_[64] = '\0'; }
thread_process.h
#ifndef THREAD_PROCESS_H_ #define THREAD_PROCESS_H_class ThreadProcess { public:void Process0(void* arg);void Process1(void* arg);void Process2(void* arg); };#endif /* THREAD_PROCESS_H_ */
thread_process.cpp
#include <pthread.h> #include <stdio.h> #include <unistd.h> #include "thread_process.h"void ThreadProcess::Process0(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); } void ThreadProcess::Process1(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); }void ThreadProcess::Process2(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); }
thread_pool.h
#ifndef THREAD_POOL_H_ #define THREAD_POOL_H_#include <map> #include <vector> #include "command.h"#define MAX_THREAD_NUM 50 // 該值目前需要設定為初始線程數的整數倍 #define ADD_FACTOR 40 // 該值表示一個線程可以處理的最大任務數 #define THREAD_NUM 10 // 初始線程數class ThreadPool { public:ThreadPool() {};static void InitializeThreads();void AddWork(Command command);void ThreadDestroy(int iwait = 2); private:static void* Process(void* arg);static void AddThread();static void DeleteThread();static bool bshutdown_;static int icurr_thread_num_;static std::map<pthread_t,int> thread_id_map_;static std::vector<Command> command_;static pthread_mutex_t command_mutex_;static pthread_cond_t command_cond_; };#endif /* THREAD_POOL_H_ */
thread_pool.cpp
#include <pthread.h> #include <stdlib.h> #include "thread_pool.h" #include "thread_process.h" #include "command.h"bool ThreadPool::bshutdown_ = false; int ThreadPool::icurr_thread_num_ = THREAD_NUM; std::vector<Command> ThreadPool::command_; std::map<pthread_t,int> ThreadPool::thread_id_map_; pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;void ThreadPool::InitializeThreads() {for (int i = 0; i < THREAD_NUM ; ++i){pthread_t tempThread;pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);thread_id_map_[tempThread] = 0;} }void* ThreadPool::Process(void* arg) {ThreadProcess threadprocess;Command command;while (true){pthread_mutex_lock(&command_mutex_);// 如果線程需要退出,則此時退出if (1 == thread_id_map_[pthread_self()]){pthread_mutex_unlock(&command_mutex_);printf("thread %u will exit\n", pthread_self());pthread_exit(NULL);}// 當線程不需要退出且沒有需要處理的任務時,需要縮容的則縮容,不需要的則等待信號if (0 == command_.size() && !bshutdown_){if(MAX_THREAD_NUM != THREAD_NUM){DeleteThread();if (1 == thread_id_map_[pthread_self()]){pthread_mutex_unlock(&command_mutex_);printf("thread %u will exit\n", pthread_self());pthread_exit(NULL);}}pthread_cond_wait(&command_cond_,&command_mutex_);}// 線程池需要關閉,關閉已有的鎖,線程退出if(bshutdown_){pthread_mutex_unlock (&command_mutex_);printf ("thread %u will exit\n", pthread_self ());pthread_exit (NULL);}// 如果線程池的最大線程數不等于初始線程數,則表明需要擴容if(MAX_THREAD_NUM != THREAD_NUM){AddThread();}// 從容器中取出待辦任務std::vector<Command>::iterator iter = command_.begin();command.set_arg(iter->get_arg());command.set_cmd(iter->get_cmd());command_.erase(iter);pthread_mutex_unlock(&command_mutex_);// 開始業務處理switch(command.get_cmd()){case 0:threadprocess.Process0(command.get_arg());break;case 1:threadprocess.Process1(command.get_arg());break;case 2:threadprocess.Process2(command.get_arg());break;default:break;}}return NULL; // 完全為了消除警告(eclipse編寫的代碼,警告很煩人) }void ThreadPool::AddWork(Command command) {bool bsignal = false;pthread_mutex_lock(&command_mutex_);if (0 == command_.size()){bsignal = true;}command_.push_back(command);pthread_mutex_unlock(&command_mutex_);if (bsignal){pthread_cond_signal(&command_cond_);} }void ThreadPool::ThreadDestroy(int iwait) {while(0 != command_.size()){sleep(abs(iwait));}bshutdown_ = true;pthread_cond_broadcast(&command_cond_);std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();for (; iter!=thread_id_map_.end(); ++iter){pthread_join(iter->first,NULL);}pthread_mutex_destroy(&command_mutex_);pthread_cond_destroy(&command_cond_); }void ThreadPool::AddThread() {if(((icurr_thread_num_*ADD_FACTOR) < command_.size())&& (MAX_THREAD_NUM != icurr_thread_num_)){InitializeThreads();icurr_thread_num_ += THREAD_NUM;} }void ThreadPool::DeleteThread() {int size = icurr_thread_num_ - THREAD_NUM;std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();for(int i=0; i<size; ++i,++iter){iter->second = 1;} }
main.cpp
#include "thread_pool.h" #include "command.h"int main() {ThreadPool thread_pool;thread_pool.InitializeThreads();Command command;char arg[8] = {0};for(int i=1; i<=1000; ++i){command.set_cmd(i%3);sprintf(arg,"%d",i);command.set_arg(arg);thread_pool.AddWork(command);}sleep(10); // 用于測試線程池縮容 thread_pool.ThreadDestroy();return 0; }
?
代碼是按照google的開源c++編碼規范編寫。大家可以通過改變那幾個宏的值來調整線程池。有問題大家一起討論。