1. 管道通信
1.1 管道的概念與分類
管道(Pipe) 是進程間通信(IPC)的一種基礎機制,主要用于在具有親緣關系的進程(如父子進程、兄弟進程)之間傳遞數據,其核心特性是通過內核緩沖區實現單向或半雙工的數據傳輸。
- 匿名管道:通常用于具有親緣關系的進程之間通信,如父子進程或兄弟進程。它是半雙工的,數據只能在一個方向上流動,有固定的讀端和寫端,且只存在于內存中,不屬于任何文件系統,但可以使用普通的read、write等函數進行讀寫。
- 命名管道(FIFO):可以在無關的進程之間進行通信,有路徑名與之相關聯,以一種特殊設備文件形式存在于文件系統中。創建后,無關進程可以通過該文件進行通信,通信方式類似于使用文件傳輸數據,遵循先進先出原則。
管道是輕量級且高效的進程間通信方式,適用于簡單的數據流場景,但其單向性和容量限制使其不適合復雜需求。命名管道擴展了應用范圍,但需注意文件系統的依賴。
1.2 管道的原理
在操作系統還不支持進程間通信的時候,人們嘗試使用操作系統已有的功能來實現進程間通信。
要實現進程間通信,就需要兩個進程訪問共享的資源,什么資源是各個進程都可以共享訪問的呢?
答案顯而易見:文件。
父進程打開一個文件并創建子進程,子進程就會繼承父進程的文件描述符表,這樣父子進程就可以訪問同一個文件,通過向文件當中進行讀寫就可以實現進程間通信。
當然,對文件的訪問是需要同步與互斥機制的,這一點由操作系統來實現,我們并不關心。
兩個進程之間的通信一般都是些臨時的小體量的消息,無需將其正真存入到文件當中(而且存入文件當中會造成較大的訪存消耗)。實際上,我們只需要在struct file維護的文件緩沖區當中進行信息交換即可。
于是,在操作系統在這個思路的基礎之上,實現了管道機制。
所謂管道,就是一種特殊的管道文件,其本質上是內核管理的一段環形內存緩沖區,通過文件描述符提供單向或半雙工的數據流傳輸。
2. 匿名管道的使用
2.1 pipe函數
我們說,管道文件是一種特殊的文件,那么其打開的方式(或者說創建的方式)自然也要與一般的文件進行區別。
在Linux當中,我們使用pipe函數來創建一個匿名管道:
#include <unistd.h>
int pipe(int pipefd[2]);
返回值:成功返回 0,失敗返回 -1 并設置 errno。
參數:pipefd 是長度為 2 的整型數組,用于返回兩個文件描述符:
- pipefd[0]:管道的讀端,只能用于讀取數據。
- pipefd[1]:管道的寫端,只能用于寫入數據。
注意,管道只能進行單向數據傳輸,這意味著共享管道的父子進程一個只能讀,一個只能寫。
在實踐當中,我們應當關閉當前進程未使用的端口:
#include <unistd.h>int main()
{int pipefd[2] = {0};int n = pipe(pipefd);if(n == -1){perror("pipe:");return 1;}int id = fork();if(id == 0){// 子進程寫close(fd[0]);// ...}else{// 父進程讀close(fd[1]);// ...}
}
2.2 管道讀寫規則
當沒有數據可讀時:
- O_NONBLOCK disable:read調用阻塞,即進程暫停執行,一直等到有數據來到為止。
- O_NONBLOCK enable:read調用返回 -1,errno值為EAGAIN。
當管道滿的時:
- O_NONBLOCK disable: write調用阻塞,直到有進程讀走數據
- O_NONBLOCK enable:調用返回-1,errno值為EAGAIN
文件描述符關閉:
- 如果所有管道寫端對應的文件描述符被關閉:read不再阻塞而是返回0。
- 如果所有管道讀端對應的文件描述符被關閉:write操作會產生信號SIGPIPE,進而可能導致write進程退出。
原子性規則:?
- 小數據寫入(≤ PIPE_BUF,通常 4KB): 內核保證寫入的原子性,即數據要么完整寫入,要么完全不寫入。
- 大數據寫入(PIPE_BUF): 不保證原子性,數據可能被其他進程的寫入操作穿插,且可能部分寫入。
注:O_NONBLOCK為pipe2的選項(比pipe多一個選項參數)。?
?3. 進程池
學習完匿名管道的基本使用,我們可以動手嘗試編寫一個基于匿名管道的進程池。
平時,各個子進程就阻塞在read處等待,當父進程通過管道對其下達任務時就會將其喚醒。
.hpp后綴的文件其實就是.cpp和.h文件的結合體,類似于java的包。
3.1 Channel.hpp
首先,我們定義一個Channel類用于管理父子進程之間的通信管道(信道):
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void CloseAndWait(){close(_wfd);std::cout << _process << "的信道關閉成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "進程" << _process << "已被成功回收" << std::endl;}// 通過信道將任務提交給子進程執行void ExecuteTask(int code){std::cout << "將任務" << code << "派遣給" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};
由于進程池中進程的數量可能很多,信道也相對變多,我們應當定義一個類來管理這些信道:
class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}// 選擇進程并將任務分派出去void GiveTask(int code){int channel = SelectChannel();std::cout << "選擇進程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}private:// 選擇進程int SelectChannel(){// 輪詢分派任務static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};
3.2 Task.hpp
任務實際上就是一個個的函數,同樣地,由于任務可能有很多,我們也使用一個類來進行管理:
#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:// 注冊,即將任務插入數組并管理起來void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}// 根據任務碼(數組下標)返回相應的任務對象Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};
3.3 ProcessPool.hpp
完成上面的準備工作,我們就可以開始著手構建我們ProcessPool類了,TODO:
- 對ChannelManager和TaskManager進行封裝。
- 提供給用戶插入任務,發布任務等的接口。
- 開啟(Start):創建子進程并使其開始等待任務到達、創建信道并插入ChannelManager。
- 終止(Stop):銷毀信道并回收子進程。
#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已創建" << std::endl;}~ProcessPool(){// 假如用戶忘記終止并回收進程if(_activate){_CM.CloseAndWait();}}// 子進程轉入此函數并循環等待任務到達后執行void Work(int rfd){int code = 0;std::cout << "子進程" << getpid() << "開始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "進程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "進程" << getpid() << "獲取任務時發生錯誤" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子進程close(fds[1]);Work(fds[0]);close(fds[0]);exit(0);}// 父進程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}// 用戶發布任務的接口,交由ChannelManager處理void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "發布任務: " << code << std::endl;_CM.GiveTask(code);}void Stop(){_CM.CloseAndWait();_activate = false;}// 封裝TaskManager的接口,使用戶自定義任務void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};
3.4 Main.cpp
#include "ProcessPool.hpp"
#include <ctime>int main()
{std::cout << "程序啟動" << std::endl;srand((unsigned int)time(nullptr));ProcessPool processpool;// 生成n個測試任務int n = 10;for(int i = 0; i < 10; i++){processpool.RegisterTask(([i](){std::cout << "進程" << getpid() << "正在執行任務" << i << std::endl;}));}processpool.Start();// 隨機發布10個任務while(n--){int code = rand() % 10;processpool.LaunchTask(code);sleep(2);}processpool.Stop();return 0;
}
?3.5?匿名管道的死鎖問題
上面的代碼實際上存在一個嚴重的問題,那就是在10個任務執行結束之后進行信道的銷毀時:
在第一個信道提示關閉之后,并沒有顯式子進程退出的消息,而是直接卡住不動了。查看源代碼會發現問題就是出在這一行,說明在信道被關閉之后,子進程并沒有退出。
這是由于后創建的子進程繼承了父進程對之前創建的子進程的寫端口:?
所以,在父進程的視角上關閉信道之后,管道1的寫端依然沒有完全關閉,子進程就會繼續在read處阻塞等待。子進程因等待父進程下達指令或關閉信道而阻塞;父進程因等待子進程退出而阻塞。?此時就形成了死鎖,導致程序卡住。
解決方案
- 方案1:先關閉所有的信道再等待子進程退出。
- 方案2:逆向關閉信道并退出。
- 方案3:關閉子進程從父進程那里繼承下來的寫入端。
4. 最終代碼
代碼最終采用的是第三種方案,因為該方案的安全性更高,當然前兩種方案被部分注釋了,讀者可以自己嘗試修改死鎖的解決方案。
4.1 Channel.hpp
#include <vector>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void SubProcessCloseBrother(){close(_wfd);}void Close(){std::cout << "關閉" << _process << "的信道" << std::endl;close(_wfd);std::cout << _process << "的信道關閉成功" << std::endl;}void Wait(){waitpid(_process, nullptr, 0);std::cout << "進程" << _process << "已被成功回收" << std::endl;}// 確保調用該函數的信道為當前最后啟動的 或者 事先關閉所有子進程的寫入端,否則會造成死鎖void CloseAndWait(){close(_wfd);std::cout << _process << "的信道關閉成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "進程" << _process << "已被成功回收" << std::endl;}void ExecuteTask(int code){std::cout << "將任務" << code << "派遣給" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}void GiveTask(int code){int channel = SelectChannel();std::cout << "選擇進程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}// 方案1:先關閉后回收void CloseChannels(){for(auto& channel : _Channels){channel.Close();}}void WaitProcesses(){for(auto& channel : _Channels){std::cout << "回收進程" << channel.GetPid() << std::endl;channel.Wait();}}// 方案2:反向關閉回收// void CloseAndWait()// {// for(int i = _Channels.size() - 1; i >= 0; i--)// {// _Channels[i].CloseAndWait();// }// }// 方案3:關閉所有子進程的寫入端,可以任意方式關閉回收void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}void SubProcessCloseBrothers(){for(auto& channel : _Channels){channel.SubProcessCloseBrother();}}private:int SelectChannel(){// 輪詢分派任務static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};
4.2 Task.hpp
#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};
4.3 ProcessPool.hpp
#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已創建" << std::endl;}~ProcessPool(){if(_activate){_CM.CloseChannels();_CM.WaitProcesses();}}void Work(int rfd){int code = 0;std::cout << "子進程" << getpid() << "開始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "進程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "進程" << getpid() << "獲取任務時發生錯誤" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子進程close(fds[1]);// 將子進程的寫入端全部關閉_CM.SubProcessCloseBrothers();Work(fds[0]);close(fds[0]);exit(0);}// 父進程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "發布任務: " << code << std::endl;_CM.GiveTask(code);}void Stop(){// _CM.CloseChannels();// _CM.WaitProcesses();_CM.CloseAndWait();_activate = false;}void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};
4.4 Makefile
ProcessPool:Main.cppg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm ProcessPool