?
線程池框圖
C語言線程池詳解:從基礎到實現
通俗理解線程池
想象你開了一家快遞站,每天要處理很多包裹派送:
- ?沒有線程池?:每來一個包裹就雇一個新快遞員,送完就解雇
- 問題:頻繁招聘解雇成本高(線程創建銷毀開銷大)
- ?有線程池?:固定雇傭5個快遞員,包裹來了就分配給空閑的快遞員
- 優點:快遞員復用,效率高,管理方便
線程池就是這樣的"快遞員管理系統",預先創建一組線程,有任務時分配給空閑線程執行。
完整C語言線程池實現(帶詳細注釋)
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#define THREAD_NUM 5 // 線程池中線程數量// 任務結構體(相當于快遞站的"包裹")
typedef struct {void (*function)(void *); // 任務函數指針void *arg; // 函數參數
} Task;// 線程池結構體(相當于"快遞站")
typedef struct {Task *task_queue; // 任務隊列(存放待處理的任務)int queue_capacity; // 隊列容量int queue_size; // 當前隊列中任務數量int queue_front; // 隊首索引int queue_rear; // 隊尾索引pthread_t *threads; // 工作線程數組(相當于"快遞員")pthread_mutex_t mutex; // 互斥鎖,保護任務隊列pthread_cond_t cond; // 條件變量,線程等待信號int shutdown; // 線程池關閉標志
} ThreadPool;// 創建線程池
ThreadPool* thread_pool_create(int capacity) {ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));// 初始化任務隊列pool->queue_capacity = capacity;pool->task_queue = (Task *)malloc(sizeof(Task) * capacity);pool->queue_size = 0;pool->queue_front = 0;pool->queue_rear = 0;// 初始化線程數組pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * THREAD_NUM);// 初始化互斥鎖和條件變量pthread_mutex_init(&pool->mutex, NULL);pthread_cond_init(&pool->cond, NULL);// 設置線程池運行狀態pool->shutdown = 0;// 創建工作線程for (int i = 0; i < THREAD_NUM; i++) {pthread_create(&pool->threads[i], NULL, worker, (void *)pool);}return pool;
}// 工作線程函數(相當于"快遞員的工作流程")
void* worker(void* arg) {ThreadPool *pool = (ThreadPool *)arg;while (1) {pthread_mutex_lock(&pool->mutex); // 加鎖保護任務隊列// 當任務隊列為空且線程池未關閉時,線程等待while (pool->queue_size == 0 && !pool->shutdown) {pthread_cond_wait(&pool->cond, &pool->mutex); // 等待條件變量}// 如果線程池已關閉且任務已處理完,線程退出if (pool->shutdown && pool->queue_size == 0) {pthread_mutex_unlock(&pool->mutex);pthread_exit(NULL);}// 從任務隊列中取出一個任務Task task;task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;// 更新隊列狀態pool->queue_front = (pool->queue_front + 1) % pool->queue_capacity;pool->queue_size--;pthread_mutex_unlock(&pool->mutex); // 解鎖// 執行任務(快遞員開始派送包裹)printf("Thread %lu start working...\n", pthread_self());task.function(task.arg); // 調用任務函數printf("Thread %lu finish work!\n", pthread_self());}return NULL;
}// 添加任務到線程池
void thread_pool_add(ThreadPool *pool, void (*func)(void *), void *arg) {pthread_mutex_lock(&pool->mutex); // 加鎖保護任務隊列// 如果隊列已滿,等待(這里簡單處理,實際可以擴容或返回錯誤)while (pool->queue_size == pool->queue_capacity && !pool->shutdown) {pthread_cond_wait(&pool->cond, &pool->mutex);}if (pool->shutdown) {pthread_mutex_unlock(&pool->mutex);return;}// 添加任務到隊尾pool->task_queue[pool->queue_rear].function = func;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_capacity;pool->queue_size++;pthread_cond_signal(&pool->cond); // 喚醒一個等待的線程pthread_mutex_unlock(&pool->mutex); // 解鎖
}// 銷毀線程池
void thread_pool_destroy(ThreadPool *pool) {if (pool == NULL) return;// 設置關閉標志pool->shutdown = 1;// 喚醒所有等待的線程pthread_cond_broadcast(&pool->cond);// 等待所有線程退出for (int i = 0; i < THREAD_NUM; i++) {pthread_join(pool->threads[i], NULL);}// 釋放資源free(pool->threads);free(pool->task_queue);pthread_mutex_destroy(&pool->mutex);pthread_cond_destroy(&pool->cond);free(pool);
}// 示例任務函數1
void task_function1(void *arg) {int num = *(int *)arg;printf("Task1 processing number: %d\n", num);sleep(1); // 模擬耗時操作
}// 示例任務函數2
void task_function2(void *arg) {char *str = (char *)arg;printf("Task2 processing string: %s\n", str);sleep(2); // 模擬耗時操作
}int main() {// 創建線程池,任務隊列容量為10ThreadPool *pool = thread_pool_create(10);// 添加任務到線程池for (int i = 0; i < 10; i++) {int *num = (int *)malloc(sizeof(int));*num = i;thread_pool_add(pool, task_function1, (void *)num);}// 添加不同類型的任務char *str = "Hello ThreadPool";thread_pool_add(pool, task_function2, (void *)str);// 等待所有任務完成sleep(5);// 銷毀線程池thread_pool_destroy(pool);return 0;
}
關鍵組件詳細解釋
1. 任務隊列(Task Queue)
typedef struct {void (*function)(void *); // 函數指針void *arg; // 函數參數
} Task;
- ?作用?:存儲待執行的任務
- ?原理?:使用環形隊列實現,避免頻繁內存分配
- ?操作?:
- 隊尾添加任務
- 隊首取出任務
2. 線程池管理(ThreadPool)
typedef struct {Task *task_queue; // 任務隊列int queue_capacity; // 隊列容量int queue_size; // 當前任務數// ...其他成員
} ThreadPool;
- ?作用?:管理線程和任務隊列
- ?關鍵成員?:
threads
:工作線程數組mutex
:保護任務隊列的互斥鎖cond
:線程間通信的條件變量
3. 工作線程(Worker Thread)
void* worker(void* arg) {while (1) {// 1. 加鎖并檢查任務隊列// 2. 無任務時等待條件變量// 3. 取出任務并執行// 4. 解鎖}
}
- ?工作流程?:
- 檢查任務隊列
- 無任務則等待
- 有任務則取出執行
- 循環處理
4. 任務添加(thread_pool_add)
void thread_pool_add(ThreadPool *pool, void (*func)(void *), void *arg) {// 1. 加鎖// 2. 檢查隊列狀態// 3. 添加任務到隊列// 4. 喚醒一個等待線程// 5. 解鎖
}
- ?作用?:向線程池提交新任務
- ?關鍵點?:
- 隊列滿時等待
- 添加后喚醒工作線程
5. 線程池銷毀(thread_pool_destroy)
void thread_pool_destroy(ThreadPool *pool) {// 1. 設置關閉標志// 2. 喚醒所有線程// 3. 等待線程退出// 4. 釋放資源
}
- ?作用?:安全關閉線程池
- ?關鍵點?:
- 先通知所有線程
- 等待線程自然退出
線程池工作流程圖
主線程:[創建線程池] → [添加任務] → [銷毀線程池]| |↓ ↓
任務隊列: [任務1][任務2][任務3]...|↓
工作線程:[線程1取任務] → [執行] → [取下一個任務][線程2取任務] → [執行] → [取下一個任務]...
為什么需要這些機制?
?互斥鎖(mutex)??:
- 防止多個線程同時訪問任務隊列導致數據混亂
?條件變量(cond)??:
- 當任務隊列為空時,讓工作線程休眠等待
- 當有新任務時喚醒線程,避免忙等待
?環形隊列?:
- 高效利用內存,避免頻繁內存分配
- 先進先出(FIFO)的任務處理順序
?線程復用?:
- 避免頻繁創建銷毀線程的開銷
- 控制并發數量,防止系統過載
實際應用場景
- ?網絡服務器?:處理大量客戶端請求
- ?文件處理?:批量處理大量文件
- ?數據計算?:并行計算任務
- ?GUI應用?:后臺耗時任務處理
擴展改進建議
- ?動態調整線程數?:根據負載自動增減線程
- ?任務優先級?:支持高優先級任務插隊
- ?任務取消?:支持取消已提交但未執行的任務
- ?超時機制?:設置任務執行超時時間
- ?任務結果獲取?:提供獲取任務執行結果的機制