線程池原理及創建并C++實現

本文給出了一個通用的線程池框架,該框架將與線程執行相關的任務進行了高層次的抽象,使之與具體的執行任務無關。另外該線程池具有動態伸縮性,它能根據執行任務的輕重自動調整線程池中線程的數量。文章的最后,我們給出一個簡單示例程序,通過該示例程序,我們會發現,通過該線程池框架執行多線程任務是多么的簡單。
為什么需要線程池
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。
傳統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建,即時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處于不停的創建線程,銷毀線程的狀態。
我們將傳統方案中的線程執行過程分為三個過程:T1、T2、T3。
T1:線程創建時間
T2:線程執行時間,包括線程的同步等時間
T3:線程銷毀時間
那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。
除此之外,線程池能夠減少創建的線程個數。通常線程池所允許的并發線程是有上界的,如果同時需要并發的線程數超過上界,那么一部分線程將會等待。而傳統方案中,如果同時請求數目為2000,那么最壞情況下,系統可能需要產生2000個線程。盡管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。
因此線程池的出現正是著眼于減少線程池本身帶來的開銷。線程池采用預創建的技術,在應用程序啟動之后,將立即創建一定數量的線程(N1),放入空閑隊列中。這些線程都是處于阻塞(Suspended)狀態,不消耗CPU,但占用較小的內存空間。當任務到來后,緩沖池選擇一個空閑線程,把任務傳入此線程中運行。當N1個線程都在處理任務后,緩沖池自動創建一定數量的新線程,用于處理更多的任務。在任務執行完畢后線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閑時,大部分線程都一直處于暫停狀態,線程池自動銷毀一部分線程,回收系統資源。
基于這種預創建技術,線程池將線程創建和銷毀本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的線程本身開銷則越小,不過我們另外可能需要考慮進去線程之間同步所帶來的開銷。
?
構建線程池框架
一般線程池都必須具備下面幾個組成部分:
線程池管理器:用于創建并管理線程池
工作線程:?線程池中實際執行的線程
任務接口:?盡管線程池大多數情況下是用來支持網絡服務器,但是我們將線程執行的任務抽象出來,形成任務接口,從而是的線程池與具體的任務無關。
任務隊列:線程池的概念具體到實現則可能是隊列,鏈表之類的數據結構,其中保存執行線程。
我們實現的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
CJob是所有的任務的基類,其提供一個接口Run,所有的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。
CThread是Linux中線程的包裝,其封裝了Linux線程最經常使用的屬性和方法,它也是一個抽象類,是所有線程類的基類,具有一個接口Run。
CWorkerThread是實際被調度和執行的線程類,其從CThread繼承而來,實現了CThread中的Run方法。
CThreadPool是線程池類,其負責保存線程,釋放線程以及調度線程。
CThreadManage是線程池與用戶的直接接口,其屏蔽了內部的具體實現。
CThreadMutex用于線程之間的互斥。
CCondition則是條件變量的封裝,用于線程之間的同步。
它們的類的繼承關系如下圖所示:
?
線程池的時序很簡單,如下圖所示。CThreadManage直接跟客戶端打交道,其接受需要創建的線程初始個數,并接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上調用的都是CThreadPool的相關操作。CThreadPool創建具體的線程,并把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。
?
理解系統組件
下面我們分開來了解系統中的各個組件。

[cpp]?view plain?copy
  1. //CThreadManage??
  2. //CThreadManage的功能非常簡單,其提供最簡單的方法,其類定義如下:??
  3. class?CThreadManage??
  4. {??
  5. private:??
  6. ????CThreadPool*????m_Pool;??
  7. ????int??????????m_NumOfThread;??
  8. protected:??
  9. public:??
  10. ????void?????SetParallelNum(int?num);??
  11. ????CThreadManage();??
  12. ????CThreadManage(int?num);??
  13. ????virtual?~CThreadManage();??
  14. ???
  15. ????void????Run(CJob*?job,void*?jobdata);??
  16. ????void????TerminateAll(void);??
  17. };??
  18. //其中m_Pool指向實際的線程池;m_NumOfThread是初始創建時候允許創建的并發的線程個數。另外Run和TerminateAll方法也非常簡單,只是簡單的調用CThreadPool的一些相關方法而已。其具體的實現如下:??
  19. CThreadManage::CThreadManage(){??
  20. ????m_NumOfThread?=?10;??
  21. ????m_Pool?=?new?CThreadPool(m_NumOfThread);??
  22. ?}??
  23. CThreadManage::CThreadManage(int?num){??
  24. ????m_NumOfThread?=?num;??
  25. ????m_Pool?=?new?CThreadPool(m_NumOfThread);??
  26. }??
  27. CThreadManage::~CThreadManage(){??
  28. ????if(NULL?!=?m_Pool)??
  29. ????delete?m_Pool;??
  30. }??
  31. void?CThreadManage::SetParallelNum(int?num){??
  32. ????m_NumOfThread?=?num;??
  33. }??
  34. void?CThreadManage::Run(CJob*?job,void*?jobdata){??
  35. ????m_Pool->Run(job,jobdata);??
  36. }??
  37. void?CThreadManage::TerminateAll(void){??
  38. ????m_Pool->TerminateAll();??
  39. }??
  40. ??
  41. //CThread??
  42. //CThread?類實現了對Linux中線程操作的封裝,它是所有線程的基類,也是一個抽象類,提供了一個抽象接口Run,所有的CThread都必須實現該Run方法。CThread的定義如下所示:??
  43. class?CThread??
  44. {??
  45. private:??
  46. ????int??????????m_ErrCode;??
  47. ????Semaphore????m_ThreadSemaphore;??//the?inner?semaphore,?which?is?used?to?realize??
  48. ????unsigned?????long?m_ThreadID;?????
  49. ????bool?????????m_Detach;???????//The?thread?is?detached??
  50. ????bool?????????m_CreateSuspended;??//if?suspend?after?creating??
  51. ????char*????????m_ThreadName;??
  52. ????ThreadState?m_ThreadState;??????//the?state?of?the?thread??
  53. protected:??
  54. ????void?????SetErrcode(int?errcode){m_ErrCode?=?errcode;}??
  55. ????static?void*?ThreadFunction(void*);??
  56. public:??
  57. ????CThread();??
  58. ????CThread(bool?createsuspended,bool?detach);??
  59. ????virtual?~CThread();??
  60. ????virtual?void?Run(void)?=?0;??
  61. ????void?????SetThreadState(ThreadState?state){m_ThreadState?=?state;}??
  62. ???
  63. ????bool?????Terminate(void);????//Terminate?the?threa??
  64. ????bool?????Start(void);????????//Start?to?execute?the?thread??
  65. ????void?????Exit(void);??
  66. ????bool?????Wakeup(void);??
  67. ?????
  68. ????ThreadState??GetThreadState(void){return?m_ThreadState;}??
  69. ????int??????GetLastError(void){return?m_ErrCode;}??
  70. ????void?????SetThreadName(char*?thrname){strcpy(m_ThreadName,thrname);}??
  71. ????char*????GetThreadName(void){return?m_ThreadName;}??
  72. ????int??????GetThreadID(void){return?m_ThreadID;}??
  73. ???
  74. ????bool?????SetPriority(int?priority);??
  75. ????int??????GetPriority(void);??
  76. ????int??????GetConcurrency(void);??
  77. ????void?????SetConcurrency(int?num);??
  78. ????bool?????Detach(void);??
  79. ????bool?????Join(void);??
  80. ????bool?????Yield(void);??
  81. ????int??????Self(void);??
  82. };??
  83. //線程的狀態可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創建后不想立即執行任務,那么我們可以將其“暫停”,如果需要運行,則喚醒。有一點必須注意的是,一旦線程開始執行任務,將不能被掛起,其將一直執行任務至完畢。??
  84. //線程類的相關操作均十分簡單。線程的執行入口是從Start()函數開始,其將調用函數ThreadFunction,ThreadFunction再調用實際的Run函數,執行實際的任務。??
  85. ???
  86. //CThreadPool??
  87. //CThreadPool是線程的承載容器,一般可以將其實現為堆棧、單向隊列或者雙向隊列。在我們的系統中我們使用STL?Vector對線程進行保存。CThreadPool的實現代碼如下:??
  88. class?CThreadPool??
  89. {??
  90. friend?class?CWorkerThread;??
  91. private:??
  92. ????unsigned?int?m_MaxNum;???//the?max?thread?num?that?can?create?at?the?same?time??
  93. ????unsigned?int?m_AvailLow;?//The?min?num?of?idle?thread?that?shoule?kept??
  94. ????unsigned?int?m_AvailHigh;????//The?max?num?of?idle?thread?that?kept?at?the?same?time??
  95. ????unsigned?int?m_AvailNum;?//the?normal?thread?num?of?idle?num;??
  96. ????unsigned?int?m_InitNum;??//Normal?thread?num;??
  97. protected:??
  98. ????CWorkerThread*?GetIdleThread(void);??
  99. ???
  100. ????void????AppendToIdleList(CWorkerThread*?jobthread);??
  101. ????void????MoveToBusyList(CWorkerThread*?idlethread);??
  102. ????void????MoveToIdleList(CWorkerThread*?busythread);??
  103. ???
  104. ????void????DeleteIdleThread(int?num);??
  105. ????void????CreateIdleThread(int?num);??
  106. public:??
  107. ????CThreadMutex?m_BusyMutex;????//when?visit?busy?list,use?m_BusyMutex?to?lock?and?unlock??
  108. ????CThreadMutex?m_IdleMutex;????//when?visit?idle?list,use?m_IdleMutex?to?lock?and?unlock??
  109. ????CThreadMutex?m_JobMutex;?//when?visit?job?list,use?m_JobMutex?to?lock?and?unlock??
  110. ????CThreadMutex?m_VarMutex;??
  111. ???
  112. ????CCondition???????m_BusyCond;?//m_BusyCond?is?used?to?sync?busy?thread?list??
  113. ????CCondition???????m_IdleCond;?//m_IdleCond?is?used?to?sync?idle?thread?list??
  114. ????CCondition???????m_IdleJobCond;??//m_JobCond?is?used?to?sync?job?list??
  115. ????CCondition???????m_MaxNumCond;??
  116. ???
  117. ????vector<CWorkerThread*>???m_ThreadList;??
  118. ????vector<CWorkerThread*>???m_BusyList;?????//Thread?List??
  119. ????vector<CWorkerThread*>???m_IdleList;?//Idle?List??
  120. ???
  121. ????CThreadPool();??
  122. ????CThreadPool(int?initnum);??
  123. ????virtual?~CThreadPool();??
  124. ???
  125. ????void????SetMaxNum(int?maxnum){m_MaxNum?=?maxnum;}??
  126. ????int?????GetMaxNum(void){return?m_MaxNum;}??
  127. ????void????SetAvailLowNum(int?minnum){m_AvailLow?=?minnum;}??
  128. ????int?????GetAvailLowNum(void){return?m_AvailLow;}??
  129. ????void????SetAvailHighNum(int?highnum){m_AvailHigh?=?highnum;}??
  130. ????int?????GetAvailHighNum(void){return?m_AvailHigh;}??
  131. ????int?????GetActualAvailNum(void){return?m_AvailNum;}??
  132. ????int?????GetAllNum(void){return?m_ThreadList.size();}??
  133. ????int?????GetBusyNum(void){return?m_BusyList.size();}??
  134. ????void????SetInitNum(int?initnum){m_InitNum?=?initnum;}??
  135. ????int?????GetInitNum(void){return?m_InitNum;}??
  136. ????
  137. ????void????TerminateAll(void);??
  138. ????void????Run(CJob*?job,void*?jobdata);??
  139. };??
  140. ??
  141. CThreadPool::CThreadPool()??
  142. {??
  143. ????m_MaxNum?=?50;??
  144. ????m_AvailLow?=?5;??
  145. ????m_InitNum=m_AvailNum?=?10?;????
  146. ????m_AvailHigh?=?20;??
  147. ???
  148. ????m_BusyList.clear();??
  149. ????m_IdleList.clear();??
  150. ????for(int?i=0;i<m_InitNum;i++){??
  151. ????CWorkerThread*?thr?=?new?CWorkerThread();??
  152. ????thr->SetThreadPool(this);??
  153. ????AppendToIdleList(thr);??
  154. ????thr->Start();??
  155. ????}??
  156. }??
  157. ???
  158. CThreadPool::CThreadPool(int?initnum)??
  159. {??
  160. ????assert(initnum>0?&&?initnum<=30);??
  161. ????m_MaxNum???=?30;??
  162. ????m_AvailLow?=?initnum-10>0?initnum-10:3;??
  163. ????m_InitNum=m_AvailNum?=?initnum?;????
  164. ????m_AvailHigh?=?initnum+10;??
  165. ???
  166. ????m_BusyList.clear();??
  167. ????m_IdleList.clear();??
  168. ????for(int?i=0;i<m_InitNum;i++){??
  169. ????CWorkerThread*?thr?=?new?CWorkerThread();??
  170. ????AppendToIdleList(thr);??
  171. ????thr->SetThreadPool(this);??
  172. ????thr->Start();???????//begin?the?thread,the?thread?wait?for?job??
  173. ????}??
  174. }??
  175. ???
  176. CThreadPool::~CThreadPool()??
  177. {??
  178. ???TerminateAll();??
  179. }??
  180. ???
  181. void?CThreadPool::TerminateAll()??
  182. {??
  183. ????for(int?i=0;i?<?m_ThreadList.size();i++)?{??
  184. ????CWorkerThread*?thr?=?m_ThreadList[i];??
  185. ????thr->Join();??
  186. ????}??
  187. ????return;??
  188. }??
  189. ???
  190. CWorkerThread*?CThreadPool::GetIdleThread(void)??
  191. {??
  192. ????while(m_IdleList.size()?==0?)??
  193. ????m_IdleCond.Wait();??
  194. ?????
  195. ????m_IdleMutex.Lock();??
  196. ????if(m_IdleList.size()?>?0?)??
  197. ????{??
  198. ????CWorkerThread*?thr?=?(CWorkerThread*)m_IdleList.front();??
  199. ????printf("Get?Idle?thread?%dn",thr->GetThreadID());??
  200. ????m_IdleMutex.Unlock();??
  201. ????return?thr;??
  202. ????}??
  203. ????m_IdleMutex.Unlock();??
  204. ???
  205. ????return?NULL;??
  206. }??
  207. ???
  208. //add?an?idle?thread?to?idle?list??
  209. void?CThreadPool::AppendToIdleList(CWorkerThread*?jobthread)??
  210. {??
  211. ????m_IdleMutex.Lock();??
  212. ????m_IdleList.push_back(jobthread);??
  213. ????m_ThreadList.push_back(jobthread);??
  214. ????m_IdleMutex.Unlock();??
  215. }??
  216. ???
  217. //move?and?idle?thread?to?busy?thread??
  218. void?CThreadPool::MoveToBusyList(CWorkerThread*?idlethread)??
  219. {??
  220. ????m_BusyMutex.Lock();??
  221. ????m_BusyList.push_back(idlethread);??
  222. ????m_AvailNum--;??
  223. ????m_BusyMutex.Unlock();??
  224. ????
  225. ????m_IdleMutex.Lock();??
  226. ????vector<CWorkerThread*>::iterator?pos;??
  227. ????pos?=?find(m_IdleList.begin(),m_IdleList.end(),idlethread);??
  228. ????if(pos?!=m_IdleList.end())??
  229. ????m_IdleList.erase(pos);??
  230. ????m_IdleMutex.Unlock();??
  231. }??
  232. ???
  233. void?CThreadPool::MoveToIdleList(CWorkerThread*?busythread)??
  234. {??
  235. ????m_IdleMutex.Lock();??
  236. ????m_IdleList.push_back(busythread);??
  237. ????m_AvailNum++;??
  238. ????m_IdleMutex.Unlock();??
  239. ???
  240. ????m_BusyMutex.Lock();??
  241. ????vector<CWorkerThread*>::iterator?pos;??
  242. ????pos?=?find(m_BusyList.begin(),m_BusyList.end(),busythread);??
  243. ????if(pos!=m_BusyList.end())??
  244. ????m_BusyList.erase(pos);??
  245. ????m_BusyMutex.Unlock();??
  246. ???
  247. ????m_IdleCond.Signal();??
  248. ????m_MaxNumCond.Signal();??
  249. }??
  250. ???
  251. //create?num?idle?thread?and?put?them?to?idlelist??
  252. void?CThreadPool::CreateIdleThread(int?num)??
  253. {??
  254. ????for(int?i=0;i<num;i++){??
  255. ????CWorkerThread*?thr?=?new?CWorkerThread();??
  256. ????thr->SetThreadPool(this);??
  257. ????AppendToIdleList(thr);??
  258. ????m_VarMutex.Lock();??
  259. ????m_AvailNum++;??
  260. ????m_VarMutex.Unlock();??
  261. ????thr->Start();???????//begin?the?thread,the?thread?wait?for?job??
  262. ????}??
  263. }??
  264. ???
  265. void?CThreadPool::DeleteIdleThread(int?num)??
  266. {??
  267. ????printf("Enter?into?CThreadPool::DeleteIdleThreadn");??
  268. ????m_IdleMutex.Lock();??
  269. ????printf("Delete?Num?is?%dn",num);??
  270. ????for(int?i=0;i<num;i++){??
  271. ????CWorkerThread*?thr;??
  272. ????if(m_IdleList.size()?>?0?){??
  273. ????????????thr?=?(CWorkerThread*)m_IdleList.front();??
  274. ????????????printf("Get?Idle?thread?%dn",thr->GetThreadID());??
  275. ????}??
  276. ???
  277. ????vector<CWorkerThread*>::iterator?pos;??
  278. ????pos?=?find(m_IdleList.begin(),m_IdleList.end(),thr);??
  279. ????if(pos!=m_IdleList.end())??
  280. ????????m_IdleList.erase(pos);??
  281. ????m_AvailNum--;??
  282. ????printf("The?idle?thread?available?num:%d?n",m_AvailNum);??
  283. ????printf("The?idlelist??????????????num:%d?n",m_IdleList.size());??
  284. ????}??
  285. ????m_IdleMutex.Unlock();??
  286. }??
  287. ??
  288. void?CThreadPool::Run(CJob*?job,void*?jobdata)??
  289. {??
  290. ????assert(job!=NULL);??
  291. ?????
  292. ????//if?the?busy?thread?num?adds?to?m_MaxNum,so?we?should?wait??
  293. ????if(GetBusyNum()?==?m_MaxNum)??
  294. ????????m_MaxNumCond.Wait();??
  295. ???
  296. ????if(m_IdleList.size()<m_AvailLow)??
  297. ????{??
  298. ????if(GetAllNum()+m_InitNum-m_IdleList.size()?<?m_MaxNum?)??
  299. ????????CreateIdleThread(m_InitNum-m_IdleList.size());??
  300. ????else??
  301. ????????CreateIdleThread(m_MaxNum-GetAllNum());??
  302. ????}??
  303. ???
  304. ????CWorkerThread*??idlethr?=?GetIdleThread();??
  305. ????if(idlethr?!=NULL)??
  306. ????{??
  307. ????idlethr->m_WorkMutex.Lock();??
  308. ????MoveToBusyList(idlethr);??
  309. ????idlethr->SetThreadPool(this);??
  310. ????job->SetWorkThread(idlethr);??
  311. ????printf("Job?is?set?to?thread?%d?n",idlethr->GetThreadID());??
  312. ????idlethr->SetJob(job,jobdata);??
  313. ????}??
  314. }??
  315. //在CThreadPool中存在兩個鏈表,一個是空閑鏈表,一個是忙碌鏈表。Idle鏈表中存放所有的空閑進程,當線程執行任務時候,其狀態變為忙碌狀態,同時從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構造函數中,我們將執行下面的代碼:??
  316. for(int?i=0;i<m_InitNum;i++)??
  317. {??
  318. ????CWorkerThread*?thr?=?new?CWorkerThread();??
  319. ????AppendToIdleList(thr);??
  320. ????thr->SetThreadPool(this);??
  321. ????thr->Start();???????//begin?the?thread,the?thread?wait?for?job??
  322. }??

