一、進程間通信介紹
進程間通信目的:數據傳輸:一個進程需要將它的數據發送給另?個進程資源共享:多個進程之間共享同樣的資源。通知事件:一個進程需要向另一個或一組進程發送消息,通知它(它們)發生了某種事件(如進程終止時要通知父進程)。進程控制:有些進程希望完全控制另一個進程的執行(如Debug進程),此時控制進程希望能夠攔截另一個進程的所有陷入和異常,并能夠及時知道它的狀態改變。怎么通信?進程間通信本質:先讓不同的進程,看到同一份資源(內存)(然后才有通信的條件)不能由任何一個進程提供,進程間數據隔離->OS提供系統調用->設計統一的通信接口。什么是通信?![]()
二、具體通信方式
1)基于文件的,管道通信
2)System V 本機通信
1.背景
基于已有的技術,直接進行通信。
2.原理
單獨設計了一個內存級的文件,管道(復用了文件管理的代碼)。
獨特的系統調用:
/* On all other architectures */
int pipe(int pipefd[2]);? //數組第一個參數是讀的fd,第二個參數的寫的fd。
返回值:成功返回0,失敗返回-1,錯誤碼被設置。
管道:通過創建子進程,子進程拷貝一份和父進程一樣的文件描述符表,指向同一個“文件”(管道),關閉相應的讀寫端,使得單向通信。
3.demo代碼,測試接口。
測試代碼如下:
#include <iostream> #include <cstdio> #include <cstdlib> #include <cstring> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h>void ChildWrite(int wfd) {std::cout << "子進程wfd:" << wfd << std::endl;char buff[1024];int cnt = 0;while (true){sleep(1);snprintf(buff, sizeof(buff), "第%d次:子進程寫入\n", cnt++);ssize_t ret = write(wfd, buff, strlen(buff));(void)ret;} }void FatherRead(int rfd) {std::cout << "父進程rfd:" << rfd << std::endl;char buff[1024];int cnt = 1;while (cnt--){sleep(5);ssize_t ret = read(rfd, buff, sizeof(buff) - 1);buff[ret] = 0;std::cout << buff << std::endl;if (ret == 0){std::cout << "寫關閉了,讀關閉" << std::endl;break;}} }int main() {// 1.創建管道int pipefd[2] = {0};int ret = pipe(pipefd);if (ret == -1)exit(1);// 2.創建子進程,子進程寫,父進程讀pid_t pid = fork();if (pid < 0)exit(1);else if (pid == 0){// 子進程close(pipefd[0]);ChildWrite(pipefd[1]);close(pipefd[1]);exit(0);}else{// 父進程close(pipefd[1]);FatherRead(pipefd[0]);close(pipefd[0]);int status;waitpid(pid, &status, 0);std::cout << "子進程 exit code:" << ((status >> 8) & 0xFF) << " exit signal:" << (status & 0x7F) << std::endl; }return 0; }
5種特性(重點):
1)匿名管道,只能用來進行具有血緣關系的進程進行進程間通信(常用于父子)
2)管道文件,自帶同步機制
3)管道是面向字節流的。(怎么讀和怎么寫沒有必然關系)
4)管道是單向通信的。
屬于半雙工的一種特殊情況。
半雙工:任何時刻,一個發,一個收。
全雙工:任何時刻,可以同時收發(吵架)。
5)(管道)文件的生命周期是隨進程的(引用計數)。
4中通信情況:
1)寫慢,讀快?------ 讀端就要阻塞(進程)
2)寫快,讀慢 ------ 滿了的時候,寫端就要阻塞等待
3)寫關,讀繼續 ------ read讀到返回值為0,表示文件結尾。
4)寫繼續,讀關 ------ 寫端寫入無意義,OS不會做無意義的事情->OS會殺掉寫端進程->發送異常信號 13 SIGPIPE?
在小于pipe_buf時,管道的寫入被要求是原子性的(一次要寫全寫完)。
測試管道容量:一次寫一個字節,寫入測試信息。
4.基于匿名管道 --- 進程池?
原理:
父進程通過向指定的管道寫入的方式來向子進程發送對應的任務。
父進程的wfd關閉,子進程會讀到0,可以退出。
總體結構:
????????要對匿名管道進行管理,先描述在組織,要有對應的類Channel,記錄父進程的wfd和子進程的pid,在有對應管理多個類Channel的數據結構vector,對于匿名管道的管理就轉換成了對于vector的增刪查改。對于任務也做管理,與匿名管道管理類似。
邏輯:
? ? ? ? 父進程創建若干個子進程,讓子進程阻塞在read中,并死循環執行,子進程一直阻塞,等待父進程發送對應的任務碼,根據任務碼執行相應的任務。父進程采用輪詢的方式,確保能夠負載均衡,選擇子進程發送隨機的任務碼,即向對應的匿名管道內寫入任務碼。
易錯點:
回收時需注意,每次創建子進程時,子進程會繼承上一次父進程的wfd,導致第1個管道有n個引用,第2個管道有n-1個引用 ... 第n個管道有一個引用。
解決方法1:倒著關閉
解決方法2:隨便關閉。創建子進程后,子進程把之前從父進程繼承下來的wfd全部關閉。
完整代碼如下:
Processpool.hpp
#pragma once#include <vector> #include <cstdlib> #include <cstdio> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h> #include "Task.hpp"class Channel { public:Channel(int wfd, int pid): _wfd(wfd), _pid(pid){}~Channel(){}int Wfd() const { return _wfd; }int Pid() const { return _pid; }void Close() const { close(_wfd); }void Wait() const { waitpid(_pid, nullptr, 0); }private:int _wfd; // 控制子進程int _pid; // 拿到子進程pid,方便回收 };class ChannelManager { public:ChannelManager() : _next(0){}// 為了保證負載均衡,采用輪詢的方式const Channel& SelectChannel(){const Channel& c = _channels[_next];// 選出下標++_next;_next %= _channels.size();return c;}void InsertChannel(int wfd, int pid){_channels.emplace_back(wfd, pid);}void CloseFd() const{for(const auto &e : _channels){e.Close();}}void CloseChannels() const{// 2.正著關,創建子進程時就關閉從父進程繼承下來的wfdfor (size_t i = 0; i < _channels.size(); i++){// 關閉父進程的寫fd,讓子進程讀到0個字節退出。_channels[i].Close();_channels[i].Wait();printf("等待成功,回收了子進程:%d\n", _channels[i].Pid());}// // 1.倒著關閉// for (int i = _channels.size() - 1; i >= 0; i--)// {// // 關閉父進程的寫fd,讓子進程讀到0個字節退出。// _channels[i].Close();// _channels[i].Wait();// printf("等待成功,回收了子進程:%d\n", _channels[i].Pid());// }}~ChannelManager(){}private:std::vector<Channel> _channels;int _next; };class ProcessPool { public:ProcessPool(){}~ProcessPool(){}void ChildRead(int rfd) const{int taskcode = 0;while (true){ssize_t ret = read(rfd, &taskcode, sizeof(taskcode));// 父進程寫端關閉了,子進程要結束if (ret == 0){std::cout << "父進程寫端關閉,子進程:" << getpid() << "退出" << std::endl;break;}// 讀到的不是4字節,丟棄,重新讀if (ret != sizeof(taskcode)){printf("丟棄\n");return;}// 執行相應任務printf("進程:%d ExcuteTask開始,ret:%d,taskcode:%d\n", getpid(), (int)ret, taskcode);_tm.ExecuteTask(taskcode);}}void Create(int num){for (int i = 0; i < num; i++){// 1.創建管道int pipefd[2] = {0};int ret = pipe(pipefd);if (ret != 0)exit(1);// 2.創建子進程pid_t pid = fork();// 3.關閉父讀,子寫if (pid < 0)exit(1);else if (pid == 0){// 關掉從父進程繼承下來的wfd_cm.CloseFd();// 子進程關閉寫close(pipefd[1]);// 子進程工作printf("ChildRead開始,進程為%d\n", getpid());ChildRead(pipefd[0]);close(pipefd[0]);// 子進程完成工作,退出exit(0);}// 父進程關閉讀close(pipefd[0]);// emplace_back直接構造,插入到_channels_cm.InsertChannel(pipefd[1], pid);// 循環num次}}// 選擇一個子進程,隨機發送任務void Run(){// 1.選擇一個子進程const Channel &c = _cm.SelectChannel();printf("挑選的子進程為:%d\n", c.Pid());// 2.獲取任務碼int taskcode = _tm.TaskCode();// 3.發送任務碼給子進程,子進程執行(寫給子進程)printf("父進程:%d 寫入taskcode:%d\n", getpid(), taskcode);ssize_t ret = write(c.Wfd(), &taskcode, sizeof(taskcode));}void Close() const{_cm.CloseChannels();}private:TaskManager _tm;ChannelManager _cm; };
task.hpp
#pragma once#include <iostream> #include <cstdio> #include <vector> #include <cstdlib> #include <ctime>void Open() {std::cout << "這是一個打開的任務" << std::endl; } void Download() {std::cout << "這是一個下載的任務" << std::endl; } void Upload() {std::cout << "這是一個上傳的任務" << std::endl; }typedef void (*task_t)(); class Task { public:Task(task_t task) : _task(task){}~Task(){}void operator()() const{_task();}task_t getTask() const {return _task;} private:task_t _task; };class TaskManager { public:TaskManager(){// 設置種子數,采用隨機任務形式srand((unsigned)time(nullptr));Register(Open);Register(Download);Register(Upload);}// 注冊任務void Register(task_t t){_tasks.emplace_back(t);}// 返回任務碼int TaskCode() const{int ret = rand() % _tasks.size();return ret;}void ExecuteTask(int taskcode) const{if (taskcode < 0 || taskcode >= _tasks.size()){std::cout << "讀取的任務碼無效" << std::endl;return;}printf("進程:%d執行任務,taskcode為:%d\n",getpid(),taskcode);_tasks[taskcode]();printf("任務:%d 執行完畢\n",taskcode);}~TaskManager(){}private:std::vector<Task> _tasks; };
main.cc
#include <iostream> #include <unistd.h> #include <sys/wait.h> #include "ProcessPool.hpp"int main() {pid_t pid = fork();if (pid == 0){ProcessPool pool;pool.Create(5);int cnt = 5;while (cnt--){sleep(1);pool.Run();}sleep(10);pool.Close();exit(0);}int status;waitpid(pid, &status, 0);std::cout << "main exit code:" << ((status >> 8) & 0xFF) << " signal code:" << (status & 0x7F) << std::endl;return 0; }