Python 之 進程

目錄

理論知識

操作系統背景知識

進程

什么是進程

進程調度

進程的并行與并發

同步異步阻塞非阻塞

同步和異步

阻塞與非阻塞

同步/異步與阻塞/非阻塞 組合

進程的創建與結束

進程的創建

進程的結束

在python程序中的進程操作

multiprocess(multiprocessing)模塊

multiprocessing下的Process(P大寫)模塊介紹

使用Process模塊創建進程在一個python進程中開啟子進程,start方法和并發效果。

守護進程

socket聊天并發實例

多進程中的其他方法

進程同步(multiprocess.Lock 鎖、multiprocess.Semaphore 信號量、multiprocess.Event 事件)

鎖 —— multiprocess.Lock

信號量 —— multiprocess.Semaphore(了解)

事件 —— multiprocess.Event(了解)

進程間通信——隊列(multiprocess.Queue)

進程間通信

隊列

生產者消費者模型

JoinableQueue([maxsize])

進程之間的數據共享

進程池和multiprocess.Pool模塊

進程池

multiprocess.Pool模塊


理論知識

操作系統背景知識

顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。

進程的概念起源于操作系統,是操作系統最核心的概念,也是操作系統提供的最古老也是最重要的抽象概念之一。操作系統的其他所有內容都是圍繞進程的概念展開的。

所以想要真正了解進程,必須事先了解操作系統,點擊進入? ??

PS:即使可以利用的cpu只有一個(早期的計算機確實如此),也能保證支持(偽)并發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路復用和空間多路復用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。

必備的理論基礎:

一 操作系統的作用:
? 1:隱藏丑陋復雜的硬件接口,提供良好的抽象接口
? 2:管理、調度進程,并且將多個進程對硬件的競爭變得有序

二 多道技術:
? 1.產生背景:針對單核,實現并發
? ps:現在的主機一般是多核,那么每個核都會利用多道技術。例如:有4個cpu,運行于cpu1的某個程序遇到io阻塞,會等到io結束再重新調度,且會被調度到4個cpu中的任意一個,具體由操作系統調度算法決定。
? 2.空間上的復用:如內存中同時有多道程序
? 3.時間上的復用:復用一個cpu的時間片
? 強調:遇到io切,占用cpu時間過長也切,核心在于切之前將進程的狀態保存下來,這樣才能保證下次切換回來時,能基于上次切走的位置繼續運行

進程

什么是進程

進程(Process)是計算機中的程序關于某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。

狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。

廣義定義:進程是一個具有一定獨立功能的程序關于某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。

第一,進程是一個實體。每一個進程都有它自己的地址空間,一般情況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲著活動過程調用的指令和本地變量。
第二,進程是一個“執行中的程序”。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操作系統執行之時),它才能成為一個活動的實體,我們稱其為進程。
第三,進程是操作系統中最基本、重要的概念。是多道程序系統出現后,為了刻畫系統內部出現的動態情況,描述系統內部各道程序的活動規律引進的一個概念,所有多道程序設計操作系統都建立在進程的基礎上。

從理論角度看,是對正在運行的程序過程的抽象;
從實現角度看,是一種數據結構,目的在于清晰地刻畫動態系統的內在規律,有效管理和調度進入計算機系統主存儲器運行的程序。
動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。
并發性:任何進程都可以同其他進程一起并發執行
獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
異步性:由于進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度向前推進
結構特征:進程由程序、數據和進程控制塊三部分組成。
多個不同的進程可以包含相同的程序:一個程序在不同的數據集里就構成不同的進程,能得到不同的結果;但是執行過程中,程序不能發生改變。

程序和進程的區別

程序是指令和數據的有序集合,其本身沒有任何運行的含義,是一個靜態的概念。
而進程是程序在處理機上的一次執行過程,它是一個動態的概念。
程序可以作為一種軟件資料長期存在,而進程是有一定生命期的。
程序是永久的,進程是暫時的。

注意:同一個程序執行兩次,就會在操作系統中出現兩個進程,所以我們可以同時運行一個軟件,分別做不同的事情也不會混亂。

進程調度

要想多個進程交替運行,操作系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是需要遵循一定的法則,由此就有了進程的調度算法。

先來先服務調度算法
? ? ? 先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可用于作業調度,也可用于進程調度。FCFS算法比較有利于長作業(進程),而不利于短作業(進程)。由此可知,本算法適合于CPU繁忙型作業,而不利于I/O繁忙型的作業(進程)。

短作業優先調度算法
? ? ? 短作業(進程)優先調度算法(SJ/PF)是指對短作業或短進程優先調度的算法,該算法既可用于作業調度,也可用于進程調度。但其對長作業不利;不能保證緊迫性作業(進程)被及時處理;作業的長短只是被估算出來的。