在該代碼中,我們將創建m_InitNum個線程,創建之后即調用AppendToIdleList放入Idle鏈表中,由于目前沒有任務分發給這些線程,因此線程執行Start后將自己掛起。
事實上,線程池中容納的線程數目并不是一成不變的,其會根據執行負載進行自動伸縮。為此在CThreadPool中設定四個變量:
m_InitNum:初始創建時線程池中的線程的個數。
m_MaxNum:當前線程池中所允許并發存在的線程的最大數目。
m_AvailLow:當前線程池中所允許存在的空閑線程的最小數目,如果空閑數目低于該值,表明負載可能過重,此時有必要增加空閑線程池的數目。實現中我們總是將線程調整為m_InitNum個。
m_AvailHigh:當前線程池中所允許的空閑的線程的最大數目,如果空閑數目高于該值,表明當前負載可能較輕,此時將刪除多余的空閑線程,刪除后調整數也為m_InitNum個。
m_AvailNum:目前線程池中實際存在的線程的個數,其值介于m_AvailHigh和m_AvailLow之間。如果線程的個數始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要創建,也不需要刪除,保持平衡狀態。因此如何設定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態,是線程池設計必須考慮的問題。
線程池在接受到新的任務之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個步驟:
(1)檢查當前處于忙碌狀態的線程是否達到了設定的最大值m_MaxNum,如果達到了,表明目前沒有空閑線程可用,而且也不能創建新的線程,因此必須等待直到有線程執行完畢返回到空閑隊列中。
(2)如果當前的空閑線程數目小于我們設定的最小的空閑數目m_AvailLow,則我們必須創建新的線程,默認情況下,創建后的線程數目應該為m_InitNum,因此創建的線程數目應該為(?當前空閑線程數與m_InitNum);但是有一種特殊情況必須考慮,就是現有的線程總數加上創建后的線程數可能超過m_MaxNum,因此我們必須對線程的創建區別對待。
??? if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
??? ??? CreateIdleThread(m_InitNum-m_IdleList.size());
??? else
??? ??? CreateIdleThread(m_MaxNum-GetAllNum());
如果創建后總數不超過m_MaxNum,則創建后的線程為m_InitNum;如果超過了,則只創建( m_MaxNum-當前線程總數)個。
(3)調用GetIdleThread方法查找空閑線程。如果當前沒有空閑線程,則掛起;否則將任務指派給該線程,同時將其移入忙碌隊列。
當線程執行完畢后,其會調用MoveToIdleList方法移入空閑鏈表中,其中還調用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
?
[cpp]?view plain?copy
  1. //CWorkerThread??
  2. //CWorkerThread是CThread的派生類,是事實上的工作線程。在CThreadPool的構造函數中,我們創建了一定數量的CWorkerThread。一旦這些線程創建完畢,我們將調用Start()啟動該線程。Start方法最終會調用Run方法。Run方法是個無限循環的過程。在沒有接受到實際的任務的時候,m_Job為NULL,此時線程將調用Wait方法進行等待,從而處于掛起狀態。一旦線程池將具體的任務分發給該線程,其將被喚醒,從而通知線程從掛起的地方繼續執行。CWorkerThread的完整定義如下:??
  3. class?CWorkerThread:public?CThread??
  4. {??
  5. private:??
  6. ????CThreadPool*??m_ThreadPool;??
  7. ????CJob*????m_Job;??
  8. ????void*????m_JobData;??
  9. ?????
  10. ????CThreadMutex?m_VarMutex;??
  11. ????bool??????m_IsEnd;??
  12. protected:??
  13. public:??
  14. ????CCondition???m_JobCond;??
  15. ????CThreadMutex?m_WorkMutex;??
  16. ????CWorkerThread();??
  17. ????virtual?~CWorkerThread();??
  18. ????void?Run();??
  19. ????void????SetJob(CJob*?job,void*?jobdata);??
  20. ????CJob*???GetJob(void){return?m_Job;}??
  21. ????void????SetThreadPool(CThreadPool*?thrpool);??
  22. ????CThreadPool*?GetThreadPool(void){return?m_ThreadPool;}??
  23. };??
  24. CWorkerThread::CWorkerThread()??
  25. {??
  26. ????m_Job?=?NULL;??
  27. ????m_JobData?=?NULL;??
  28. ????m_ThreadPool?=?NULL;??
  29. ????m_IsEnd?=?false;??
  30. }??
  31. CWorkerThread::~CWorkerThread()??
  32. {??
  33. ????if(NULL?!=?m_Job)??
  34. ????delete?m_Job;??
  35. ????if(m_ThreadPool?!=?NULL)??
  36. ????delete?m_ThreadPool;??
  37. }??
  38. ???
  39. void?CWorkerThread::Run()??
  40. {??
  41. ????SetThreadState(THREAD_RUNNING);??
  42. ????for(;;)??
  43. ????{??
  44. ????while(m_Job?==?NULL)??
  45. ????????m_JobCond.Wait();??
  46. ???
  47. ????m_Job->Run(m_JobData);??
  48. ????m_Job->SetWorkThread(NULL);??
  49. ????m_Job?=?NULL;??
  50. ????m_ThreadPool->MoveToIdleList(this);??
  51. ????if(m_ThreadPool->m_IdleList.size()?>?m_ThreadPool->GetAvailHighNum())??
  52. ????{??
  53. ????m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum());??
  54. ????}??
  55. ????m_WorkMutex.Unlock();??
  56. ????}??
  57. }??
  58. void?CWorkerThread::SetJob(CJob*?job,void*?jobdata)??
  59. {??
  60. ????m_VarMutex.Lock();??
  61. ????m_Job?=?job;??
  62. ????m_JobData?=?jobdata;??
  63. ????job->SetWorkThread(this);??
  64. ????m_VarMutex.Unlock();??
  65. ????m_JobCond.Signal();??
  66. }??
  67. void?CWorkerThread::SetThreadPool(CThreadPool*?thrpool)??
  68. {??
  69. ????m_VarMutex.Lock();??
  70. ????m_ThreadPool?=?thrpool;??
  71. ????m_VarMutex.Unlock();??
  72. }??
  73. //當線程執行任務之前首先必須判斷空閑線程的數目是否低于m_AvailLow,如果低于,則必須創建足夠的空閑線程,使其數目達到m_InitNum個,然后將調用MoveToBusyList()移出空閑隊列,移入忙碌隊列。當任務執行完畢后,其又調用MoveToIdleList()移出忙碌隊列,移入空閑隊列,等待新的任務。??
  74. //除了Run方法之外,CWorkerThread中另外一個重要的方法就是SetJob,該方法將實際的任務賦值給線程。當沒有任何執行任務即m_Job為NULL的時候,線程將調用m_JobCond.Wait進行等待。一旦Job被賦值給線程,其將調用m_JobCond.Signal方法喚醒該線程。由于m_JobCond屬于線程內部的變量,每個線程都維持一個m_JobCond,只有得到任務的線程才被喚醒,沒有得到任務的將繼續等待。無論一個線程何時被喚醒,其都將從等待的地方繼續執行m_Job->Run(m_JobData),這是線程執行實際任務的地方。??
  75. //在線程執行給定Job期間,我們必須防止另外一個Job又賦給該線程,因此在賦值之前,通過m_VarMutex進行鎖定,?Job執行期間,其于的Job將不能關聯到該線程;任務執行完畢,我們調用m_VarMutex.Unlock()進行解鎖,此時,線程又可以接受新的執行任務。??
  76. //在線程執行任務結束后返回空閑隊列前,我們還需要判斷當前空閑隊列中的線程是否高于m_AvailHigh個。如果超過m_AvailHigh,則必須從其中刪除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())個線程,使線程數目保持在m_InitNum個。??
  77. ???
  78. //CJob??
  79. //CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,代碼如下:??
  80. class?CJob??
  81. {??
  82. private:??
  83. ????int??????m_JobNo;????????//The?num?was?assigned?to?the?job??
  84. ????char*????m_JobName;??????//The?job?name??
  85. ????CThread??*m_pWorkThread;?????//The?thread?associated?with?the?job??
  86. public:??
  87. ????CJob(?void?);??
  88. ????virtual?~CJob();??
  89. ?????????
  90. ????int??????GetJobNo(void)?const?{?return?m_JobNo;?}??
  91. ????void?????SetJobNo(int?jobno){?m_JobNo?=?jobno;}??
  92. ????char*????GetJobName(void)?const?{?return?m_JobName;?}??
  93. ????void?????SetJobName(char*?jobname);??
  94. ????CThread?*GetWorkThread(void){?return?m_pWorkThread;?}??
  95. ????void?????SetWorkThread?(?CThread?*pWorkThread?){??
  96. ????????m_pWorkThread?=?pWorkThread;??
  97. ????}??
  98. ????virtual?void?Run?(?void?*ptr?)?=?0;??
  99. };??
