🔥 本文專欄:Linux Linux實踐項目
🌸作者主頁:努力努力再努力wz
💪 今日博客勵志語錄:
人生沒有標準答案,你的錯題本也能寫成傳奇。
★★★ 本文前置知識:
匿名管道
1.前置知識回顧(對此十分熟悉的讀者可以跳過)
那么在上一篇博客中,我們知道了進程之間具有通信的需求,因為進程之間需要合作協同完成某項任務,那么就需要各個進程之間進行分工合作,那么進程就需要知道對方完成的進度以及完成的結果,所以進程之間需要通信,但是進程無法直接訪問對方的數據,因為進程之間具有獨立性,所以為了達到進程的通信的需求又保證進程之間的獨立性,那么操作系統采取的策略就是在內存中創建一塊公共區域,那么一個進程向這個公共區域中寫入,另一個進程從該公共區域讀取,就能完成進程的通信
而對于父子進程或者說有血緣關系的進程,那么我們知道創建子進程的過程會拷貝父進程的task_struct結構體,并且修改其中的部分屬性得到子進程自己獨立的一份task_struct結構體,那么其中就會涉及到文件描述表的拷貝,那么意味著子進程會繼承父進程打開的文件,而進程之間通信的核心思想便是創建一個公共區域,而由于子進程和父進程會共享被打開的文件,那么意味著文件就可以作為這個公共區域,所以父子進程通信的方式就是通過文件,所以在創建子進程之前,那么父進程會先創建一份用來通信的文件,而該文件不需要刷新寫入到磁盤當中,因為該文件的內容只是臨時用來保存父子之間寫入的內容,不需要刷新到磁盤長時間來保存,所以需要創建一份內存級別也就是不需要刷新到磁盤的文件,那么其中就要調用pipe接口,那么它會創建兩個分別以只讀權限打開以及只寫權限打開同一個管道文件的file結構體對象,并返回這兩個結構體的文件描述符,然后再調用fork接口創建子進程,那么子進程會繼承父進程創建的兩個以不同權限打開的file結構體對象,而該文件只能單向通信,也就是只能一個進程往該文件中寫入,另一個進程從該文件中讀取,不能雙方同時寫入,不然會造成內容混亂,而正是由于一個進程只能往該文件寫,另一個文件只能從該文件讀,那么這個特點和我們生活中的自來水管道是十分相似的,因為自來水管道只能從一端流入,然后從一端流出,所以該文件又稱之為管道文件,那么為了實現單向通信,就需要父子進程關閉各自其中的一個讀寫端
那么這就是對上文的內容大致回顧,如果你對此感到陌生,那么可以去看我上一期文章
進程池項目介紹
1.進程池的意義
那么這里我們用之前所學的內容來實現一個進程池,其中就包括匿名管道,那么首先在講進程池具體實現之前,那么我們得知道進程池是用來干什么的,它有什么用,也就是做這個進程池有什么意義,那么想必這些問題是讀者對于進程池首先的一個疑惑,所以這里我們就先來認識做進程池的意義
上文前置知識回顧的開篇我就說道過,進程之間需要共同來完成某項任務,那么此時進程就需要分工合作,來完成各自分配的任務,那么假設有這么一個場景,那么你現在有100個task要完成,然后你把這些task都準備交給子進程來完成,那么此時你是如何去分配這些任務給子進程呢?
那么有的小伙伴采取的是這種方式,也就是他先調用fork接口,然后創建一個子進程,然后給該子進程分配一個任務,接著父進程則是等待子進程退出,通過退出碼來查看子進程完成的情況,如果子進程正常退出并且結果正確,那么接著它便繼續調用fork接口重復上面的步驟,也就是循環創建子進程,然后給其分配任務讓其執行,而對于父進程則是等待其子進程退出,獲取其子進程的退出碼,而現在有100個任務,那么意味著這個小伙伴要調用100次fork接口
而還有的小伙伴采取的是另一種方式,那么他不是創建出一個子進程,然后就直接給創建出的該子進程分配任務去完成,他則是先創建出一批的進程,比如20個進程,那么此時創建完這20個進程之后,那么此后他不會再調用fork接口去創建其他新的子進程,而就是利用手頭上持有的這20個進程來完成這100個task,那么就需要父進程依次給這20個子進程分配各自的任務,然后分配完之后,等待這20個進程退出,獲取其退出碼,看進程是否正常退出,然后再依次給執行完任務結束的子進程繼續分配新的任務
那么我們就來比較并且評價一下上面的這兩個小伙伴各自的實現方式,首先明確的是,這兩個小伙伴的實現方式肯定都是正確沒有問題的,也就是說上面的這兩種方式都能夠成功的完成這100個task,但是這兩種方式完成的效率就會有所差別,那么第二個小伙伴的實現的方式的效率要比第一個小伙伴的實現方式的效率要高很多,那么為什么呢?
那么首先我們一定要記住并且理解的一個道理那就是,系統接口的調用是具有代價的,雖然你在代碼中for循環連續100次調用fork接口創建了一批子進程,然后一運行你的代碼,發現程序還是正常運行并且結果正確,但是你要知道的是,fork系統調用接口底層所涉及到的工作,其中就包括會拷貝父進程的task_struct結構體然后修改其中的部分屬性得到子進程自己獨立的一份task_struct結構體,然后創建完子進程的task_struct結構體之后還涉及到寫時拷貝以及頁表的重新映射,并且操作系統還要將創建出來的子進程的task_struct結構體放到相應的隊列中來維護管理比如放到就緒隊列中,那么當子進程運行結束之后,那么又會涉及到子進程的task_struct結構體等各種資源的釋放,那么從子進程的創建以及銷毀所涉及到的工作就可以看出來,那么調用一個fork接口其實是有成本的,就如同以前你看到初中班上學習成績十分優秀的同學,那么他上課的時候總是趴在桌子上睡覺,結果人家考試還次次考全班第一,你看著人家學習很輕松,但其實人家在你看不到的地方其實在偷偷努力,比如晚上學習到凌晨幾點
所以對于第一種實現方式,那么它的缺點就是十分的明顯,那么要多次調用系統接口,那么效率必然不會優秀,而第二種方式相比于第一種方式,那么它則是先創建一批子進程,俗話說磨刀不誤砍柴功,那么這里我們先創建一批進程,但是不讓其執行特定的任務,然后創建完之后,那么我們就只需要讓這創建出的進程輪流去執行這100個任務
那么對于第一種方式,那么假設要交給子進程完成100個task,那么意味著要調用100次fork接口,然后這100個task就分別交給每一個fork創建出來的進程,最終完成這100個task,而對于第二種方式,假設我們預先創建20個進程,然后讓這20個進程輪流執行完成這100個task任務,那么我們來對比一下這兩種方式的效率
那么對于第一種,那么假設完成一個task的代價是k,那么調用fork接口的代價是m,那么第一種實現的方式的總代價就是100m+100K,而對于第二種方式來說,那么它預先創建了20個進程,來執行這100個task,那么對于第二種方式的總代價就是20m+100k,所以粗略估計下來,那么第二種方式明顯比第一種更加優秀
而第二種方式正是我們進程池的實現的核心思想,那么為什么稱其為進程池,我們就可以用和尚下山去取水的故事例子來理解:那么有一個和尚住在一個高山山頂上的一座寺廟,那么它如果要喝水或者洗澡只能到山腳下的小溪中去取水然后再將水運回山頂,那么一旦和尚口渴了或者想洗澡,那么意味著他就要跑到山腳下去取水,那么這樣做明顯代價就太大了并且十分的不劃算,那么為了提升效率,減少上山下山的時間的浪費,那么和尚采取的做法就是在半山腰上建立一個蓄水池,先存儲一大部分水,那么一旦有用水的需求就到這個池子中去即可,而不需要跑到山腳下去運水
所以我們為什么叫起進程“池”,那么這個池字就很形象,那么我們預先創建一批進程的這個過程和上文那個例子中建立一個蓄水池是一個道理,那么我們就不需要在去調用fork來去創建一個進程,直接從創建好的進程池中選取子進程去完成任務即可,那么這就是進程池的意義,目的就是為了提高效率,減少系統調用的開銷
2.進程池的大體框架
那么知道了進程池的意義之后,那么我們再來說一下進程池的實現,那么首先我們腦海中得先有一個大體的實現框架以及思路,也就是說我們得先分析出進程池涉及到的各個模塊,然后再來談這各個模塊具體的代碼的實現
1.進程池的創建
那么根據上文,我們知道,那么我們在執行任務之前,首先得創建一批子進程,那么假設要創建的子進程的數量是n,那么意味著我們會涉及到一個循環,其中在循環內部調用n次fork接口來創建n個子進程,那么其次我們子進程是來完成某項任務的,那么這個任務的發送就得交給父進程,由父進程來分配給子進程要執行的任務,那么這個任務可以通過一個任務碼來傳遞,也就是一個int類型的變量,那么既然父進程要給子進程發送任務碼,那么必然就要涉及到進程之間的通信,而父子進程如何通信,我們也很熟悉了,那么便是通過匿名管道進行通信,所以這里就注意,在調用fork之前,那么我們得先調用pipe接口,所以剛才說的這一系列內容,比如管道以及子進程的創建,我們都可以把它封裝到一個函數模塊中,具體的實現細節下文會提到
2.任務列表的制作
那么我們知道子進程到時候是會通過管道讀取父進程交給它的任務碼,那么任務碼的本質其實就是一個編號,因為到時候我們所有要執行的函數都會有一個函數指針指向它,那么最終會定義一個全局的函數指針數組,那么所謂的任務碼就是對應著這個函數指針數組的一個下標,那么由于定義成了全局的指針數組,那么到時候fork創建子進程,那么子進程也能訪問到這個函數指針數組,那么就可以讀取管道中的任務碼然后根據函數指針數組來執行相應的函數,那么我們要執行的各個任務的邏輯都是封裝在函數當中,而我們函數指針數組就可以理解為任務列表,到時候我們就要完成函數指針數組的初始化,那么這個初始化工作就會交給一個函數來完成
3.子進程執行任務&&父進程傳遞任務
而我們知道我們會通過fork接口來創建子進程,然后利用fork的返回值使得父子進程有著不同的執行流,那么我們知道在創建子進程之前,會首先創建管道文件,那么接著調用fork創建子進程,那么意味著子進程會繼承并且會和父進程共享者打開的管道文件,所以到時候在子進程的執行流中就需要關閉管道文件的寫端,關閉完之后,下一步便是讀取管道文件傳來的任務碼,獲取到任務碼然后執行任務,那么這就是子進程執行任務的大致思路,至于具體的細節,我們下文在進行補充
而父進程對應的代碼段則是想管道文件中寫入子進程要執行的任務碼
4.資源的清理
那么資源的清理便是進程池的最后階段了,那么這個階段的工作就是父進程會關閉之前打開的管道文件,并且等待子進程退出,看子進程是否正常退出,那么具體的實現細節下文會說到
進程池的各個模塊的具體實現
1.進程池的創建
那么這里我們進程池的創建專門放到process_init模塊當中,那么其中會涉及到一個for循環的邏輯,然后在循環調用pipe接口,然后創建管道文件,得到管道文件的讀寫端的文件描述,那么接著再調用fork接口創建子進程,然后利用fork的返回值,讓父子進程有著各自的執行流,那么在子進程的執行流中,那么它會調用close接口來關閉管道文件的寫端,而對于父進程則是關閉管道文件的讀端
那么到時候父進程得要向管道文件中寫入任務碼,那么意味著父進程的知道管道文件的文件描述符,因為到時候向管道文件寫入需要調用write接口,而write接口會接收一個文件描述符作為參數,向該文件描述符所指向的文件中寫入一定字節數,并且我們還得知道該管道文件相連接的是哪個子進程,所以我們得記錄子進程的PID,那么我們可以定義一個channel類,然后內部封裝了兩個成員變量,分別是管道文件的文件描述符以及其連接的子進程的PID,那么父進程在關閉玩對應的管道文件的讀端之后,還要初始化channel對象,將其插入到一個vector數組中,那么vector數組中就維護了創建出來的各個管道的屬性
std::vector<channel> channelarray;
class channel
{public:int _processid;int _write_fd;channel(int processid,int write_fd):_processid(processid),_write_fd(write_fd){}
};
而對于子進程來說,那么它關閉玩管道的寫端之后,接著的任務就是去獲取父進程在管道文件中寫入的任務碼以及執行任務,那么這個內容我們可以封裝到一個start_mission函數模塊中,那么我下文會詳細解析這個函數
其次這里有一個小細節,那么到時候子進程要去管道文件讀取任務碼,那么這里我進行了一個重定向,也就是將子進程的管道文件重定向到標準輸入文件,那么這里就會調用dup2接口,那么其會關閉標準輸入文件,將標準輸入文件的下標的指針指向管道文件,這樣做的好處就是我們子進程在讀取管道文件的輸入的時候,不需要知道管道文件的文件描述符,統一的去標準輸入的文件描述符中讀取即可
dup2(pipefd[0],0);close(pipefd[0]);
void processpool_init()
{for(int i=0;i<processnum;i++){int pipefd[2];int n=pipe(pipefd);if(n<0){perror("pipe fail");exit(EXIT_FAILURE);}int id=fork();if(id<0){perror("fork");close(pipefd[0]);close(pipefd[1]);exit(EXIT_FAILURE);}if(id==0){close(pipefd[1]);dup2(pipefd[0],0);close(pipefd[0]);start_mission();exit(0);}close(pipefd[0]);channelarray.push_back(channel(id,pipefd[1]));}
}
2.任務列表的制作
那么任務列表的制作就非常輕松,那么到時會我們會定義一個全局的函數指針數組,那么其中函數指針數組的每一個元素是一個函數指針指向一個函數,那么我們會將這個數組中的每一個元素給初始化指向對應的函數,那么這個函數就是子進程要執行的任務,那么函數指針數組的下標就是任務碼,那么剛才所說的這些工作都交給mission_load來完成
#define missionnum 4
typedef void (*mission)() ;
std::vector<mission> missionarray;
void task1()
{std::cout<<"I am childprocess: "<<getpid()<<" running task1"<<std::endl;
}
void task2()
{std::cout<<"I am childprocess: "<<getpid()<<" running task2"<<std::endl;
}
void task3()
{std::cout<<"I am childprocess: "<<getpid()<<" running task3"<<std::endl;
}
void task4()
{std::cout<<"I am childprocess: "<<getpid()<<" running task4"<<std::endl;
}
void mission_load()
{missionarray.push_back(task1);missionarray.push_back(task2);missionarray.push_back(task3);missionarray.push_back(task4);
}
3.子進程執行任務&&父進程傳遞任務
那么子進程執行任務我們專門設置了一個start_mission函數模塊來實現,那么其中在start_mission模塊中,就會涉及到一個死循環,因為子進程不可能執行完一個任務就退出了,因為它還要繼續被父進程分配執行下一個任務,就和之前實現shell外殼程序一樣,那么整體的大框架也是一個死循環,那么你獲取以及執行完用戶輸入的一個指令之后你的bash進程不可能就退出結束了吧,同理這里你子進程在獲取父進程向管道文件中寫入的任務碼以及執行對應的函數之后,那么就循環繼續讀取下一次父進程向管道文件中的寫入的任務碼,所以涉及到一個死循環的邏輯
那么讀取任務碼就涉及到調用read接口,那么從上文可知我們已經將管道文件重定向到標準輸入文件,那么這里我們就從標準輸入文件中讀取任務碼,由于函數指針數組是全局變量,那么獲取到任務碼之后,直接根據函數指針數組執行相應的函數即可,而注意還要判斷read的返回值,如果read返回0,說明了此時管道文件的寫端已經被關閉,那么父進程已經關閉了該管道文件的寫端,所以子進程沒必要在進行讀取,所以直接break,然后子進程退出
void start_mission()
{while(true){int staues;int n=read(0,&staues,sizeof(int));if(n==sizeof(int)){if(staues>=0&&staues<missionnum){std::cout<<"我是子進程"<<getpid()<<" 成功獲取到任務碼"<<staues<<std::endl;missionarray[staues]();}}else if(n==0){break;}else if(n<0){perror("read");exit(EXIT_FAILURE);}}
}
而父進程要做的則是傳遞任務,我們同樣也是定義一個process_control函數來實現,那么其中就要注意的就是負載均衡,所謂的負載均衡指的就是我們給創建出來的所有子進程分配任務的時候,希望讓所有子進程都盡可能的分配執行到任務,也就是大家有事干,都別閑著,和操作系統調度進程是一個道理,那么做到負載均衡的方式有兩種,第一種就是隨機分配,那么由于之前我們用數組記錄了每一個管道文件對應的channel對象,其中channel對象保存了子進程的編號,那么假設有n個管道,那么我們可以產生一個0到n-1的隨機數,然后調用對應的子進程,由于產生0到n-1這每一個數的概率肯定是相等,所以可以做到負載均衡
其次第二種方式則是輪詢,那么所謂的輪詢就更加直觀,就是我們先分配給任務按照子進程被創建的順序依次分配,從第一個依次分配到最后一個,最后再回到第一個,那么其中就會涉及到取模運算
void process_control()
{srand((unsigned int)time(NULL));int which=0;for(int i=0;i<100;i++){int cmd=rand()%missionnum;int n=write(channelarray[which]._write_fd,&cmd,sizeof(int));if(n<0){perror("write");exit(EXIT_FAILURE);}std::cout<<"father process send a message to"<<channelarray[which]._processid<<" cmd :"<<cmd<<std::endl;which=(which+1)%processnum; }
}
4.資源的清理
那么最后的資源清理任務則放到process_clean函數模塊,那么這個模塊就是關閉回收管道以及等待子進程,那么這里要注意的一點就是,我們每創建一個子進程,那么該子進程會繼承之前創建出的所有管道文件,這會讓管道文件的引用計數加一,那么子進程以及父進程會關閉各自的讀寫端,會讓其引用計數減一,那么對于最后一個管道文件來說,那么它只被最后一個創建的子進程以及父進程所共享,那么由于子進程與父進程再關閉各自的讀寫端,那么最后一個管道文件的讀寫端的引用計數是1,那么以此往前類推,那么前面的管道文件的讀寫端就是從2開始遞增,所以我們關閉管道文件得從最后一個管道文件往前關閉,不然你從前往后關閉的話,那么管道的引用計數不會為0,那么會導致資源泄漏并且子進程一直陷入阻塞狀態,因為管道的寫端未被關閉并且父進程一直沒有寫入
void process_clean()
{for(int i=l1.size()-1;i>=0;i--){close(channelarray[i]._write_fd);int statues;int n=waitpid(channelarray[i]._processid,&statues,0);if(n<0){perror("waitpid");}else{std::cout<<"子進程"<<channelarray[i]._processid<<"等待成功"<<std::endl;}}
}
完整實現
processpool.cpp
#include"processpool.hpp"
int main()
{mission_load();processpool_init();process_control();process_clean();return 0;
}
processpool.hpp
include<iostream>
#include<unistd.h>
#include<vector>
#include<sys/wait.h>
#include<sys/types.h>
#include<cstdlib>
#include<time.h>
#include"task.hpp"
#define EXIT_FAILURE 1
#define missionnum 4
const int processnum=10;
std::vector<channel> channelarray;
class channel
{public:int _processid;int _write_fd;channel(int processid,int write_fd):_processid(processid),_write_fd(write_fd){}
};
void mission_load()
{missionarray.push_back(task1);missionarray.push_back(task2);missionarray.push_back(task3);missionarray.push_back(task4);
}
void start_mission()
{while(true){int staues;int n=read(0,&staues,sizeof(int));if(n==sizeof(int)){if(staues>=0&&staues<missionnum){std::cout<<"我是子進程"<<getpid()<<" 成功獲取到任務碼"<<staues<<std::endl;missionarray[staues]();}}else if(n==0){break;}else if(n<0){perror("read");exit(EXIT_FAILURE);}}
}
void process_control()
{srand((unsigned int)time(NULL));int which=0;for(int i=0;i<100;i++){int cmd=rand()%missionnum;int n=write(channelarray[which]._write_fd,&cmd,sizeof(int));if(n<0){perror("write");exit(EXIT_FAILURE);}std::cout<<"father process send a message to"<<channelarray[which]._processid<<" cmd :"<<cmd<<std::endl;which=(which+1)%processnum; }
}
void process_clean()
{for(int i=l1.size()-1;i>=0;i--){close(channelarray[i]._write_fd);int statues;int n=waitpid(channelarray[i]._processid,&statues,0);if(n<0){perror("waitpid");}else{std::cout<<"子進程"<<channelarray[i]._processid<<"等待成功"<<std::endl;}}
}
void processpool_init()
{for(int i=0;i<processnum;i++){int pipefd[2];int n=pipe(pipefd);if(n<0){perror("pipe fail");exit(EXIT_FAILURE);}int id=fork();if(id<0){perror("fork");close(pipefd[0]);close(pipefd[1]);exit(EXIT_FAILURE);}if(id==0){close(pipefd[1]);dup2(pipefd[0],0);start_mission();exit(0);}close(pipefd[0]);channelarray.push_back(channel(id,pipefd[1]));}
}
task.hpp
typedef void (*mission)() ;
std::vector<mission> missionarray;
void task1()
{std::cout<<"I am childprocess: "<<getpid()<<" running task1"<<std::endl;
}
void task2()
{std::cout<<"I am childprocess: "<<getpid()<<" running task2"<<std::endl;
}
void task3()
{std::cout<<"I am childprocess: "<<getpid()<<" running task3"<<std::endl;
}
void task4()
{std::cout<<"I am childprocess: "<<getpid()<<" running task4"<<std::endl;
}
運行截圖:
結語
那么這就是本期博客關于進程池的詳細介紹了,那么從進程池的意義以及進程池的實現大體框架到具體細節這幾個維度帶你全面解析進程池,其次注意就是進程池的應用場景一定是要執行任務數量要大于子進程的數量,如果你要執行30個任務,創建27個子進程其實意義不大,那么讀者下來也可以自己實現一個屬于你自己的進程池,那么我的下一期博客會介紹命名管道,那么我會持續更新,希望您能夠多多關注哦,如果本篇文章有幫組到你,還請三連加關注哦,你的支持就是我創作的最大動力!