時間片輪轉法
? ? ? 時間片輪轉(Round Robin,RR)法的基本思路是讓每個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,需要將CPU的處理時間分成固定大小的時間片,例如,幾十毫秒至幾百毫秒。如果一個進程在被調度選中之后用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放自己所占有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。
? ? ? 顯然,輪轉法只能用來調度分配一些可以搶占的資源。這些可以搶占的資源可以隨時被剝奪,而且可以將它們再分配給別的進程。CPU是可搶占資源的一種。但打印機等資源是不可搶占的。由于作業調度是對除了CPU之外的所有系統硬件資源的分配,其中包含有不可搶占資源,所以作業調度不使用輪轉法。
? ? ? 在輪轉法中,時間片長度的選取非常重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。如果時間片長度過短,則調度程序搶占處理機的次數增多。這將使進程上下文切換次數也大大增加,從而加重系統開銷。反過來,如果時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所允許最大的進程數來確定的。

? ? ? 在輪轉法中,加入到就緒隊列的進程有3種情況:
? ? ? 一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。
? ? ? 另一種情況是分給該進程的時間片并未用完,只是因為請求I/O或由于進程的互斥與同步關系而被阻塞。當阻塞解除之后再回到就緒隊列。
? ? ? 第三種情況就是新創建進程進入就緒隊列。
? ? ? 如果對這些進程區別對待,給予不同的優先級和時間片從直觀上看,可以進一步改善系統服務質量和效率。例如,我們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞原因分成不同的就緒隊列,每個隊列按FCFS原則排列,各隊列之間的進程享有不同的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片之后,或從睡眠中被喚醒以及被創建之后,將進入不同的就緒隊列。 ?

多級反饋隊列
? ? ? 前面介紹的各種用作進程調度的算法都有一定的局限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,而且如果并未指明進程的長度,則短進程優先和基于進程長度的搶占式調度算法都將無法使用。
? ? ? 而多級反饋隊列調度算法則不必事先知道各種進程所需的執行時間,而且還可以滿足各種類型進程的需要,因而它是目前被公認的一種較好的進程調度算法。在采用多級反饋隊列調度算法的系統中,調度算法的實施過程如下所述。
? ? ? (1) 應設置多個就緒隊列,并為各個隊列賦予不同的優先級。第一個隊列的優先級最高,第二個隊列次之,其余各隊列的優先權逐個降低。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,為每個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長一倍,……,第i+1個隊列的時間片要比第i個隊列的時間片長一倍。
? ? ? (2) 當一個新進程進入內存后,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,便可準備撤離系統;如果它在一個時間片結束時尚未完成,調度程序便將該進程轉入第二隊列的末尾,再同樣地按FCFS原則等待調度執行;如果它在第二隊列中運行一個時間片后仍未完成,再依次將它放入第三隊列,……,如此下去,當一個長作業(進程)從第一隊列依次降到第n隊列后,在第n 隊列便采取按時間片輪轉的方式運行。
? ? ? (3) 僅當第一隊列空閑時,調度程序才調度第二隊列中的進程運行;僅當第1~(i-1)隊列均空時,才會調度第i隊列中的進程運行。如果處理機正在第i隊列中為某進程服務時,又有新進程進入優先權較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進程將搶占正在運行進程的處理機,即由調度程序把正在運行的進程放回到第i隊列的末尾,把處理機分配給新到的高優先權進程。

進程的并行與并發

并行?:?并行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個線程,四核的CPU )
并發?:?并發是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人,A走一段后,讓給B,B用完繼續給A ,交替使用,目的是提高效率。

區別:

并行是從微觀上,也就是在一個精確的時間片刻,有不同的程序在執行,這就要求必須有多個處理器。
并發是從宏觀上,在一個時間段上可以看出是同時執行的,比如一個服務器同時處理多個session。

同步異步阻塞非阻塞

狀態介紹

首先了解進程的幾個狀態。在程序運行的過程中,由于被操作系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。

就緒(Ready)狀態:當進程已分配到除CPU以外的所有必要的資源,只要獲得處理機便可立即執行,這時的進程狀態稱為就緒狀態。
執行/運行(Running)狀態:當進程已獲得處理機,其程序正在處理機上執行,此時的進程狀態稱為執行狀態。
阻塞(Blocked)狀態:正在執行的進程,由于等待某個事件發生而無法執行時,便放棄處理機而處于阻塞狀態。引起進程阻塞的事件可有多種,例如,等待I/O完成、申請緩沖區不能滿足、等待信件(信號)等。

同步和異步

同步就是等其他人完成,你才完成,異步就是注冊一個回調,對方完成告訴你完成了

所謂同步?就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列。要么成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。

所謂異步?是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了。至于被依賴的任務最終是否真正完成,依賴它的任務無法確定,所以它是不可靠的任務序列。

比如我去銀行辦理業務,可能會有兩種方式:
第一種:選擇排隊等候;
第二種:選擇取一個小紙條上面有我的號碼,等到排到我這一號時由柜臺的人通知我輪到我去辦理業務了;

第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務情況;
第二種:后者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)
? ? ? ?往往注冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這里是柜臺的人)通過某種機制(在這里是寫在小紙條上的號碼,
? ? ? ?喊號)找到等待該事件的人。

阻塞與非阻塞

阻塞就是你等的時候不做其他的事,非阻塞就是你等的時候可以做其他的事

阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來說的

繼續上面的那個例子,不論是排隊還是使用號碼等待通知,如果在這個等待的過程中,等待者除了等待消息通知之外不能做其它的事情,那么該機制就是阻塞的,表現在程序中,也就是該程序一直阻塞在該函數調用處不能繼續往下執行。
相反,有的人喜歡在銀行辦理這些業務的時候一邊打打電話發發短信一邊等待,這樣的狀態就是非阻塞的,因為他(等待者)沒有阻塞在這個消息通知上,而是一邊做自己的事情一邊等待。

注意:同步非阻塞形式實際上是效率低下的,想象一下你一邊打著電話一邊還需要抬頭看到底隊伍排到你了沒有。如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒有這樣的問題,因為打電話是你(等待者)的事情,而通知你則是柜臺(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換。

同步/異步與阻塞/非阻塞 組合

同步阻塞形式
效率最低。拿上面的例子來說,就是你專心排隊,什么別的事都不做。

異步阻塞形式
如果在銀行等待辦理業務的人采用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間里他不能離開銀行做其它的事情,那么很顯然,這個人被阻塞在了這個等待的操作上面;
異步操作是可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。

同步非阻塞形式
實際上是效率低下的。想象一下你一邊打著電話一邊還需要抬頭看到底隊伍排到你了沒有,如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的。

異步非阻塞形式
效率更高,因為打電話是你(等待者)的事情,而通知你則是柜臺(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換
比如說,這個人突然發覺自己煙癮犯了,需要出去抽根煙,于是他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那么他就沒有被阻塞在這個等待的操作上面,自然這個就是異步+非阻塞的方式了。

很多人會把同步和阻塞混淆,是因為很多時候同步操作會以阻塞的形式表現出來,同樣的,很多人也會把異步和非阻塞混淆,因為異步操作一般都不會在真正的IO操作處被阻塞

進程的創建與結束

進程的創建

但凡是硬件,都需要有操作系統去管理,只要有操作系統,就有進程的概念,就需要有創建進程的方式,一些操作系統只為一個應用程序設計,比如微波爐中的控制器,一旦啟動微波爐,所有的進程都已經存在。

而對于通用系統(跑很多應用程序),需要有系統運行過程中創建或撤銷進程的能力,主要分為4中形式創建新的進程:

  • 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,后臺運行的進程與用戶無關,運行在后臺并且只在需要時才喚醒的進程,稱為守護進程,如電子郵件、web頁面、新聞、打印)
  • 一個進程在運行過程中開啟了子進程(如nginx開啟多進程,os.fork,subprocess.Popen等)
  • 用戶的交互式請求,而創建一個新進程(如用戶雙擊暴風影音)
  • 一個批處理作業的初始化(只在大型機的批處理系統中應用)

無論哪一種,新進程的創建都是由一個已經存在的進程執行了一個用于創建進程的系統調用而創建的。

1. 在UNIX中該系統調用是fork,fork會創建一個與父進程一模一樣的副本,二者有相同的存儲映像、同樣的環境字符串和同樣的打開文件(在shell解釋器進程中,執行一個命令就會創建一個子進程)
2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的創建,也負責把正確的程序裝入新進程。

關于創建子進程,UNIX和windows
1.相同的是:進程創建后,父進程和子進程有各自不同的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另外一個進程。
2.不同的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是可以有只讀的共享內存區的。但是對于windows系統來說,從一開始父進程與子進程的地址空間就是不同的。

進程的結束

1. 正常退出(自愿,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自愿,python a.py中a.py不存在)
3. 嚴重錯誤(非自愿,執行非法指令,如引用不存在的內存,1/0等,可以捕捉異常,try...except...)
4. 被其他進程殺死(非自愿,如kill -9)

在python程序中的進程操作

剛剛我們已經了解了,運行中的程序就是一個進程。所有的進程都是通過它的父進程來創建的。因此,運行起來的python程序也是一個進程,那么我們也可以在程序中再創建進程。多個進程可以實現并發效果,也就是說,當我們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以我們之前所學的知識,并不能實現創建進程這個功能,所以我們就需要借助python中強大的模塊。

multiprocess(multiprocessing)模塊

仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由于提供的子模塊非常多,為了方便大家歸類記憶,將這部分大致分為四個部分:創建進程部分,進程同步部分,進程池部分,進程之間數據共享。

multiprocessing下的Process(P大寫)模塊介紹

Process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。

Process([group [, target [, name [, args [, kwargs]]]]])

由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

注意

  • 需要使用關鍵字的方式來指定參數
  • args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

參數

  • group? # 參數未使用,值始終為None
  • target? #??表示調用對象,即子進程要執行的任務
  • args? ? ?#??表示調用對象的位置參數元組,args=(1,2,'egon',)
  • kwargs #?表示調用對象的字典,kwargs={'name':'egon','age':18}
  • name? ?# 為子進程的名稱

方法

  • p.start()? #?啟動進程,并調用該子進程中的p.run()
  • p.run()? ? #?進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
  • p.terminate()? ? ?# 強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,
    使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
  • p.is_alive() # 如果p仍然運行,返回True
  • p.join([timeout]) #?主線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。
    timeout # 是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

屬性:

  • p.daemon? ? #?默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程終止時,p也隨之終止,
    并且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
  • p.name? ? ? ? #?進程的名稱
  • p.pid? ? ? ? ? ? #?進程的pid
  • p.exitcode? ?# 進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
  • p.authkey? ? #進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。
    這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

在windows中使用process模塊的注意事項:
? ? ? ? 在Windows操作系統中由于沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,
而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。
所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。

使用Process模塊創建進程在一個python進程中開啟子進程,start方法和并發效果。

import time
from multiprocessing import Processdef f(name):print('hello', name)time.sleep(1)print('我是子進程')if __name__ == '__main__':p = Process(target=f, args=('bob',))p.start()# p.join()  使用join 會 hang 住,等待子進程運行完成,才繼續執行print('我是父進程')# 我是父進程
# 我是子進程

子進程id?

import os
from multiprocessing import Processdef f(x):print('子進程id :',os.getpid(),'父進程id :',os.getppid())return x*xif __name__ == '__main__':print('主進程id :', os.getpid())p_lst = []for i in range(5):p = Process(target=f, args=(i,))p.start()
# 進階,多個進程同時運行(注意,子進程的執行順序不是根據啟動順序決定的)
import time
from multiprocessing import Processdef f(name):print('hello', name)time.sleep(1)if __name__ == '__main__':p_lst = []for i in range(5):p = Process(target=f, args=('bob',))p.start()p_lst.append(p)p.join()                # 這時會等待p運行完成之后再運行后續代碼# [p.join() for p in p_lst] # 如果使用這個,則不會再循環中等待代碼運行完,而在這個時候運行完成,比上一行代碼好print('父進程在執行')

除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式

import os
from multiprocessing import Processclass MyProcess(Process):def __init__(self,name):super().__init__()self.name=namedef run(self):print(os.getpid())print('%s 正在和女主播聊天' %self.name)p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')p1.start() #start會自動調用run
p2.start()
# p2.run()p1.join()
p2.join()print('主線程')

進程之間的數據隔離

在Windows中,各進程內的數據獨立不共享,所以函數變量也不共享

from multiprocessing import Processdef work():global n                # 這行代碼其實沒什么用n = 0                   # 必須有,不然回報未定義的錯print('子進程內: ',n)    # 子進程內:  0if __name__ == '__main__':n = 100p=Process(target=work)p.start()print('主進程內: ',n)    # 子進程內:  100

守護進程

主進程創建守護進程,需要注意:
1.守護進程會在主進程代碼執行結束后就終止,進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止,無論子進程是否結束
2.守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process
import timedef foo():while True:print(123)time.sleep(1)print("end123")def bar():print(456)time.sleep(3)print("end456")if __name__ == '__main__':p1 = Process(target=foo)p2 = Process(target=bar)p1.daemon = Truep1.start()p2.start()time.sleep(0.1)print("main-------")  # 打印該行則主進程代碼結束,守護進程p1結束,雖然p2還在執行

socket聊天并發實例

# server端
from socket import *
from multiprocessing import Processserver = socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)def talk(conn,client_addr):while True:try:msg = conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':        # windows下start進程一定要寫到這下面while True:conn,client_addr=server.accept()p = Process(target=talk,args=(conn,client_addr))p.start()
# client端
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))while True:msg = input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg = client.recv(1024)print(msg.decode('utf-8'))