[cpp]?view plain?copy
  1. CJob::CJob(void)??
  2. :m_pWorkThread(NULL)??
  3. ,m_JobNo(0)??
  4. ,m_JobName(NULL)??
  5. {??
  6. }??
  7. CJob::~CJob(){??
  8. ????if(NULL?!=?m_JobName)??
  9. ????free(m_JobName);??
  10. }??
  11. void?CJob::SetJobName(char*?jobname)??
  12. {??
  13. ????if(NULL?!=m_JobName)????{??
  14. ????????free(m_JobName);??
  15. ????????m_JobName?=?NULL;??
  16. ????}??
  17. ????if(NULL?==jobname)????{??
  18. ????????m_JobName?=?(char*)malloc(strlen(jobname)+1);??
  19. ????????strcpy(m_JobName,jobname);??
  20. ????}??
  21. }??
  22. //線程池使用示例??
  23. //至此我們給出了一個簡單的與具體任務無關的線程池框架。使用該框架非常的簡單,我們所需要的做的就是派生CJob類,將需要完成的任務實現在Run方法中。然后將該Job交由CThreadManage去執行。下面我們給出一個簡單的示例程序??
  24. class?CXJob:public?CJob??
  25. {??
  26. public:??
  27. ????CXJob(){i=0;}??
  28. ????~CXJob(){}??
  29. ????void?Run(void*?jobdata)????{??
  30. ????????printf("The?Job?comes?from?CXJOB/n");??
  31. ????????sleep(2);??
  32. ????}??
  33. };??
  34. ???
  35. class?CYJob:public?CJob??
  36. {??
  37. public:??
  38. ????CYJob(){i=0;}??
  39. ????~CYJob(){}??
  40. ????void?Run(void*?jobdata)????{??
  41. ????????printf("The?Job?comes?from?CYJob/n");??
  42. ????}??
  43. };??
  44. ???
  45. main()??
  46. {??
  47. ????CThreadManage*?manage?=?new?CThreadManage(10);??
  48. ????for(int?i=0;i<40;i++)??
  49. ????{??
  50. ????????CXJob*???job?=?new?CXJob();??
  51. ????????manage->Run(job,NULL);??
  52. ????}??
  53. ????sleep(2);??
  54. ????CYJob*?job?=?new?CYJob();??
  55. ????manage->Run(job,NULL);??
  56. ????manage->TerminateAll();??
  57. }??
  58. //CXJob和CYJob都是從Job類繼承而來,其都實現了Run接口。CXJob只是簡單的打印一句”The?Job?comes?from?CXJob”,CYJob也只打印”The?Job?comes?from?CYJob”,然后均休眠2秒鐘。在主程序中我們初始創建10個工作線程。然后分別執行40次CXJob和一次CYJob。??


