文章目錄
- 池化技術
- 進程池
- 框架及基本思路
- 進程的描述
- 組織
- 管道通信建立的潛在問題
- 任務的描述與組織
- 子進程讀取管道信息
- 控制子進程
- 進程退出及資源回收
池化技術
池化技術是一種編程技巧,一般用于優化資源的分配與復用;
當一種資源需要被使用時這意味著這個資源可能會被進行多次使用或者需要同時使用多個該資源,當出現這種情況時內核將會頻繁的對該資源進行申請并釋放,大大降低整體的效率;
池化技術旨在預先分配一組資源,當用戶層需要使用這些資源時將直接對預先分配資源進行使用;
若是預先分配資源不足以當前使用情況時將再次申請一批,動態增長的資源在使用過后將被釋放以保證不出現資源浪費情況;
所用資源的數量始終>=
預先分配的資源數量;
實現以下幾點:
- 減少資源申請及釋放
- 提高資源使用效率
- 資源數量控制
- 資源動態拓展
常見的池化技術有如下:
-
進程池
適用于大量進程執行短期任務的情況;
-
內存池
預先分配一大塊內存,然后在這塊內存當中劃分出多個小塊內存用于動態分配與回收;
-
線程池
適用于任務處理不需要大量資源單需要大量并發執行情況;
進程池
進程池是一種用于并發執行的資源池技術;
預先創建一定數量的進程用于執行提交給進程池的任務;
這些進程在池中保持活躍狀態并可以快速響應并執行任務而不需要每次任務到來時再創建新的進程從而提高整體工作效率;
進程池通常用于以下幾種情況:
-
性能提高
進程的創建與銷毀具有開銷,尤其在高負載或多任務并發的場景中使用進程池可以避免頻繁的 創建/銷毀 進程從而提高系統性能;
-
資源限制
限制進程池的大小可以避免系統資源(CPU,內存等)被過度消耗;
-
負載均衡
進程池可以通過系統負載情況動態分配任務,使各個進程的工作量保持均衡;
本文模擬實現的進程池通過多個匿名管道實現進程間通信使得一個進程與多個其對應的血緣關系進程進行協同從而形成一個進程池[父寫子讀];
框架及基本思路
-
創建文件
-
ProcessPool.cc
本程序不采用聲明與定義分離的思路,該文件用于
main
函數以及對應的接口函數的聲明及其實現定義; -
Task.hpp
用于設計需要投喂給進程池的任務列表;
-
整體構造采用先描述后組織的方式對進程池進行設置,并以自頂向下的方式進行設計,即先將所需接口以聲明的形式標出而后再對接口進行具體實現;
總體為:
#define PROCESSNUM 5 // 控制子進程創建個數
/* 設計為默認預分配5個進程 *//*...描述
*/int main() {
// 初始化任務列表LoadTask(&tasks);// 組織 // 0.以數據結構的形式將進程進行組織std::vector<channel> channels;// 1.初始化進程池InitProcessPool(&channels);// 2.控制子進程ControlSlaver(channels);// 3.清理收尾QuitProcess(channels);return 0;
}
基本思路為父進程調用pipe()
系統調用接口創建管道文件;
再根據進程池的進程數量通過循環調用fork()
系統調用接口創建子進程,再根據數據流向調用close()
系統調用接口關閉不需要的文件描述符使得父進程能與每個創建的子進程利用管道文件相連接從而構成單向管道通信信道;
當子進程與對應的通信信道被建立后父進程根據描述將子進程以數據結構的方式進行管理;
父進程繼續執行后面的代碼用于對子進程發送任務,子進程通過循環使其保持活躍狀態并等待父進程向管道發送任務并對任務進行處理;
當識別到對應的退出指令后父進程對進程池進行清理同時調用wait()
系統調用接口等待并回收子進程;
進程的描述
一個進程再被組織與管理前需要被進行描述;
基本的信息為:
- 發送任務用的文件描述符
- 進程的
PID
- 進程名
- …
#include "Task.hpp"#include <unistd.h>#include <cassert>#include <string>#include <vector>#include <cstdlib>#include <ctime>#include <sys/wait.h>#include <sys/stat.h>#define PROCESSNUM 5 // 控制子進程創建個數// 描述
class channel {public:// 構造函數channel(int cmdfd, pid_t slaveryid, const std::string& processname): _cmdfd(cmdfd), _slaveryid(slaveryid), _processname(processname) {}public:int _cmdfd; // 發送任務用文件描述符pid_t _slaveryid; // 進程的pidstd::string _processname; // 進程名
};int main() {// 初始化任務列表LoadTask(&tasks);// 組織std::vector<channel> channels;InitProcessPool(&channels);// 2.控制子進程ControlSlaver(channels);// 3.清理收尾// sleep(1000);QuitProcess(channels);return 0;
}
組織
組織的方式為現將所需的進程調用fork()
系統調用接口再將用數據結構將其進行組織以方便后期控制子進程以及管理子進程;
void InitProcessPool(std::vector<channel>* channels) { // 1.初始化pid_t id = fork();if (id < 0) {// 進程創建失敗std::cerr << "fork errno" << std::endl;assert(id >= 0); // 差錯處理 子進程創建失敗}if (id == 0) {// 子進程close(pipefd[1]);// 子進程讀 關閉[1]/*slaver(pipefd[0]);這是一種做法 為從這個描述符當中讀取任務數據并執行任務*/dup2(pipefd[0], 0);// 該種做法為 使用dup2接口進行重定向// 使得子進程的默認輸入從鍵盤改為pipefd[0]中讀取slaver();//默認從文件描述符0 獲取任務信息即可std::cout << "Process : " << getpid() << " quit sucess" << std::endl;exit(0);}// 父進程close(pipefd[0]); // 父進程關閉讀端// 添加字段std::string name = "Process" + std::to_string(i);channels->push_back(channel(pipefd[1], id, name)); // 調用構造函數進行初始化}
}int main(){//...std::vector<channel> channels;InitProcessPool(&channels);//...return 0;
}
子進程創建后采用數據結構進行管理;
本文中實現的進程池對于管道數據流向為 父寫子讀 ;
使用if()
條件判斷區別父子進程,父進程在執行完對應的代碼后將自己部分的該函數的棧幀進行銷毀;
而子進程將調用slaver()
函數從對應的文件描述符中讀取父進程寫進管道中的數據及任務;
對于父進程而言其管理著channel
數組,數組中存放著所有當前子進程的所有信息;
可debug
來嘗試查看對應的channel
數組中所存儲的信息;
/* 在main函數中進行調用 */
void Debug(const std::vector<channel>& channels) {for (auto& c : channels) {std::cout << "cmdfd : " << c._cmdfd << std::endl<< "slaveryid : " << c._slaveryid << std::endl<< "processname : " << c._processname << std::endl;std::cout << "---------------------" << std::endl;}
}
debug
后的結果為:
cmdfd : 4
slaveryid : 9619
processname : Process0
---------------------
cmdfd : 5
slaveryid : 9620
processname : Process1
---------------------
cmdfd : 6
slaveryid : 9621
processname : Process2
---------------------
cmdfd : 7
slaveryid : 9622
processname : Process3
---------------------
cmdfd : 8
slaveryid : 9623
processname : Process4
---------------------
結果中父進程將通過文件描述符 4 ~ 8
向各個子進程發送數據,這些文件描述符是管道的寫端;
子進程將從文件描述符 3
讀取數據;
slaver()
為子進程讀取對應文件描述符,其需要傳入一個參數為文件描述符fd
;
子進程讀取管道數據的方式有兩種:
-
從文件描述符
3
中讀取這個數組是一個臨時的空間,存放著管道文件的讀寫文件描述符,需要父子進程根據數據流向來關閉另外一個不需要的文件描述符;
本文中的文件描述符中父進程為寫方,需要關閉文件描述符
pipefd[0]
即讀端,而子進程需要關閉寫端,但其可以從文件描述符3
也就是pipefd[1]
中讀取父進程寫入管道的數據,但需在調用slaver()
函數時傳入對應的文件描述符即調用
slaver(pipefd[1])
即可; -
從文件描述符
0
中讀取文件描述符
0
一般作為標準輸入,默認從鍵盤等文件進行讀取;即調用
dup2()
系統調用接口將文件描述符3
重定向至文件描述符0
,以減少調用slaver()
函數的傳參步驟,變相減少程序的可維護成本;
管道通信建立的潛在問題
在組織的初始化中存在一個問題,即子進程將冗余存在多個寫端;
子進程為父進程的一份拷貝,當父子進程中其中一個進程被修改時(即對物理內存進行修改);
為了避免一個進程的寫入操作影響到另一個進程,將會發生寫時拷貝操作(不是本節重點);
管道文件是一種內存級文件,沒有實質的Inode
與對應的 數據塊 ;
但是其寫入與讀取的操作是根據文件描述符進行的,在進行fork()
系統調用接口創建子進程時文件描述符也會被拷貝一份,這造成了子進程中存在著冗余的寫端(繼承其父進程的寫端);
本文設計的進程池為退出時父進程關閉寫端從而使子進程讀端讀取失敗即讀取到文件末尾并返回0
后逐步回收子進程;
但出現文件描述符冗余情況時父進程關閉寫端時依舊可能有其他子進程指向該位置的寫端從而導致無法正常將程序退出;
解決辦法有兩種:
-
倒序關閉文件描述符
int main(){// 初始化任務列表...// 組織...// 2.控制子進程...// 3.清理收尾...int last = channels.size()-1;for (; last >= 0;--last){close(channels[last]._cmdfd);sleep(3);waitpid(channels[last]._slaveryid, nullptr, 0);sleep(3);}/* 采用倒序關閉文件描述符的方法確實可以確保在結束時逐一關閉每個子進程對應的寫端這會導致子進程讀端在讀到文件結尾(EOF)時退出循環并且子進程會調用 exit(0)進入僵尸狀態等待父進程回收*/for(const auto &c:channels ){close(c._cmdfd);}sssleep(5);for (const auto& c : channels) {waitpid(c._slaveryid, nullptr, 0);}sleep(5);return 0; }
在父進程發出退出指令時倒序遍歷
channel
數組,父進程依次關閉對應的寫端,當關閉最后一個進程的寫端時其讀端將會讀取到0
,而后根據程序設計對資源進行回收;當該子進程退出時其指向上一個管道文件的寫端指向將消失,而后依次進行回收;
該方法可以使得程序正常退出,但無法解決實質性問題即父子進程中建立的為單向通信信道,在程序運行中不符合管道的單向信道規范;
-
在初始化中關閉多余的文件描述符
在初始化中關閉文件描述符可以保證程序在運行當中可以存在正確的單向通信規范;
具體操作為父進程采用數據結構保留對應的寫端文件描述符,并在下次
fork()
創建子進程時在子進程中遍歷數組從而能夠關閉對應的文件描述符;void InitProcessPool(std::vector<channel>* channels) { // 1.初始化//version 2 -- 確保每個子進程只有一個寫端std::vector<int> oldfds;for (int i = 0; i < PROCESSNUM; ++i) {// 子進程創建時先創建管道int pipefd[2];int n = pipe(pipefd);assert(!n); // 差錯處理 pipe創建管道失敗(void)n;pid_t id = fork();if (id < 0) {// 進程創建失敗std::cerr << "fork errno" << std::endl;assert(id >= 0); // 差錯處理 子進程創建失敗}if (id == 0) {// 子進程close(pipefd[1]);for(const auto& fd:oldfds){close(fd);/*這里的close不會調用失敗在下文中的oldfds數組將會記錄父進程對上一個子進程的寫端 所以會導致子進程會對其他子進程存在寫端所以這里的close并不會調用失敗 因為對于每個子進程而言 數組中的所有文件描述符都是有效的文件描述符*/}// 子進程讀 關閉[1]/*slaver(pipefd[0]);這是一種做法 為從這個描述符當中讀取任務數據并執行任務*/dup2(pipefd[0], 0);// 該種做法為 使用dup2接口進行重定向// 使得子進程的默認輸入從鍵盤改為pipefd[0]中讀取slaver();//默認從文件描述符0 獲取任務信息即可std::cout << "Process : " << getpid() << " quit sucess" << std::endl;exit(0);}// 父進程close(pipefd[0]); // 父進程關閉讀端// 添加字段std::string name = "Process" + std::to_string(i);channels->push_back(channel(pipefd[1], id, name)); // 進行初初始化oldfds.push_back(pipefd[1]);// Debug// std::cout << "==============================" << std::endl;// std::cout << "對oldfds數組進行打印 [" << debugn <<"]@ "<< std::endl;// debugn++;// for (const auto& fd : oldfds) {// std::cout << fd << std::endl;// }// std::cout << "==============================" << std::endl;} }
任務的描述與組織
任務的描述與組織在Task.hpp
文件當中;
思路為以數據結構和函數指針相配合的方式將函數(任務)以加載的方式進行管理(在main
函數所在文件中需要注意使用extern
聲明);
/* ######## *//* Task.hpp *//* ######## */#pragma once#include <iostream>#include <vector>typedef void (*task_t)(); // 定義函數指針void task1() { std::cout << "任務1 : 數據初始化(Data Initialization)" << std::endl;
}
void task2(){std::cout << "任務2 : 更新數據(Update Data)" << std::endl;
}
void task3(){std::cout << "任務3 : 獲取數據(Retrieve Data)" << std::endl;
}
void task4(){std::cout << "任務4 : 數據驗證(Data Validation)" << std::endl;
}
void task5(){std::cout << "任務5 : 生成報告(Generate Report)" << std::endl;
}
void task6(){std::cout << "任務6 : 備份數據(Backup Data)" << std::endl;
}void LoadTask(std::vector<task_t> *tasks){tasks->push_back(task1);tasks->push_back(task2);tasks->push_back(task3);tasks->push_back(task4);tasks->push_back(task5);tasks->push_back(task6);
}
該處task
數組中的下標即為任務碼;
通過數組的形式訪問函數指針從而調用對應函數;
子進程讀取管道信息
void slaver() {// 子進程讀取父進程向管道內寫入的信息while (true) {/*debug//每個子進程都將從文件描述符rfd中進行讀取在當前程序中 rfd為3重定向后為0// std::cout << getpid() << " - read fd is : " << rfd << std::endl;*/int cmdcode = 0;int n = read(0,&cmdcode,sizeof(int));if(n == sizeof(int)){//執行cmdcode對應的任務列表std::cout <<"slaver get a cmdcode @["<< getpid() << "] : cmdcode : " << cmdcode << std :: endl;if (cmdcode < 0 || cmdcode > (int)tasks.size()) continue;tasks[cmdcode]();//指針數組std::cout << "--------------------------" << std::endl;}if(n == 0)//說明寫端被關閉 0表示文件結尾break;// sleep(100);}
}
以循環的方式通過調用read()
系統調用接口分別向各自管道內讀取數據,若是管道不存在數據則進行等待(默認行為),意味著子進程不需要調用sleep()
等接口來進行時間的延長;
子進程讀取到對應信息時將對數據進行分析并進行處理(根據需求,當前簡易進程池中不需要對數據進行分析);
當讀取到0
時說明讀取到文件末尾,即寫端被關閉,此時子進程退出并進入僵尸狀態等待被父進程回收;
控制子進程
子進程可以讀取父進程寫入管道文件的數據時父進程可以根據需求將對應的任務碼通過write()
系統調用接口寫入管道文件當中并等待子進程讀取處理;
該處為三步操作:
-
選擇任務
任務即為
tasks
函數指針數組中所存儲的各個函數指針;通過下標的方式進行選擇,其中下標即為任務對應的任務碼;
-
選擇進程
為避免多個任務在同一個進程下進行等待而降低效率,進程的選擇需要依靠負載均衡;
常見的負載均衡包括:
-
隨機
通過偽隨機或是真隨機的方式為各個子進程分配任務;
-
輪詢/輪轉
通過輪詢/輪轉的方式時多個子進程為一個周期輪流分配任務;
-
最少連接
選擇當前連接數(需處理數據數量)最少的進程優先為其分配任務;
-
…
-
-
發送任務(任務碼)
調用
write()
系統調用接口將任務碼和所需數據寫入至管道文件中;
void ControlSlaver(const std::vector<channel> &channels){
//向子進程派發任務//需要隨機Menu();for (int i = 1; i <= 100; ++i) {//(1) 選擇任務// int cmdcode = rand() % tasks.size();std::cout << "Please Enter your choic :";int choice = 0;std::cin >> choice;if(choice==0){std::cout << "正在退出" << std::endl;return;}choice -= 1;//(2) 選擇進程 -- 需要負載均衡 (隨機數或是輪詢 此處使用隨機)int fd = rand() % PROCESSNUM;//(3) 發送任務 (任務碼)std::cout << "Parent Process say : " << std::endl<< "cmdcode = " << choice << " alread sendto "<< channels[fd]._processname << channels[fd]._slaveryid<< std::endl;write(channels[fd]._cmdfd, &choice, sizeof(choice));sleep(1);}
}
需要時可在控制接口中打印菜單:
void Menu(){std::cout << "#############################################\n";std::cout << "# 主菜單 #\n";std::cout << "#############################################\n";std::cout << "# 1. 數據初始化 #\n";std::cout << "# 2. 更新數據 #\n";std::cout << "# 3. 獲取數據 #\n";std::cout << "# 4. 數據驗證 #\n";std::cout << "# 5. 生成報告 #\n";std::cout << "# 6. 備份數據 #\n";std::cout << "# 0. 退出 #\n";std::cout << "#############################################\n";
}
進程退出及資源回收
兩種回收方式:
-
倒序關閉文件描述符
參考上文,此處不贅述;
-
正常退出回收
正常回收的情況下父子進程間的匿名管道必須是單向信道;
即當進程池的一批子進程被創建完畢后應及時對冗余的寫端(子進程的)進行關閉;
即可正常回收;
退出與回收即為父進程遍歷對應的channels
數組,將對應的寫端(文件描述符)進行關閉;
當關閉文件描述符時讀端(子進程)的read()
系統調用接口將默認讀取到0
表示讀取到了文件結束符號,在子進程的slaver()
接口中根據依次判斷是否讀取到0
來依次退出子進程的循環,使得子進程正常退出進入僵尸狀態;
父進程則調用waitpid()
系統調用接口輪流回收已經進入僵尸狀態的子進程;
void QuitProcess(const std::vector<channel> channels)
{// 正常回收 - 對應的需要在一批子進程被創建完畢后應及時對冗余的寫端(子進程的)進行關閉for(const auto &c:channels ){close(c._cmdfd);waitpid(c._slaveryid, nullptr, 0);}// // version 1 - 倒序關閉文件描述符回收法// int last = channels.size()-1;// for (; last >= 0;--last){// close(channels[last]._cmdfd);// sleep(3);// waitpid(channels[last]._slaveryid, nullptr, 0);// sleep(3);// }// /* // 采用倒序關閉文件描述符的方法確實可以確保在結束時逐一關閉每個子進程對應的寫端// 這會導致子進程讀端在讀到文件結尾(EOF)時退出循環// 并且子進程會調用 exit(0)// 進入僵尸狀態等待父進程回收// */// for(const auto &c:channels ){// close(c._cmdfd);// }ss// sleep(5);// for (const auto& c : channels) {// waitpid(c._slaveryid, nullptr, 0);// }// sleep(5);
}
- 完整代碼(供參考):
[參考代碼(gitee) - DIo夾心小面包 (半介莽夫)]