線程池的原理與實現
- 線程池簡介
- 1.線程池
- 1.線程池
- 2.數量固定的原因
- 3.線程數量如何確定
- 4.為什么需要線程池
- 5.線程池結構
- 線程池的實現
- 數據結構設計
- 1.任務結構
- 2.任務隊列結構
- 3.線程池結構
- 接口設計
- 線程池的應用
- reactor
- redis 中線程池
- skynet 中線程池
線程池簡介
1.線程池
1.線程池
一種管理維持固定線程數量的池式結構。
2.數量固定的原因
- 避免頻繁的創建和銷毀線程,造成資源浪費
- 隨著線程的數量不斷增多,并不能實現性能的提升,相反可能造成系統負擔
3.線程數量如何確定
- CPU密集型:proc(CPU核心數)
- IO密集型(網絡IO,磁盤IO):2 * proc(CPU核心數)
4.為什么需要線程池
某類任務執行特別耗時,造成阻塞,嚴重影響該線程執行其他任務
作用:
- 復用線程資源;
- 減少線程創建和銷毀的開銷;
- 可異步處理生產者線程的任務;
- 減少了多個任務(不是一個任務)的執行時間;
5.線程池結構
線程池屬于一種生產消費模型。
線程池運行環境構成:
生產者線程:發布任務
隊列:存儲任務,調度線程池
消費者線程:取出任務,執行任務
線程池的實現
數據結構設計
1.任務結構
typedef struct task_s {void *next;handler_pt func;void *arg;
} task_t;
2.任務隊列結構
typedef struct task_queue_s {void *head;void **tail; int block;spinlock_t lock;pthread_mutex_t mutex;pthread_cond_t cond;
} task_queue_t;
3.線程池結構
struct thrdpool_s {task_queue_t *task_queue;atomic_int quit;int thrd_count;pthread_t *threads;
};
接口設計
// 創建對象的時候,回滾式的
static task_queue_t *
__taskqueue_create() {task_queue_t *queue = (task_queue_t *)malloc(sizeof(*queue));if (!queue) return NULL;int ret;ret = pthread_mutex_init(&queue->mutex, NULL);if (ret == 0) {ret = pthread_cond_init(&queue->cond, NULL);if (ret == 0) {spinlock_init(&queue->lock);queue->head = NULL;queue->tail = &queue->head;queue->block = 1;return queue;}pthread_cond_destroy(&queue->cond);}pthread_mutex_destroy(&queue->mutex);return NULL;
}static void
__nonblock(task_queue_t *queue) {pthread_mutex_lock(&queue->mutex);queue->block = 0;pthread_mutex_unlock(&queue->mutex);pthread_cond_broadcast(&queue->cond);
}static inline void
__add_task(task_queue_t *queue, void *task) {void **link = (void **)task; // malloc *link = NULL; // task->next = NULL;spinlock_lock(&queue->lock);*queue->tail = link;queue->tail = link;spinlock_unlock(&queue->lock);pthread_cond_signal(&queue->cond);
}static inline void *
__pop_task(task_queue_t *queue) {spinlock_lock(&queue->lock);if (queue->head == NULL) {spinlock_unlock(&queue->lock);return NULL;}task_t *task;task = queue->head;queue->head = task->next;if (queue->head == NULL) {queue->tail = &queue->head;}spinlock_unlock(&queue->lock);return task;
}static inline void *
__get_task(task_queue_t *queue) {task_t *task;// 虛假喚醒while ((task = __pop_task(queue)) == NULL) {pthread_mutex_lock(&queue->mutex);if (queue->block == 0) {// break;pthread_mutex_unlock(&queue->mutex);return NULL;}// 1. 先 unlock(&mtx);// 2. 在 cond 休眠// ----- 生產者產生任務 signal// 3. 在 cond 喚醒// 4. 加上 lock(&mtx);pthread_cond_wait(&queue->cond, &queue->mutex);pthread_mutex_unlock(&queue->mutex);}return task;
}static void
__taskqueue_destroy(task_queue_t *queue) {task_t *task;while ((task = __pop_task(queue))) { // 任務的生命周期由 thrdpool 管理free(task);}spinlock_destroy(&queue->lock);pthread_cond_destroy(&queue->cond);pthread_mutex_destroy(&queue->mutex);free(queue);
}static void *
__thrdpool_worker(void *arg) {thrdpool_t *pool = (thrdpool_t*) arg;task_t *task;void *ctx;while (atomic_load(&pool->quit) == 0) {task = (task_t*)__get_task(pool->task_queue);if (!task) break;handler_pt func = task->func;ctx = task->arg;free(task);func(ctx);}return NULL;
}static void
__threads_terminate(thrdpool_t * pool) {atomic_store(&pool->quit, 1);__nonblock(pool->task_queue);int i;for (i=0; i<pool->thrd_count; i++) {pthread_join(pool->threads[i], NULL);}
}static int
__threads_create(thrdpool_t *pool, size_t thrd_count) {pthread_attr_t attr;int ret;ret = pthread_attr_init(&attr);if (ret == 0) {pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);if (pool->threads) {int i = 0;for (; i < thrd_count; i++) {if (pthread_create(&pool->threads[i], &attr, __thrdpool_worker, pool) != 0) {break;}}pool->thrd_count = i;pthread_attr_destroy(&attr);if (i == thrd_count)return 0;__threads_terminate(pool);free(pool->threads);}ret = -1;}return ret;
}void
thrdpool_terminate(thrdpool_t * pool) {atomic_store(&pool->quit, 1);__nonblock(pool->task_queue);
}thrdpool_t *
thrdpool_create(int thrd_count) {thrdpool_t *pool;pool = (thrdpool_t*) malloc(sizeof(*pool));if (!pool) return NULL;task_queue_t *queue = __taskqueue_create();if (queue) {pool->task_queue = queue;atomic_init(&pool->quit, 0);if (__threads_create(pool, thrd_count) == 0) {return pool;}__taskqueue_destroy(pool->task_queue);}free(pool);return NULL;
}int
thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg) {if (atomic_load(&pool->quit) == 1) {return -1;}task_t *task = (task_t *)malloc(sizeof(task_t));if (!task) return -1;task->func = func;task->arg = arg;__add_task(pool->task_queue, task);return 0;
}void
thrdpool_waitdone(thrdpool_t *pool) {int i;for (i=0; i<pool->thrd_count; i++) {pthread_join(pool->threads[i], NULL);}__taskqueue_destroy(pool->task_queue);free(pool->threads);free(pool);
}
線程池的應用
reactor
在一個事件循環中,可以處理多個就緒事件,這些就緒事件在reactor 模型中時串行執行的,一個事件處理若耗時較長,會延遲其他同時觸發的事件的處理(對于客戶端而言,響應會變得較慢)。
redis 中線程池
作用:讀寫 io 處理以及數據包解析、壓縮;