多進程中的其他方法

from multiprocessing import Process
import time
import randomclass Myprocess(Process):def __init__(self,person):self.name = person      # name屬性是Process中的屬性,表示進程的名字super().__init__()      # 執行父類的初始化方法會覆蓋name屬性# self.name = person    # 在這里設置就可以修改進程名字了# self.person = person  # 如果不想覆蓋進程名,就修改屬性名稱就可以了__()def run(self):print('%s正在和網紅臉聊天' %self.name)time.sleep(random.randrange(1,5))print('%s還在和網紅臉聊天' %self.name)p1 = Myprocess('哪吒')
p1.start()
print(p1.pid)                   # 可以查看子進程的進程id
p1.terminate()                  # 關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive())            # 結果為Trueprint('開始')
print(p1.is_alive())            # 結果為False

進程同步(multiprocess.Lock 鎖、multiprocess.Semaphore 信號量、multiprocess.Event 事件)

鎖 —— multiprocess.Lock

from multiprocessing import Lock
lock = Lock()
lock.acquire()  # 加鎖
lock.release()  # 釋放鎖

通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中并發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管并發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。

# 多進程搶占輸出資源
import os
import time
import random
from multiprocessing import Processdef work(n):print('%s: %s is running' %(n,os.getpid()))time.sleep(random.random())print('%s:%s is done' %(n,os.getpid()))if __name__ == '__main__':for i in range(3):p=Process(target=work,args=(i,))p.start()
# 由并發變成了串行,犧牲了運行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process,Lockdef work(lock,n):lock.acquire()print('%s: %s is running' % (n, os.getpid()))time.sleep(random.random())print('%s: %s is done' % (n, os.getpid()))lock.release()
if __name__ == '__main__':lock=Lock()for i in range(3):p=Process(target=work,args=(lock,i))p.start()

