進程間通信介紹
?進程間通信目的
- 數據傳輸:一個進程需要將它的數據發送給另一個進程
- 資源共享:多個進程之間共享同樣的資源。
- 通知事件:一個進程需要向另一個或一組進程發送消息,通知它(它們)發生了某種事件(如進程終止時要通知父進程)。
- 進程控制:有些進程希望完全控制另一個進程的執行(如Debug進程),此時控制進程希望能夠攔截另一個進程的所有陷入和異常,并能夠及時知道它的狀態改變。
?管道
什么是管道
管道是Unix中最古老的進程間通信的形式。我們把從一個進程連接到另一個進程的一個數據流稱為一個“管道”如:
?用fork來共享管道原理
?站在文件描述符角度-深度理解管道 (內存級)
?左邊是進程管理,右邊是文件管理,進程通過全局變量能找到文件描述符標,也就找到了對應的file文件也能打開磁盤上的文件拷貝到緩沖區里進行讀寫,父進程創建子進程,發生寫實拷貝,類似于淺拷貝,此時不做文件操作,文件描述符里面的對于關系和父進程一樣,struc_file沒有關閉,因為父進程還在,?struc_file的對應的引用計數不為0就一定不會關閉,既然訪問的同一個struc_file,也可以同時訪問其中的file文件,通過緩沖區可以進行文件的讀寫
- 父進程通過讀寫兩種方式打開內存級的文件返回給上層完成管道的創建
- 子進程繼承父進程的文件描述符表,發生淺拷貝,也能拿到父進程以讀寫打開的管道文件
- 父子都看得到,讓父子單向通信,父進程寫,子進程讀,各自關閉掉自己不需要的文件描述符
這個是管道是OS單獨設計的,得配上單獨的系統調用:pipe 內存級的,不需要文件路徑,沒有文件名,所以叫匿名管道,那我們怎么保證,兩個進程打開的是同一個管道的?
子進程繼承了父進程的文件描述符表
站在內核角度-管道本質
?匿名管道
#include <unistd.h>
功能:創建??名管道
原型
int pipe(int fd[2]);
參數
fd:?件描述符數組,其中fd[0]表?讀端, fd[1]表?寫端
返回值:成功返回0,失敗返回錯誤代碼
#include<iostream>
#include<unistd.h>
using namespace std;
int main()
{int fds[2]={0};int n=pipe(fds);if(n<0){cerr<<"Pipe error"<<endl;return 1;}cout<<"fds[0]"<<fds[0]<<endl;cout<<"fds[1]"<<fds[1]<<endl;return 0;
}
0,1,2被三個標準占用了,從3,4開始
父寫子讀,實現通信
#include<iostream>
#include<unistd.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<cstring>
using namespace std;void ChildWrite(int fd)
{
char buffer[1024];
int cnt=0;
while(true)
{
snprintf(buffer,sizeof(buffer),"I am child,pid:%d cnt:%d\n",getpid(),cnt++);
write(fd,buffer,strlen(buffer));
sleep(1);
}
}
void FatherRead(int fd)
{char buffer[1024];while(true){buffer[0]=0;ssize_t n=read(fd,buffer,sizeof(buffer)-1);if(n>0){buffer[n]=0;cout<<"child says: "<<buffer<<endl;}}
}
int main()
{ //1.創建管道int fds[2]={0}; //fds[0]讀端,fds[1]寫端int n=pipe(fds);if(n<0){cerr<<"Pipe error"<<endl;return 1;}cout<<"fds[0]"<<fds[0]<<endl;cout<<"fds[1]"<<fds[1]<<endl;//3.創建子進程 f->r c->wpid_t id=fork();if(id==0){//childclose(fds[0]); //關閉讀端ChildWrite(fds[1]);close(fds[1]);exit(0);}
close(fds[1]); //關閉寫端
FatherRead(fds[0]);
waitpid(id,nullptr,0);
close(fds[0]);return 0;
}
父進程寫的cnt在不斷變化,子進程能讀到,說明實現了管道通信
五種特性
1.匿名管道,只能用來進行具有血緣關系的進程進行進程間通信(常用父與子,如上述代碼)
2.管道文件,自帶同步機制(父子進程進行IO同時進行,一個讀一個寫,不管是父快還是子快,父不斷寫,子read讀不到會阻塞住直到讀到為止)
3.管道是面向字節流的
4.管道是單向通信的(要么父寫子讀,要么子寫父讀)
5.?(管道)文件的生命周期隨進程
4種通信情況?
1.寫慢,讀快------>讀端阻塞等待寫端(進程)
2.寫快,讀慢------>緩沖區寫滿了,寫要阻塞等待讀端
3.寫關,讀開------>read會讀到返回值0,表示文件結尾
//寫一條就關
void ChildWrite(int fd)
{
char buffer[1024];
int cnt=0;
while(true)
{
snprintf(buffer,sizeof(buffer),"I am child,pid:%d cnt:%d\n",getpid(),cnt++);
write(fd,buffer,strlen(buffer));
sleep(1);
break;
}
}
//觀察n的返回值
void FatherRead(int fd)
{char buffer[1024];while(true){buffer[0]=0;ssize_t n=read(fd,buffer,sizeof(buffer)-1);if(n>0){buffer[n]=0;cout<<"child says: "<<buffer<<endl;}else{cout<< "n:"<<n<<endl;}}
}
?4.讀關,寫開------->寫端再寫沒有意義,OS會殺掉寫端進程
#include<iostream>
#include<unistd.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<cstring>
using namespace std;void ChildWrite(int fd)
{
char buffer[1024];
int cnt=0;
while(true)
{
snprintf(buffer,sizeof(buffer),"I am child,pid:%d cnt:%d\n",getpid(),cnt++);
write(fd,buffer,strlen(buffer));
sleep(1);
}
}
void FatherRead(int fd)
{char buffer[1024];while(true){buffer[0]=0;ssize_t n=read(fd,buffer,sizeof(buffer)-1);if(n>0){buffer[n]=0;cout<<"child says: "<<buffer<<endl;}else{cout<< "n:"<<n<<endl;sleep(4);}break;}
}
int main()
{ //1.創建管道int fds[2]={0}; //fds[0]讀端,fds[1]寫端int n=pipe(fds);if(n<0){cerr<<"Pipe error"<<endl;return 1;} cout<<"fds[0]"<<fds[0]<<endl;cout<<"fds[1]"<<fds[1]<<endl;//3.創建子進程 f->r c->wpid_t id=fork();if(id==0){//childclose(fds[0]); //關閉讀端ChildWrite(fds[1]);close(fds[1]);exit(0);}
close(fds[1]); //關閉寫端
FatherRead(fds[0]);
close(fds[0]);int status;
int ret=waitpid(id,&status,0);
//獲取到子進程的退出狀態
if(ret>0)
{printf("child code: %d exited status: %d\n",(status)>>8&0xff,(status)&0x7f);
}
return 0;
}
發送異常信號13 SIGPIPE?
?基于匿名管道----進程池
父進程創建多個子進程,用匿名管道分派任務
?1.構建進程鏈,為進程池做準備
#ifndef __PROCESS__POOL_HPP__
#define __PROCESS__POOL_HPP__
#include <iostream>
#include <vector>
#include <unistd.h>
#include<cstdlib>
using namespace std;
const int gdefaultnum = 5; // 要創建幾個進程
// 先描述 單個進程
class Channel
{
public:Channel() {}~Channel() {}private:int _wfd;
};// 在組織 進程鏈
class ChannelManager
{
public:ChannelManager() {}~ChannelManager() {}private:vector<Channel> _channels;
};
?2.創建進程池,提供管道條件
// 進程池
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num) {}~ProcessPool() {}bool Create(){int pipefd[2] = {0};for (int i = 0; i < _process_num; i++){// 1.創建管道int n = pipe(pipefd);if (n < 0)return false;}// 2.創建子進程 各自關閉不需要的文件描述符pid_t id = fork();if (id < 0)return false;else if (id == 0){// 子進程 --->讀// 3.關閉不需要的文件描述符close(pipefd[1]);exit(0);}else{// 父進程 --->寫// 3.關閉不需要的文件描述符close(pipefd[0]);}return true;}private:ChannelManager _cm; // 進程鏈int _process_num; // 進程個數
};
3.父子各自打印驗證是否通信
void BuildChannel(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);// Channel c(wfd,subid);// _channels.push_back(c);}void Print()
{for(auto &chnnel : _channels){
cout<<chnnel.Name()<<endl;}
}void Work(int rfd){while (true){cout << "我是子進程,我的rfd是:" << rfd << endl;sleep(2);}}
//ProcessPool.hpp
#ifndef __PROCESS__POOL_HPP__
#define __PROCESS__POOL_HPP__
#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
using namespace std;
const int gdefaultnum = 5; // 要創建幾個進程
// 先描述 單個進程
class Channel
{
public:Channel(int fd, pid_t id) : _wfd(fd), _subid(id) { _name = "chnnel-" + std::to_string(_wfd) + "-" + std::to_string(_subid); }~Channel() {}int Fd(){return _wfd;}pid_t Subid(){return _subid;}string Name(){return _name;}
private:int _wfd;pid_t _subid;std::string _name;
};// 在組織 進程鏈
class ChannelManager
{
public:ChannelManager() {}~ChannelManager() {}void BuildChannel(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);// Channel c(wfd,subid);// _channels.push_back(c);}void Print()
{for(auto &chnnel : _channels){
cout<<chnnel.Name()<<endl;}
}
private:vector<Channel> _channels;
};// 進程池
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num) {}~ProcessPool() {}void Work(int rfd){while (true){cout << "我是子進程,我的rfd是:" << rfd << endl;sleep(2);}}bool Create(){for (int i = 0; i < _process_num; i++){int pipefd[2] = {0};// 1.創建管道int n = pipe(pipefd);if (n < 0)return false;// 2.創建子進程 父子各自關閉不需要的文件描述符pid_t id = fork();if (id < 0)return false;else if (id == 0){// 子進程 --->讀// 3.關閉不需要的文件描述符close(pipefd[1]);Work(pipefd[0]);exit(0);}else{// 父進程 --->寫// 3.關閉不需要的文件描述符close(pipefd[0]);_cm.BuildChannel(pipefd[1], id);close(pipefd[1]);}}return true;}void Debug(){_cm.Print();}
private:ChannelManager _cm; // 進程鏈int _process_num; // 進程個數
};#endif
//Main.cc#include"ProcessPool.hpp"int main()
{ProcessPool pp(gdefaultnum);//創建進程池pp.Create();//打印進程池pp.Debug();sleep(1000);return 0;
}
實現通信?
?4.分配任務,子寫父讀
void Work(int rfd){while (true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){if (n == sizeof(code)){continue;}cout << "子進程[]"<<getpid()<<"]收到一個任務碼:" << code << endl;}else if (n == 0){cout << "子進程退出" << endl;break;}else{// 讀失敗cout << "讀取錯誤" << endl;break;}}}void PushTack(int taskcode){// 1.選擇一個子進程,采用輪詢,防止負載均衡和負載不均衡auto &c = _cm.Select();cout << "選擇一個子進程:" << c.Name() << endl;// 2.發送任務c.Send(taskcode);cout << "發送了一個任務碼:" << taskcode << endl;}
完整代碼
//ProcessPool.hpp
#ifndef __PROCESS__POOL_HPP__
#define __PROCESS__POOL_HPP__
#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
using namespace std;
const int gdefaultnum = 5; // 要創建幾個進程
// 先描述 單個進程
class Channel
{
public:Channel(int fd, pid_t id) : _wfd(fd), _subid(id) { _name = "chnnel-" + std::to_string(_wfd) + "-" + std::to_string(_subid); }~Channel() {}int Fd() { return _wfd; }pid_t Subid() { return _subid; }string Name() { return _name; }void Send(int code){int n = write(_wfd, &code, sizeof(code));(void)n;}private:int _wfd;pid_t _subid;std::string _name;
};// 在組織 進程鏈
class ChannelManager
{
public:ChannelManager() : _next(0) {}~ChannelManager() {}void BuildChannel(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);// Channel c(wfd,subid);// _channels.push_back(c);}// 輪詢Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}void Print(){for (auto &chnnel : _channels){cout << chnnel.Name() << endl;}}private:vector<Channel> _channels;int _next;
};// 進程池
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num) {}~ProcessPool() {}void Work(int rfd){while (true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){if (n == sizeof(code)){continue;}cout << "子進程[]"<<getpid()<<"]收到一個任務碼:" << code << endl;}else if (n == 0){cout << "子進程退出" << endl;break;}else{// 讀失敗cout << "讀取錯誤" << endl;break;}}}bool Create(){for (int i = 0; i < _process_num; i++){int pipefd[2] = {0};// 1.創建管道int n = pipe(pipefd);if (n < 0)return false;// 2.創建子進程 父子各自關閉不需要的文件描述符pid_t id = fork();if (id < 0)return false;else if (id == 0){// 子進程 --->讀// 3.關閉不需要的文件描述符close(pipefd[1]);Work(pipefd[0]);exit(0);}else{// 父進程 --->寫// 3.關閉不需要的文件描述符close(pipefd[0]);_cm.BuildChannel(pipefd[1], id);close(pipefd[1]);}}return true;}void Debug(){_cm.Print();}void PushTack(int taskcode){// 1.選擇一個子進程,采用輪詢,防止負載均衡和負載不均衡auto &c = _cm.Select();cout << "選擇一個子進程:" << c.Name() << endl;// 2.發送任務c.Send(taskcode);cout << "發送了一個任務碼:" << taskcode << endl;}private:ChannelManager _cm; // 進程鏈int _process_num; // 進程個數
};#endif
//Main.cc#include"ProcessPool.hpp"int main()
{ProcessPool pp(gdefaultnum);//創建進程池pp.Create();//打印進程池//pp.Debug();int task_code = 1;while(true){pp.PushTack(task_code++);sleep(1);}return 0;
}
創建進程池后,OS關閉了沒有意義的管道,每次選擇一個管道接受消息?