1.線程池簡介
線程池,顧名思義,就是一個“池子”里面放有多個線程。為什么要使用線程池呢?當我們編寫的代碼需要并發異步處理很多任務時候,一般的處理辦法是一個任務開啟一個線程去處理,處理結束后釋放線程。可是這樣頻繁的申請釋放線程,系統的開銷很大,為了解決這個問題,線程池就產生了。線程池實現原理是事先申請一定數量的線程存放在程序中,當外部有任務需要線程處理時,把這個任務放到這個“池子”里面,“池子”里面空閑的線程就去取這個任務進行處理,這樣既能實現多線程并發處理任務,又能減少系統頻繁創建刪除線程的開銷,這種技術叫做池化技術,相應的池化技術還有內存池、連接池等。
為了更形象理解線程池,舉一個例子:線程池就像雞圈里面雞,每一個雞比作一個線程,你手里有一把玉米,每一個顆玉米比作一個任務,雞吃玉米比作處理任務。當你把一把玉米撒入雞圈,一群雞就圍過來搶玉米,但是一次一只雞只能吃一顆玉米,吃完一顆繼續吃下一顆,直到雞圈里面的玉米吃完才停止。線程池處理任務也是一樣的,當任務鏈表上有任務時,通過條件變量通知線程池里面的線程,一群線程就圍過來了,但是一個線程一次只能取一個任務進行處理,處理完又去取下一個任務,池子里面每一個線程都是這樣處理,直到鏈表上面的任務全部處理完了,池子中的線程又空閑起來了。
2.線程池-設計實現
實現思路:通過向系統申請多個線程,創建一個任務鏈表,鏈表上面存放待處理的任務,當有新的任務加入時候,通過條件變量通知池子里面的線程從鏈表上面取任務,然后處理任務,一直這樣循環下去。
?首先定義結構體,定義如下:
/*** 定義的回調函數
*/
typedef void (*task_func_t)(void *args);/*** 定義的任務節點結構體
*/
typedef struct task_t
{void *args; //任務參數task_func_t func; //任務函數指針struct list_head node; //鏈表節點
}task_t;/*** 線程池信息
*/
typedef struct threadpool_t
{struct list_head hlist; //任務鏈表int thread_num; //線程池數量int max_ts_num; //最大任務數量volatile int curr_ts_num; //當前線程池存在的任務數volatile int is_exit; //是否退出線程池標志pthread_mutex_t mutex; //互斥鎖pthread_cond_t cond; //條件變量pthread_t *ths; //線程id數組
}threadpool_t;
這是線程池實現的程序:
/*** @brief:線程處理任務函數* @args: 傳入的參數* @return: NULL
*/
static void* _process_task_thread(void *args)
{threadpool_t* tp = (threadpool_t*)args;struct list_head *pos = NULL;task_t *task=NULL;if(!args) return NULL;while(1){pthread_mutex_lock(&tp->mutex);while(list_empty(&tp->hlist) && !tp->is_exit){pthread_cond_wait(&tp->cond, &tp->mutex);}if(tp->is_exit){ //判斷釋放退出線程池pthread_mutex_unlock(&tp->mutex);break;}pos = tp->hlist.next; //從任務鏈表取出頭節點list_del(pos); //從鏈表中刪除節點--tp->curr_ts_num; //更新任務數pthread_mutex_unlock(&tp->mutex);task = list_entry(pos, task_t, node); //從鏈表節點推出任務節點task->func(task->args); //執行任務free(task); //釋放任務內存}return NULL;
}/*** @brief:創建一個線程池* @thread_nums: 線程數量* @max_ts_num:線程池中最大的任務數量* @return: 線程池句柄
*/
threadpool_t* create_threadpool(int thread_nums, int max_ts_num)
{if(thread_nums <= 0) return NULL;threadpool_t* tp = (threadpool_t*)malloc(sizeof(threadpool_t));memset(tp, 0, sizeof(threadpool_t));INIT_LIST_HEAD(&tp->hlist);tp->is_exit = 0;tp->curr_ts_num = 0;tp->thread_num = thread_nums;tp->max_ts_num = max_ts_num;tp->ths = (pthread_t*)malloc(sizeof(pthread_t)*thread_nums);pthread_mutex_init(&tp->mutex, NULL);pthread_cond_init(&tp->cond, NULL);for(int i=0; i<tp->thread_num; ++i){pthread_create(&(tp->ths[i]), NULL, _process_task_thread, tp);}return tp;
} /*** @brief:往線程池中添加任務* @tp: 線程池句柄* @func:任務處理函數指針* @args:傳入任務參數* @priority: 優先級 1:優先處理 其他:添加到尾部* @return: 返回狀態 0:ok
*/
int add_task_threadpool(threadpool_t* tp, task_func_t func, void *args, int priority)
{if(!tp) return -1;if(!func) return -2;if(tp->curr_ts_num > tp->max_ts_num) return -3;task_t *task = (task_t*)malloc(sizeof(task_t)); //申請任務節點內存task->func = func; //給函數指針賦值task->args = args; //保持參數指針pthread_mutex_lock(&tp->mutex);if(priority==1) //高優先級,添加到頭部list_add(&task->node, &tp->hlist);else //添加到尾部list_add_tail(&task->node, &tp->hlist);++tp->curr_ts_num; //更新任務數pthread_mutex_unlock(&tp->mutex);pthread_cond_signal(&tp->cond); //通知線程取任務return 0;
}/*** @brief:獲取線程池中當前存在的任務數量* @tp: 線程池句柄* @return: 當前任務數量
*/
int get_ts_num_threadpool(threadpool_t* tp)
{return tp ? tp->curr_ts_num : -1;
}/*** @brief:釋放線程池資源* @tp:線程池句柄* @return: 0:ok
*/
int destory_threadpool(threadpool_t* tp)
{if(!tp) return -1;while(!list_empty(&tp->hlist)){ //等待線程池執行完鏈表中的任務continue; }tp->is_exit = 1; //更新標志,退出線程池pthread_cond_broadcast(&tp->cond);//通知所有線程函數for(int i=0; i<tp->thread_num; ++i){//等待所有線程函數結束pthread_join(tp->ths[i], NULL);}pthread_mutex_destroy(&tp->mutex); //釋放資源pthread_cond_destroy(&tp->cond);free(tp->ths);free(tp);tp = NULL;return 0;
}
相關視頻推薦
手把手實現線程池(120行),實現異步操作,解決項目性能問題?
手撕高性能線程池,準備好linux開發環境
線程池在3個開源框架的應用(redis、skynet、workflow)
免費學習地址:c/c++ linux服務器開發/后臺架構師
需要C/C++ Linux服務器架構師學習資料加qun812855908獲取(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享
?
3.線程池組-設計實現
有了線程池處理并發任務,為什么還要線程池組呢?原因在于線程池中,所有的線程都使用一個互斥鎖阻塞,當你創建的線程池中線程的個數比較多的情況下,存在很多線程對同一個線程搶占,這樣會影響線程池取任務處理的效率。因此由"小顆粒"(即每一個線程池中線程個數少)的線程池組成一個"線程池組"這樣就能減輕多個線程對同一個鎖搶占造成效率低的問題。
設計實現:將多個線程池封裝組合到一起,當外部有任務需要處理時,找到線程池組中線程池任務最少的池子,把任務給放進去。
?定義一個線程池組管理結構體:
typedef struct manange_thpool_t
{int thpool_nums; //線程池個數threadpool_t *thpools[MAX_THREADPOOL_NUMS]; //線程池結構體
}manange_thpool_t;
代碼實現:
/*** @brief:創建線程池組管理句柄* @tp_nums:線程池組中線程池個數* @thread_num:單個線程池中線程個數* @max_ts_n:單個線程池中最大的任務數量
*/
manange_thpool_t* create_group_threadpool(int tp_nums, int thread_num, int max_ts_n)
{manange_thpool_t* mtp = (manange_thpool_t*)malloc(sizeof(manange_thpool_t));if(!mtp) return NULL;memset(mtp, 0, sizeof(manange_thpool_t));mtp->thpool_nums = tp_nums;for(int i=0; i<tp_nums; ++i){mtp->thpools[i] = create_threadpool(thread_num, max_ts_n);}return mtp;
}/*** @brief:往線程池組中添加任務* @mtp:線程池組句柄* @func:任務函數* @args:任務函數的參數* @priority: 優先級 1:優先處理 其他:依次處理* @return: 0:ok 其他:err
*/
int add_task_group_threadpool(manange_thpool_t* mtp, task_func_t func, void *args, int priority)\
{int ts_num= INT_MAX;threadpool_t *tp=NULL;int index=0;for(register int i=0; i<mtp->thpool_nums; ++i){if(mtp->thpools[i]->curr_ts_num < ts_num){ts_num = mtp->thpools[i]->curr_ts_num;tp = mtp->thpools[i];index=i;}}if(!tp){tp = mtp->thpools[0];}return add_task_threadpool(tp, func, args, priority);
}/*** @brief:釋放線程池組函數* @tp: 線程池組句柄* @return:none
*/
void destory_group_threadpool(manange_thpool_t* tp)
{if(!tp) return;for(int i=0; i<tp->thpool_nums; ++i){if(tp->thpools[i]) destory_threadpool(tp->thpools[i]);}
}
4.測試
測試程序如下:
#include <stdio.h>
#include <unistd.h>
#include "list.h"
#include "threadpool.h"
#include "manange_threadpool.h"//任務傳遞的參數
typedef struct info_t
{int times;char buffer[32];
}info_t;void task1(void *args)
{info_t *info = (info_t*)args;printf("handle task1 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);free(args);
}void task2(void *args)
{info_t *info = (info_t*)args;printf("handle task2 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);free(args);
}void task3(void *args)
{info_t *info = (info_t*)args;printf("handle task3 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);free(args);
}//------------split-----------------void test_threadpool(void)
{threadpool_t* tp = create_threadpool(4, 128);info_t *info;for(int t=0; t<10; ++t){for(int i=0; i<32; ++i){info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test ThreadPool task1 info...");add_task_threadpool(tp, task1, info, 1); //往線程池組添加任務info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test ThreadPool task2 info...");add_task_threadpool(tp, task2, info, 0);info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test ThreadPool task3 info...");add_task_threadpool(tp, task3, info, 0);}sleep(1);}destory_threadpool(tp);printf("Test ThreadPool Finish...\n");
}void test_manange_threadpool(void)
{//創建線程池組句柄,有4個線程池,每個線程池使用4線程,每個線程池最大的任務數是32manange_thpool_t* mtp = create_group_threadpool(4, 4, 128);info_t *info;for(int t=0; t<10; ++t){for(int i=0; i<32; ++i){info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test task1 info...");add_task_group_threadpool(mtp, task1, info, 1); //往線程池組添加任務info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test task2 info...");add_task_group_threadpool(mtp, task2, info, 0);info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test task3 info...");add_task_group_threadpool(mtp, task3, info, 0);}sleep(1);}//釋放線程池組資源destory_group_threadpool(mtp);printf("Test Manage ThreadPool Finish...\n");
}int main(void)
{#if 1 //測試單個的線程池功能test_threadpool();
#else //測試線程池組功能test_manange_threadpool();
#endifreturn 0;
}
通過修改宏定義,決定使用線程池還是線程池組
-
測試線程池結果
?2.測試線程池組結果
?
5.總結
使用線程池情況:一般程序中有并發處理任務,但是處理的任務并發量不高時候采用線程池。
使用線程池組情況:程序中任務并發量很大情況下使用。