文章目錄
- 一、進程間通信(IPC)的理解
- 1.為什么進程間要通信(IPC)
- 2.如何進行通信
- 二、匿名管道
- 1.管道的理解
- 2.匿名管道的使用
- 3.管道的五種特性
- 4.管道的四種通信情況
- 5.管道緩沖區容量
- 三、進程池
- 1.進程池的理解
- 2.進程池的制作
- 四、源碼
- ProcessPool.hpp
- Task.hpp
- Main.cc
一、進程間通信(IPC)的理解
1.為什么進程間要通信(IPC)
首先進程之間是相互獨立的,盡管是父子進程之間,它們雖然資源共享,但當子進程需要修改數據時仍然需要 進行寫時拷貝,保持獨立性。
而讓進程間通信可以實現數據之間的交互,資源共享,事件通知,又或者是讓一個進程對另一個進程進行控制。
進程間通信是操作系統中實現進程間協作和數據交換的重要機制 ,它使得多個進程能夠共同完成任務,提高系統的效率和可靠性。
2.如何進行通信
進程間通信的原理其實很簡單,只需要兩個進程共同訪問一個資源,而一個進程對資源的更改能被另一進程感知到,從而做出相應的操作。
所以通信的前提是進程之間能夠訪問同一個資源,而且該資源是公共的,而不是某進程內部的。
IPC 的典型方式對比
二、匿名管道
1.管道的理解
我們把進程之間通信的介質(資源)叫作管道。
開發者在設計管道技術時文件系統已經比較成熟,所以為了方便管理該資源就使用文件來實現, 而對文件的讀寫就是通信的過程 ,但它與一般的文件還是有些區別,文件都是儲存到磁盤上的,而進程之間通信用的文件并不需要把它儲存到磁盤上,它只是作為一個傳輸介質。
它比較特殊,所以起名為管道。管道其實是一個內存級的文件。
注意:父子進程之間的管道叫作匿名管道
,顧名思義就是沒有名字,也不需要名字,因為子進程能夠繼承下來父進程開辟的管道資源。
2.匿名管道的使用
創建匿名管道常用的接口是:
int pipe(int pipefd[2]);
需要包含頭文件:
#include<unistd.h>
- 返回值:創建成功返回0,失敗返回-1
- 參數:這個是一個輸出型參數,傳入一個int類型長度為2的數組,然后得到
pipefd[0]:以讀的方式打開的文件描述符
pipefd[1]:以寫的方式打開的文件描述符。
示例:
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
int main()
{int pipefd[2];pipe(pipefd);int rfd = pipefd[0],wfd = pipefd[1];pid_t id = fork();if(id == 0){close(wfd);//關閉子進程的寫文件,只讓它讀int k=0;while(true){read(rfd,&k,sizeof(k));printf("read:%d\n",k);}}else{close(rfd);//關閉父進程的讀文件,只讓它寫。int num=0;while(true){write(wfd,&num,sizeof(num));num++;sleep(1);}}return 0;
}
要記住pipefd[2]中哪個是讀哪個是寫有一個小技巧,0像嘴巴,所以下標為0的是讀,1像鋼筆,所以1下標是寫。
3.管道的五種特性
- 匿名管道,只能用來進行具有血緣關系的進程間通信(用于父與子)。
管道文件,自帶同步機制
。如上代碼示例,父進程寫一次休眠一秒,而子進程是一直不斷地讀,快的一端會遷就于慢的一端,最后實現同步。- 管道是面向字節流的。怎么讀與怎么寫并沒有聯系,比如寫入“hello world”,但可能讀到“hel”,這取決于你要讀多少字節。
管道是單向通信的
。也就是a(表示進程)寫的時候b讀。b寫的時候a在讀。而不是既在寫同時也在讀。- 管道(文件)的生命周期是隨進程的。進程結束管道也隨之銷毀。
4.管道的四種通信情況
- 寫慢,讀快 — 讀端就要阻塞(等待寫端寫入)。
- 寫快,讀慢 —到管道容量滿了后,寫端就要阻塞(等待讀端讀取數據,然后就可以覆蓋式地繼續往管道寫入)。
- 寫關閉,讀繼續 — read就會返回0,表示文件結尾。
- 寫繼續,讀關閉 — 寫端不再有意義,系統會殺掉寫端進程。
5.管道緩沖區容量
管道緩沖區容量為64kb,大家可以根據管道的性質與通信特點,自行進行測試。
三、進程池
1.進程池的理解
在程序使用內存的時候,比如vector擴容機制,會提前給你開辟一塊空間供你使用,盡管現在用不到,相當于做一下預備。減少開辟空間的頻次,從而達到提高效率的效果。
那么進程池也同樣,給父進程提前開辟一些子進程,提供父進程使用。其中我們使用匿名管道建立聯系。
在父進程給子進程派發任務時,為了提高效率會讓每個子進程均勻地分配到任務(稱為負載均勻),而不是把大部分的任務都派發到一個子進程上,通常會有以下策略:
- 輪詢:按順序一一分配。
- 隨機:隨機進行分配。
- 負載因子:設計一個負載因子,讓子進程按負載因子的大小排成一個小根堆,每次取出堆頭的子進程派發任務,然后更新負載因子插回到堆中。
2.進程池的制作
在面向對象的編程中最重要的就是對對象的描述與組織,這里我們的核心就是對管道進行管理。那么首先需要一個類對管道進行描述。
class Channel
{
public:Channel(int fd,pid_t id): _wfd(fd),_subid(id){ }//... ...~Channel(){}
private:int _wfd;int _subid;
};
_wfd是該管道對應寫端的fd,_subid是該管道對應的子進程的pid。
這里我們不必把rfd(讀端fd)加入,因為我們現在對管道的描述組織,目的是方便父進程管理,而rfd是給子進程用的,所以不用添加為變量。
這里我們就以輪詢
的方式派發任務,剛才創建的Channel相當于對管道的描述,接下來創建ChannelManage進行組織。這里選擇使用數組來管理,派發任務方式選擇輪詢,所以需要記錄下一個需要派發到的管道的下標。
class ChannelManage
{
public:ChannelManage():_next(0){}//... ...~ChannelManage(){}
private:vector<Channel> _channels;int _next;
};
接下來還需要創建一個類對整體的進程池做管理。
class ProcessPool
{
public:ProcessPool(int num) : __process_num(num){}// ... ...~ProcessPool(){}
private:ChannelManage _cm;int _process_num;
};
其中_process_num表示需要創建多少子進程,這是由使用者來決定的。
在ProcessPool中我們準備實現這些方法
-
bool Start():用于創建子進程。
由于我們是要生成多個通道所以需要循環來進行,而單趟循環需要做以下這些操作:1.創建管道,然后創建子進程。(這樣能讓子進程繼承到管道信息)
2.關于子進程:寫端關閉,然后執行Work(),最后把讀端關閉,并exit退出。
3.關于父進程:讀端關閉,然后把wfd,pid存入_cm中。
-
void Work(int rfd):用于子進程讀取任務碼并執行命令。
-
void Run():用于獲取并派發任務。
-
void Stop():用于關閉寫端并回收子進程。
最后為方便測試我們還需要一個管理任務的類和方法。我們可以單獨創建一個Task.hpp文件。
typedef void (*task_t)();class TaskManage
{
public:TaskManage(){ //隨機數種子srand((unsigned int)time(nullptr));}int Code(){ //隨機生成任務碼(數組下標)return rand()%_tasks.size();}void Execute(int code){ //執行任務_tasks[code]();}// ... ...~TaskManage(){}
private:vector<function<task_t>> _tasks;//用于儲存任務的數組
};
然后需要在ProcessPool中放入TaskManage成員變量,并在ProcessPool的構造函數中完成對_tasks中內容的插入。具體操作參考下面源碼。
四、源碼
ProcessPool.hpp
#ifndef _PROCESS_POOL_HPP_
#define _PROCESS_POOL_HPP_#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include "Task.hpp"
using namespace std;//先描述
class Channel
{
public:Channel(int fd,pid_t id): _wfd(fd),_subid(id){_name = "channel-" + to_string(_wfd) + "-" + to_string(_subid);}~Channel(){}void Send(int code){int n = write(_wfd,&code,sizeof(code));(void)n;}void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}int Fd(){return _wfd;}pid_t SubId(){return _subid;}string Name(){return _name;}private:int _wfd;pid_t _subid;string _name;//int _loadnum;
};//再組織
class ChannelManager
{
public:ChannelManager(): _next(0){}~ChannelManager(){}void Insert(int wfd,pid_t subid){_channels.emplace_back(wfd,subid);// Channel c(wfd,subid);// _channels.push_back(move(c));}Channel& Select(){auto& c = _channels[_next];_next++;_next %= _channels.size();return c;}void PrintChannel(){for(auto& channel : _channels){cout << channel.Name() << endl;}}void CloseAll(){for(auto& channel: _channels){channel.Close();}}void StopSubProcess(){for(auto& channel: _channels){channel.Close();cout << "關閉: " << channel.Name() << endl;}}void WaitSubProcess(){for(auto& channel: _channels){channel.Wait();cout << "回收: " << channel.Name() << endl;}}void CloseAndWait(){for(auto& channel: _channels){channel.Close();cout << "關閉: " << channel.Name() << endl;channel.Wait();cout << "回收: " << channel.Name() << endl;}//解決方法1 倒著關閉// for(int i = _channels.size() - 1;i >= 0;i--)// {// _channels[i].Close();// cout << "關閉: " << _channels[i].Name() << endl;// _channels[i].Wait();// cout << "回收: " << _channels[i].Name() << endl;// }}private:vector<Channel> _channels;int _next;
};const int gdefaultnum = 5;class ProcessPool
{
public:ProcessPool(int num): _process_num(num){_tm.Register(PrintLog);_tm.Register(DownLoad);_tm.Register(UpLoad);}~ProcessPool(){}void Work(int rfd){while(true){int code = 0;size_t n = read(rfd,&code,sizeof(code));if(n > 0){if(n != sizeof(code)){continue;}cout << "子進程[" << getpid() << "]收到一個任務碼: " << code << endl;_tm.Execute(code);}else if(n == 0){cout << "子進程退出" << endl;break;}else{cout << "讀取錯誤" << endl;break;}}}bool Start(){for(int i = 0;i < _process_num;i++){//1.創建管道int pipefd[2] = { 0 };int n = pipe(pipefd);if(n < 0){return false;}//2.創建子進程pid_t subid = fork();if(subid < 0){return false;}else if(subid == 0){//子進程//關閉子進程繼承的哥哥的w端_cm.CloseAll();//3.關閉不需要的文件描述符close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(0);}else{//父進程//3.關閉不需要的文件描述符close(pipefd[0]);_cm.Insert(pipefd[1],subid);}}return true;}void Debug(){_cm.PrintChannel();}void Run(){//1.選擇一個任務int taskcode = _tm.Code();//2.選擇一個信道[子進程],負載均衡的選擇一個子進程,完成任務auto& c = _cm.Select();cout << "選擇了一個子進程: " << c.Name() << endl;//3.發送任務c.Send(taskcode);cout << "發送了一個任務碼: " << taskcode << endl;}void Stop(){//關閉父進程//_cm.StopSubProcess();//回收所有的子進程//_cm.WaitSubProcess();_cm.CloseAndWait();}private:ChannelManager _cm;int _process_num;TaskManager _tm;
};#endif
Task.hpp
#pragma once#include <iostream>
#include <vector>
#include <ctime>
using namespace std;typedef void (*task_t)();void PrintLog()
{cout << "我是一個打印日志的任務" << endl;
}void DownLoad()
{cout << "我是一個下載的任務" << endl;
}void UpLoad()
{cout << "我是一個上傳的任務" << endl;
}class TaskManager
{
public:TaskManager(){srand((unsigned int)time(nullptr));}~TaskManager(){}void Register(task_t t){_tasks.push_back(t);}int Code(){return rand() % _tasks.size();}void Execute(int code){if(code >= 0 && code < _tasks.size()){_tasks[code]();}}private:vector<task_t> _tasks;
};
Main.cc
#include "ProcessPool.hpp"int main()
{//創建進程池對象ProcessPool pp(gdefaultnum);//啟動進程池pp.Start();//自動派發任務int cnt = 10;while(cnt--){pp.Run();sleep(1);}//回收,結束進程池pp.Stop();return 0;
}