線程池使用后記
線程池適合場合
事實上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應用所產生的影響,這是有前提的,前提就是線程本身開銷與線程執行任務相比不可忽略。如果線程本身的開銷相對于線程任務執行開銷而言是可以忽略不計的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務器以及Telnet服務器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創建,即時銷毀”的策略。
總之線程池通常適合下面的幾個場合:
(1)? 單位時間內處理任務頻繁而且任務處理時間短
(2)? 對實時性要求較高。如果接受到任務后在創建線程,可能滿足不了實時要求,因此必須采用線程池進行預創建。
(3)? 必須經常面對高突發性事件,比如Web服務器,如果有足球轉播,則服務器將產生巨大的沖擊。此時如果采取傳統方法,則必須不停的大量產生線程,銷毀線程。此時采用動態線程池可以避免這種情況的發生。
?
結束語
本文給出了一個簡單的通用的與任務無關的線程池的實現,通過該線程池能夠極大的簡化Linux下多線程的開發工作。該線程池的進一步完善開發工作還在進行中,希望能夠得到你的建議和支持。
參考資料
http://www-900.ibm.com/developerWorks/cn/java/j-jtp0730/index.shtml
POSIX多線程程序設計,David R.Butenhof??? 譯者:于磊 曾剛,中國電力出版社
C++面向對象多線程編程,CAMERON HUGHES等著 周良忠譯,人民郵電出版社
Java Pro,結合線程和分析器池,Edy Yu
關于作者
張中慶,西安交通大學軟件所,在讀碩士,目前研究方向為分布式網絡與移動中間件.

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/385005.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/385005.shtml
英文地址,請注明出處:http://en.pswp.cn/news/385005.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux 打印簡單日志(一)