上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。
接下來,我們以模擬搶票為例,來看看數據安全的重要性。

# 多進程同時搶購余票
# 文件db的內容為:{"count":1}
# 注意一定要用雙引號,不然json無法識別
# 并發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():dic=json.load(open('db'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db'))time.sleep(0.1)     # 模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) # 模擬寫數據的網絡延遲json.dump(dic,open('db','w'))print('\033[43m購票成功\033[0m')def task():search()get()if __name__ == '__main__':for i in range(100): # 模擬并發100個客戶端搶票p=Process(target=task)p.start()
# 使用鎖來保證數據安全
# 文件db的內容為:{"count":5}
# 注意一定要用雙引號,不然json無法識別
# 并發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():dic=json.load(open('db'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db'))time.sleep(random.random())     # 模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(random.random()) # 模擬寫數據的網絡延遲json.dump(dic,open('db','w'))print('\033[32m購票成功\033[0m')else:print('\033[31m購票失敗\033[0m')def task(lock):search()lock.acquire()get()lock.release()if __name__ == '__main__':lock = Lock()for i in range(100):           # 模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start()

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,速度是慢了,但犧牲了速度卻保證了數據安全。雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基于文件,而文件是硬盤上的數據)
2.需要自己加鎖處理

因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。
這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。隊列和管道都是將數據存放于內存中,隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

信號量 —— multiprocess.Semaphore(了解)

互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。

假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。

信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用于訪問像服務器這樣的有限資源。信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念

from multiprocessing import Process,Semaphore
import time,randomdef go_ktv(sem,user):sem.acquire()print('%s 占到一間ktv小屋' %user)time.sleep(random.randint(0,3)) # 模擬每個人在ktv中待的時間不同sem.release()if __name__ == '__main__':sem=Semaphore(4)p_l=[]for i in range(13):p=Process(target=go_ktv,args=(sem,'user%s' %i,))p.start()p_l.append(p)for i in p_l:i.join()print('============》')

事件 —— multiprocess.Event(了解)

python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,
如果“Flag”值為 True,那么當程序執行 event.wait 方法時便不再阻塞。
clear? ?# 將“Flag”設置為False
set? ? ? # 將“Flag”設置為True
is_set # 判斷當前的狀態是否為T

import time
import random
from multiprocessing import Process,Eventdef car(i,e):                    # 感知狀態的變化if not e.is_set():           # 當前這個事件的狀態如果是Falseprint('car%s正在等待'%i)  # 這輛車正在等待通過路口e.wait()                     # 阻塞 直到有一個e.set行為  # 等紅燈print('car%s通過路口'%i)def traffic_light(e):   # 修改事件的狀態print('\033[1;31m紅燈亮\033[0m') # 事件在創立之初的狀態是False,相當于我程序中的紅燈time.sleep(2)  # 紅燈亮2swhile True:if not e.is_set():  # Falseprint('\033[1;32m綠燈亮\033[0m')e.set()elif e.is_set():print('\033[1;31m紅燈亮\033[0m')e.clear()time.sleep(2)if __name__ == '__main__':e = Event()Process(target=traffic_light,args=[e,]).start()for i in range(50):time.sleep(random.randrange(0,5,2))Process(target=car,args=[i,e]).start()

進程間通信——隊列(multiprocess.Queue)

進程間通信

IPC(Inter-Process Communication)

隊列

創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

Queue([maxsize]) ? # 創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。
另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。

Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )?
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。
block用于控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。
timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait( )?
同q.get(False)方法。

q.put(item [, block [,timeout ] ] )?
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。
block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。
timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。

q.qsize()?
返回隊列中目前項目的正確數量。此函數的結果并不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目,在某些系統上,此方法可能引發NotImplementedError異常。

q.empty()?
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。
也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full()?
如果q已滿,返回為True. 由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。

q.close()?
關閉隊列,防止隊列中加入更多數據。調用此方法時,后臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉,也就是說會繼續處理隊列中存在的數據。
如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。
例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread()?
不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞。

q.join_thread()?
連接隊列的后臺線程。此方法用于在調用q.close()方法后,等待所有隊列項被消耗。
默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

代碼實例

from multiprocessing import Queue
q=Queue(3)# put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。# 如果隊列中的數據一直不被取走,程序就會永遠停在這里。
try:q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except:             # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。print('隊列已經滿了')print(q.full())     # 我們可以在放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。print(q.get())
print(q.get())
print(q.get())
# print(q.get())    # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。
try:q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except:             # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。print('隊列已經空了')print(q.empty())   # 空了

