預備知識:
進程通信
進程需要某種協同,協同的前提條件是通信。有些數據是用來通知就緒的,有些是單純的傳輸數據,還有一些是控制相關信息。
進程具有獨立性,所以通信的成本可能稍微高一點;進程間通信前提是讓不同進程看到同一份(操作系統)資源(“一段內存”)。
一定是某一個進程先需要通信,讓OS創建一個共享資源,OS必須提供很多系統調用,OS創建的共享資源不同,系統調用不同,進程通信就會有不同種類。
System V
System V是一套本地通信的標準,通信方式有:
1.消息隊列
2.共享內存
3.信息量
直接復用內核代碼直接通信:命名管道和匿名管道。
管道
匿名管道
父子進程的文件表類似于淺拷貝,不會創建第二份struct file;進程會默認打開標準輸入輸出,就是拷貝了bash的files_struct,繼承了它的文件描述符;struct file內部也包含了引用計數(內存級),因此子進程fd關閉不影響父進程文件使用。
在這里,父子進程看到同一份資源(內核級緩沖區),這個資源叫管道文件,父進程從緩沖區讀,子進程緩沖區寫,就能進行通信,當然,這里的管道通信是單向的。
讓父子進程關掉不需要的描述符,緩沖區不需要刷新,因為不需要寫到磁盤文件,而且寫入效率也低。
創建管道文件
如果想要雙向通信,可以創建兩個管道,單項通信更簡單一些,如果在一個緩沖區雙方都要讀寫,那么會更復雜。
代碼:
#include<iostream>
#include<unistd.h>
#include<cstring>
using namespace std;
void wrprc(int fd)
{string s = "syx 666";int ct = 0;int id = getpid();while (1){string message = to_string(id) + ":" + s + to_string(ct++);write(fd, message.c_str(), message.size());cout << message << endl;sleep(1);}
}
void rdprc(int fd)
{int id = getpid();char buffer[512];while (1){ssize_t n = read(fd, buffer, 511);if(n>0){buffer[511] = '\0';cout << id << ' ' << n << ":" << buffer << endl;sleep(1);}}
}
int main()
{int pipefd[2];int n = pipe(pipefd);if(n!=0){cerr << "error:" << errno << "errorstring" << strerror(errno) << endl;return 1;}//0 read 1 writecout << pipefd[0] << ':' << pipefd[1] << endl;pid_t id = fork();if(id==0){close(pipefd[0]);wrprc(pipefd[1]);close(pipefd[1]);}else{close(pipefd[1]);rdprc(pipefd[0]);close(pipefd[0]);}return 0;
}
運行:
管道的5種特征:
1.匿名管道只能用來進行具有血緣關系的進程之間通信,如父子進程。
2.管道內部,自帶進程之間同步的機制(多執行流執行代碼的時候,具有明顯的順序性),父進程讀取節奏與子進程寫的節奏保持一致,緩沖區可能存在被多個進程同時訪問的情況,導致數據不一致問題(小于PPIPE_BUF字節,寫入是原子的,安全的)。
3.管道文件的生命周期是跟隨進程的生命周期。
4.管道文件在通信的時候,是面向字節流的,寫入和讀取的次數不是一一對應的(讀取會讀取一大堆)。
5.管道的通信模式,是一種特殊的半雙工模式。(不能同時兩個都寫)
管道的4種情況
1.如果管道內部是空的并且寫的fd沒有關閉,讀取條件不具備,讀進程會被阻塞。
2.如果管道被寫滿,并且fd不讀且沒被關閉,寫進程會被阻塞。
3.管道一直再被讀,但是寫被關閉了,讀端會一直讀到0,表示讀到文件結尾。
4.讀fd被關閉,OS會殺掉對應寫的進程(發送13號信號)。
進程池
提前創建一批子進程,有任務將任務交給子進程執行,當管道沒數據,子進程就在阻塞等待,父進程想哪個管道寫入,就是喚醒哪個子進程來處理任務,當然,父進程要進行后端任務的負載均衡(都忙起來)。
//task.hpp
#pragma
#include<iostream>
using namespace std;
#include<stdlib.h>
#include<unistd.h>
void Download()
{cout << "download!" << endl;
}
void Print()
{cout << "print!" << endl;
}
void Flush()
{cout << "flush!" << endl;
}
typedef void (*task)();
#define NUM 3
task tasks[NUM];
void loadTask()
{srand(time(nullptr) ^ getpid());tasks[0] = Download;tasks[1] = Print;tasks[2] = Flush;
}
void exec(int number)
{if(number<0||number>2){return;}tasks[number]();
}
int selectTask()
{return rand() % NUM;
}
//processpool.c
#include<iostream>
#include<string>
#include<unistd.h>
using namespace std;
#include<vector>
#include"task.hpp"
#include<sys/wait.h>
class Channel
{
public:Channel(int w,pid_t i){wfd = w;id = i;}pid_t getid(){return id;}int getwfd(){return wfd;}void closewfd(){close(wfd);}private:int wfd;pid_t id;
};
void work(int rfd)
{while(1){int command = 0;int n = read(rfd, &command, sizeof(command));if (n == sizeof(int)){exec(command);}else{close(rfd);break;}}
}
void createChannel(int num,vector<Channel>* v)
{for (int i = 0; i < num; i++){int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(2);pid_t id = fork();if(id==0){for(auto i:*v){i.closewfd();}close(pipefd[1]);work(pipefd[0]);exit(0);} close(pipefd[0]);v->emplace_back(Channel(pipefd[1], id));}}
void sendTask(Channel ch,int task)
{write(ch.getwfd(),&task,sizeof(task));
}
int main(int argc,char* argv[])
{if(argc!=2){cerr << "number error!" << endl;return 1;}vector<Channel> channels;int now = 0;loadTask();int num = std::stoi(argv[1]);//1.創建子進程和通信createChannel(num, &channels);//2.通過channel發放任務int cnt = 4;while (cnt--){int task = selectTask();cout << channels[now].getid() << ' ' << task << ':' << endl;sendTask(channels[now], task);now++;now %= num;sleep(1);}// 3.回收管道和子進程for(auto &i:channels){i.closewfd();int status;int pid=0;pid = waitpid(i.getid(), &status, 0);if(pid<0)cerr << "error!" << endl;}return 0;
}
這里要注意,子進程創建會繼承父進程的文件描述符表,一定要把不需要寫端關掉。
命名管道
如果兩個進程毫無關系,那么就用命名管道進行通信。
創建命名管道:
mkfifo filename
int mkfifo(const char* filename,mode_t mode);
刪除目錄下文件:
int unlink(const char* path);
server和client進程通信(server讀,client寫):
#namePipe.hpp
#pragma
#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/stat.h>
#include<unistd.h>
#include<cerrno>
#include<fcntl.h>
using namespace std;
#define Size 256
const string Path = "./myfifo";
#define creater 1
#define user 2
class NamedPipe
{
public:NamedPipe(const string &Path,int id) :path(Path),id(id){if(id==creater){int ret = mkfifo(path.c_str(), 0666);if(ret!=0){perror("mkfifo");}}openNamedPipe();}int removeNamedpipe(){int ret = unlink(path.c_str());if(ret!=0){perror("mkfifo");}return ret;}~NamedPipe(){if(id==creater)removeNamedpipe();}int Read(string* out){char buffer[Size];int n = read(fd, buffer, Size);if(n>0){buffer[Size] = 0;*out = buffer;}return n;}void Write(const string& in){write(fd, in.c_str(), in.size());}private : const string path;int id;int fd;int openNamedPipe(){if(id==creater)fd = open(path.c_str(), O_RDONLY);elsefd = open(path.c_str(), O_WRONLY);return fd;}
};
#server.cpp
#include"namedPipe.hpp"
int main()
{NamedPipe pipe(Path,creater);while(1){string messages;int n=pipe.Read(&messages);if(n==0){cout << "client quit!" << endl;break;}cout << messages << endl;}return 0;
}
#client.cpp
#include"namedPipe.hpp"
int main()
{NamedPipe pipe(Path,user);while(1){cout << "in:";string messages;cin >> messages;pipe.Write(messages);}return 0;
}