簡單日志輸出&#xff1a; #include<stdio.h> #include<string.h> #include<stdlib.h>void write(char* filename,char* szStr){FILE* fp;fp fopen(filename,"at");if(fp ! NULL){fwrite(szStr,256,1,fp); //fclose(fp);fp NULL;} }int main(int…

c++簡單線程池實現

線程池&#xff0c;簡單來說就是有一堆已經創建好的線程&#xff08;最大數目一定&#xff09;&#xff0c;初始時他們都處于空閑狀態&#xff0c;當有新的任務進來&#xff0c;從線程池中取出一個空閑的線程處理任務&#xff0c;然后當任務處理完成之后&#xff0c;該線程被重…

Linux 打印可變參數日志

實現了傳輸進去的字符串所在的文檔&#xff0c;函數和行數顯示功能。 實現了將傳入的可變參數打印到日志功能。 #include<stdio.h> #include<stdarg.h> #include<string.h>const char * g_path "/home/exbot/wangqinghe/log.txt"; #define LOG(fm…

C++強化之路之線程池開發整體框架(二)

一.線程池開發框架 我所開發的線程池由以下幾部分組成&#xff1a; 1.工作中的線程。也就是線程池中的線程&#xff0c;主要是執行分發來的task。 2.管理線程池的監督線程。這個線程的創建獨立于線程池的創建&#xff0c;按照既定的管理方法進行管理線程池中的所有線程&#xf…