上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。

import time
from multiprocessing import Process, Queuedef f(q):q.put([time.asctime(), 'from Eva', 'hello'])# 調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。if __name__ == '__main__':q = Queue()                      # 創建一個Queue對象p = Process(target=f, args=(q,)) # 創建一個進程p.start()print(q.get())p.join()

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微復雜一些的例子:

import os
import time
import multiprocessing# 向queue中輸入數據的函數
def inputQ(queue):info = str(os.getpid()) + '(put):' + str(time.asctime())queue.put(info)# 向queue中輸出數據的函數
def outputQ(queue):info = queue.get()print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))# Main
if __name__ == '__main__':multiprocessing.freeze_support()record1 = []   # store input processesrecord2 = []   # store output processesqueue = multiprocessing.Queue(3)# 輸入進程for i in range(10):process = multiprocessing.Process(target=inputQ,args=(queue,))process.start()record1.append(process)# 輸出進程for i in range(10):process = multiprocessing.Process(target=outputQ,args=(queue,))process.start()record2.append(process)for p in record1:p.join()for p in record2:p.join()

生產者消費者模型

在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。

什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。

基于隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):while True:res = q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res = '包子%s' %iq.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q = Queue()p1 = Process(target=producer,args=(q,))   # 生產者們:即廚師們c1 = Process(target=consumer,args=(q,))   # 消費者們:即吃貨們p1.start()c1.start()print('主')

此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):while True:res = q.get()if res is None:break               # 收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res = '包子%s' %iq.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))q.put(None)                             # 發送結束信號if __name__ == '__main__':q = Queue()p1 = Process(target=producer,args=(q,)) # 生產者們:即廚師們c1 = Process(target=consumer,args=(q,)) # 消費者們:即吃貨們p1.start()c1.start()print('主')

注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號,但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決,即多個消費者的時候:有幾個消費者就需要發送幾次結束信號

JoinableQueue([maxsize])

創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:

q.task_done()?
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。
如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。

q.join()?
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。?
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,并等待它們被處理。

from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):while True:res = q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))q.task_done()  # 向q.join()發送一次信號,證明一個數據已經被取走了def producer(name,q):for i in range(10):time.sleep(random.randint(1,3))res = '%s%s' %(name,i)q.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))q.join()       # 生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。if __name__ == '__main__':q = JoinableQueue()# 生產者們:即廚師們p1 = Process(target=producer,args = ('包子',q))p2 = Process(target=producer,args = ('骨頭',q))p3 = Process(target=producer,args = ('泔水',q))# 消費者們:即吃貨們c1 = Process(target=consumer,args=(q,))c2  =Process(target=consumer,args=(q,))c1.daemon = Truec2.daemon = True# 開始p_l = [p1,p2,p3,c1,c2]for p in p_l:p.start()p1.join()p2.join()p3.join()print('主') # 主進程等join ---> p1,p2,p3 等 ----> c1,c2# p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據# 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨著主進程的結束而結束,所以設置成守護進程就可以了。

進程之間的數據共享

展望未來,基于消息傳遞的并發編程是大勢所趨,即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合,通過消息隊列交換數據。這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中。

但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。以后我們會嘗試使用數據庫來解決現在進程之間的數據共享問題。

進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的。雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止于此

from multiprocessing import Manager,Process,Lock
def work(d,lock):with lock:       # 不加鎖而操作共享的數據,肯定會出現數據錯亂d['count'] -= 1if __name__ == '__main__':lock = Lock()with Manager() as m:dic = m.dict({'count':100})p_l=[]for i in range(100):p = Process(target=work,args=(dic,lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic)

進程池和multiprocess.Pool模塊

進程池

為什么要有進程池?進程池的概念。

在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?

在這里,要介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程并不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現并發效果。

multiprocess.Pool模塊

Pool([numprocess ?[,initializer [, initargs]]]) ? ?# 創建進程池
參數
1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值
2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
3 initargs:是要傳給initializer的參數組

主要方法
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
# 需要強調的是:此操作并不會在所有池工作進程中并執行func函數。
# 如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()

p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
# 此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。
# 當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。

p.close():關閉進程池,防止新任務再加進來,已有的任務會執行完

P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用

其他方法
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
? obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
? obj.ready():如果調用完成,返回True
? obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
? obj.wait([timeout]):等待結果變為可用。
? obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

代碼實例

同步異步

import os,time
from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(3)return n**2if __name__ == '__main__':p = Pool(3)    # 進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l = []for i in range(10):res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞# 但不管該任務是否存在阻塞,同步調用都會在原地等著print(res_l)
import os
import time
import random
from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(random.random())return n**2if __name__ == '__main__':p = Pool(3)   # 進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res = p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行# 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  res_l.append(res)# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果# 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了p.close()p.join()for res in res_l:print(res.get()) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

練習

