?進程通信
管道:基于文件級別的單向通信
創建父子進程,使得進程的struct file*fd_array[]的文件描述符指向同一個struct file文件,這個文件是內存級文件。
父進程關寫端,子進程再關閉讀端。實現單向通信
子進程寫入,父進程讀取。
如果進程不是父子關系,則無法利用管道,因此管道應用于父子或者兄弟進程
以上的管道叫做匿名管道。
創建管道:pipe
輸出型參數
pipefd[0]讀下標
pipefd[1]寫下標
代碼示例
#include <iostream>
#include<string>
#include<cstdlib>
#include<unistd.h>
#include<string.h>
#include<sys/types.h>
#include<sys/wait.h>using namespace std;
#define N 2
#define NUM 1024
void Writer(int wfd)
{string s="hello I am child";pid_t self=getpid();int number=0;char buffer[NUM];while(true){buffer[0]=0; //字符串清空//將字符串的內容放入buffer緩沖區中snprintf(buffer,sizeof(buffer),"%s-%d-%d",s.c_str(),self,number++);cout<<buffer<<endl;// 發送給父進程write(wfd,buffer,strlen(buffer));sleep(1);}
}
void Reader(int rfd)
{char buffer[NUM];while(true){buffer[0]=0;// n表示實際讀到字節的大小ssize_t n=read(rfd,buffer,sizeof(buffer));if(n>0){buffer[n]=0; //當成字符串加入"\0"cout<<"father get a message["<<getpid()<<"]#"<<buffer<<endl;}}
}
int main()
{int pipefd[]={0};int n=pipe(pipefd);if(n<0) return 1;cout<<"pipefd[0]:"<<pipefd[0]<<", pipefd[1]: "<<pipefd[1]<<endl;pid_t id=fork();if(id<0) return 2;if(id==0) {close(pipefd[0]);//IPC codeWriter(pipefd[1]);close(pipefd[1]);exit(0);}close(pipefd[1]);Reader(pipefd[0]);pid_t rid=waitpid(id,nullptr,0);if(rid<0) return 3;close(pipefd[0]);return 0;
}
然而多執行流會會出現訪問沖突的問題--父進程訪問的數據到一半時,舊數據被寫端覆蓋。
父子進程協同,保護管道文件數據安全
讀寫端正常,如果管道為空,讀端阻塞
管道文件有大小,寫滿寫端阻塞
讀端正常讀,寫端關閉,寫進程變成僵尸進程,讀端就會讀到0,表明讀到文件結尾,而且不會阻塞
寫端正常寫,讀端關閉,操作系統通過信號殺掉寫入的進程。
ulimit查看pipe size大小,但是不同內核可能有差別
管道面向字節流,一次性讀完,有多少讀多少,且將分割符看成一個普通字符,管道規范可以解決這個問題
管道是基于文件的,文件的生命周期是隨進程的
管道的應用場景
使用管道實現簡易版本的進程池
?Task.hpp
#pragma once
#include<cstdlib>
#include<iostream>
#include<vector>
#include<unistd.h>
#include<assert.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<string>
typedef void (*task_t)();
std::vector<task_t> tasks;//任務是四個函數
void task1()
{std::cout<<"task 1 call"<<std::endl;
}
void task2()
{std::cout<<"task 2 call"<<std::endl;
}
void task3()
{std::cout<<"task 3 call"<<std::endl;
}
void task4()
{std::cout<<"task 4 call"<<std::endl;
}
// const & 輸入 ->向函數內部輸入
// * 輸出
// & 輸入 輸出
void LoadTask(std::vector<task_t> *p_tasks)
{p_tasks->push_back(&task1);p_tasks->push_back(&task2);p_tasks->push_back(&task3);p_tasks->push_back(&task4);
}
?ProcessPool.cc
#include "Task.hpp"
//對管道進行描述#define processnum 10
class channel
{
public:channel(int task_id,int pid,std::string processname):_cmdfd(task_id),_pid(pid),_processname(processname){}
public:int _cmdfd;int _pid;std::string _processname;
};
std::vector<channel> channels;
void slaver()
{while(true){int tasknum=0;ssize_t num=read(0,&tasknum,sizeof(int));//read block,等待輸入// tasknum=tasknum%tasks.size();// (*tasks[tasknum])();//std::cout<<"i and pid"<<i<<pid<<std::endl;if (!num)break;else{std::cout<<"child process "<<getpid()<<"pid: "<<"receive task_id: "<<tasknum<<std::endl;(*tasks[tasknum])();}}
}
void InitProcessPool(std::vector<channel>* pchannels)
{std::vector<int> oldfds;for (int i=0;i<processnum;i++){//create pipeint pipefd[2];int n=pipe(pipefd);assert(!n); // n=0 successpid_t pid=fork();assert(pid!=-1); //pid =-1 fail//child//std::cout<<"i = "<<i;if(pid==0){ std::cout<<"child process:"<<getpid()<<"have otherfds: ";//only one write fdfor(auto oldfd:oldfds){std::cout<<oldfd<<" ";close(oldfd);}std::cout<<std::endl;//build relationshipclose(pipefd[1]);// pipe read from fd=0 not fd=3;dup2(pipefd[0],0);close(pipefd[0]);// slaver();exit(0);}close(pipefd[0]);int status=0;// ensure one by one ,block until child process finish// pid_t result=waitpid(pid,&status,0);// assert(result!=-1);pchannels->push_back(channel(pipefd[1],pid,"process "+std::to_string(i)));oldfds.push_back(pipefd[1]);sleep(1);}}
void menu()
{std::cout<<"*********************"<<std::endl;std::cout<<"******1.task one*****"<<std::endl;std::cout<<"******2.task two*****"<<std::endl;std::cout<<"******3.task three***"<<std::endl;std::cout<<"******4.task four****"<<std::endl;std::cout<<"******0.quit*********"<<std::endl;std::cout<<"*********************"<<std::endl;
}
void ctrlSlaver()
{int which=0;while(true){menu();int enter=0;std::cout<<"enter number:";std::cin>>enter;std::cout<<std::endl;if (enter==0){std::cout<<"quit software"<<std::endl;//ssize_t n=write(0,&enter,0); 不用寫入,直接退出就好了//assert(n!=-1);break;}ssize_t n=write(channels[which]._cmdfd,&enter,sizeof(int));assert(n!=-1);std::cout<<"parent send a task_num "<<enter<<" to process "<<channels[which]._processname<<std::endl;which++;which=which%processnum;}
}
void quitProcess(std::vector<channel>& pchannels)
{for(auto channel:channels){std::cout<<"close process"<<channel._pid<<std::endl;//關閉讀端,進程關閉close(channel._cmdfd);wait(NULL);}}
void PrintTask(const std::vector<task_t> tasks)
{for(auto task:tasks){(*task)();}}
int main()
{// load the taskLoadTask(&tasks);//PrintTask(tasks);InitProcessPool(&channels);ctrlSlaver();quitProcess(channels);return 0;
}