vfprintf()函數

函數聲明&#xff1a;int vfprintf(FILE *stream, const char *format, va_list arg) 函數參數&#xff1a; stream—這是指向了FILE對象的指針&#xff0c;該FILE對象標識了流。 format—c語言字符串&#xff0c;包含了要被寫入到流stream中的文本。它可以包含嵌入的format標簽…

Makefile(二)

將生產的.o文件放進指定的文件中&#xff08;先創建該文件夾&#xff09; src $(wildcard ./*.cpp) obj $(patsubst %.cpp,./output/%.o,$(src))target test$(target) : $(obj)g $(obj) -o $(target) %.o: %.cppg -c $< -o output/$.PHONY:clean clean:rm -f $(target) $…

TCP粘包問題分析和解決(全)

TCP通信粘包問題分析和解決&#xff08;全&#xff09;在socket網絡程序中&#xff0c;TCP和UDP分別是面向連接和非面向連接的。因此TCP的socket編程&#xff0c;收發兩端&#xff08;客戶端和服務器端&#xff09;都要有成對的socket&#xff0c;因此&#xff0c;發送端為了將…

UML類圖符號 各種關系說明以及舉例

UML中描述對象和類之間相互關系的方式包括&#xff1a;依賴&#xff0c;關聯&#xff0c;聚合&#xff0c;組合&#xff0c;泛化&#xff0c;實現等。表示關系的強弱&#xff1a;組合>聚合>關聯>依賴 相互間關系 聚合是表明對象之間的整體與部分關系的關聯&#xff0c…

