Linux筆記---進程間通信:匿名管道

1. 管道通信

1.1 管道的概念與分類

管道(Pipe) 是進程間通信(IPC)的一種基礎機制,主要用于在具有親緣關系的進程(如父子進程、兄弟進程)之間傳遞數據,其核心特性是通過內核緩沖區實現單向或半雙工的數據傳輸。

  • 匿名管道:通常用于具有親緣關系的進程之間通信,如父子進程或兄弟進程。它是半雙工的,數據只能在一個方向上流動,有固定的讀端和寫端,且只存在于內存中,不屬于任何文件系統,但可以使用普通的read、write等函數進行讀寫。
  • 命名管道(FIFO):可以在無關的進程之間進行通信,有路徑名與之相關聯,以一種特殊設備文件形式存在于文件系統中。創建后,無關進程可以通過該文件進行通信,通信方式類似于使用文件傳輸數據,遵循先進先出原則。

管道是輕量級且高效的進程間通信方式,適用于簡單的數據流場景,但其單向性和容量限制使其不適合復雜需求。命名管道擴展了應用范圍,但需注意文件系統的依賴。

1.2 管道的原理

在操作系統還不支持進程間通信的時候,人們嘗試使用操作系統已有的功能來實現進程間通信。

要實現進程間通信,就需要兩個進程訪問共享的資源,什么資源是各個進程都可以共享訪問的呢?

答案顯而易見:文件。

父進程打開一個文件并創建子進程,子進程就會繼承父進程的文件描述符表,這樣父子進程就可以訪問同一個文件,通過向文件當中進行讀寫就可以實現進程間通信。

當然,對文件的訪問是需要同步與互斥機制的,這一點由操作系統來實現,我們并不關心。

兩個進程之間的通信一般都是些臨時的小體量的消息,無需將其正真存入到文件當中(而且存入文件當中會造成較大的訪存消耗)。實際上,我們只需要在struct file維護的文件緩沖區當中進行信息交換即可。

于是,在操作系統在這個思路的基礎之上,實現了管道機制。

所謂管道,就是一種特殊的管道文件,其本質上是內核管理的一段環形內存緩沖區,通過文件描述符提供單向或半雙工的數據流傳輸。

2. 匿名管道的使用

2.1 pipe函數

我們說,管道文件是一種特殊的文件,那么其打開的方式(或者說創建的方式)自然也要與一般的文件進行區別。

在Linux當中,我們使用pipe函數來創建一個匿名管道:

#include <unistd.h>
int pipe(int pipefd[2]);

返回值:成功返回 0,失敗返回 -1 并設置 errno。

參數:pipefd 是長度為 2 的整型數組,用于返回兩個文件描述符:

  • pipefd[0]:管道的讀端,只能用于讀取數據。
  • pipefd[1]:管道的寫端,只能用于寫入數據。

注意,管道只能進行單向數據傳輸,這意味著共享管道的父子進程一個只能讀,一個只能寫。

在實踐當中,我們應當關閉當前進程未使用的端口:

