線程池流程總結:
1、構造函數中創建線程,并添加到線程池(構造函數返回時,線程自動啟動,并停在等待wait:從線程池取出一個任務處);
2、主線程中添加任務,到任務隊列。并用“條件變量”通知一個線程,從線程池取出一個任務;
3、取出任務后,執行線程的任務函數 =》回調添加的“實際的線程函數”;
4、主線程執行完,return返回 =》調用線程池析構函數;
5、“條件變量”通知所有線程停止,使得線程循環退出,并等待所有線程完成任務;
6、主線程main結束。
一、C++線程池1
1、用c++封裝線程池
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>using namespace std;class ThreadPool {
public:ThreadPool(size_t threads) : stop(false) {for (size_t i = 0; i < threads; ++i) {//一、lambda表達式返回值是一個線程對象,為什么沒有看見創建線程的語句?// thread是什么時候創建的呢?// //二、lambda表達式什么時候執行?// 1.當線程池的構造函數返回時,線程池中的線程才開始運行// 2.當你創建一個std::thread對象并傳入一個函數時(對象的實例化),// 這個線程會自動開始執行該函數。因此,通常你不需要顯式調用start()方法!//lambda表達式創建線程,并將線程加入線程池workers.emplace_back([this] {//線程循環,不斷從任務隊列中取出任務并執行while (true) {//取出任務std::function<void()> task;{//互斥鎖保護:任務隊列和線程池停止狀態std::unique_lock<std::mutex> lock(this->queueMutex);bool empty = this->tasks.empty();bool stopped = this->stop;//等待條件變量通知或線程池停止this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });//線程池停止且任務隊列為空,退出線程循環if (this->stop && this->tasks.empty()) return;//取出任務task = std::move(this->tasks.front());//從任務隊列中刪除任務this->tasks.pop();}//執行任務task();}});}}//定義任務入隊函數模板template<class F, class... Args>void enqueue(F&& fun, Args&&... args) // 添加任務到任務隊列(傳遞:線程函數、參數){//將任務封裝成std::functionauto task = std::bind(std::forward<F>(fun), std::forward<Args>(args)...);{//互斥鎖保護:任務隊列和線程池停止狀態std::unique_lock<std::mutex> lock(queueMutex);//線程池停止if (stop) throw std::runtime_error("Enqueue on stopped ThreadPool");//將任務加入任務隊列tasks.emplace(task);}//通知一個線程condition.notify_one();}// 析構函數:等待所有線程完成任務,并停止線程池~ThreadPool() {{//互斥鎖保護:線程池停止狀態std::unique_lock<std::mutex> lock(queueMutex);stop = true;}//通知所有線程condition.notify_all();//等待所有線程完成任務for (std::thread& worker : workers) {worker.join();}}
private:std::vector<std::thread> workers;//線程池std::queue<std::function<void()>> tasks;//任務隊列std::mutex queueMutex;//互斥鎖(保護任務隊列tasks 和線程池停止狀態stop)std::condition_variable condition;//條件變量(加入任務到任務隊列時通知一個線程)bool stop;//線程池是否停止
};
2、main測試
int main() {ThreadPool pool(2); // 創建一個包含4個線程的線程池for (int i = 0; i < 2; ++i) { // 添加2個任務到線程池中執行//任務入隊函數模板:輸出任務編號和線程IDpool.enqueue([i] { std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl; });}system("pause");return 0; // 主線程等待所有工作線程完成(在析構函數中處理)
}
3、通過測試,可以看出“線程池執行步驟”:
1、主線程 ThreadPool pool(2); // 創建一個包含4個線程的線程池
ThreadPool構造函數中:向“線程池workers”中添加lambda表達式形式的線程
//1.1線程循環,不斷從任務隊列中取出任務并執行
while(true)
{//1.2等待:條件變量通知或線程池停止this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
}2、主線程中添加任務,到任務隊列 函數模板:輸出任務編號和線程ID
pool.enqueue([i] {?std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl;?});//2.1將任務加入任務隊列(先用包裝器function包裝任務)
tasks.emplace(task);//2.2條件變量通知一個線程(每加入一個任務,就通知一次線程執行任務!!)
condition.notify_one();//2.3線程循環中的等待(條件變量通知),滿足條件,開始向下執行!
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });//2.4從任務隊列取出任務
task = std::move(this->tasks.front());//2.5 執行任務(回調)task();=》回調“實際的線程函數”=》就是添加進來的lambda表達式:std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl;?3、主線程執行完。return 0; // 主線程返回//3.1 調用線程池析構函數
~ThreadPool()?
{//1.設置線程停止標識為truestop = true;//2.通知所有線程condition.notify_all();=》//線程循環while(true){//while循環一直在這等待!(lock滿足條件,向下執行!)this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });//當線程任務執行完成:stop = true;//線程池停止且任務隊列為空,退出線程循環if (this->stop && this->tasks.empty()) return;}3.//等待所有線程完成任務worker.join();
}4、主線程結束
二、C++線程池2
1、線程池.h頭文件
#ifndef _thread_pool_HPP
#define _thread_pool_HPP#include <vector>
#include <deque>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>//!
//! convenience macro to log with file and line information
//!
#ifdef __SOLA_LOGGING_ENABLED
#define __SOLA_LOG(level, msg) sola::level(msg, __FILE__, __LINE__);
#else
#define __SOLA_LOG(level, msg)
#endif /* __SOLA_LOGGING_ENABLED */namespace sola {class logger_iface {
public://! ctorlogger_iface(void) = default;//! dtorvirtual ~logger_iface(void) = default;//! copy ctorlogger_iface(const logger_iface&) = default;//! assignment operatorlogger_iface& operator=(const logger_iface&) = default;public://!//! debug logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void debug(const std::string& msg, const std::string& file, std::size_t line) = 0;//!//! info logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void info(const std::string& msg, const std::string& file, std::size_t line) = 0;//!//! warn logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void warn(const std::string& msg, const std::string& file, std::size_t line) = 0;//!//! error logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void error(const std::string& msg, const std::string& file, std::size_t line) = 0;
};//!
//! default logger class provided by the library
//!
class logger : public logger_iface {
public://!//! log level//!enum class log_level {error = 0,warn = 1,info = 2,debug = 3};public://! ctorlogger(log_level level = log_level::info);//! dtor~logger(void) = default;//! copy ctorlogger(const logger&) = default;//! assignment operatorlogger& operator=(const logger&) = default;public://!//! debug logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void debug(const std::string& msg, const std::string& file, std::size_t line);//!//! info logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void info(const std::string& msg, const std::string& file, std::size_t line);//!//! warn logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void warn(const std::string& msg, const std::string& file, std::size_t line);//!//! error logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void error(const std::string& msg, const std::string& file, std::size_t line);private://!//! current log level in use//!log_level m_level;//!//! mutex used to serialize logs in multithreaded environment//!std::mutex m_mutex;
};//!
//! variable containing the current logger
//! by default, not set (no logs)
//!
extern std::unique_ptr<logger_iface> active_logger;//!
//! debug logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void debug(const std::string& msg, const std::string& file, std::size_t line);//!
//! info logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void info(const std::string& msg, const std::string& file, std::size_t line);//!
//! warn logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void warn(const std::string& msg, const std::string& file, std::size_t line);//!
//! error logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void error(const std::string& msg, const std::string& file, std::size_t line);class thread_pool{public://任務包裝器typedef std::function<void()> task_t;thread_pool(int init_size = 3);~thread_pool();void stop();void add_task(const task_t&); //thread safe; 添加任務private:thread_pool(const thread_pool&);//禁止復制拷貝.const thread_pool& operator=(const thread_pool&);bool is_started() { return m_is_started; }void start();//啟動線程池void thread_loop();//線程循環函數task_t take();//取任務函數private:typedef std::vector<std::thread*> threads_t;//線程容器typedef std::deque<task_t> tasks_t;//任務隊列int m_init_threads_size;//初始線程數量threads_t m_threads;//線程容器tasks_t m_tasks;//任務隊列std::mutex m_mutex;//互斥鎖std::condition_variable m_cond;//條件變量bool m_is_started;//線程池是否啟動};}
#endif
2、線程池cpp文件
#include <assert.h>
#include <iostream>
#include <sstream>
#include "thread_pool.hpp"namespace sola {std::unique_ptr<logger_iface> active_logger = nullptr;static const char black[] = {0x1b, '[', '1', ';', '3', '0', 'm', 0};static const char red[] = {0x1b, '[', '1', ';', '3', '1', 'm', 0};static const char yellow[] = {0x1b, '[', '1', ';', '3', '3', 'm', 0};static const char blue[] = {0x1b, '[', '1', ';', '3', '4', 'm', 0};static const char normal[] = {0x1b, '[', '0', ';', '3', '9', 'm', 0};logger::logger(log_level level): m_level(level) {}voidlogger::debug(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::debug) {std::lock_guard<std::mutex> lock(m_mutex);std::cout << "[" << black << "DEBUG" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voidlogger::info(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::info) {std::lock_guard<std::mutex> lock(m_mutex);std::cout << "[" << blue << "INFO " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voidlogger::warn(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::warn) {std::lock_guard<std::mutex> lock(m_mutex);std::cout << "[" << yellow << "WARN " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voidlogger::error(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::error) {std::lock_guard<std::mutex> lock(m_mutex);std::cerr << "[" << red << "ERROR" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voiddebug(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->debug(msg, file, line);}voidinfo(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->info(msg, file, line);}voidwarn(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->warn(msg, file, line);}voiderror(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->error(msg, file, line);}static std::stringget_tid(){std::stringstream tmp;tmp << std::this_thread::get_id();return tmp.str();}//線程池構造函數thread_pool::thread_pool(int init_size):m_init_threads_size(init_size),m_mutex(),m_cond(),m_is_started(false){//構造函數中自動啟動線程池start();}thread_pool::~thread_pool(){//如果線程池已經啟動,則先停止if(m_is_started){stop();}}//啟動線程池void thread_pool::start(){assert(m_threads.empty());m_is_started = true;m_threads.reserve(m_init_threads_size);//預先給線程容器分配空間for (int i = 0; i < m_init_threads_size; ++i){//創建線程并加入線程容器(線程函數為thread_loop)m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));}}//停止線程池void thread_pool::stop(){__SOLA_LOG(debug, "thread_pool::stop() stop.");{std::unique_lock<std::mutex> lock(m_mutex);m_is_started = false;//通知所有線程停止m_cond.notify_all();__SOLA_LOG(debug, "thread_pool::stop() notifyAll().");}for (threads_t::iterator it = m_threads.begin(); it != m_threads.end() ; ++it){(*it)->join(); //等待線程退出delete *it; //釋放線程資源}m_threads.clear();//清空線程容器}//線程池線程函數void thread_pool::thread_loop(){__SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " start.");//線程池線程循環while(m_is_started){//從任務隊列中取出一個任務task_t task = take();//如果取出的任務不為空,則執行任務(std::function類型可以直接判斷是否為空!)if(task){//執行任務:回調實際的任務函數task();}}__SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " exit.");}//向線程池添加任務void thread_pool::add_task(const task_t& task){std::unique_lock<std::mutex> lock(m_mutex);/*while(m_tasks.isFull()){//when m_tasks have maxsizecond2.notify_one();}*///向任務隊列中添加任務m_tasks.push_back(task);//通知一個線程m_cond.notify_one();}//從線程池取出一個任務thread_pool::task_t thread_pool::take(){std::unique_lock<std::mutex> lock(m_mutex);//always use a while-loop, due to spurious wakeup//如果任務隊列為空 + 線程池沒有停止,則等待while(m_tasks.empty() && m_is_started){__SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wait.");m_cond.wait(lock);}__SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wakeup.");task_t task;tasks_t::size_type size = m_tasks.size();//如果任務隊列不為空 + 線程池沒有停止,則取出一個任務if(!m_tasks.empty() && m_is_started){task = m_tasks.front();m_tasks.pop_front();assert(size - 1 == m_tasks.size());/*if (TaskQueueSize_ > 0){cond2.notify_one();}*/}return task;}
}
3、測試main函數
#include <iostream>
#include <chrono>
#include <condition_variable>
#include "thread_pool.hpp"std::mutex g_mutex;void priorityFunc()
{for (int i = 1; i < 4; ++i){std::this_thread::sleep_for(std::chrono::seconds(1));std::lock_guard<std::mutex> lock(g_mutex);std::cout << "priorityFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;}}void testFunc()
{// loop to print character after a random period of timefor (int i = 1; i < 4; ++i){std::this_thread::sleep_for(std::chrono::seconds(1));std::lock_guard<std::mutex> lock(g_mutex);std::cout << "testFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;}}int main()
{sola::active_logger = std::unique_ptr<sola::logger>(new sola::logger(sola::logger::log_level::debug));sola::thread_pool thread_pool;// add tasks to the thread poolfor(int i = 0; i < 2 ; i++)thread_pool.add_task(testFunc);system("pause");return 0;
}
4、執行流程
1、主線程構造函數
thread_pool thread_pool;
{//構造函數中自動啟動線程池start();
}start()
{//創建線程并加入線程容器(線程函數為thread_loop)m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));
}//線程池線程函數
void thread_pool::thread_loop()
{//線程池線程循環while(m_is_started){//從任務隊列中取出一個任務task_t task = take();//如果取出的任務不為空,則執行任務(std::function類型可以直接判斷是否為空!)if(task){//執行任務:回調實際的任務函數task();}}
}//等待:從線程池取出一個任務
thread_pool::task_t thread_pool::take()
{//如果任務隊列為空 + 線程池沒有停止,則等待while(m_tasks.empty() && m_is_started){m_cond.wait(lock);}
}2、主線程中添加任務,到任務隊列
thread_pool.add_task(testFunc);thread_pool::add_task(testFunc)
{//向任務隊列中添加任務m_tasks.push_back(task);//通知一個線程m_cond.notify_one();
}//wait條件滿足:從線程池取出一個任務
thread_pool::task_t thread_pool::take()
{//如果任務隊列為空 + 線程池沒有停止,則等待while(m_tasks.empty() && m_is_started){m_cond.wait(lock);}//如果任務隊列不為空 + 線程池沒有停止,則取出一個任務if(!m_tasks.empty() && m_is_started){task = m_tasks.front();m_tasks.pop_front();}return task;
}//線程池線程函數:執行線程的任務函數
void thread_pool::thread_loop()
{//線程池線程循環while(m_is_started){//從任務隊列中取出一個任務task_t task = take();//如果取出的任務不為空,則執行任務(std::function類型可以直接判斷是否為空!)if(task){//執行任務:回調實際的任務函數task();}}
}//task()回調任務函數
void testFunc()
{
}3、主線程執行完。return 0; // 主線程等待所有工作線程完成(在析構函數中處理)//3.1 調用線程池析構函數
~ThreadPool()
{
//1.調用stop
stop()
{//1.1通知所有線程停止m_cond.notify_all();=》//1.2使得線程循環退出!while(true){//while循環一直在這等待!(lock滿足條件,向下執行!)//如果任務隊列為空 + 線程池沒有停止,則等待while(m_tasks.empty() && m_is_started){m_cond.wait(lock);}//當線程任務執行完成:stop = true;//線程池停止且任務隊列為空,退出線程循環//如果任務隊列不為空 + 線程池沒有停止,則取出一個任務if(!m_tasks.empty() && m_is_started){task = m_tasks.front();m_tasks.pop_front();}return task;}1.3//等待所有線程完成任務(*it)->join(); //等待線程退出
}4、主線程結束