1 進程間通信的必要性
首先要明確進程間是相互獨立的(獨享一份虛擬地址空間,頁表,資源),那怎么樣才能使得兩個進程間實現資源的發送?所以,兩個進程一定需要看到同一份資源,并且?個進程需要向另?個或?組進程發送消息,通知它(它們)發生了某種事件(如進程終止時要通知父進程)。
進程間通信的目的:
1.數據傳輸
2.資源共享
3.通知事件(也就是?個進程向另?個或?組進程發送消息)
4.進程控制(一些進程希望完全控制另?個進程的執行)
1.1 進程間通信分類
管道:(基于文件的通信方法)
1.匿名管道pipe
2.命名管道
System V IPC:(單獨設計的通信模塊)
System V 消息隊列?
System V 共享內存
System V 信號量
?POSIX IPC:(網絡間進程通信)
消息隊列
共享內存
信號量
互斥量
條件變量
讀寫鎖
2 管道
管道是從?個進程連接到另?個進程的?個數據流。
管道的本質是一個基于文件系統的一個內存級的單向通信的文件,主要用于進程間通信(IPC)。
所以管道其實也是文件,在前面講的文件系統中,那個管道文件是不是也要創建,要打開,要有對應的路徑解析,要有對應的inode,那么文件使用的接口(write,read等管道也是可以直接使用的),但是這個文件不需要和磁盤進行IO,只需要存在內核中,所以就沒有路徑,沒有文件名,是內核中模擬的一個文件(所以叫匿名管道),創建一個緩沖區,利用緩沖區實現兩個進程看到同一份資源。
3 匿名管道
系統調用:
int pipe(int fd[2]);
這里fd是一個輸出型參數,fd所對應的也就是之前講的文件描述符,fd[0]表示讀端, fd[1]表示寫端,
成功返回0,失敗返回錯誤代碼。
3.1 用fork來共享管道原理
父進程創建管道后,創建子進程,子進程會拷貝父進程的資源,但是文件本身并未拷貝,而是訪問該文件的文件描述符,所對應的地址是同一個文件,這時父進程關閉讀端,子進程關閉寫端,(這里關閉讀寫端的操作可以不做也能使用,但是還是最好做,以防誤操作,畢竟是要實現單向通信)這樣父進程就可以向管道中寫入,子進程就可以從管道中讀取文件。
大致邏輯:
給出一個代碼例子:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>int main(int argc, char *argv[])
{int pipefd[2];if (pipe(pipefd) == -1){perror("pipe");}pid_t pid;pid = fork();if (pid == -1){perror("fork");} if (pid == 0) {close(pipefd[0]);write(pipefd[1], "hello", 5);close(pipefd[1]);exit(0);}close(pipefd[1]);char buf[10] = {0};read(pipefd[0], buf, 10);printf("buf=%s\n", buf);return 0;
}
這里就是通過子進程向管道寫入“hello”,父進程讀取并打印。
3.2 管道讀寫規則
當沒有數據可讀時:
O_NONBLOCK disable:read調?阻塞,即進程暫停執行,?直等到有數據來到為止。
O_NONBLOCK enable:read調用返回-1,errno值為EAGAIN。
當管道滿的時候 :
O_NONBLOCK disable:write調?阻塞,直到有進程讀?數據?
O_NONBLOCK enable:調用返回-1,errno值為EAGAIN
如果所有管道寫端對應的文件描述符被關閉,則read返回0
如果所有管道讀端對應的文件描述符被關閉,則write操作會產生信號SIGPIPE,進而可能導致 write進程退出。
3.3?管道讀寫的幾種情況
情況一:管道里沒有數據,讀端正常
這時讀端就會阻塞,等待寫端寫入。
情況二:讀端不讀取,寫端一直寫
緩沖區寫滿了,就不會再寫入了。
情況三:寫端關閉(不寫),讀端正常
read之后,就會返回0,表示讀到文件結尾。
情況四:讀端關閉,寫端正常
OS直接殺掉該進程。(讀端關閉了,寫入就沒有意義了,OS不會做浪費時間和空間的事,所以直接殺掉該進程)
3.4 管道的特點
只能用于具有共同祖先的進程(具有親緣關系的進程)之間進行通信;通常,?個管道由?個進 程創建,然后該進程調用fork,此后父,子進程之間就可應用該管道。
管道提供流式服務
進程退出,管道釋放,所以管道的生命周期隨進程
內核會對管道操作進行同步與互斥
管道是半雙工的,數據只能向?個方向流動;需要雙方通信時,需要建立起兩個管道
3.5 創建進程池處理任務
進程池是用父進程創建一批子進程,通過父進程向管道里面寫入信息通知對應的進程完成對應的任務。
創建函數:
using callback_t =std::function<void(int fd)>;
bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0){perror("pipe");return false;}pid_t id = fork();if (id < 0){perror("fork");return false;}else if (id == 0){close(pipefd[1]);cb(pipefd[0]);exit(0);}close(pipefd[0]);std::string name = "channel-" + std::to_string(i);_channels.emplace_back(pipefd[1], name, id);}return true;}
通過一個數組將這一批子進程(管道寫端)進行(channel)類的管理,這里的cb是父進程通知子進程后的處理對應任務的函數。
channel類:
class channel
{
public:channel(){}channel(int wfd, std::string name, pid_t id): _wfd(wfd), _name(name), _sub_target(id){}int fd() { return _wfd; }std::string name() { return _name; }pid_t target() { return _sub_target; }void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_sub_target, nullptr, 0);(void)rid;}~channel(){}
private:int _wfd;std::string _name;pid_t _sub_target;
};
成員包括寫端的文件描述符,管道名稱,對應pid。以及關閉寫端的函數,回收子進程的函數。
任務處理函數:
這里通過一批打印函數來模擬任務的調用
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>
// 4種任務
// task_t[4];using task_t = std::function<void()>;void Download()
{std::cout << "我是一個downlowd任務" << std::endl;
}void MySql()
{std::cout << "我是一個 MySQL 任務" << std::endl;
}void Sync()
{std::cout << "我是一個數據刷新同步的任務" << std::endl;
}void Log()
{std::cout << "我是一個日志保存任務" << std::endl;
}std::vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download);tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};Init ginit;
while(true){int code = 0;//std::cout << "子進程阻塞: " << getpid() << std::endl;ssize_t n = read(fd, &code, sizeof(code));if(n == sizeof(code)) // 任務碼{std::cout << "子進程被喚醒: " << getpid() << std::endl;if(code >= 0 && code < tasks.size()){tasks[code]();}else{std::cerr << "父進程給我的任務碼是不對的: " << code << std::endl;}}else if(n == 0){std::cout << "子進程應該退出了: " << getpid() << std::endl;break;}else{std::cerr << "read fd: " << fd << ", error" << std::endl;break;}}
讀取成功進行對應任務處理,為0說明寫端關閉了,子進程可以退出了。
控制發送任務的函數:
void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void CtrlSubProcessHelper(int &index){// 1. 選擇一個通道(進程)int who = index;index+=rand();index %= _channels.size();// 2. 選擇一個任務,隨機int x = rand() % tasks.size(); // [0, 3]// 3. 任務推送給子進程std::cout << "選擇信道: " << _channels[who].name() << ", subtarget : " << _channels[who].target() << std::endl;write(_channels[who].fd(), &x, sizeof(x));sleep(1);}
count表示需要處理的任務個數。
回收函數和回收子進程:
void WaitSubProcesses(){for (auto &c : _channels){c.Close();c.Wait();}}
注意這里關閉管道是并沒有關閉完的,是有問題的?
注意創建子進程時,會拷貝父進程的資源,創建第一個子進程時,父進程關閉讀端,第一個子進程會關閉寫端,再創建一個新的子進程,會繼承父進程的寫端,子進程關閉對應的寫端(但是關閉的是第二次創建管道的寫端,而第一次的寫端被這個新的子進程繼承下來了),所以往后再創建子進程,也會發送同樣的問題,所以需要再每次子進程關閉寫端時,需要將之前繼承下來的寫端也關閉掉。
加入這段代碼:
for(auto&e:_channels)
{std::cout << e.fd() << " ";e.Close();
}
這里也是對進程池進行了封裝:
processpool.hpp
#pragma once
#include"task.hpp"
#include <iostream>
#include<functional>
#include <cstdio>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <sys/wait.h>
#include <vector>const int gprocess_num = 5;
using callback_t =std::function<void(int fd)>;class channel
{
public:channel(){}channel(int wfd, std::string name, pid_t id): _wfd(wfd), _name(name), _sub_target(id){}int fd() { return _wfd; }std::string name() { return _name; }pid_t target() { return _sub_target; }void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_sub_target, nullptr, 0);(void)rid;}~channel(){}
private:int _wfd;std::string _name;pid_t _sub_target;
};class Processpool
{
private:void CtrlSubProcessHelper(int &index){// 1. 選擇一個通道(進程)int who = index;index++;index %= _channels.size();// 2. 選擇一個任務,隨機int x = rand() % tasks.size(); // [0, 3]// 3. 任務推送給子進程std::cout << "選擇信道: " << _channels[who].name() << ", subtarget : " << _channels[who].target() << std::endl;write(_channels[who].fd(), &x, sizeof(x));sleep(1);}
public:Processpool(int num=gprocess_num) : _processnum(num){srand(time(0));}bool InitProcessPool(callback_t cb){for (int i = 0; i < _processnum; i++){int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0){perror("pipe");return false;}pid_t id = fork();if (id < 0){perror("fork");return false;}else if (id == 0){std::cout << "進程:" << getpid() << ", 關閉了: ";for(auto&e:_channels)//處理關閉之前的讀端{std::cout << e.fd() << " ";e.Close();}std::cout<<std::endl;close(pipefd[1]);cb(pipefd[0]);exit(0);}sleep(1);close(pipefd[0]);std::string name = "channel-" + std::to_string(i);_channels.emplace_back(pipefd[1], name, id);}return true;}void PollingCtrlSubProcess(){int index = 0;while (true){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){if (count < 0)return;int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void WaitSubProcesses(){for (auto &c : _channels){c.Close();c.Wait();}}
private:std::vector<channel> _channels;int _processnum;
};
main.cc
#include"processpool.hpp"
#include<vector>int main()
{Processpool pool(5);pool.InitProcessPool([](int fd){while(true){int code = 0;//std::cout << "子進程阻塞: " << getpid() << std::endl;ssize_t n = read(fd, &code, sizeof(code));if(n == sizeof(code)) // 任務碼{std::cout << "子進程被喚醒: " << getpid() << std::endl;if(code >= 0 && code < tasks.size()){tasks[code]();}else{std::cerr << "父進程給我的任務碼是不對的: " << code << std::endl;}}else if(n == 0){std::cout << "子進程應該退出了: " << getpid() << std::endl;break;}else{std::cerr << "read fd: " << fd << ", error" << std::endl;break;}}});sleep(5);// 3. 控制進程池pool.PollingCtrlSubProcess(10);// 4. 結束線程池pool.WaitSubProcesses();std::cout << "父進程控制子進程完成,父進程結束" << std::endl;}
task.hpp
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <functional>
// 4種任務
// task_t[4];using task_t = std::function<void()>;void Download()
{std::cout << "我是一個downlowd任務" << std::endl;
}void MySql()
{std::cout << "我是一個 MySQL 任務" << std::endl;
}void Sync()
{std::cout << "我是一個數據刷新同步的任務" << std::endl;
}void Log()
{std::cout << "我是一個日志保存任務" << std::endl;
}std::vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(Download);tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};Init ginit;
運行結果: