1. 作用
線程池內部維護了多個工作線程,每個工作線程都會去任務隊列中拿取任務并執行,當執行完一個任務后不是馬上銷毀,而是繼續保留執行其它任務。顯然,線程池提高了多線程的復用率,減少了創建和銷毀線程的時間。
2. 實現原理
線程池內部由任務隊列、工作線程和管理者線程組成。
任務隊列:存儲需要處理的任務。每個任務其實就是具體的函數,在任務隊列中存儲函數指針和對應的實參。當工作線程獲取任務后,就能根據函數指針來調用指定的函數。其實現可以是數組、鏈表、STL容器等。
工作線程:有N個工作線程,每個工作線程會去任務隊列中拿取任務,然后執行具體的任務。當任務被處理后,任務隊列中就不再有該任務了。當任務隊列中沒有任務時,工作線程就會阻塞。
管理者線程:周期性檢測忙碌的工作線程數量和任務數量。當任務較多線程不夠用時,管理者線程就會多創建幾個工作線程來加快處理(不會超過工作線程數量的上限)。當任務較少線程空閑多時,管理者線程就會銷毀幾個工作線程來減少內存占用(不會低于工作線程數量的下限)。
注意:線程池中沒有維護“生產者線程”,所謂的“生產者線程”就是往任務隊列中添加任務的線程。
3. 手撕線程池
參考來源:愛編程的大丙。
【1】threadpool.c:
#include "threadpool.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>#define NUMBER 2 //管理者線程增加或減少的工作線程數量//任務結構體
typedef struct Task {void (*func)(void* arg);void* arg;
} Task;//線程池結構體
struct ThreadPool {//任務隊列,視為環形隊列Task* taskQ;int queueCapacity; //隊列容量int queueSize; //當前任務個數int queueFront; //隊頭 -> 取任務int queueRear; //隊尾 -> 加任務//線程相關pthread_t managerID; //管理者線程IDpthread_t* threadIDs; //工作線程IDint minNum; //工作線程最小數量int maxNum; //工作線程最大數量int busyNum; //工作線程忙的數量int liveNum; //工作線程存活數量int exitNum; //要銷毀的工作線程數量pthread_mutex_t mutexPool; //鎖整個線程池pthread_mutex_t mutexBusy; //鎖busyNumpthread_cond_t notFull; //任務隊列是否滿pthread_cond_t notEmpty; //任務隊列是否空//線程池是否銷毀int shutdown; //釋放為1,否則為0
};/**************************************************************** 函 數: threadPoolCreate* 功 能: 創建線程池并初始化* 參 數: min---工作線程的最小數量* max---工作線程的最大數量* capacity---任務隊列的最大容量* 返回值: 創建的線程池的地址**************************************************************/
ThreadPool* threadPoolCreate(int min, int max, int capacity)
{//申請線程池空間ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {//此處循環只是為了便于失敗釋放空間,只會執行一次if (pool == NULL) {printf("pool create error!\n");break;}//申請任務隊列空間,并初始化pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);if (pool->taskQ == NULL) {printf("Task create error!\n");break;}pool->queueCapacity = capacity;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;//初始化互斥鎖和條件變量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0){printf("mutex or cond create error!\n");break;}//初始化shutdownpool->shutdown = 0;//初始化線程相關參數pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL) {printf("threadIDs create error!\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;pool->exitNum = 0;//創建管理者線程和工作線程pthread_create(&pool->managerID, NULL, manager, pool);//創建管理線程for (int i = 0; i < min; ++i) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);//創建工作線程}return pool;} while (0);//申請資源失敗,釋放已分配的資源if (pool && pool->taskQ) free(pool->taskQ);if (pool && pool->threadIDs) free(pool->threadIDs);if (pool) free(pool);return NULL;
}/**************************************************************** 函 數: threadPoolDestroy* 功 能: 銷毀線程池* 參 數: pool---要銷毀的線程池* 返回值: 0表示銷毀成功,-1表示銷毀失敗**************************************************************/
int threadPoolDestroy(ThreadPool* pool)
{if (!pool) return -1;//關閉線程池pool->shutdown = 1;//阻塞回收管理者線程pthread_join(pool->managerID, NULL);//喚醒所有工作線程,讓其自殺for (int i = 0; i < pool->liveNum; ++i) {pthread_cond_signal(&pool->notEmpty);}//釋放所有互斥鎖和條件變量pthread_mutex_destroy(&pool->mutexBusy);pthread_mutex_destroy(&pool->mutexPool);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);//釋放堆空間if (pool->taskQ) {free(pool->taskQ);pool->taskQ = NULL;}if (pool->threadIDs) {free(pool->threadIDs);pool->threadIDs = NULL;}free(pool);pool = NULL;return 0;
}/**************************************************************** 函 數: threadPoolAdd* 功 能: 生產者往線程池的任務隊列中添加任務* 參 數: pool---線程池* func---函數指針,要執行的任務地址* arg---func指向的函數的實參* 返回值: 無**************************************************************/
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);//任務隊列滿,阻塞生產者while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {pthread_cond_wait(&pool->notFull, &pool->mutexPool);}//判斷線程池是否關閉if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);return;}//添加任務進pool->taskQpool->taskQ[pool->queueRear].func = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueSize++;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pthread_cond_signal(&pool->notEmpty);//喚醒工作線程pthread_mutex_unlock(&pool->mutexPool);
}/**************************************************************** 函 數: getThreadPoolBusyNum* 功 能: 獲取線程池忙的工作線程數量* 參 數: pool---線程池* 返回值: 忙的工作線程數量**************************************************************/
int getThreadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}/**************************************************************** 函 數: getThreadPoolAliveNum* 功 能: 獲取線程池存活的工作線程數量* 參 數: pool---線程池* 返回值: 存活的工作線程數量**************************************************************/
int getThreadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return liveNum;
}/**************************************************************** 函 數: worker* 功 能: 工作線程的執行函數* 參 數: arg---實參傳入,這里傳入的是線程池* 返回值: 空指針**************************************************************/
void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1) {/* 1.取出任務隊列中的隊頭任務 */pthread_mutex_lock(&pool->mutexPool);//無任務就阻塞線程while (pool->queueSize == 0 && !pool->shutdown) {pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);//喚醒后,判斷是不是要銷毀線程if (pool->exitNum > 0) {//線程自殺pool->exitNum--;//銷毀指標-1if (pool->liveNum > pool->minNum) {pool->liveNum--;//活著的工作線程-1pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}//線程池關閉了就退出線程if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}//取出pool中taskQ的任務Task task;task.func = pool->taskQ[pool->queueFront].func;task.arg = pool->taskQ[pool->queueFront].arg;pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移動隊頭pool->queueSize--;//通知生產者添加任務pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);/* 2.設置pool的busyNum+1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);/* 3.執行取出的任務 */printf("thread %ld start working ...\n", pthread_self());task.func(task.arg);free(task.arg);task.arg = NULL;printf("thread %ld end working ...\n", pthread_self());/* 4.設置pool的busyNum-1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}/**************************************************************** 函 數: manager* 功 能: 管理者線程的執行函數* 參 數: arg---實參傳入,這里傳入的是線程池* 返回值: 空指針**************************************************************/
void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown) {/* 每隔3秒檢測一次 */sleep(3);/* 獲取pool中相關變量 */pthread_mutex_lock(&pool->mutexPool);int taskNum = pool->queueSize; //任務隊列中的任務數量int liveNum = pool->liveNum; //存活的工作線程數量int busyNum = pool->busyNum; //忙碌的工作線程數量pthread_mutex_unlock(&pool->mutexPool);/* 功能一:增加工作線程,每次增加NUMBER個 *///當任務個數大于存活工作線程數,且存活工作線程數小于最大值if (taskNum > liveNum && liveNum < pool->maxNum) {pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}/* 功能二:銷毀工作線程,每次銷毀NUMBER個 *///當忙的線程數*2 < 存活線程數,且存活線程數 > 最小線程數if (busyNum * 2 < liveNum && liveNum > pool->minNum) {pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;//喚醒NUMBER個工作線程,讓其解除阻塞,在worker函數中自殺for (int i = 0; i < NUMBER; ++i) {pthread_cond_signal(&pool->notEmpty);}pthread_mutex_unlock(&pool->mutexPool);}}return NULL;
}/**************************************************************** 函 數: threadExit* 功 能: 工作線程退出函數,將工作線程的ID置為0,然后退出* 參 數: pool---線程池* 返回值: 無**************************************************************/
void threadExit(ThreadPool* pool)
{//將pool->threadIDs中的ID改為0pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; i++) {if (pool->threadIDs[i] == tid) {pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);//退出
}
【2】threadpool.h:
#ifndef _THREADPOOL_H
#define _THREADPOOL_Htypedef struct ThreadPool ThreadPool;//創建線程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int capacity);//銷毀線程池
int threadPoolDestroy(ThreadPool* pool);//給線程池添加任務
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);//獲取當前忙碌的工作線程的數量
int getThreadPoolBusyNum(ThreadPool* pool);//獲取當前存活的工作線程的數量
int getThreadPoolAliveNum(ThreadPool* pool);/*********************其它函數**********************/
void* worker(void* arg);//工作線程的執行函數
void* manager(void* arg);//管理者線程的執行函數
void threadExit(ThreadPool* pool);//線程退出函數#endif
【3】main.c:
#include <stdio.h>
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>//任務函數,所有線程都執行此任務
void testFunc(void* arg)
{int* num = (int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), *num);sleep(1);
}int main()
{//創建線程池: 最少3個工作線程,最多10個,任務隊列容量為100ThreadPool* pool = threadPoolCreate(3, 10, 100);//加入100個任務于任務隊列for (int i = 0; i < 100; ++i) {int* num = (int*)malloc(sizeof(int));*num = i + 100;threadPoolAdd(pool, testFunc, num);}//銷毀線程池sleep(30);//保證任務全部運行完畢threadPoolDestroy(pool);return 0;
}
【4】運行結果:
......