# server:進程池版socket并發聊天
# Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count())
# 開啟6個客戶端,會發現2個客戶端處于等待狀態
# 在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import osserver=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)def talk(conn):print('進程pid: %s' %os.getpid())while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool(4)while True:conn,*_ = server.accept()p.apply_async(talk,args=(conn,))# p.apply(talk,args=(conn,client_addr)) # 同步的話,則同一時間只有一個客戶端能訪問
# client端
from socket import *client = socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))

回調函數

需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。 主進程則調用一個函數去處理該結果,該函數即回調函數 我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行), 這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。主要用于爬蟲的時候

# 示例
import re
from urllib.request import urlopen
from multiprocessing import Pooldef get_page(url,pattern):response = urlopen(url).read().decode('utf-8')return pattern,responsedef parse_page(info):pattern,page_content=infores = re.findall(pattern,page_content)for item in res:dic = {'index':item[0].strip(),'title':item[1].strip(),'actor':item[2].strip(),'time':item[3].strip(),}print(dic)
if __name__ == '__main__':regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'pattern1 = re.compile(regex,re.S)url_dic={'http://maoyan.com/board/7':pattern1,}p=Pool()res_l = []for url,pattern in url_dic.items():res = p.apply_async(get_page,args = (url,pattern),callback = parse_page)res_l.append(res)for i in res_l:i.get()

如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數
進程池的其他實現方式:https://docs.python.org/dev/library/concurrent.futures.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/454568.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/454568.shtml
英文地址,請注明出處:http://en.pswp.cn/news/454568.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

從全息投影到全息平臺,必須克服7個障礙

“每個科幻愛好者和癡迷技術的人兒都希望能擁有一個全息成像臺。不幸的是&#xff0c;制造全息平臺的技術還尚未被人類掌握。據說我們離這項技術可用之時還有大約10到15年的時間——這是 AMD 的專業人士 Phil Rogers 說的&#xff0c;他專攻 3D 技術工作已超過20年。在《今日宇…

android fragment fragmenttransaction,Android FragmentTransaction 常用方法總結

FragmentManage&#xff1a;FragmentManager能夠實現管理activity中fragment. 通過調用activity的getFragmentManager()取得它的實例.FragmentManager可以做如下一些事情:1、使用findFragmentById() (用于在activity ...public class Toolkit { /** * * Role:Telec…

IBM總架構師寇衛東:話說程序員的職業生涯-IT程序人生-職業生涯規劃

初級程序員和高級程序員時期&#xff0c;都屬于職業生涯發展的第一階段&#xff0c;我們可以稱之為黃金時期。這階段程序員的年齡在20~35歲之間&#xff0c;因為年輕&#xff0c;所以更善于學習&#xff0c;而且體力充沛&#xff0c;很多走過這個階段的程序員有過通宵工作的經歷…

metric learning -- 馬氏距離與歐氏距離

一 基本概念 方差&#xff1a;&#xff08;variance&#xff09;是在概率論和統計方差衡量隨機變量或一組數據時離散程度的度量。概率論中方差用來度量隨機變量和其數學期望&#xff08;即均值&#xff09;之間的偏離程度。統計中的方差&#xff08;樣本方差&#xff09;是每個…

深入理解 C# 協變和逆變

msdn 解釋如下&#xff1a; “協變”是指能夠使用與原始指定的派生類型相比&#xff0c;派生程度更大的類型。 “逆變”則是指能夠使用派生程度更小的類型。 解釋的很正確&#xff0c;大致就是這樣&#xff0c;不過不夠直白。 直白的理解&#xff1a; “協變”->”和諧的變”…

華為mate20能用鴻蒙嗎,華為mate20可以用5g網絡嗎

華為mate20不可以用5g網絡&#xff0c;它是4g手機在2018年上市&#xff0c;當時5g并沒有開始流行&#xff0c;因此mate20是不支持5G的。不過在后來的2019年秋季&#xff0c;華為發布了mate20 x的5g版本&#xff0c;這也是mate20系列里唯一支持5G的&#xff0c;除此之外mate20、…

基本農田衛星地圖查詢_#重慶朝天門#谷歌百度騰訊高德“衛星地圖”PK,谷歌更勝一籌...

截圖自便民查詢網&#xff0c;各家衛星地圖PK&#xff0c;各有千秋~谷歌精確度最高&#xff1a;5m&#xff0c;來福士修建中&#xff0c;嘉陵江是綠的&#xff0c;長江是黃的。兩江交匯處有一條分明的界線。谷歌 5m:20ft谷歌 50m:100ft谷歌 200m:500ft谷歌 300m:1000ft谷歌 500…

軟件開發者面試百問答案,老紫竹研究室出品(已經有64個)

當然&#xff0c;全部是我個人的答案&#xff0c;不代表別人。地址 www.laozizhu.com/program.jsp?typeId104 老紫竹研究室&#xff0c;分享軟件開發的快樂與收獲 ‘ 我這里貼上已經寫好的答案連接。 軟件開發者面試百問答案 - 你需要哪些東西幫助你判斷項目是否符合時間要求…

