文章目錄
- 進程池
- 什么進程池
- 進程池的作用
- 用代碼模擬進程池
- 管道信息
- 任務類
- InitProcesspool()
- DisPatchTasks()
- 任務的執行邏輯(Work)
- CleanProcessPool()
- 封裝
- main.cc
- Channel.hpp
- ProcessPool.hpp
- Task.hpp
- Makefile
- 總結
- 總結
進程池
什么進程池
進程池(Process Pool)是一種用于管理進程的技術,它通過預先創建一定數量的進程來避免頻繁創建和銷毀進程的開銷。進程池通常用于需要并發執行大量任務的場景,特別是在處理CPU密集型任務時。
上面這種模型就是進程池,父進程通過創建多個子進程,然后通過管道連接,分別向各個子進程派發任務。
上面的父進程稱為master進程,子進程稱為worker進程或者slaver進程。
進程池的作用
進程池的作用主要體現在以下幾個方面:
-
提高性能: 通過預先創建一定數量的進程,減少了頻繁創建和銷毀進程的開銷,尤其在需要大量并發任務處理時,能有效提升整體執行效率。
-
減少系統資源消耗: 在系統中創建進程是一個資源密集型操作,頻繁創建和銷毀進程會導致資源浪費。進程池通過復用已有進程,避免了這一問題。
-
提升任務響應速度: 由于進程池中的進程是預先創建的,所以當需要處理任務時,可以快速分配一個空閑進程,而不需要等待進程的創建。
-
更好的資源管理: 進程池可以限制系統中的最大并發進程數,避免過多進程同時運行,導致系統資源(如CPU、內存)耗盡,從而保護系統的穩定性。
-
并行處理: 對于CPU密集型任務,進程池通過并行化處理多個任務,可以顯著提升處理效率,尤其是在多核CPU的環境中。
進程池適合用于大規模并發任務的處理,如Web爬蟲、數據處理、大規模計算等場景。
用代碼模擬進程池
管道信息
首先實現進程池,我們要控制創建多少個進程,所以可以用cin,手動輸入,但是還有一種控制創建子進程格式的方法就是通過main函數的參數進行控制子進程的創建個數
enum
{OK = 0,UsageError,PipeError,ForkError,
};
void Usage(string proc)
{cout<<"Usage: "<<proc<<"process-num"<<endl;
}
int main(int argc, char *argv[])
{if(argc != 2){Usage(argv[0]);return UsageError;}return 0;
}
main函數的第一個參數是數組元素個數,已知數組的第一個元素是可執行程序的名稱(./XXXX),所以第二個元素是創建子進程個數,如果輸入的命令行的格式錯誤我們有一個函數Usage會輸出,類型錯誤,并且返回枚舉類型UsageError。
有了需要創建的子進程的個數,接下來我們就要創建管道并且創建子進程了。
我們封裝一個類用這個類來管理進程池
using work_t = function<void()>;
class ProcessPool
{
public:ProcessPool(int n,work_t w):num(n),work(w){}~ProcessPool(){}int InitProcesspool(){}void DisPatchTasks(){}void CleanProcessPool(){}
private:vector<Channel> channels;//管道int num;//進程的個數work_t work;//函數類型(工作類型(void()))
};
上面vector中的類型也是一個我們自己封裝的類,用來管理管道。
class Channel
{
public://構造函數Channel(int wfd,pid_t who):_wfd(wfd),_who(who){_name="Channel-"+to_string(wfd)+'-'+to_string(who);}//獲取名字string Name(){return _name;}void Send(int cmd)//cmd是任務碼{write(_wfd,&cmd,sizeof(cmd));}void Close()//關閉寫端{close(_wfd);}pid_t Id()//獲取寫端對應的子進程{return _who;}//析構函數~Channel(){}
private:int _wfd;//master的寫端string _name;//對應worker的名字pid_t _who;//記錄哪個子進程的管道
};
Channel的作用:保存寫端的fd,記錄寫端對應的子進程的名字,還有子進程的pid。
除了需要管理channels的類,我們還需要管理任務的類,因為我們還沒有任務。
任務類
先模擬出三種任務
void Download()
{cout<<"我是下載任務....."<<endl;
}
void Log()
{cout<<"我是日志任務....."<<endl;
}
void Sql()
{cout<<"我是數據庫同步任務....."<<endl;
}
任務我們用TaskManeger來表示,直接用容器unordered_map<int,task_t>來存儲任務,task_t表示返回值是void的函數類型,識別到任務碼直接調取函數即可,存儲的就是上面三個模擬的函數。選擇任務我們用隨機數選取,只需要模上任務的數量即可。
static int number = 0;
class TaskManager
{
public:TaskManager(){//srand(time(nullptr));InsertTask(Download);InsertTask(Log);InsertTask(Sql);}~TaskManager(){}//插入任務void InsertTask(task_t t){//number表示有多少個任務tasks[number++]=t;}//選擇任務int SelectTask(){//選擇一個任務return rand() % number;}//執行任務void Excute(int number){//沒找到說明方法是空的if(tasks.find(number) == tasks.end()) return;//找到了執行這個方法tasks[number]();}
private://int(任務碼)--->任務種類unordered_map<int,task_t> tasks;
};TaskManager tmp;//函數作用:傳進來的參數不對,這個函數就告訴我們如何傳遞參數
void Usage(string proc)
{cout<<"Usage: "<<proc<<"process-num"<<endl;
}
接下來我們來完成這三個函數:
InitProcesspool()
首先根據成員提供的需要創建的子進程的個數創建出管道,然后判斷管道是否創建成功,管道創建之后,創建子進程,子進程創建失敗返回錯誤碼,如果子進程創建成功判斷fork的返回值,返回值是0則是子進程,需要關閉寫端,讀端的fd原本是pipefd2將其重定向到0中也就是標準輸入之后,進入提前定義好的work中,完成work之后,子進程直接退出exit()出來之后只可能是父進程,因為子進程在if中已經退出了,所以關閉父進程的讀端,然后將數據插入到channels當中。
int InitProcesspool()
{for (int i = 0; i < num; i++){// 1. 創建管道// 管道初始化成0int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return PipeError; // 管道創建失敗// 2. 創建進程pid_t id = fork();if (id < 0)return ForkError; // 子進程創建失敗// 創建通信信道if (id == 0){close(pipefd[1]); // 關閉子進程的寫窗口// 子進程// 原本讀的是pipefd0的內容,現在重定向到標準輸入,現在讀端讀取的就是標準輸入dup2(pipefd[0], 0);work(); // 子進程需要做的工作exit(0); // 子進程干完工作直接結束}// 因為上面子進程執行完任務會直接退出,所以下面的代碼只能是父進程執行close(pipefd[0]); // 關閉父進程的讀窗口channels.emplace_back(pipefd[1], id);}return OK;
}
DisPatchTasks()
為了保證每個進程都在不停的工作,不會出現一個進程一直休息,一個進程一直工作的情況,我們采用輪詢派發任務,也就是遍歷數組依次派發任務,直到沒有任務為止(這里任務數量是我們自己定的),派發完任務之后利用send發送給子進程。
void DisPatchTasks()
{int who = 0;// 派發任務int num = 20;while (num--){// a.選擇一個任務,整數int task = tmp.SelectTask();// b.選擇一個子進程channelChannel &curr = channels[who];who++;who %= channels.size(); // 防止數組越界cout << "########################" << endl;cout << "send" << task << "to" << curr.Name() << ",任務還剩" << num << "個" << endl;cout << "########################" << endl;// c.派發任務curr.Send(task);sleep(1);}
}
任務的執行邏輯(Work)
子進程進入Work之后不會出子進程,會一直死循環在這個Work當中,只有當寫端關閉才會break掉
void Worker()
{while(true){int cmd = 0;int n = read(0,&cmd,sizeof(cmd));if(n == sizeof(cmd)) tmp.Excute(cmd);else if(n == 0)//寫端關閉{cout<<"pid: "<<getpid()<<"quit..."<<endl;break;}}
}
CleanProcessPool()
最后回收管道并且回收子進程即可
void CleanProcessPool(){for (auto &c : channels){// 調用關閉函數c.Close();}for (auto &e : channels){// 0:阻塞式等待pid_t rid = waitpid(e.Id(), nullptr, 0);if (rid > 0){cout << "child " << rid << " wait...sucess" << endl;}}}
封裝
main.cc
#include "ProcessPool.hpp"
#include "Task.hpp"//我們自己就是master進程
int main(int argc, char *argv[])
{//可執行程序的名字占一個,后面的數字占一個if(argc != 2){Usage(argv[0]);return UsageError;}//將字符串轉化為整數,表示一共創建多少個子進程int num = stoi(argv[1]);// vector<Channel> channels;//將所有的管道存儲在容器當中// //初始化進程池// InitProcesspool(num,channels,Worker);// //派發任務// DisPatchTasks(channels);// //退出進程池// CleanProcessPool(channels);ProcessPool *pp = new ProcessPool(num,Worker);pp->InitProcesspool();pp->DisPatchTasks();pp->CleanProcessPool();delete pp;return 0;
}
Channel.hpp
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <string>
#include <unistd.h>
using namespace std; #endif//先描述再組織
class Channel
{
public://構造函數Channel(int wfd,pid_t who):_wfd(wfd),_who(who){_name="Channel-"+to_string(wfd)+'-'+to_string(who);}//獲取名字string Name(){return _name;}void Send(int cmd){write(_wfd,&cmd,sizeof(cmd));}void Close(){close(_wfd);}pid_t Id(){return _who;}//析構函數~Channel(){}
private:int _wfd;//master的寫端string _name;//對應worker的名字pid_t _who;//記錄哪個子進程的管道
};
ProcessPool.hpp
#include <string>
#include <unistd.h>
#include <cstdlib>
#include <vector>
#include <sys/types.h>
#include <functional>
#include <sys/wait.h>
#include "Channel.hpp"
#include "Task.hpp"// typedef function<void()> work_t;
using work_t = function<void()>;enum
{OK = 0,UsageError,PipeError,ForkError,
};class ProcessPool
{
public:ProcessPool(int n,work_t w):num(n),work(w){}~ProcessPool(){}// work_t work:回調int InitProcesspool(){for (int i = 0; i < num; i++){// 1. 創建管道// 管道初始化成0int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return PipeError; // 管道創建失敗// 2. 創建進程pid_t id = fork();if (id < 0)return ForkError; // 子進程創建失敗// 創建通信信道if (id == 0){close(pipefd[1]); // 關閉子進程的寫窗口// 子進程// 原本讀的是pipefd0的內容,現在重定向到標準輸入,現在讀端讀取的就是標準輸入dup2(pipefd[0], 0);work(); // 子進程需要做的工作exit(0); // 子進程干完工作直接結束}// 因為上面子進程執行完任務會直接退出,所以下面的代碼只能是父進程執行close(pipefd[0]); // 關閉父進程的讀窗口channels.emplace_back(pipefd[1], id);}return OK;}void DisPatchTasks(){int who = 0;// 派發任務int num = 20;while (num--){// a.選擇一個任務,整數int task = tmp.SelectTask();// b.選擇一個子進程channelChannel &curr = channels[who];who++;who %= channels.size(); // 防止數組越界cout << "########################" << endl;cout << "send" << task << "to" << curr.Name() << ",任務還剩" << num << "個" << endl;cout << "########################" << endl;// c.派發任務curr.Send(task);sleep(1);}}void CleanProcessPool(){for (auto &c : channels){// 調用關閉函數c.Close();}for (auto &e : channels){// 0:阻塞式等待pid_t rid = waitpid(e.Id(), nullptr, 0);if (rid > 0){cout << "child " << rid << " wait...sucess" << endl;}}}void DebugPrint(){for (auto e : channels){cout << e.Name() << endl;}}
private:vector<Channel> channels;int num;work_t work;
};
Task.hpp
#pragma once#include <iostream>
#include <unordered_map>
#include <functional>
#include <ctime>
using namespace std;using task_t = function<void()>;void Download()
{cout<<"我是下載任務....."<<endl;
}void Log()
{cout<<"我是日志任務....."<<endl;
}void Sql()
{cout<<"我是數據庫同步任務....."<<endl;
}static int number = 0;
class TaskManager
{
public:TaskManager(){//srand(time(nullptr));InsertTask(Download);InsertTask(Log);InsertTask(Sql);}~TaskManager(){}//插入任務void InsertTask(task_t t){//number表示有多少個任務tasks[number++]=t;}//選擇任務int SelectTask(){//選擇一個任務return rand() % number;}//執行任務void Excute(int number){//沒找到說明方法是空的if(tasks.find(number) == tasks.end()) return;//找到了執行這個方法tasks[number]();}
private://int(任務碼)--->任務種類unordered_map<int,task_t> tasks;
};TaskManager tmp;//函數作用:傳進來的參數不對,這個函數就告訴我們如何傳遞參數
void Usage(string proc)
{cout<<"Usage: "<<proc<<"process-num"<<endl;
}void Worker()
{while(true){int cmd = 0;int n = read(0,&cmd,sizeof(cmd));if(n == sizeof(cmd)) tmp.Excute(cmd);else if(n == 0)//寫端關閉{cout<<"pid: "<<getpid()<<"quit..."<<endl;break;}else//讀取失敗{}}
}
Makefile
BIN=processpool
cc=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS=-o
SRC=$(shell ls *.cc)
OBJ=$(SRC:.cc=.o)$(BIN):$(OBJ)$(cc) $(LDFLAGS) $@ $^
%.o:%.cc$(cc) $(FLAGS) $<
.PHONY:clean
clean:rm -f $(BIN) $(OBJ)
.PHONY:test
test:echo $(SRC)
總結
總結
本文詳細介紹了進程池的概念及其在實際應用中的作用。通過代碼模擬,我們展示了如何初始化進程池、分發任務、執行任務邏輯以及清理進程池。文章還涵蓋了相關的封裝類和文件結構,如main.cc
、Channel.hpp
、ProcessPool.hpp
、Task.hpp
和Makefile
,這些內容為理解和實現進程池提供了全面的指導。
進程池是一種有效的資源管理技術,能夠提高多任務處理的效率和系統性能。通過合理的設計和實現,進程池可以在復雜的系統中發揮重要作用,減少資源浪費并提升任務執行的穩定性。希望本文的內容能為讀者在實際項目中應用進程池提供有價值的參考。