線程系列:
Linux–線程的認識(一)
Linux–線程的分離、線程庫的地址關系的理解、線程的簡單封裝(二)
線程的互斥:臨界資源只能在同一時間被一個線程使用
生產消費模型
信號量
線程池
線程池(Thread Pool)是一種基于池化技術設計用于管理和復用線程的機制。
在多線程編程中,頻繁地創建和銷毀線程會消耗大量的系統資源,并且由于線程的創建和銷毀需要時間,這也會降低程序的執行效率。線程池通過預先創建一定數量的線程并放入池中,需要時從池中取出線程執行任務,執行完畢后線程并不銷毀而是重新放回池中等待下一次使用,從而避免了線程的頻繁創建和銷毀所帶來的開銷。
主要優點
- 降低資源消耗:通過復用已存在的線程,減少線程創建和銷毀的開銷。
- 提高響應速度:當任務到達時,可以立即分配線程進行處理,減少了等待時間。
- 提高線程的可管理性:線程池可以統一管理線程,包括線程的創建、調度、執行和銷毀等。
關鍵參數
- 核心線程數:線程池維護線程的最少數量,即使這些線程處于空閑狀態,線程池也不會回收它們(一般為主線程)。
- 最大線程數:線程池中允許的最大線程數。
- 阻塞隊列:用于存放待執行的任務的隊列。當線程池中的所有線程都忙時,新任務會被添加到這個隊列中等待處理。
- 非核心線程空閑存活時間:當線程池中的線程數量超過核心線程數時,如果線程空閑時間超過這個時間,多余的線程將被終止。
- 線程工廠:用于創建新線程的工廠。
- 拒絕策略:當線程池和隊列都滿了時,對于新來的任務將采取的拒絕策略。
實現原理
線程池的實現原理通常包括以下幾個關鍵部分:
- 線程池管理器:負責創建并管理線程池,包括初始化線程池、創建工作線程、銷毀線程池等。
- 工作線程:線程池中的線程,負責執行具體的任務。工作線程通常會不斷從任務隊列中取出任務并執行,直- 到線程池被銷毀或所有任務都執行完畢。
- 任務接口:每個任務必須實現的接口,用于定義任務的執行邏輯。在Java中,這通常是通過實現Runnable或Callable接口來實現的。
- 任務隊列:用于存放待處理的任務。當線程池中的工作線程數量達到最大值時,新到達的任務會被放入任務隊列中等待處理。任務隊列的實現通常依賴于Java的BlockingQueue接口。
實現流程
- 當有新任務提交給線程池時,線程池會首先判斷當前正在運行的線程數量是否小于核心線程數。如果是,則直接創建新的線程來執行任務;否則,將任務加入任務隊列等待處理。
- 如果任務隊列已滿,且當前正在運行的線程數量小于最大線程數(maximumPoolSize),則創建新的線程來處理任務;如果線程數量已經達到最大線程數,則根據配置的拒絕策略來處理新任務(如拋出異常、直接丟棄等)。
- 當一個線程完成任務后,它會從任務隊列中取下一個任務來執行;如果沒有任務可供執行,并且線程池中的線程數量超過了核心線程數,且這些線程空閑時間超過了設定的存活時間,則這些線程會被銷毀,直到線程池中的線程數量減少到核心線程數為止。
實例
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include<iostream>
#include<string>
#include<pthread.h>
#include<functional>
#include<unistd.h>using namespace std;namespace ThreadMdule
{using func_t = std::function<void(string)>;class Thread{public:void Excute(){_func(_threadname);}Thread(func_t func, const std::string &name="none-name"): _func(func), _threadname(name), _stop(true){}static void* threadroutine(void* args){Thread* self=static_cast<Thread*>(args);self->Excute();return nullptr;}bool start(){int n=pthread_create(&_tid,nullptr,threadroutine,this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;func_t _func;bool _stop;};
}#endif
ThreadPool.hpp
#pragma once#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"
using namespace ThreadMdule;
using namespace std;
const static int gdefaultthreadnum=3;//默認線程池的線程數template <class T>
class ThreadPool
{
public:ThreadPool(int threadnum=gdefaultthreadnum) :_threadnum(threadnum),_waitnum(0),_isrunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);LOG(INFO,"ThreadPool COnstruct.");}//各個線程獨立的任務函數void HandlerTask(string name){LOG(INFO,"%s is running...",name.c_str());while(true){LockQueue();//開啟保護//等到有任務時才退出循環執行下列語句while(_task_queue.empty()&&_isrunning){_waitnum++;ThreadSleep();_waitnum--;}//當任務隊列空并且線程池停止時線程退出if(_task_queue.empty()&&!_isrunning){UnlockQueue();cout<<name<<" quit "<<endl;sleep(1);break;}//1.任務隊列不為空&&線程池開啟//2.任務隊列不為空&&線程池關閉,直到任務隊列為空//所以,只要有任務,就要處理任務T t=_task_queue.front();//取出對應任務_task_queue.pop();UnlockQueue();LOG(DEBUG,"%s get a task",name.c_str());//處理任務t();LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str());}}//線程池中線程的構建void InitThreadPool(){for(int i=0;i<_threadnum;i++){string name="thread-"+to_string(i+1);_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);LOG(INFO,"init thread %s done",name.c_str());}_isrunning=true;}//線程池的啟動void Start(){for(auto& thread:_threads){thread.start();}}//線程池停止void Stop(){LockQueue();_isrunning=false;ThreadWakeupAll();UnlockQueue();}void Wait(){for(auto& thread:_threads){thread.Join();LOG(INFO,"%s is quit...",thread.name().c_str());}}//將任務入隊列bool Enqueue(const T& t){bool ret=false;LockQueue();if(_isrunning){_task_queue.push(t);//如果有空閑的線程,那么喚醒線程讓其執行任務if(_waitnum>0){ThreadWakeup();}LOG(DEBUG,"enqueue task success");ret=true;}UnlockQueue();return ret;}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeupAll(){pthread_cond_broadcast(&_cond);}int _threadnum;//線程數vector<Thread> _threads;//存儲線程的vectorqueue<T> _task_queue;//輸入的任務隊列pthread_mutex_t _mutex;//互斥鎖pthread_cond_t _cond;//條件變量int _waitnum;//空閑的線程數bool _isrunning;//表示線程池是否啟動};
Task.hpp
#include<iostream>
#include<string>
#include<functional>
using namespace std;class Task
{
public:Task(){}Task(int a,int b): _a(a),_b(b),_result(0){}void Excute(){_result=_a+_b;}string ResultToString(){return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);}string DebugToString(){return to_string(_a) + "+" + to_string(_b) + "= ?";}void operator()(){Excute();}
private:int _a;int _b;int _result;
};
main.cc
int main()
{srand(time(nullptr)^getpid()^pthread_self());//EnableScreen();EnableFile();unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5));tp->InitThreadPool();tp->Start();int tasknum=10;while(tasknum){sleep(1);int a=rand()%10+1;usleep(1024);int b=rand()%20+1;Task t(a,b);LOG(INFO,"main thread push task: %s",t.DebugToString().c_str());tp->Enqueue(t);tasknum--;}tp->Stop();tp->Wait();
}
Loh.hpp
#pragma once#include<iostream>
#include<fstream>
#include<ctime>
#include<cstdarg>
#include<string>
#include<sys/types.h>
#include<unistd.h>
#include<cstdio>
#include"LockGuard.hpp"using namespace std;bool gIsSave=false;//默認輸出到屏幕
const string logname="log.txt";
//1.日志是有等級的
enum Level
{DEBUG=0,INFO,WARNING,ERROR,FATAL
};void SaveFile(const string& filename,const string& messages)
{ofstream out(filename,ios::app);if(!out.is_open()){return;}out<<messages;out.close();
}
//等級轉化為字符串
string LevelToString(int level)
{switch (level){case DEBUG:return "Debug";case INFO:return "Info";case WARNING:return "Warning";case ERROR:return "Error";case FATAL:return "Fatal";default:return "Unkonwn";}
}//獲取當前時間
string GetTimeString()
{time_t curr_time=time(nullptr);//時間戳struct tm* format_time=localtime(&curr_time);//轉化為時間結構if(format_time==nullptr)return "None";char time_buffer[1024];snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",format_time->tm_year + 1900,format_time->tm_mon + 1,format_time->tm_mday,format_time->tm_hour,format_time->tm_min,format_time->tm_sec);return time_buffer;}pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//獲取日志信息
void LogMessage(string filename,int line,bool issave,int level,char* format,...)
{string levelstr=LevelToString(level);string timestr=GetTimeString();pid_t selfid=getpid();char buffer[1024];va_list arg;va_start(arg,format);vsnprintf(buffer,sizeof(buffer),format,arg);va_end(arg);string message= "[" + timestr + "]" + "[" + levelstr + "]" +"[" + std::to_string(selfid) + "]" +"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";LockGuard lockguard(&lock);if(!issave){cout<<message;}else{SaveFile(logname,message);}}#define LOG(level,format,...) \do \{ \LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \} while (0)#define EnableFile() \do \{ \gIsSave=true; \ } while (0)#define EnableScreen() \do \{ \gIsSave=false; \ } while (0)
線程池解釋(代碼)
日志解釋(代碼)
日志是系統或程序記錄所有發生事件的文件: