Linux第十講:進程間通信IPC
- 1.進程間通信介紹
- 1.1什么是進程間通信
- 1.2為什么要進程間通信
- 1.3怎么進行進程間通信
- 2.管道
- 2.1理解管道
- 2.2匿名管道的實現代碼
- 2.3管道的五種特性
- 2.3.1匿名管道,只能用來進行具有血緣關系的進程進行通信(通常是父子)
- 2.3.2管道文件,自帶同步機制
- 2.3.3管道是面向字節流的
- 2.3.4管道是單向通信的
- 2.3.5(管道)文件的聲明周期,是隨進程的
- 2.4管道的四種通信情況
- 2.4.1寫快,讀慢
- 2.4.2寫慢,讀快
- 2.4.3寫關,繼續讀
- 2.4.4讀關閉,寫繼續
- 2.5知識補充
- 3.進程池的實現
- 3.1什么是進程池
- 3.2進程池框架搭建
- 3.3在進程池中創建多個進程,并分配管道
- 3.4任務分配和執行
- 3.5任務執行策略 && 任務執行代碼實現
- 3.6進程池的回收
- 3.7結果展示
- 3.8問題處理
- 3.8.1解決方法1
- 3.8.2解決方法2
- 4.命名管道
- 4.1什么是命名管道 && 命名管道的原理
- 4.2命名管道的創建 && 進程間通信使用
- 4.3進程間通信的封裝實現
- 4.4命名管道與匿名管道的區別
- 5.system V共享內存
- 5.1什么是共享內存
- 5.2共享內存接口的使用
- 5.3代碼實現兩個進程看到同一份資源
- 5.4進程間通信代碼封裝實現
- 5.5共享內存實現的進程間通信操作
- 5.5.1基于共享內存實現的進程間通信的優缺點分析
- 5.5.2處理共享內存的缺點
- 5.6 shmdt
- 5.7一個細節問題
- 5.8描述共享內存的數據結構
- 6.system V信號量
- 6.1前提知識補充
- 6.2什么是信號量
- 6.3信號量的接口和系統調用
- 7.附錄 -- 進程池完整代碼
1.進程間通信介紹
進程都是獨立的,都有自己獨立的內容和數據,有著自己獨立的結構體,如果是父子進程的話,修改數據還會發生寫時拷貝,那么究竟是怎么進行通信的呢?
1.1什么是進程間通信
進程間通信(IPC)是指運行在一臺計算機或不同計算機上的多個進程之間進行數據交換和通信的技術
1.2為什么要進程間通信
1.3怎么進行進程間通信
我們現在只需要知道:進程間通信的本質,是讓不同的進程,先看到同一份資源,然后才有通信的條件,那么這份資源是誰提供的呢?是進程提供的嗎?不是!因為進程還沒有這么大的權限,這份資源是由操作系統提供的,而操作系統要提供資源,就需要進行系統調用,所以說就必須要有特定的通信接口!我們學習進程間通信,不僅僅要學習原理,還要學習接口的使用:
2.管道
2.1理解管道
管道是很早以前實現的一種進程間通信的方式它的本質是基于文件的通信:
我們先了解一下管道的簡單實現思想:
然后我們再看一下真正的管道的實現原理:
2.2匿名管道的實現代碼
理解管道之后,我們就要掌握管道的創建方法了:
我們先看一個創建管道的系統調用:
創建管道的具體操作:
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/wait.h>void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);//這里我們使用C語言提供的函數write(wfd, buffer, strlen(buffer));}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){buffer[0] = 0;sleep(1);ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}}
}int main()
{//1.創建管道int fds[2] = {0};int n = pipe(fds);if(n < 0){std::cout << "pipe error" << std::endl;return 1;}//2.創建子進程pid_t id = fork();if(id == 0){//假設我們讓子進程寫,父進程讀close(fds[0]);//子進程寫入,需要關閉讀端Child_Write(fds[1]);close(fds[1]);//無論是父進程還是子進程,最后都需要進行關閉所有文件的操作exit(0);}close(fds[1]);Read_Parent(fds[0]);waitpid(id, nullptr, 0);close(fds[0]);return 0;
}
結果:
這樣我們就可以驗證得出,進程運行時拿取到的數據,因為子進程創建了一個cnt的臨時變量
上面講的是匿名管道的原理以及通信方式,而匿名管道經常用于父子進程之間、兄弟進程之間的通信,所以我們還要掌握其它的通信方式:
2.3管道的五種特性
我們先理解管道,然后再學習其它通信方式:
總結:(這里有的知識理解起來比較困難,以后會講到,比如字節流)
1.匿名管道,只能用來進行具有血緣關系的進程進行通信(通常是父子)
2.管道文件,自帶同步機制
3.管道是面向字節流的
4.管道是單向通信的
5.(管道)文件的生命周期,是隨進程的
2.3.1匿名管道,只能用來進行具有血緣關系的進程進行通信(通常是父子)
這個在上面講過了,因為只有存在血緣關系的進程之間才會對files_struct進行淺拷貝
2.3.2管道文件,自帶同步機制
我們先看一個代碼:
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}}
}
當子進程進行管道寫入,每次寫入都要sleep上1s,父進程不斷進行讀取,會發生什么呢?:
我們先來說一下同步的概念:同步的本質就是按照順序、有規律地進行操作。比如買火車票,必須排隊依次買票,不能插隊。
父進程一直需要從文件中讀取數據,但是子進程還沒有寫入,這時,父進程就要進行阻塞狀態,當子進程寫入之后,父進程進行讀取,然后再次進入阻塞狀態,所以說,文件自帶同步機制
2.3.3管道是面向字節流的
我們再來看一個場景:
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){//sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){sleep(5);buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}}
}
當子進程一直寫,而父進程每5秒讀一次,會發生什么呢?:
可以看出,每次讀取一塊,先輸出一個概念:管道是有一個特定的大小的,當寫入的數據達到一定大小,就不能再寫入了,就需要父進程讀取數據,然后再進行寫入。
然而,對于文件流,我們之后再說,我們只需要知道有這個東西就行
2.3.4管道是單向通信的
半雙工:任何一個時刻,一個發,一個收(就像是上課,老師講,自己聽)
全雙工:任何一個時刻,可以同時發收(就像吵架,你不僅要輸出,同時還要聽)
而管道屬于半雙工的一種特殊情況,當收發確定時,就不可再更改了
2.3.5(管道)文件的聲明周期,是隨進程的
管道其實就是一個文件,子進程和父進程同時指向管道,當子進程/父進程結束了,管道就要被回收
2.4管道的四種通信情況
總結:
1.寫快,讀慢
2.寫慢,讀快
3.寫關,繼續讀
4.讀關,繼續寫
2.4.1寫快,讀慢
這個上面已經舉過例子了,也就是當一直寫,但是每個5秒進行讀時。
管道文件的大小固定,當寫入固定大小數據之后,就不再進行寫入了,此時寫要進入阻塞狀態,等待讀端
2.4.2寫慢,讀快
這個上面也講過了,也就是一直讀,但是每個1s才寫。
此時讀端進入阻塞狀態,等待寫端寫入
2.4.3寫關,繼續讀
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){//sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));close(wfd);exit(1);}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){sleep(1);buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}else if(n == 0){std::cout << "n: " << n << std::endl;std::cout << "Child退出,我也退出" << std::endl;break;}else{break;}}
}
結果:
此時,read會讀取數據,讀到返回值為0,表示文件結尾,但是此時子進程不會退出,而是不再進行寫入操作,但是還會執行其它的指令
2.4.4讀關閉,寫繼續
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));//close(wfd);exit(1);//printf("hello");}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){//sleep(1);buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);close(rfd);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}else if(n == 0){std::cout << "n: " << n << std::endl;std::cout << "Child退出,我也退出" << std::endl;break;}else{break;}}printf("test\n");
}
當提前關閉讀時,此時寫入管道就沒有任何意義了,因為寫入了也沒有什么用,而操作系統不會允許這樣的情況發生,所以操作系統就會向子進程發送13號(SIGPIPE)信號,從而終止子進程,我們可以進行驗證:
結果:
退出碼是13!
2.5知識補充
1.管道的容量我們可以通過每次寫入一個字節,cnt計數++的方法來確定,在我的ubunto系統下,是64kb
2.管道的寫入是原子性的,其實也就是指如果要執行寫入操作的話,就必須將整段話寫完,才能進行讀取。比如說,當我們寫入hello world,只有當hello world寫完之后,才能進行讀取,這個以后會講到
3.進程池的實現
進程池實現的目的在于加深對于管道的認識和使用:
3.1什么是進程池
3.2進程池框架搭建
// 先描述 -- 管道
class Channel
{
public:Channel() {};~Channel() {};private:int _wfd; // 讀端fdpid_t _subid; // 進程pidstd::string _name; // 用來方便管道標識
};// 再組織 -- 管道管理
class ChannelManager
{
public:ChannelManager() {};~ChannelManager() {};private:std::vector<Channel> _channels; // 用來管理所有的管道
};// 進程池
class ProcessPoll
{
public:ProcessPoll() {};~ProcessPoll() {};private:ChannelManager _cm; // 對管道進行管理int _process_num; // 創建多少個進程
};
3.3在進程池中創建多個進程,并分配管道
進程池的框架搭好之后,需要啟動進程池,從而創建出需要的進程和管道:
int main()
{//創建進程池,假如進程池中需要5個進程ProcessPoll pp(5);//啟動進程池,執行進程創建工作pp.start();return 0;
}
// 先描述
class Channel
{
public:Channel(int wfd, pid_t subid):_wfd(wfd), _subid(subid){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}~Channel() {};private:int _wfd; // 讀端fdpid_t _subid; // 進程pidstd::string _name; // 用來方便管道標識
};// 再組織
class ChannelManager
{
public:ChannelManager() {};void Insert(int wfd, pid_t subid){_channels.push_back({wfd, subid});}~ChannelManager() {};private:std::vector<Channel> _channels; // 用來管理所有的管道
};// 進程池
class ProcessPoll
{
public:ProcessPoll(int num):_process_num(num){};//子進程工作bool Work(int rfd) {}//在進程池中創建進程,并將管道進行管理bool start(){for(int i = 0; i<_process_num; i++)//創建定量的進程{//1.創建管道int pidfd[2] = {0};int n = pipe(pidfd);if(n < 0) return false;//2.創建子進程pid_t subid = fork();if(subid < 0){printf("創建子進程失敗\n");return false;}else if(subid == 0){//子進程 -- 讀端 -- pidfd[0]close(pidfd[1]);Work(pidfd[0]);close(pidfd[0]);//work執行完畢,證明進程結束,關閉文件exit(1);}else{//父進程 -- 寫端 -- pidfd[1]close(pidfd[0]);_cm.Insert(pidfd[1], subid);//將創建的進程和管道進行管理//此時還不確定需要執行什么任務,所以只能讓子進程處于阻塞狀態//通過其它函數確定好執行任務之后,父進程才進行寫入}}}~ProcessPoll(){};private:ChannelManager _cm; // 對管道進行管理int _process_num; // 創建多少個進程
};
在看上面的代碼時,先看ProcessPoll(進程池)結構體,然后再看其它的就行
3.4任務分配和執行
int main()
{//創建進程池,假如進程池中需要5個進程ProcessPoll pp(5);//啟動進程池,執行進程創建工作pp.start();//有了進程池,就能夠分配任務并讓進程池中的進程執行了int cnt = 10;while(cnt--){pp.Run();//執行10次命令sleep(1);}return 0;
}
我們假設任務是隨機分配的,我們將任務清單和任務管理再寫一個文件:
#pragma once
#include <iostream>
#include <vector>
#include <ctime>typedef void (*task_t)();
/Task/
void PrintLog()
{std::cout << "我是一個打印日志的任務" << std::endl;
}void Download()
{std::cout << "我是一個下載的任務" << std::endl;
}void Upload()
{std::cout << "我是一個上傳的任務" << std::endl;
}
/Task/class TaskManager
{
public:TaskManager(){srand(time(nullptr));}//將任務插入到任務管理表中void Register(task_t task){_tasks.push_back(task);}//任務碼(每一個任務碼對應一個任務,拿到一個任務碼,就執行對應的任務)int Code(){return rand() % _tasks.size();}//任務執行void Execute(int code)//拿到任務碼執行任務{if(code>=0 && code<_tasks.size()){_tasks[code]();}}~TaskManager() {};private:std::vector<task_t> _tasks;
};
所以說,我們的進程池中將需要執行的任務進行插入,然后再拿到任務碼執行任務就可以了:
// 進程池
class ProcessPoll
{
public:ProcessPoll(int num):_process_num(num){//將任務進行插入_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}//執行命令void Run(){//假如執行的任務是隨機分配的int taskcode = _tm.Code();//任務執行_tm.Execute(taskcode);}~ProcessPoll(){};private:ChannelManager _cm; // 對管道進行管理int _process_num; // 創建多少個進程TaskManager _tm;
};
但是這里會有問題:如果任務都讓一個進程執行的話,那么會導致累的累死,閑的閑死的情況,所以我們要通過一定的策略進行任務的執行
3.5任務執行策略 && 任務執行代碼實現
下面我們來實現代碼:
// 先描述
class Channel
{
public://任務碼的發送void Send(int code){//拿到需要執行的任務之后,我們需要將任務碼先發送出去,子進程拿到任務碼之后,就可以進行任務執行了int n = write(_wfd, &code, sizeof(code));(void)n;//防止警告}private:int _wfd; // 讀端fdpid_t _subid; // 進程pidstd::string _name; // 用來方便管道標識
};// 再組織
class ChannelManager
{
public:// 選擇一個需要執行的進程Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}private:std::vector<Channel> _channels; // 用來管理所有的管道int _next; // 下一個需要執行的進程
};// 進程池
class ProcessPoll
{
public:// 子進程工作void Work(int rfd) {//創建好子進程之后,子進程就進入該函數,進入阻塞狀態//一旦父進程通過函數調用寫入了一個任務碼,就可以進行任務執行while(true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if(n > 0)//讀取成功{if(n != sizeof(code)) continue;//讀取不符std::cout << "子進程[" << getpid() << "]收到一個任務碼: " << code << std::endl;_tm.Execute(code);}else if(n == 0)//父進程關閉寫端{std::cout << "子進程退出" << std::endl;break;}else{printf("read失敗\n");break;}}}// 執行命令void Run(){// 假如執行的任務是隨機分配的int taskcode = _tm.Code();// 任務執行 -- 采用輪詢的方式,所以我們要先知道該哪個進程執行了// 1.選擇一個能夠執行任務的進程auto &c = _cm.Select();std::cout << "選擇了一個子進程: " << c.Name() << std::endl;// 2. 發送任務碼c.Send(taskcode);std::cout << "發送了一個任務碼: " << taskcode << std::endl;}// 在進程池中創建進程,并將管道進行管理bool start(){for (int i = 0; i < _process_num; i++) // 創建定量的進程{// 1.創建管道int pidfd[2] = {0};int n = pipe(pidfd);if (n < 0)return false;// 2.創建子進程pid_t subid = fork();if (subid < 0){printf("創建子進程失敗\n");return false;}else if (subid == 0){close(pidfd[1]);//創建子進程之后,子進程直接進入工作狀態Work(pidfd[0]);close(pidfd[0]);exit(1);}else{close(pidfd[0]);_cm.Insert(pidfd[1], subid);}}return true;}private:ChannelManager _cm; // 對管道進行管理int _process_num; // 創建多少個進程TaskManager _tm;
};
上面的代碼并不是完整的代碼,因為完整的代碼容易造成混亂
3.6進程池的回收
進程池工作完畢之后,要進行資源的回收:
#pragma once#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <cstdlib>
#include <sys/wait.h>
#include "Task.hpp"// 先描述
class Channel
{
public:void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}private:int _wfd; // 讀端fdpid_t _subid; // 進程pidstd::string _name; // 用來方便管道標識
};// 再組織
class ChannelManager
{
public:ChannelManager():_next(0) {}//管道的回收 -- 關閉wfdvoid StopSubProcess(){for (auto &channel : _channels){channel.Close();std::cout << "關閉: " << channel.Name() << std::endl;}}//進程回收 -- waitpidvoid WaitSubProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}private:std::vector<Channel> _channels; // 用來管理所有的管道int _next; // 下一個需要執行的進程
};// 進程池
class ProcessPoll
{
public:ProcessPoll(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}//進程的回收void Stop(){//管道的回收,只需要將父進程的讀端關閉即可_cm.StopSubProcess();//子進程的回收,需要waitpid_cm.WaitSubProcess();}private:ChannelManager _cm; // 對管道進行管理int _process_num; // 創建多少個進程TaskManager _tm;
};
3.7結果展示
確實很好的執行了我們想要的效果!
3.8問題處理
但是這里會出現一個問題,我們來看:
當回收進程時,我們直接使用一個函數調用進行回收,會發生什么?會發生進程池沒有正確地被回收,卡著的情況,我們講一下原理:
3.8.1解決方法1
我們很容易能夠想到的解決方法是:從后向前進行進程的關閉不就行了?
void CloseAndWait()
{//解決方法1:倒著回收for(int i = _channels.size()-1; i>=0; i--){_channels[i].Close();_channels[i].Wait();}
}
3.8.2解決方法2
難道不可以真的只讓父進程有w端的指向嗎?:
void CloseAndWait()
{//解決方法2:只讓父進程有w端的指向for (auto &channel : _channels){channel.Close();channel.Wait();}
}void CloseAll()
{for(auto& channel : _channels){channel.Close();}
}else if (subid == 0)
{// 子進程 -- 讀端 -- pidfd[0]_cm.CloseAll();//在子進程創建出來之后就將子進程多余的指向進行關閉close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(1);
}
但是我們可能會有疑問:子進程執行關閉工作的時候,父進程也在將創建的子進程進行插入到管道表中呀,如果將子進程自己插入到表中了,然后子進程恰好把自己關了,那不就錯了嗎?
上面的疑問是對于寫時拷貝不清楚,當子進程創建之后,就繼承有父進程的一張管道管理表,而這張表中是沒有自己進程的數據的,因為自己剛創建出來,當父進程需要向表中進行寫入工作時,發生寫時拷貝,所以子進程永遠拿不到有自己進程號的那張表,只會拿到之前的進程表!
4.命名管道
4.1什么是命名管道 && 命名管道的原理
之前我們講的匿名管道會有一個局限性,也就是只能滿足有血緣關系的進程之間的通信需要,不能滿足兩個完全不同的進程之間的通信需求,所以我們還需要學習命名管道:
4.2命名管道的創建 && 進程間通信使用
我們可以使用mkfifo命令來實現命名管道的創建操作:
#include <sys/types.h>
#include <sys/stat.h>int mkfifo(const char *pathname, mode_t mode);
使用,我們先掌握一下命令的使用和刪除管道操作(當然可以使用rm命令):
可以看出,命名管道是p開頭的文件,而該函數第二個參數其實就是設置權限就行,返回值:成功返回0,否則返回-1:
int main()
{umask(0);int n = mkfifo(FIFO_FILE, 0666);if(n != 0){std::cout << "mkfifo error" << std::endl;return 1;}return 0;
}
那么兩個完全不同的進程之間的通信的實現是如何完成的?:
// sercer.cc文件 -- 用于創建出管道,并向管道中進行讀取操作int main()
{// 創建管道umask(0);int n = mkfifo(FIFO_FILE, 0666);if (n == -1){std::cout << "mkfifo filed" << std::endl;exit(0);}std::cout << "mkfifo success" << std::endl;// 進行管道的讀取操作int fd = open(FIFO_FILE, O_RDONLY);if (fd > 0){while (true){// 先打開管道文件char buffer[1024];ssize_t number = read(fd, buffer, sizeof(buffer) - 1);if (number > 0){buffer[number] = 0;std::cout << buffer << std::endl;}else{// TODO}}}else{std::cout << "read open filed" << std::endl;}close(fd);// 刪除管道n = unlink(FIFO_FILE);if (n == -1){std::cout << "unlink filed" << std::endl;exit(0);}std::cout << "unlink success" << std::endl;return 0;
}////client.cc文件 -- 進行管道的寫入操作int main()
{int fd = open(FIFO_FILE, O_WRONLY);if(fd > 0){std::string message;int cnt = 1;pid_t id = getpid();while(true){std::getline(std::cin, message);message += (",message number: " + std::to_string(cnt) + ", [" + std::to_string(id) + "]");ssize_t number = write(fd, message.c_str(), sizeof(message));}}else{std::cout << "write open filed" << std::endl;}close(fd);return 0;
}
當我們運行管道的讀端server.cc文件,結果為:
也就是說此時并沒有進行文件的創建操作
我們直接輸出結論:當write方法沒有執行open的時候,read方法就會在open內部進行阻塞,知道有人把管道文件打開了,open才會返回
此時還有一個問題,當寫端關閉的時候,讀端并沒有關閉,原因如下:
// 進行管道的讀取操作
int fd = open(FIFO_FILE, O_RDONLY);
if (fd > 0)
{while (true){// 先打開管道文件char buffer[1024];ssize_t number = read(fd, buffer, sizeof(buffer) - 1);if (number > 0){buffer[number] = 0;std::cout << buffer << std::endl;}else if(number == 0){// TODOstd::cout << "write close, me too" << std::endl;break;}else{//處理其它的情況std::cerr << "read error" << std::endl;break;}}
}
4.3進程間通信的封裝實現
下面我們要將對于管道的創建、管道的讀取等操作進行一系列的封裝,來達到隱藏內部細節,在外部可以直接使用的目的:
//comm.hpp文件 -- 對命名管道的封裝操作// 命名管道類實現 -- 包含管道的創建和析構操作
class Namedfifo
{
public:Namedfifo(const std::string &path, const std::string &name): _path(path), _name(name){// 構造函數中,需要創建一個管道umask(0);_fifoname = _path + "/" + _name;int n = mkfifo(_fifoname.c_str(), 0666);if (n == -1){std::cout << "mkfifo filed" << std::endl;exit(0);}std::cout << "mkfifo success" << std::endl;}~Namedfifo(){// 刪除管道int n = unlink(_fifoname.c_str());if (n == -1){std::cout << "unlink filed" << std::endl;exit(0);}std::cout << "unlink success" << std::endl;}private:std::string _path;std::string _name;std::string _fifoname;
};// 文件操作類實現 -- 包含對管道的一系列操作
class Fileoper
{
public:Fileoper(const std::string &path, const std::string &name): _path(path), _name(name){_fifoname = _path + "/" + _name;}// 管道讀操作void OpenForRead(){_fd = open(_fifoname.c_str(), O_RDONLY);if (_fd > 0){while (true){// 先打開管道文件char buffer[1024];ssize_t number = read(_fd, buffer, sizeof(buffer) - 1);if (number > 0){buffer[number] = 0;std::cout << buffer << std::endl;}else if (number == 0){std::cout << "write close, me too" << std::endl;break;}else{// 處理其它的情況std::cerr << "read error" << std::endl;break;}}}else{std::cout << "read open filed" << std::endl;}close(_fd);}// 管道寫操作void OpenForWrite(){_fd = open(_fifoname.c_str(), O_WRONLY);if (_fd > 0){std::string message;int cnt = 1;pid_t id = getpid();while (true){std::getline(std::cin, message);message += (",message number: " + std::to_string(cnt) + ", [" + std::to_string(id) + "]");ssize_t number = write(_fd, message.c_str(), sizeof(message));}}else{std::cout << "write open filed" << std::endl;}close(_fd);}void Close(){if (_fd > 0)close(_fd);}private:std::string _path;std::string _name;std::string _fifoname;int _fd;
};
這樣就極大的簡化了操作:
// sercer.cc文件 -- 用于創建出管道,并向管道中進行讀取操作int main()
{Namedfifo fifo(".", "fifo");// 進行管道的讀取操作Fileoper readfile(".", "fifo");readfile.OpenForRead();return 0;
}// sercer.cc文件 -- 用于創建出管道,并向管道中進行讀取操作int main()
{Namedfifo fifo(".", "fifo");// 進行管道的讀取操作Fileoper readfile(".", "fifo");readfile.OpenForRead();return 0;
}
但是我們每次進行write或open等操作時,對于錯誤都要進行單獨的處理,比較麻煩,所以我們可以使用宏函數來處理這個問題:
#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)Namedfifo(const std::string &path, const std::string &name): _path(path), _name(name)
{umask(0);_fifoname = _path + "/" + _name;int n = mkfifo(_fifoname.c_str(), 0666);if (n == -1){//在使用時,就可以直接使用了:ERR_EXIT("mkfifo");}std::cout << "mkfifo success" << std::endl;
}
4.4命名管道與匿名管道的區別
命名管道與匿名管道的區別只有:命名管道可以用來進行不相關的進程之間的通信
其它的特性和匿名管道相同(五種特性都是一樣的)
5.system V共享內存
system V其實是一種標準,是早期UNIX操作系統的一個版本,但是它提供了一套豐富的IPC機制,我們已經了解過,只有在某一領域非常領先之后才可以指定標準。Linux內核支持了這種標準,專門為此設計了一個IPC通信模塊,其實也就是提供了通信的接口設計
5.1什么是共享內存
我們都知道,IPC的本質是:讓不同的進程,看到同一份資源,了解共享內存才可以對其進行使用:
5.2共享內存接口的使用
我們先認識一下使用的接口:
我們通過代碼來使用這些接口,直接進行封裝了:
const std::string pathname = ".";
const int projid = 0x66;
const int gdefaultid = -1;
const int gsize = 4096;#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0);class Shm
{
public:Shm() :_shmid(gdefaultid), _size(gsize){}// 創建一個共享內存void Creat(){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, IPC_CREAT | IPC_EXCL);if(_shmid < 0){ERR_EXIT("shmget");}printf("shmget return value is : %d\n", _shmid);}~Shm(){}private:int _shmid;int _size;
};
運行之后,我們可以使用ipcs -m來查看我們剛創建的共享內存:
當我們關閉xshell之后,再次查看共享內存,會發現,共享內存還在,其實,共享內存的聲明周期是隨內核的,只有當操作系統重啟之后,共享內存才會被回收,那么回收使用的指令級操作是什么呢?:
我們寫代碼時肯定不會寫指令級的代碼,那么有沒有一個函數,是用來進行共享內存的操作的呢?有!shmctl:
那么代碼實現共享內存的操作如下:
//刪除共享內存
void Destroy()
{if(_shmid == gdefaultid) return;int n = shmctl(_shmid, IPC_RMID, nullptr);if(n < 0){ERR_EXIT("shmctl");}printf("shmctl delete shm: %d success\n", _shmid);
}
5.3代碼實現兩個進程看到同一份資源
我們上面講過,讓兩個進程看到同一個共享空間,需要讓虛擬進程空間的指針通過頁表與共享空間的指針進行對應,這個操作稱為關聯,而shmat(at: attach)就是代碼級別的操作:
那么共享內存的關聯操作的代碼實現如下:
// 共享內存的關聯操作
void Attach()
{if (_shmid == gdefaultid)return;_start_mem = shmat(_shmid, nullptr, 0);if ((long long)_start_mem < 0)ERR_EXIT("shmat");printf("shmat success\n");
}
// 返回關聯的虛擬空間內的地址
void *VirtualAddr()
{printf("VirtualAddr: %p\n", _start_mem);return _start_mem;
}
但是我們運行時可能會報錯:Permission Denied,也就是說,我們的進程沒有權限訪問共享內存,下面我們進行講解:
我們實現了共享內存的創建、刪除和關聯操作,那么為了實現讓兩個進程都可以看到同一個共享內存,還差一個操作,也就是如何讓另一個進程看到這個共享內存,原理其實就是找到同一個key值,然后進行關聯即可:
// 創建一個共享內存
void Creat()
{key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, IPC_CREAT | IPC_EXCL | 0666);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);
}
// 讓另外一個進程獲取共享內存
void Get()
{key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, IPC_CREAT);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);
}
我們可以發現,這兩個操作很相似,知識shmget的參數有一點區別,所以我們進行封裝:
private:void CreatHelper(int flg){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, flg);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);}public:// 創建一個共享內存void Creat(){CreatHelper(IPC_CREAT | IPC_EXCL | 0666);}// 讓另外一個進程獲取共享內存void Get(){CreatHelper(IPC_CREAT);}
完整代碼實現:
const std::string pathname = ".";
const int projid = 0x66;
const int gdefaultid = -1;
const int gsize = 4096;#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0);class Shm
{
private:void CreatHelper(int flg){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, flg);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);}public:Shm() : _shmid(gdefaultid), _size(gsize), _start_mem(nullptr){}// 創建一個共享內存void Creat(){CreatHelper(IPC_CREAT | IPC_EXCL | 0666);}// 讓另外一個進程獲取共享內存void Get(){CreatHelper(IPC_CREAT);}// 共享內存的關聯操作void Attach(){if (_shmid == gdefaultid)return;_start_mem = shmat(_shmid, nullptr, 0);if ((long long)_start_mem < 0)ERR_EXIT("shmat");printf("shmat success\n");}// 返回關聯的虛擬空間內的地址void *VirtualAddr(){printf("VirtualAddr: %p\n", _start_mem);return _start_mem;}// 刪除共享內存void Destroy(){if (_shmid == gdefaultid)return;int n = shmctl(_shmid, IPC_RMID, nullptr);if (n < 0){ERR_EXIT("shmctl");}printf("shmctl delete shm: %d success\n", _shmid);}~Shm(){}private:int _shmid;int _size;void *_start_mem;
};
5.4進程間通信代碼封裝實現
上面我們實現了讓共享內存被兩個不同的進程成功拿到并映射,本質是拿到同一份資源,但是我們并不想讓結構體中的太多函數暴露給上層,所以我們要對我們的代碼再進行一定的優化:
const std::string pathname = ".";
const int projid = 0x66;
const int gdefaultid = -1;
const int gsize = 4096;
#define CREATER "creater"
#define USER "user"#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0);class Shm
{
private:void CreatHelper(int flg){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, flg);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);}// 共享內存的關聯操作void Attach(){if (_shmid == gdefaultid)return;_start_mem = shmat(_shmid, nullptr, 0);if ((long long)_start_mem < 0)ERR_EXIT("shmat");printf("shmat success\n");}// 刪除共享內存void Destroy(){if (_shmid == gdefaultid)return;int n = shmctl(_shmid, IPC_RMID, nullptr);if (n < 0){ERR_EXIT("shmctl");}printf("shmctl delete shm: %d success\n", _shmid);}public:Shm(const std::string &pathname, const std::string &usertype): _shmid(gdefaultid),_size(gsize),_start_mem(nullptr),_pathname(pathname),_usertype(usertype){if (_usertype == CREATER)CreatHelper(IPC_CREAT | IPC_EXCL | 0666);else if (_usertype == USER)CreatHelper(IPC_CREAT);else{}Attach();}// 返回共享內存的起始地址void *VirtualAddr(){printf("VirtualAddr: %p\n", _start_mem);return _start_mem;}// 返回共享內存的大小int Size(){printf("Shm Size is: %d\n", _size);return _size;}~Shm(){//只有創建共享內存的需要執行刪除操作if(_usertype == CREATER)Destroy();}private:int _shmid;int _size;void *_start_mem;std::string _pathname;std::string _usertype;
};
此時我們的代碼就會被優化很多了
5.5共享內存實現的進程間通信操作
我們之前進行的操作的核心在于讓不同的進程看到同一份資源,那么怎么實現依靠于共享內存進程間通信呢?:
5.5.1基于共享內存實現的進程間通信的優缺點分析
5.5.2處理共享內存的缺點
對于共享內存的這個缺點,我們可以使用什么方法來解決呢?可以使用命名管道的方法解決,但是這個方法顯然不是正規的解決方法,我們先來看該方法的思路:
為什么說這個方法不正規呢?因為根本不會有人使用這個方法,而且這個方法引出的問題也很多,我們只是學習,下面來看:
完整代碼實現:
5.6 shmdt
我們始終都忽略了一個接口,也就是我們只實現了共享內存的掛起,也就是將共享內存映射到了物理內存中,建立了頁表的映射,但是我們忽略了共享內存使用結束之后的取消掛起操作,也就是取消進程頁表和物理內存的映射關系:
5.7一個細節問題
我們開辟的共享內存為4096,那么我們如果開辟了4097的空間,那么操作系統給我們的空間是多少?我們實際可以使用的空間又是多少?
在內核中,共享內存在創建的時候,它的大小,必須是4096的倍數,所以操作系統給我們的實際空間大小為4096*2,但是我們只能使用4096的空間!
5.8描述共享內存的數據結構
6.system V信號量
system V標準其實并不重要,想要了解ststem V消息隊列的可以自己進行搜索學習,但是我們要通過該標準引入信號和信號量
6.1前提知識補充
6.2什么是信號量
6.3信號量的接口和系統調用
因為system V信號量有它自己的缺點,所以這里我么不再進行講述,想再了解的直接搜索:system V信號量,但是我們進行一些結論的輸出:
共享內存、消息隊列、信號量的接口,都是使用key值來進行唯一性的區分的,OS將這三種當作成了同一種資源,這也是為什么這三個叫做system V標準了!在底層,因為這三種資源都是使用key進行唯一性的區分的,所以OS使用了一個結構體,就對這三種資源進行了統一的管理!!!
7.附錄 – 進程池完整代碼
進程池實現:
#pragma once#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <cstdlib>
#include <sys/wait.h>
#include "Task.hpp"// 先描述
class Channel
{
public:Channel(int wfd, pid_t subid) : _wfd(wfd), _subid(subid){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}//任務碼的發送void Send(int code){//拿到需要執行的任務之后,我們需要將任務碼先發送出去,子進程拿到任務碼之后,就可以進行任務執行了ssize_t n = write(_wfd, &code, sizeof(code));(void)n;//防止警告}void Close(){close(_wfd);std::cout << "關閉: " << _name << std::endl;}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);std::cout << "回收: " << _name << std::endl;(void)rid;}int Fd() { return _wfd; }pid_t SubId() { return _subid; }std::string Name() { return _name; }~Channel() {}private:int _wfd; // 讀端fdpid_t _subid; // 進程pidstd::string _name; // 用來方便管道標識
};// 再組織
class ChannelManager
{
public:ChannelManager():_next(0) {}void Insert(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);}// 選擇一個需要執行的進程Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}void PrintChannel(){for (auto &channel : _channels){std::cout << channel.Name() << std::endl;}}//管道的回收 -- 關閉wfdvoid StopSubProcess(){for (auto &channel : _channels){channel.Close();std::cout << "關閉: " << channel.Name() << std::endl;}}//進程回收 -- waitpidvoid WaitSubProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}void CloseAndWait(){// //解決方法1:倒著回收// for(int i = _channels.size()-1; i>=0; i--)// {// _channels[i].Close();// _channels[i].Wait();// }//解決方法2:只讓父進程有w端的指向for (auto &channel : _channels){channel.Close();channel.Wait();}}void CloseAll(){for(auto& channel : _channels){channel.Close();}}~ChannelManager() {}private:std::vector<Channel> _channels; // 用來管理所有的管道int _next; // 下一個需要執行的進程
};const int gdefaultnum = 5;// 進程池
class ProcessPoll
{
public:ProcessPoll(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}// 子進程工作void Work(int rfd) {//創建好子進程之后,子進程就進入該函數,進入阻塞狀態//一旦父進程通過函數調用寫入了一個任務碼,就可以進行任務執行while(true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if(n > 0)//讀取成功{if(n != sizeof(code)) continue;//讀取不符std::cout << "子進程[" << getpid() << "]收到一個任務碼: " << code << std::endl;_tm.Execute(code);}else if(n == 0)//父進程關閉寫端{std::cout << "子進程退出" << std::endl;break;}else{printf("read失敗\n");break;}}}// 執行命令void Run(){// 假如執行的任務是隨機分配的int taskcode = _tm.Code();// 任務執行 -- 采用輪詢的方式,所以我們要先知道該哪個進程執行了// 1.選擇一個能夠執行任務的進程auto &c = _cm.Select();std::cout << "選擇了一個子進程: " << c.Name() << std::endl;// 2. 發送任務碼c.Send(taskcode);std::cout << "發送了一個任務碼: " << taskcode << std::endl;}// 在進程池中創建進程,并將管道進行管理bool Start(){for (int i = 0; i < _process_num; i++) // 創建定量的進程{// 1.創建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false;// 2.創建子進程pid_t subid = fork();if (subid < 0){printf("創建子進程失敗\n");return false;}else if (subid == 0){// 子進程 -- 讀端 -- pidfd[0]_cm.CloseAll();close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]); // work執行完畢,證明進程結束,關閉文件exit(1);}else{// 父進程 -- 寫端 -- pidfd[1]close(pipefd[0]);_cm.Insert(pipefd[1], subid); // 將創建的進程和管道進行管理// 此時還不確定需要執行什么任務,所以只能讓子進程處于阻塞狀態// 通過其它函數確定好執行任務之后,父進程才進行寫入}}return true;}void Debug(){_cm.PrintChannel();}//進程的回收void Stop(){// //管道的回收,只需要將父進程的讀端關閉即可// _cm.StopSubProcess();// //子進程的回收,需要waitpid// _cm.WaitSubProcess();_cm.CloseAndWait();}~ProcessPoll() {}private:ChannelManager _cm; // 對管道進行管理int _process_num; // 創建多少個進程TaskManager _tm;
};
任務實現:
#pragma once
#include <iostream>
#include <vector>
#include <ctime>typedef void (*task_t)();/Task/
void PrintLog()
{std::cout << "我是一個打印日志的任務" << std::endl;
}void Download()
{std::cout << "我是一個下載的任務" << std::endl;
}void Upload()
{std::cout << "我是一個上傳的任務" << std::endl;
}
/Task/class TaskManager
{
public:TaskManager(){srand(time(nullptr));}//將任務插入到任務管理表中void Register(task_t task){_tasks.push_back(task);}//任務碼(每一個任務碼對應一個任務,拿到一個任務碼,就執行對應的任務)int Code(){return rand() % _tasks.size();}//任務執行void Execute(int code)//拿到任務碼執行任務{if(code>=0 && code<_tasks.size()){_tasks[code]();}}~TaskManager(){}private:std::vector<task_t> _tasks;
};
Main:
#include <iostream>
#include "ProcessPoll.hpp"int main()
{//創建進程池,假如進程池中需要5個進程ProcessPoll pp(gdefaultnum);//啟動進程池,執行進程創建工作pp.Start();//有了進程池,就能夠分配任務并讓進程池中的進程執行了int cnt = 10;while(cnt--){pp.Run();//執行10次命令sleep(1);}//進程的回收pp.Stop();return 0;
}