線程池,簡單來說就是有一堆已經創建好的線程(最大數目一定),初始時他們都處于空閑狀態,當有新的任務進來,從線程池中取出一個空閑的線程處理任務,然后當任務處理完成之后,該線程被重新放回到線程池中,供其他的任務使用,當線程池中的線程都在處理任務時,就沒有空閑線程供使用,此時,若有新的任務產生,只能等待線程池中有線程結束任務空閑才能執行,下面是線程池的工作原理圖:
我們為什么要使用線程池呢?
簡單來說就是線程本身存在開銷,我們利用多線程來進行任務處理,單線程也不能濫用,無止禁的開新線程會給系統產生大量消耗,而線程本來就是可重用的資源,不需要每次使用時都進行初始化,因此可以采用有限的線程個數處理無限的任務。
?
廢話少說,直接上代碼
首先是用條件變量和互斥量封裝的一個狀態,用于保護線程池的狀態
condition.h
#ifndef _CONDITION_H_ #define _CONDITION_H_#include <pthread.h>//封裝一個互斥量和條件變量作為狀態 typedef struct condition {pthread_mutex_t pmutex;pthread_cond_t pcond; }condition_t;//對狀態的操作函數 int condition_init(condition_t *cond); int condition_lock(condition_t *cond); int condition_unlock(condition_t *cond); int condition_wait(condition_t *cond); int condition_timedwait(condition_t *cond, const struct timespec *abstime); int condition_signal(condition_t* cond); int condition_broadcast(condition_t *cond); int condition_destroy(condition_t *cond);#endif
condition.c
#include "condition.h"//初始化 int condition_init(condition_t *cond) {int status;if((status = pthread_mutex_init(&cond->pmutex, NULL)))return status;if((status = pthread_cond_init(&cond->pcond, NULL)))return status;return 0; }//加鎖 int condition_lock(condition_t *cond) {return pthread_mutex_lock(&cond->pmutex); }//解鎖 int condition_unlock(condition_t *cond) {return pthread_mutex_unlock(&cond->pmutex); }//等待 int condition_wait(condition_t *cond) {return pthread_cond_wait(&cond->pcond, &cond->pmutex); }//固定時間等待 int condition_timedwait(condition_t *cond, const struct timespec *abstime) {return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime); }//喚醒一個睡眠線程 int condition_signal(condition_t* cond) {return pthread_cond_signal(&cond->pcond); }//喚醒所有睡眠線程 int condition_broadcast(condition_t *cond) {return pthread_cond_broadcast(&cond->pcond); }//釋放 int condition_destroy(condition_t *cond) {int status;if((status = pthread_mutex_destroy(&cond->pmutex)))return status;if((status = pthread_cond_destroy(&cond->pcond)))return status;return 0; }
然后是線程池對應的threadpool.h和threadpool.c
#ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_//線程池頭文件 #include "condition.h"//封裝線程池中的對象需要執行的任務對象 typedef struct task {void *(*run)(void *args); //函數指針,需要執行的任務void *arg; //參數struct task *next; //任務隊列中下一個任務 }task_t;//下面是線程池結構體 typedef struct threadpool {condition_t ready; //狀態量task_t *first; //任務隊列中第一個任務task_t *last; //任務隊列中最后一個任務int counter; //線程池中已有線程數int idle; //線程池中kongxi線程數int max_threads; //線程池最大線程數int quit; //是否退出標志 }threadpool_t;//線程池初始化 void threadpool_init(threadpool_t *pool, int threads);//往線程池中加入任務 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);//摧毀線程池 void threadpool_destroy(threadpool_t *pool);#endif
#include "threadpool.h" #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <time.h>//創建的線程執行 void *thread_routine(void *arg) {struct timespec abstime;int timeout;printf("thread %d is starting\n", (int)pthread_self());threadpool_t *pool = (threadpool_t *)arg;while(1){timeout = 0;//訪問線程池之前需要加鎖condition_lock(&pool->ready);//空閑pool->idle++;//等待隊列有任務到來 或者 收到線程池銷毀通知while(pool->first == NULL && !pool->quit){//否則線程阻塞等待printf("thread %d is waiting\n", (int)pthread_self());//獲取從當前時間,并加上等待時間, 設置進程的超時睡眠時間clock_gettime(CLOCK_REALTIME, &abstime); abstime.tv_sec += 2;int status;status = condition_timedwait(&pool->ready, &abstime); //該函數會解鎖,允許其他線程訪問,當被喚醒時,加鎖if(status == ETIMEDOUT){printf("thread %d wait timed out\n", (int)pthread_self());timeout = 1;break;}}pool->idle--;if(pool->first != NULL){//取出等待隊列最前的任務,移除任務,并執行任務task_t *t = pool->first;pool->first = t->next;//由于任務執行需要消耗時間,先解鎖讓其他線程訪問線程池condition_unlock(&pool->ready);//執行任務t->run(t->arg);//執行完任務釋放內存free(t);//重新加鎖condition_lock(&pool->ready);}//退出線程池if(pool->quit && pool->first == NULL){pool->counter--;//當前工作的線程數-1//若線程池中沒有線程,通知等待線程(主線程)全部任務已經完成if(pool->counter == 0){condition_signal(&pool->ready);}condition_unlock(&pool->ready);break;}//超時,跳出銷毀線程if(timeout == 1){pool->counter--;//當前工作的線程數-1condition_unlock(&pool->ready);break;}condition_unlock(&pool->ready);}printf("thread %d is exiting\n", (int)pthread_self());return NULL;}//線程池初始化 void threadpool_init(threadpool_t *pool, int threads) {condition_init(&pool->ready);pool->first = NULL;pool->last =NULL;pool->counter =0;pool->idle =0;pool->max_threads = threads;pool->quit =0;}//增加一個任務到線程池 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg) {//產生一個新的任務task_t *newtask = (task_t *)malloc(sizeof(task_t));newtask->run = run;newtask->arg = arg;newtask->next=NULL;//新加的任務放在隊列尾端//線程池的狀態被多個線程共享,操作前需要加鎖condition_lock(&pool->ready);if(pool->first == NULL)//第一個任務加入 {pool->first = newtask;} else {pool->last->next = newtask;}pool->last = newtask; //隊列尾指向新加入的線程//線程池中有線程空閑,喚醒if(pool->idle > 0){condition_signal(&pool->ready);}//當前線程池中線程個數沒有達到設定的最大值,創建一個新的線性else if(pool->counter < pool->max_threads){pthread_t tid;pthread_create(&tid, NULL, thread_routine, pool);pool->counter++;}//結束,訪問condition_unlock(&pool->ready); }//線程池銷毀 void threadpool_destroy(threadpool_t *pool) {//如果已經調用銷毀,直接返回if(pool->quit){return;}//加鎖condition_lock(&pool->ready);//設置銷毀標記為1pool->quit = 1;//線程池中線程個數大于0if(pool->counter > 0){//對于等待的線程,發送信號喚醒if(pool->idle > 0){condition_broadcast(&pool->ready);}//正在執行任務的線程,等待他們結束任務while(pool->counter){condition_wait(&pool->ready);}}condition_unlock(&pool->ready);condition_destroy(&pool->ready); }
測試代碼:
#include "threadpool.h" #include <unistd.h> #include <stdlib.h> #include <stdio.h>void* mytask(void *arg) {printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg);sleep(1);free(arg);return NULL; }//測試代碼 int main(void) {threadpool_t pool;//初始化線程池,最多三個線程threadpool_init(&pool, 3);int i;//創建十個任務for(i=0; i < 10; i++){int *arg = malloc(sizeof(int));*arg = i;threadpool_add_task(&pool, mytask, arg);}threadpool_destroy(&pool);return 0; }
輸出結果:
可以看出程序先后創建了三個線程進行工作,當沒有任務空閑時,等待2s直接退出銷毀線程