#include <unistd.h>int main()
{int pipefd[2] = {0};int n = pipe(pipefd);if(n == -1){perror("pipe:");return 1;}int id = fork();if(id == 0){// 子進程寫close(fd[0]);// ...}else{// 父進程讀close(fd[1]);// ...}
}

2.2 管道讀寫規則

當沒有數據可讀時:

  • O_NONBLOCK disable:read調用阻塞,即進程暫停執行,一直等到有數據來到為止。
  • O_NONBLOCK enable:read調用返回 -1,errno值為EAGAIN。

當管道滿的時:

  • O_NONBLOCK disable: write調用阻塞,直到有進程讀走數據
  • O_NONBLOCK enable:調用返回-1,errno值為EAGAIN

文件描述符關閉:

  • 如果所有管道寫端對應的文件描述符被關閉:read不再阻塞而是返回0。
  • 如果所有管道讀端對應的文件描述符被關閉:write操作會產生信號SIGPIPE,進而可能導致write進程退出。

原子性規則:?

  • 小數據寫入(≤ PIPE_BUF,通常 4KB): 內核保證寫入的原子性,即數據要么完整寫入,要么完全不寫入。
  • 大數據寫入(PIPE_BUF): 不保證原子性,數據可能被其他進程的寫入操作穿插,且可能部分寫入。

注:O_NONBLOCK為pipe2的選項(比pipe多一個選項參數)。?

?3. 進程池

學習完匿名管道的基本使用,我們可以動手嘗試編寫一個基于匿名管道的進程池。

平時,各個子進程就阻塞在read處等待,當父進程通過管道對其下達任務時就會將其喚醒。

.hpp后綴的文件其實就是.cpp和.h文件的結合體,類似于java的包。

3.1 Channel.hpp

首先,我們定義一個Channel類用于管理父子進程之間的通信管道(信道):

#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void CloseAndWait(){close(_wfd);std::cout << _process << "的信道關閉成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "進程" << _process << "已被成功回收" << std::endl;}// 通過信道將任務提交給子進程執行void ExecuteTask(int code){std::cout << "將任務" << code << "派遣給" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};

由于進程池中進程的數量可能很多,信道也相對變多,我們應當定義一個類來管理這些信道:

class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}// 選擇進程并將任務分派出去void GiveTask(int code){int channel = SelectChannel();std::cout << "選擇進程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}private:// 選擇進程int SelectChannel(){// 輪詢分派任務static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};

3.2 Task.hpp

任務實際上就是一個個的函數,同樣地,由于任務可能有很多,我們也使用一個類來進行管理:

#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:// 注冊,即將任務插入數組并管理起來void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}// 根據任務碼(數組下標)返回相應的任務對象Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};

3.3 ProcessPool.hpp

完成上面的準備工作,我們就可以開始著手構建我們ProcessPool類了,TODO:

  • 對ChannelManager和TaskManager進行封裝。
  • 提供給用戶插入任務,發布任務等的接口。
  • 開啟(Start):創建子進程并使其開始等待任務到達、創建信道并插入ChannelManager。
  • 終止(Stop):銷毀信道并回收子進程。
#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已創建" << std::endl;}~ProcessPool(){// 假如用戶忘記終止并回收進程if(_activate){_CM.CloseAndWait();}}// 子進程轉入此函數并循環等待任務到達后執行void Work(int rfd){int code = 0;std::cout << "子進程" << getpid() << "開始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "進程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "進程" << getpid() << "獲取任務時發生錯誤" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子進程close(fds[1]);Work(fds[0]);close(fds[0]);exit(0);}// 父進程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}// 用戶發布任務的接口,交由ChannelManager處理void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "發布任務: " << code << std::endl;_CM.GiveTask(code);}void Stop(){_CM.CloseAndWait();_activate = false;}// 封裝TaskManager的接口,使用戶自定義任務void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};

3.4 Main.cpp

#include "ProcessPool.hpp"
#include <ctime>int main()
{std::cout << "程序啟動" << std::endl;srand((unsigned int)time(nullptr));ProcessPool processpool;// 生成n個測試任務int n = 10;for(int i = 0; i < 10; i++){processpool.RegisterTask(([i](){std::cout << "進程" << getpid() << "正在執行任務" << i << std::endl;}));}processpool.Start();// 隨機發布10個任務while(n--){int code = rand() % 10;processpool.LaunchTask(code);sleep(2);}processpool.Stop();return 0;
}

?3.5?匿名管道的死鎖問題

上面的代碼實際上存在一個嚴重的問題,那就是在10個任務執行結束之后進行信道的銷毀時:

在第一個信道提示關閉之后,并沒有顯式子進程退出的消息,而是直接卡住不動了。查看源代碼會發現問題就是出在這一行,說明在信道被關閉之后,子進程并沒有退出。

這是由于后創建的子進程繼承了父進程對之前創建的子進程的寫端口:?

所以,在父進程的視角上關閉信道之后,管道1的寫端依然沒有完全關閉,子進程就會繼續在read處阻塞等待。子進程因等待父進程下達指令或關閉信道而阻塞;父進程因等待子進程退出而阻塞。?此時就形成了死鎖,導致程序卡住。

解決方案
  • 方案1:先關閉所有的信道再等待子進程退出。
  • 方案2:逆向關閉信道并退出。
  • 方案3:關閉子進程從父進程那里繼承下來的寫入端。

4. 最終代碼

代碼最終采用的是第三種方案,因為該方案的安全性更高,當然前兩種方案被部分注釋了,讀者可以自己嘗試修改死鎖的解決方案。

4.1 Channel.hpp

#include <vector>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void SubProcessCloseBrother(){close(_wfd);}void Close(){std::cout << "關閉" << _process << "的信道" << std::endl;close(_wfd);std::cout << _process << "的信道關閉成功" << std::endl;}void Wait(){waitpid(_process, nullptr, 0);std::cout << "進程" << _process << "已被成功回收" << std::endl;}// 確保調用該函數的信道為當前最后啟動的 或者 事先關閉所有子進程的寫入端,否則會造成死鎖void CloseAndWait(){close(_wfd);std::cout << _process << "的信道關閉成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "進程" << _process << "已被成功回收" << std::endl;}void ExecuteTask(int code){std::cout << "將任務" << code << "派遣給" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}void GiveTask(int code){int channel = SelectChannel();std::cout << "選擇進程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}// 方案1:先關閉后回收void CloseChannels(){for(auto& channel : _Channels){channel.Close();}}void WaitProcesses(){for(auto& channel : _Channels){std::cout << "回收進程" << channel.GetPid() << std::endl;channel.Wait();}}// 方案2:反向關閉回收// void CloseAndWait()// {//     for(int i = _Channels.size() - 1; i >= 0; i--)//     {//         _Channels[i].CloseAndWait();//     }// }// 方案3:關閉所有子進程的寫入端,可以任意方式關閉回收void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}void SubProcessCloseBrothers(){for(auto& channel : _Channels){channel.SubProcessCloseBrother();}}private:int SelectChannel(){// 輪詢分派任務static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};

4.2 Task.hpp

#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};

4.3 ProcessPool.hpp

#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已創建" << std::endl;}~ProcessPool(){if(_activate){_CM.CloseChannels();_CM.WaitProcesses();}}void Work(int rfd){int code = 0;std::cout << "子進程" << getpid() << "開始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "進程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "進程" << getpid() << "獲取任務時發生錯誤" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子進程close(fds[1]);// 將子進程的寫入端全部關閉_CM.SubProcessCloseBrothers();Work(fds[0]);close(fds[0]);exit(0);}// 父進程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "發布任務: " << code << std::endl;_CM.GiveTask(code);}void Stop(){// _CM.CloseChannels();// _CM.WaitProcesses();_CM.CloseAndWait();_activate = false;}void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};

4.4 Makefile

ProcessPool:Main.cppg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm ProcessPool

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/80210.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/80210.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/80210.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Ollama API 應用指南

1. 基礎信息 默認地址: http://localhost:11434/api數據格式: application/json支持方法: POST&#xff08;主要&#xff09;、GET&#xff08;部分接口&#xff09; 2. 模型管理 API (1) 列出本地模型 端點: GET /api/tags功能: 獲取已下載的模型列表。示例:curl http://lo…

【OSCP-vulnhub】Raven-2

目錄 端口掃描 本地/etc/hosts文件解析 目錄掃描&#xff1a; 第一個flag 利用msf下載exp flag2 flag3 Mysql登錄 查看mysql的運行權限 MySql提權&#xff1a;UDF 查看數據庫寫入條件 查看插件目錄 查看是否可以遠程登錄 gcc編譯.o文件 創建so文件 創建臨時監聽…

Podman Desktop:現代輕量容器管理利器(Podman與Docker)

前言 什么是 Podman Desktop&#xff1f; Podman Desktop 是基于 Podman CLI 的圖形化開源容器管理工具&#xff0c;運行在 Windows&#xff08;或 macOS&#xff09;上&#xff0c;默認集成 Fedora Linux&#xff08;WSL 2 環境&#xff09;。它提供與 Docker 類似的使用體驗…

極狐GitLab 權限和角色如何設置?

極狐GitLab 是 GitLab 在中國的發行版&#xff0c;關于中文參考文檔和資料有&#xff1a; 極狐GitLab 中文文檔極狐GitLab 中文論壇極狐GitLab 官網 權限和角色 (BASIC ALL) 將用戶添加到項目或群組時&#xff0c;您可以為他們分配角色。該角色決定他們在極狐GitLab 中可以執…

解鎖現代生活健康密碼,開啟養生新方式

在科技飛速發展的當下&#xff0c;我們享受著便捷生活&#xff0c;卻也面臨諸多健康隱患。想要維持良好狀態&#xff0c;不妨從這些細節入手&#xff0c;解鎖科學養生之道。? 腸道是人體重要的消化器官&#xff0c;也是最大的免疫器官&#xff0c;養護腸道至關重要。日常可多…

Kafka 主題設計與數據接入機制

一、前言&#xff1a;萬物皆流&#xff0c;Kafka 是入口 在構建實時數倉時&#xff0c;Kafka 既是 數據流動的起點&#xff0c;也是后續流處理系統&#xff08;如 Flink&#xff09;賴以為生的數據源。 但“消息進來了” ≠ “你就能處理好了”——不合理的 Topic 設計、接入方…

【繪制圖像輪廓|凸包特征檢測】圖像處理(OpenCV) -part7

15 繪制圖像輪廓 15.1 什么是輪廓 輪廓是一系列相連的點組成的曲線&#xff0c;代表了物體的基本外形。相對于邊緣&#xff0c;輪廓是連續的&#xff0c;邊緣不一定連續&#xff0c;如下圖所示。輪廓是一個閉合的、封閉的形狀。 輪廓的作用&#xff1a; 形狀分析 目標識別 …

uniapp中使用<cover-view>標簽

文章背景&#xff1a; uniapp中遇到了原生組件(canvas)優先級過高覆蓋vant組件 解決辦法&#xff1a; 使用<cover-view>標簽 踩坑&#xff1a; 我想實現的是一個vant組件庫中動作面板的效果&#xff0c;能夠從底部彈出框&#xff0c;讓用戶進行選擇&#xff0c;我直…

Kafka常見問題及解決方案

Kafka 是一個強大的分布式流處理平臺&#xff0c;廣泛用于高吞吐量的數據流處理&#xff0c;但在實際使用過程中&#xff0c;也會遇到一些常見問題。以下是一些常見的 Kafka 問題及其對應的解決辦法的詳細解答&#xff1a; 消息丟失 一、原因 1.生產端 網絡故障、生產者超時…

leetcode 二分查找應用

34. Find First and Last Position of Element in Sorted Array 代碼&#xff1a; class Solution { public:vector<int> searchRange(vector<int>& nums, int target) {int low lowwer_bound(nums,target);int high upper_bound(nums,target);if(low high…

【Docker】在容器中使用 NVIDIA GPU

解決容器 GPU 設備映射問題&#xff0c;實現 AI 應用加速 &#x1f517; 官方文檔&#xff1a;NVIDIA Container Toolkit GitHub 常見錯誤排查 若在運行測試容器時遇到以下錯誤&#xff1a; docker: Error response from daemon: could not select device driver ""…