尋找數組中第二大數

設置兩個數值來表示最大數和第二大數&#xff0c;在循環比較賦值即可 //找給定數組中第二大的數int get_smax(int *arr,int length) {int max;int smax;if(arr[0] > arr[1]){max arr[0];smax arr[1];}else{max arr[1];smax arr[0];}for(int i 2; i < length; i){if(…

timerfd API使用總結

timerfd 介紹 timerfd 是在Linux內核2.6.25版本中添加的接口&#xff0c;其是Linux為用戶提供的一個定時器接口。這個接口基于文件描述符&#xff0c;所以可以被用于select/poll/epoll的場景。當使用timerfd API創建多個定時器任務并置于poll中進行事件監聽&#xff0c;當沒有可…

#if/#else/#endif

在linux環境下寫c代碼時會嘗試各種方法或調整路徑&#xff0c;需要用到#if #include<stdio.h>int main(){int i; #if 0i 1; #elsei 2; #endifprintf("i %d",i);return 0; } 有時候會調整代碼&#xff0c;但是又不是最終版本的更換某些值&#xff0c;就需要注…

內存分配調用

通過函數給實參分配內存&#xff0c;可以通過二級指針實現 #include<stdio.h> #incldue<stdlib.h>void getheap(int *p) //錯誤的模型 {p malloc(100); }void getheap(int **p) //正確的模型 {*p malloc(100); } int main() {int *p NULL;getheap(&p);free(p…

ESP傳輸模式拆解包流程

一、 ESP簡介ESP&#xff0c;封裝安全載荷協議(Encapsulating SecurityPayloads)&#xff0c;是一種Ipsec協議&#xff0c;用于對IP協議在傳輸過程中進行數據完整性度量、來源認證、加密以及防回放攻擊。可以單獨使用&#xff0c;也可以和AH一起使用。在ESP頭部之前的IPV4…

結構體成員內存對齊

#include<stdio.h> struct A {int A; };int main() {struct A a;printf("%d\n",sizeof(a));return 0; } 運行結果&#xff1a;4 #include<stdio.h> struct A {int a;int b&#xff1b; };int main() {struct A a;printf("%d\n",sizeof(a))…

C庫函數-fgets()

函數聲明&#xff1a;char *fgets(char *str,int n,FILE *stream) 函數介紹&#xff1a;從指定的stream流中讀取一行&#xff0c;并把它存儲在str所指向的字符串中。當讀取到&#xff08;n-1&#xff09;個字符時&#xff0c;獲取讀取到換行符時&#xff0c;或者到達文件末尾時…

linux內核netfilter模塊分析之:HOOKs點的注冊及調用

1: 為什么要寫這個東西?最近在找工作,之前netfilter 這一塊的代碼也認真地研究過&#xff0c;應該每個人都是這樣的你懂 不一定你能很準確的表達出來。 故一定要化些時間把這相關的東西總結一下。 0&#xff1a;相關文檔linux 下 nf_conntrack_tuple 跟蹤記錄 其中可以根據內…

指定結構體元素的位字段

struct B {char a:4; //a這個成員值占了4bitchar b:2;char c:2; } 占了1個字節 struct B {int a:4; //a這個成員值占了4bitchar b:2;char c:2; } 占了8個字節 控制LED燈的結構體&#xff1a; struct E {char a1:1;char a2:1;char a3:1;char a4:1;char a5:1;char a6:1;char a7:1…

網絡抓包工具 wireshark 入門教程

Wireshark&#xff08;前稱Ethereal&#xff09;是一個網絡數據包分析軟件。網絡數據包分析軟件的功能是截取網絡數據包&#xff0c;并盡可能顯示出最為詳細的網絡數據包數據。Wireshark使用WinPCAP作為接口&#xff0c;直接與網卡進行數據報文交換。網絡管理員使用Wireshark來…

結構體中指針

結構體中帶有指針的情況 #include<stdio.h>struct man {char *name;int age; };int main() {struct man m {"tom",20};printf("name %s, age %d\n",m.name,m.age);return 0; } 運行結果&#xff1a; exbotubuntu:~/wangqinghe/C/20190714$ gcc st…

python使用opencv提取視頻中的每一幀、最后一幀,并存儲成圖片

提取視頻每一幀存儲圖片 最近在搞視頻檢測問題&#xff0c;在用到將視頻分幀保存為圖片時&#xff0c;圖片可以保存&#xff0c;但是會出現(-215:Assertion failed) !_img.empty() in function cv::imwrite問題而不能正常運行&#xff0c;在檢查代碼、檢查路徑等措施均無果后&…