Python 第三方庫之 Celery 分布式任務隊列

一、Celery介紹和使用&#xff1a; Celery 是一個 基于python開發的分布式異步消息任務隊列&#xff0c;通過它可以輕松的實現任務的異步處理&#xff0c; 如果你的業務場景中需要用到異步任務&#xff0c;就可以考慮使用celery&#xff0c; 舉幾個實例場景中可用的例子: 你想…

windows server 2008 (五)web服務器的搭建和部署

Windows server 2008 web服務器的搭建和部署相對于windows server 2003的IIS6來說&#xff0c;windows server 2008推出的IIS7.0為管理員提供了統一的web平臺&#xff0c;為管理員和開發人員提供了一個一致的web解決方案。并針對安全方面做了改進&#xff0c;可以減少利用自定義…

改裝摩托車

摩托車發動機就是將進入氣缸中的燃料混合氣點燃使其燃燒所產生的熱能變為機械能&#xff0c;并由曲軸將動力通過傳動機構傳給摩托車后輪而變為車輛行駛動力的機械。發動機的進排氣量和氣流速是影響高轉速&#xff08;功率&#xff09;輸出的關鍵因素之一。 發動機工作時氣流的路…

華為鴻蒙os logo,華為鴻蒙OS Logo曝光:Powered by HarmonyOS

IT之家 9 月 13 日消息 9 月 10 日&#xff0c;鴻蒙 OS 2.0 亮相華為開發者大會的主舞臺上&#xff0c;華為常務董事、消費者業務 CEO 余承東表示&#xff0c;鴻蒙 OS 是首個真正為全場景時代打造的分布式操作系統&#xff0c;鴻蒙 OS 2.0 全面使能全場景生態。現在博主 勇氣數…

python判斷語句_詳解Python判斷語句的使用方法

本篇介紹Python判斷語句的使用&#xff0c;主要討論簡單條件語句、多重條件語句和嵌套條件語句&#xff0c;在講解的每個案例中都配有流程圖和代碼說明。通過本篇的學習&#xff0c;可以達成如下目標。 ● 掌握判斷語句的使用規則 ● 判斷語句流程圖的畫法 前面我們學習了Pytho…

迫在眉睫的職業規劃

對于大多數程序員來說&#xff0c;微軟是一家值得崇敬的公司&#xff0c;能夠加入微軟&#xff0c;也是很多程序員的愿望。在付出足夠的努力后&#xff0c;一旦進入了微軟&#xff0c;也就意味著可以和最先進的技術終日為伍&#xff0c;一直沿著技術這條路線走下去了。對嗎&…

js setTimeout 使用方法

在項目過程中遇到一些異步加載和其他js方法沖突的問題&#xff1a; 如圖初始化的時候會加載“商戶基本信息”,修改商戶名稱字段第二個頁面也需要修改&#xff1a; function setSeqAndName(){var pritab2 $("#allTabs").tabs("getTab", 1).find("ifra…

python中分支結構包括哪些_python中的分支結構

python不提供switch語句&#xff0c;但是python可以通過字典實現switch語句的功能 實現方法分兩步&#xff1a; 首先&#xff1a;定義一個地點 其次&#xff1a;調用字典的get()獲取相應的表達式 原始方法&#xff1a; from __future__ import division #內置函數&#xff0c;解…

機器學習算法之 logistic、Softmax 回歸

邏輯回歸本質是分類問題&#xff0c;而且是二分類問題&#xff0c;不屬于回歸&#xff0c;但是為什么又叫回歸呢。我們可以這樣理解&#xff0c;邏輯回歸就是用回歸的辦法來做分類。它是在線性回歸的基礎上&#xff0c;通過Sigmoid函數進行了非線性轉換&#xff0c;從而具有更強…

html上傳預覽圖片原理,關于html中圖片上傳預覽的實現

functionchange() {varpicdocument.getElementById("preview"),filedocument.getElementById("f");//得到后綴名varextfile.value.substring(file.value.lastIndexOf(".")1).toLowerCase();//gif在IE瀏覽器暫時無法顯示if(ext!png&&ext!…

程序員成功之路

程序員成功之路 ——The road ahead for programmer&#xff08;演講稿&#xff09; 一、我很羨慕在座的各位同學&#xff0c;因為你們是中國未來的程序員&#xff0c;而我不是&#xff0c;我一直很遺憾。 比爾蓋茨曾經寫過一本書叫做《未來之路》The road ahead, 那么今天我選…

【溫故知新】——原生js中常用的四種循環方式

一、引言 本文主要是利用一個例子&#xff0c;講一下原生js中常用的四種循環方式的使用與區別&#xff1a; 實現效果&#xff1a; 在網頁中彈出框輸入0 網頁輸出“歡迎下次光臨”在網頁中彈出框輸入1 網頁輸出“查詢中……”在網頁中彈出框輸入2 網頁輸出“取款中……”在…