通過Quartus II實現Nios II編程

目錄 一、認識Nios II二、使用Quartus II 18.0Lite搭建Nios II硬件部分三、軟件部分四、運行項目 一、認識Nios II Nios II軟核處理器簡介 Nios II是Altera公司推出的一款32位RISC嵌入式處理器&#xff0c;專門設計用于在FPGA上運行。作為軟核處理器&#xff0c;Nios II可以通…

JAVA設計模式——(三)橋接模式

JAVA設計模式——&#xff08;三&#xff09;橋接模式&#xff08;Bridge Pattern&#xff09; 介紹理解實現武器抽象類武器實現類涂裝顏色的行為接口具體顏色的行為實現讓行為影響武器修改武器抽象類修改實現類 測試 適用性 介紹 將抽象和實現解耦&#xff0c;使兩者可以獨立…

k8s 證書相關問題

1.重新生成新證書 kubeadm init phase certs apiserver-etcd-client --config ~/kubeadm.yaml這個命令表示生成 kube-apiserver 連接 etcd 使用的證書,生成后如下 -rw------- 1 root root 1.7K Apr 23 16:35 apiserver-etcd-client.key -rw-r--r-- 1 root root 1.2K Apr 23 …

比較:AWS VPC peering與 AWS Transit Gateway

簡述: VPC 對等連接和 Transit Gateway 用于連接多個 VPC。VPC 對等連接提供全網狀架構,而 Transit Gateway 提供中心輻射型架構。Transit Gateway 提供大規模 VPC 連接,并簡化了 VPC 間通信管理,相比 VPC 對等連接,支持大量 VPC 的 VPC 間通信管理。 VPC 對等連接 AWS V…

制造企業PLM深度應用:2025年基于PDCA循環的7項持續改進指標

制造企業的產品生命周期管理&#xff08;PLM&#xff09;在數字化轉型的浪潮中扮演著至關重要的角色。PLM深度應用不僅能夠提升產品研發效率、保證產品質量&#xff0c;還能增強企業在市場中的競爭力。隨著2025年智能制造目標的推進&#xff0c;基于PDCA循環的持續改進對于PLM的…

極狐GitLab 的壓縮和合并是什么?

極狐GitLab 是 GitLab 在中國的發行版&#xff0c;關于中文參考文檔和資料有&#xff1a; 極狐GitLab 中文文檔極狐GitLab 中文論壇極狐GitLab 官網 壓縮和合并 (BASIC ALL) 在你處理一個特性分支時&#xff0c;通常會創建一些小的、獨立的提交。這些小提交幫助描述構建特性…

解耦舊系統的利器:Java 中的適配器模式(Adapter Pattern)實戰解析

在現代軟件開發中&#xff0c;我們經常需要與舊系統、第三方庫或不一致接口打交道。這時候&#xff0c;如果能優雅地整合這些不兼容組件&#xff0c;又不破壞原有結構&#xff0c;就需要一位“翻譯官” —— 適配器模式。本文將通過 Java 實例&#xff0c;詳細講解適配器模式的…

03-谷粒商城筆記

一個插件的install和生命周期的報錯是不一樣的 Maven找不到ojdbc6和sqljdbc4依賴包 這時候我找到了jar包&#xff0c;然后我就先找到一個jar安裝到了本地倉庫。 在終端上進行命令了&#xff1a; mvn install:install-file -DfileD:\ojdbc6-11.2.0.4.jar -DgroupIdcom.oracle …

黑馬點評redis改 part 5

達人探店 發布探店筆記 那第一張表block表它里邊的結構呢是這個 首先呢第一個字段是i d&#xff0c;就是主鍵&#xff0c;第二個呢是shop id&#xff0c;就是商戶你發的這個比例啊&#xff0c;它是跟哪個商戶有關系的。第三個呢用戶id就是誰發的這篇筆記&#xff0c;第四個呢標…