線程池
什么是線程池?
一次預先申請一批線程,讓這批線程有任務,就處理任務;沒任務,就處于等待狀態。
為什么要有線程池?
以空間換時間,預先申請一批線程,當有任務到來,可以直接指派給線程執行。
// task.hpp
#pragma once#include <functional>using namespace std;typedef function<int(int, int)> calc_func_t;class Task
{
public:Task() {}Task(int x, int y, calc_func_t func): _x(x), _y(y), _calc_func(func){}// 加法計算的任務int operator()() { return _calc_func(_x, _y); }int get_x() { return _x; }int get_y() { return _y; }
private:int _x;int _y;calc_func_t _calc_func;
};
// log.hpp
#pragma once#include <string>
#include <stdarg.h>
#include <unordered_map>using namespace std;#define LOG_FILE "./threadpool.log"// 日志是有日志級別的
enum LogLevel
{DEBUG,NORMAL,WARNING,ERROR,FATAL
};// 針對枚舉類型的哈希函數
template <typename T>
class EnumHash
{
public:size_t operator()(const T& t) const { return static_cast<size_t>(t); }
};
unordered_map<LogLevel, string, EnumHash<LogLevel>> logLevelMap = {{DEBUG, "DEBUG"},{NORMAL, "NORMAL"},{WARNING, "WARNING"},{ERROR , "ERROR"},{FATAL, "FATAL"}
};// 完整的日志功能,至少有:日志等級 時間 支持用戶自定義
void logMessage(LogLevel log_level, const char* format, ...)
{
#ifndef DEBUG_SHOWif(log_level == DEBUG) return; // DEBUG_SHOW沒有定義,不展示DEBUG信息
#endif char stdBuffer[1024]; // 標準部分char logBuffer[1024]; // 自定義部分time_t timestamp = time(nullptr);struct tm* ploct = localtime(×tamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%04d-%02d-%02d %02d:%02d:%02d]", logLevelMap[log_level].c_str(),\1900 + ploct->tm_year, 1 + ploct->tm_mon, ploct->tm_mday, ploct->tm_hour, ploct->tm_min, ploct->tm_sec);va_list args;va_start(args, format);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);FILE* log_file = fopen(LOG_FILE, "a");fprintf(log_file, "%s %s\n", stdBuffer, logBuffer);fclose(log_file);
}
va_*
系列函數與vprintf
系列函數配合使用可以格式化打印傳入的可變參數的內容。
// thread.hpp
#pragma once#include <string>
#include <cstdio>
#include <pthread.h>using namespace std;// 對應創建線程時的routine函數的類型
typedef void*(*func_t)(void*);class ThreadData
{
public:void* _ptpool; // 指向線程池對象string _name;
};class Thread
{
public:Thread(int num, func_t callBack, void* _ptpool): _func(callBack){char nameBuffer[64];snprintf(nameBuffer, sizeof(nameBuffer), "Thread_%d", num);_tdata._name = nameBuffer;_tdata._ptpool = _ptpool;}void start() { pthread_create(&_tid, nullptr, _func, (void*)&_tdata); }void join() { pthread_join(_tid, nullptr); }const string& name() { return _tdata._name; }
private:pthread_t _tid; // 線程IDfunc_t _func; // 線程routineThreadData _tdata; // 線程數據
};
// threadPool.hpp
#pragma once#include <vector>
#include <queue>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"const int g_thread_num = 3;// 線程池:本質是生產消費模型
template<class T>
class threadPool
{
private:threadPool(int thread_num = g_thread_num): _thread_num(thread_num){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_cond, nullptr);for(int i = 0; i < _thread_num; ++i){_threads.push_back(new Thread(i + 1/*線程編號*/, routine, this/*可以傳this指針*/));}}threadPool(const threadPool<T>&) = delete;const threadPool<T>& operator=(const threadPool<T>&) = delete;
public:// 考慮多個線程使用單例的情況static threadPool<T>* getThreadPool(int thread_num = g_thread_num){if(nullptr == _pthread_pool){lockGuard lock_guard(&_pool_lock);// 在單例創建好后,鎖也就沒有意義了// 將來任何一個線程要獲取單例,仍必須調用getThreadPool接口// 這樣一定會存在大量的申請和釋放鎖的行為// 所以外層if判斷,用于在單例創建的情況下,攔截大量的線程因請求單例而訪問鎖的行為if(nullptr == _pthread_pool){_pthread_pool = new threadPool<T>(thread_num);}}return _pthread_pool;}void run(){for(auto& pthread : _threads){pthread->start();logMessage(NORMAL, "%s %s", (pthread->name()).c_str(), "啟動成功");}}void pushTask(const T& task){lockGuard lock_guard(&_lock);_task_queue.push(task);pthread_cond_signal(&_cond);}~threadPool(){for(auto& pthread : _threads){pthread->join();delete pthread;}pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);}
public:pthread_mutex_t* getMutex(){return &_lock;}bool isEmpty(){return _task_queue.empty();}void waitCond(){pthread_cond_wait(&_cond, &_lock);}T& getTask(){T& task = _task_queue.front();_task_queue.pop();return task;}
private:// 消費過程static void* routine(void* args){ThreadData* tdata = (ThreadData*)args;threadPool<T>* tpool = (threadPool<T>*)tdata->_ptpool;while(true){T task;{lockGuard lock_guard(tpool->getMutex());while (tpool->isEmpty()) tpool->waitCond();task = tpool->getTask();}logMessage(WARNING, "%s 處理完成: %d + %d = %d", (tdata->_name).c_str(), task.get_x(), task.get_y(), task());}}
private:vector<Thread*> _threads; // 數組存放創建的線程的地址int _thread_num; // 創建的線程個數queue<T> _task_queue; // 阻塞式任務隊列pthread_mutex_t _lock; // 針對任務隊列的鎖pthread_cond_t _cond; // 隊列空滿情況的條件變量static threadPool<T>* _pthread_pool; // 餓漢式線程池static pthread_mutex_t _pool_lock; // 針對線程池的鎖
};template<class T>
threadPool<T>* threadPool<T>::_pthread_pool = nullptr;
template<class T>
pthread_mutex_t threadPool<T>::_pool_lock = PTHREAD_MUTEX_INITIALIZER;
// test.cc
#include "task.hpp"
#include "threadPool.hpp"
#include <unistd.h>
#include <ctime>void test1()
{srand((unsigned int)time(nullptr) ^ getpid());threadPool<Task>::getThreadPool()->run();while(true){// 生產的過程 - 制作任務的時候要花時間的int x = rand() % 100 + 1;usleep(2023);int y = rand() % 50 + 1;Task task(x, y, [](int x, int y){ return x + y; });logMessage(DEBUG, "制作任務完成: %d + %d = ?", x, y);// 推送任務到線程池threadPool<Task>::getThreadPool()->pushTask(task);sleep(1);}
}
# Makefile
test:test.ccg++ -o $@ $^ -std=c++11 -lpthread -DDEBUG_SHOW
.PHONY:clean
clean:rm -f test
運行結果:
自旋鎖
自旋鎖:本質是通過不斷檢測鎖的狀態,來確定資源是否就緒的方案。
什么時候使用自旋鎖?這個由臨界資源就緒的時間長短決定。
自旋鎖的初始化 & 銷毀:
自旋鎖的加鎖:
自旋鎖的解鎖:
讀者寫者問題
寫者與寫者:互斥關系
讀者與寫者:互斥 & 同步關系
讀者與讀者:共享關系
讀者寫者問題和生產消費模型的本質區別在于,消費者會拿走數據(做修改),而讀者不會。
讀寫鎖的初始化 & 銷毀:
讀寫鎖之讀者加鎖:
讀寫鎖之寫者加鎖:
讀寫鎖的解鎖:
關于是讀者還是寫者優先的問題,拋開應用場景去談技術細節就是耍流氓。
而pthread庫中的讀寫鎖默認采用讀者優先,這類的應用場景主要是:數據被讀取的頻率非常高,被修改的頻率非常低。