1、什麼是進程
進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。html
進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。操做系統的其餘全部內容都是圍繞進程的概念展開的。即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。python
必備的理論基礎:linux
#一 操做系統的做用: 1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口 2:管理、調度進程,而且將多個進程對硬件的競爭變得有序 #二 多道技術: 1.產生背景:針對單核,實現併發 ps: 如今的主機通常是多核,那麼每一個核都會利用多道技術 有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個 cpu中的任意一個,具體由操做系統調度算法決定。 2.空間上的複用:如內存中同時有多道程序 3.時間上的複用:複用一個cpu的時間片 強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣 才能保證下次切換回來時,能基於上次切走的位置繼續運行
進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。nginx
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
第一,進程是一個實體。每個進程都有它本身的地址空間,通常狀況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。 第二,進程是一個「執行中的程序」。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操做系統執行之),它才能成爲一個活動的實體,咱們稱其爲進程。[3] 進程是操做系統中最基本、重要的概念。是多道程序系統出現後,爲了刻畫系統內部出現的動態狀況,描述系統內部各道程序的活動規律引進的一個概念,全部多道程序設計操做系統都創建在進程的基礎上。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
從理論角度看,是對正在運行的程序過程的抽象; 從實現角度看,是一種數據結構,目的在於清晰地刻畫動態系統的內在規律,有效管理和調度進入計算機系統主存儲器運行的程序。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。 併發性:任何進程均可以同其餘進程一塊兒併發執行 獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位; 異步性:因爲進程間的相互制約,使進程具備執行的間斷性,即進程按各自獨立的、不可預知的速度向前推動 結構特徵:進程由程序、數據和進程控制塊三部分組成。 多個不一樣的進程能夠包含相同的程序:一個程序在不一樣的數據集裏就構成不一樣的進程,能獲得不一樣的結果;可是執行過程當中,程序不能發生改變。
進程與程序的區別:web
程序是指令和數據的有序集合,其自己沒有任何運行的含義,是一個靜態的概念。 而進程是程序在處理機上的一次執行過程,它是一個動態的概念。 程序能夠做爲一種軟件資料長期存在,而進程是有必定生命期的。 程序是永久的,進程是暫時的。
注意:同一個程序執行兩次,就會在操做系統中出現兩個進程,因此咱們能夠同時運行一個軟件,分別作不一樣的事情也不會混亂。正則表達式
2、進程的調度
要想多個進程交替運行,操做系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是須要遵循必定的法則,由此就有了進程的調度算法。算法
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可用於做業調度,也可用於進程調度。FCFS算法比較有利於長做業(進程),而不利於短做業(進程)。由此可知,本算法適合於CPU繁忙型做業,而不利於I/O繁忙型的做業(進程)。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
短做業(進程)優先調度算法(SJ/PF)是指對短做業或短進程優先調度的算法,該算法既可用於做業調度,也可用於進程調度。但其對長做業不利;不能保證緊迫性做業(進程)被及時處理;做業的長短只是被估算出來的。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
時間片輪轉(Round Robin,RR)法的基本思路是讓每一個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,須要將CPU的處理時間分紅固定大小的時間片,例如,幾十毫秒至幾百毫秒。若是一個進程在被調度選中以後用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放本身所佔有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。 顯然,輪轉法只能用來調度分配一些能夠搶佔的資源。這些能夠搶佔的資源能夠隨時被剝奪,並且能夠將它們再分配給別的進程。CPU是可搶佔資源的一種。但打印機等資源是不可搶佔的。因爲做業調度是對除了CPU以外的全部系統硬件資源的分配,其中包含有不可搶佔資源,因此做業調度不使用輪轉法。 在輪轉法中,時間片長度的選取很是重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。若是時間片長度太短,則調度程序搶佔處理機的次數增多。這將使進程上下文切換次數也大大增長,從而加劇系統開銷。反過來,若是時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所容許最大的進程數來肯定的。 在輪轉法中,加入到就緒隊列的進程有3種狀況: 一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。 另外一種狀況是分給該進程的時間片並未用完,只是由於請求I/O或因爲進程的互斥與同步關係而被阻塞。當阻塞解除以後再回到就緒隊列。 第三種狀況就是新建立進程進入就緒隊列。 若是對這些進程區別對待,給予不一樣的優先級和時間片從直觀上看,能夠進一步改善系統服務質量和效率。例如,咱們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞緣由分紅不一樣的就緒隊列,每一個隊列按FCFS原則排列,各隊列之間的進程享有不一樣的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片以後,或從睡眠中被喚醒以及被建立以後,將進入不一樣的就緒隊列。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
前面介紹的各類用做進程調度的算法都有必定的侷限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,並且若是並未指明進程的長度,則短進程優先和基於進程長度的搶佔式調度算法都將沒法使用。 而多級反饋隊列調度算法則沒必要事先知道各類進程所需的執行時間,並且還能夠知足各類類型進程的須要,於是它是目前被公認的一種較好的進程調度算法。在採用多級反饋隊列調度算法的系統中,調度算法的實施過程以下所述。 (1) 應設置多個就緒隊列,併爲各個隊列賦予不一樣的優先級。第一個隊列的優先級最高,第二個隊列次之,其他各隊列的優先權逐個下降。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,爲每一個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長一倍,……,第i+1個隊列的時間片要比第i個隊列的時間片長一倍。 (2) 當一個新進程進入內存後,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,即可準備撤離系統;若是它在一個時間片結束時還沒有完成,調度程序便將該進程轉入第二隊列的末尾,再一樣地按FCFS原則等待調度執行;若是它在第二隊列中運行一個時間片後仍未完成,再依次將它放入第三隊列,……,如此下去,當一個長做業(進程)從第一隊列依次降到第n隊列後,在第n 隊列便採起按時間片輪轉的方式運行。 (3) 僅當第一隊列空閒時,調度程序才調度第二隊列中的進程運行;僅當第1~(i-1)隊列均空時,纔會調度第i隊列中的進程運行。若是處理機正在第i隊列中爲某進程服務時,又有新進程進入優先權較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進程將搶佔正在運行進程的處理機,即由調度程序把正在運行的進程放回到第i隊列的末尾,把處理機分配給新到的高優先權進程。
3、進程的並行與併發
並行 : 並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )shell
併發 : 併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。數據庫
區別:編程
並行是從宏觀上,在一個時間段上能夠看出是同時執行的,好比一個服務器同時處理多個session。
併發是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
4、同步異步阻塞非阻塞
狀態介紹
在瞭解其餘概念以前,咱們首先要了解進程的幾個狀態。在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。
(1)就緒(Ready)狀態
當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。
(2)執行/運行(Running)狀態當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。
(3)阻塞(Blocked)狀態正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。
同步和異步
所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列
。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。
所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了
。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列
。
阻塞與非阻塞
阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的
同步/異步與阻塞/非阻塞
- 同步阻塞形式
效率最低。拿上面的例子來講,就是你專心排隊,什麼別的事都不作。
- 異步阻塞形式
若是在銀行等待辦理業務的人採用的是異步的方式去等待消息被觸發(通知)
,也就是領了一張小紙條,假如在這段時間裏他不能離開銀行作其它的事情,那麼很顯然,這我的被阻塞在了這個等待的操做上面;
異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞
。
- 同步非阻塞形式
其實是效率低下的。
想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換
,效率可想而知是低下的。
- 異步非阻塞形式
效率更高,
由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換
。
好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。
不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來
,一樣的,不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞
。
5、進程的建立與結束
進程的建立
但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。
而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程:
1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)
2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)
3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)
4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)
不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程) 2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。 關於建立子進程,UNIX和windows 1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。 2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。
進程的結束
1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自願,python a.py中a.py不存在)
3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)
4. 被其餘進程殺死(非自願,如kill -9)
6、在python程序中的進程操做
運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所瞭解的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。
multiprocessing模塊
仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。multi是取自multiple多功能的意思,因此在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,大體分爲四個部分:建立進程部分、進程同步部分、進程池部分、進程之間數據共享。
1.建立進程部分
multiprocessing.Process模塊
process模塊的介紹
process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import os from multiprocessing import Process def func(args): print('in func') print(args) print('子進程',os.getpid()) print('子進程的父進程',os.getppid()) if __name__ == '__main__': p = Process(target=func,args=(2,)) #進程的就緒 p.start() #開啓了一個子進程 print('父進程',os.getpid()) print('父進程的父進程',os.getppid()) #主進程的父進程是python解釋器的進程 #參數說明: # Process(group=None, target=None, name=None, args=(), kwargs={}) 建立一個進程對象 #進程對象.start() 啓動子進程 #group 參數未使用,值始終默認爲None #target 表示調用對象,即子進程要執行的任務 調用函數時,必須是函數名,由於內部調用的是內存地址 #args 表示調用對象的位置參數元組 給target中的調用對象傳參,傳位置參數,是一個元組形式,必須有逗號 #kwargs 表示調用對象的字典 跟args一樣,傳關鍵字傳參 是一個字典形式 {'name':'ston'} #name 爲子進程的名稱,能夠給子進程命名 #os.getpid() 獲取當前進程中的進程編號 #os.getppid() 獲取當前進程中的父進程編號
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
1 p.start():啓動進程,並調用該子進程中的p.run() 2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 4 p.is_alive():若是p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
在Windows中使用Process模塊的注意事項
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文
件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候,
就不會遞歸運行了。
注意:開啓了子進程的主進程:通常狀況下,主進程會在本身的代碼執行完畢和本身的子進程代碼執行完畢後才結束。也有特殊狀況,子進程沒有結束,父進程就結束了。具體看子進程與主進程之間的關係。
多個進程的同時使用:(注意,子進程的執行順序不是根據啓動順序決定的,而是由操做系統調度)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Process def func(arg1,arg2): print(arg1) time.sleep(2) print(arg2) if __name__ == '__main__': for i in range(10): #開啓10個進程 p = Process(target=func,args=('*'*i,'='*i)) p.start()
開啓進程的方法,還有一種是經過繼承Process類來開啓進程
自定義類實現開啓進程必須:1.繼承Process類 2.必須實現一個run方法,run方法是在子進程中執行的代碼
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import os import time from multiprocessing import Process class MyProcess(Process): def __init__(self,name,age): super().__init__() def run(self): print(self.name) print(self.pid) print('=======') time.sleep(5) print('in run',os.getpid()) print('*********') if __name__ == '__main__': p1 = MyProcess('rain',23) p1.start() print(os.getpid()) print('>>>>>>>>') p2 = MyProcess('snow',21) p2.start() #對象.start()等同於內部調用了run方法
join方法的使用:
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#沒有使用join方法 import time from multiprocessing import Process def func(): print('>>>>>>>>>') time.sleep(5) print('*************') if __name__ == '__main__': p = Process(target = func) p.start() print('=============') #輸出結果:先打印============ 和 >>>>>>>>> 五秒後打印*************
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#使用join方法後 import time from multiprocessing import Process def func(): print('>>>>>>>>>>>>') time.sleep(5) print('*************') if __name__ == '__main__': p = Process(target = func) p.start() p.join() #進程對象.join() print('=============') #輸出結果:先打印>>>>>>>>>>>>,五秒後打印***********和=============
由此可知,join方法能夠感知一個子進程的結束,使得主進程與子進程之間從異步轉變爲同步,在一些應用場景下有重要意義。好比,在開啓多個進程時,往不一樣文件中寫入不一樣或相同的內容,全部文件寫完後,查看全部的文件。
多進程join方法實現多個子進程所有執行完畢後在執行主進程中的代碼
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Process def func(arg1,arg2): print(arg1) time.sleep(2) print(arg2) if __name__ == '__main__': p_lis = [] for i in range(10): p = Process(target=func,args=('*'*i,'='*i)) p_lis.append(p) p.start() [p.join() for p in p_lis] #以前的全部的進程都必須在這裏執行完畢才能執行下面的代碼 print('++++++++')
簡單的實現文件的異步操做,同步查看
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#藉助os.walk import os from multiprocessing import Process def write_file(filename,content): with open(filename,'a',encoding='utf-8') as f: f.write(content*'=') if __name__ == '__main__': p_lis = [] for i in range(5): p = Process(target=write_file,args=('info%s'%i,i)) p.start() p_lis.append(p) [p.join() for p in p_lis] print([i for i in os.walk(r'目錄')])
注意:不一樣進程之間不經過特殊的手段,是不可能共享一個數據的,這就是進程間的數據隔離問題
使用多進程實現TCP協議中socket服務端的併發功能
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import socket from multiprocessing import Process def server(conn): conn.send(b'hello') msg = conn.recv(1024) print(msg) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',12345)) sk.listen() try: while True: conn,addr = sk.accept() p = Process(target=server,args=(conn,)) p.start() finally: sk.close()
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import socket sk = socket.socket() sk.connect(('127.0.0.1',12345)) msg = sk.recv(1024) print(msg) ret = input(">>") sk.send(ret.encode('utf-8')) sk.close()
注意:在子進程中不能使用input()方法,子進程中的輸入沒法在主進程中顯示,就會報錯。
守護進程:會隨着主進程的代碼執行完畢而結束
主進程建立守護進程
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Process def func(): while 1: print('運行正常') time.sleep(0.5) def func1(): time.sleep(8) print('執行 in func1') if __name__ == '__main__': p = Process(target=func) p.daemon = True p.start() Process(target=func1).start() count = 1 while count <5: print('server 執行中') time.sleep(1) count += 1 #輸出結果:主進程與兩個子進程同時執行,當主進程的代碼執行完畢,守護進程隨即結束,而子進程func1沒有執行結束, # 因此主進程沒有執行結束,而此時的守護進程結束,所以,守護進程是隨着主進程代碼的執行結束而結束,而不是主進程結束而結束
注意:p.daemon = True 必定要在p.start()以前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Process def func(): time.sleep(8) print('in func') if __name__ == '__main__': p = Process(target=func) p.start() print(p.is_alive()) #判斷一個子進程是否在執行 time.sleep(5) print(p.is_alive()) p.terminate() #結束一個子進程 print(p.is_alive()) #True #停止子進程後,子進程不會當即結束,由於操做系統接到響應到執行須要時間,結束進程後當即判斷,這個時間小於操做系統處理的時間 time.sleep(1) print(p.is_alive())
2.進程同步部分
1.鎖——multiprocessing.Lock
程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題。
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。
咱們以模擬搶票爲例,來看看數據安全的重要性。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#文件db的內容爲:{"ticket":3} #注意必定要用雙引號,否則json沒法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 import time import json from multiprocessing import Process def show(i): with open('ticket')as f: dic = json.load(f) print('%s:餘票%s張'%(i,dic['ticket'])) def buy_ticket(i): with open('ticket')as f: dic = json.load(f) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['ticket'] > 0: print('%s:購票成功'%i) dic['ticket'] -= 1 else: print('%s:餘票不足'%i) time.sleep(0.1) #模擬寫數據的網絡延遲 with open('ticket','w')as f: json.dump(dic,f) if __name__ == '__main__': for i in range(10): p = Process(target=show,args=(i,)) p.start() for n in range(10): p = Process(target=buy_ticket,args=(n,)) p.start() #輸出結果爲:2:餘票3張 1:餘票3張 3:餘票3張 0:餘票3張 6:餘票3張 4:餘票3張 5:餘票3張 7:餘票3張 # 9:餘票3張 8:餘票3張 0:購票成功 1:購票成功 4:購票成功 5:購票成功 3:購票成功 2:購票成功 # 8:購票成功 9:購票成功 6:購票成功 7:購票成功 # 10個用戶都查看餘票爲3張,可是買票的時候都買到了票。由於因爲網絡延遲的緣由,全部的併發程序都對一份數據操做就會 #形成數據混亂,致使三張餘票賣給了10我的。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#數據同上 import time import json from multiprocessing import Process,Lock def show(i): with open('ticket')as f: dic = json.load(f) print('%s:餘票%s張'%(i,dic['ticket'])) def buy_ticket(i,lock): lock.acquire() with open('ticket')as f: dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0: print('%s:購票成功'%i) dic['ticket'] -= 1 else: print('%s:餘票不足'%i) time.sleep(0.1) with open('ticket','w')as f: json.dump(dic,f) lock.release() if __name__ == '__main__': for i in range(10): p = Process(target=show,args=(i,)) p.start() lock = Lock() for n in range(10): p = Process(target=buy_ticket,args=(n,lock)) p.start() #輸出結果爲:0:餘票3張 1:餘票3張 6:餘票3張 2:餘票3張 4:餘票3張 5:餘票3張 3:餘票3張 7:餘票3張 # 9:餘票3張 8:餘票3張 0:購票成功 3:購票成功 1:購票成功 2:餘票不足 4:餘票不足 5:餘票不足 # 8:餘票不足 9:餘票不足 6:餘票不足 7:餘票不足
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然能夠用文件共享數據實現進程間通訊,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.須要本身加鎖處理 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
2.信號量——multiprocessing.Semaphore
互斥鎖同時只容許一個進程更改數據,而信號量Semaphore是同時容許必定數量的進程更改數據 。
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。 信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念。
好比:假設在商場中有四個迷你唱吧,每次一個唱吧只能進去一我的,最多進四我的,來第五我的就要等待,模擬實現
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time import random from multiprocessing import Process,Semaphore def ktv(i,sema): sema.acquire() #方法與鎖同樣 print('%s走進KTV'%i) time.sleep(random.uniform(3,6)) print('%s走出KTV'%i) sema.release() if __name__ == '__main__': sema = Semaphore(4) #與鎖的區別 for i in range(10): p = Process(target=ktv,args=(i,sema)) p.start()
3.事件——multiprocessing.Event
python進程的事件用於主進程控制其餘進程的執行,事件主要提供了三個方法 set、wait、clear。 事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait() 方法時就會阻塞,若是「Flag」值爲True,那麼執行event.wait()方法時便再也不阻塞。 event.clear():將「Flag」設置爲False, event.set():將「Flag」設置爲True。經過event.is_set()方法來判斷一個事件的狀態。在一個事件被建立後(event = Event()),默認爲阻塞狀態。
簡單的模擬紅綠燈
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time import random from multiprocessing import Process,Event def cars(i,event): if not event.is_set(): print('%s:等待經過'%i) event.wait() #經過wait來實現控制車輛的同行 print('%s經過'%i) def traffic_light(event): while 1: if event.is_set(): #Ture時表示綠燈 event.clear() #改變事件狀態爲False print('\033[31m紅燈亮了\033[0m') #切換狀態後提示 else: event.set() print('\033[32m綠燈亮了\033[0m') time.sleep(5) # 假設每五秒切換一下 if __name__ == '__main__': event = Event() p = Process(target=traffic_light,args=(event,)) p.start() for i in range(1000): #模擬車輛 car = Process(target=cars,args=(i,event)) car.start() time.sleep(random.randint(1,3)) #每隔1-3秒來一輛車
3.進程間通訊(IPC(Inter-Process Communication))
1.隊列
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
Queue([maxsize]) 建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具備如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。 q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。 q.empty() 若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。 q.full() 若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。 q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。 q.cancel_join_thread() 不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。 q.join_thread() 鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from multiprocessing import Queue,Process def put(q): q.put('hello') def get(q): print(q.get()) if __name__ == '__main__': q = Queue() p = Process(target=put,args=(q,)) p.start() p1 = Process(target=get,args=(q,)) p1.start() #結果輸出:hello 進程之間能夠經過隊列自由的傳遞數據
生產者消費者模型
在隊列中有一個特別重要的模型——生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#方式一 import time import random from multiprocessing import Process,Queue def producer(name,food,q): for i in range(10): time.sleep(random.randint(1,2)) #模擬生產一個food的時間 ret = '%s生產了第%s個%s'%(name,i,food) print(ret) q.put(ret) #將生產好的food放進一個容器裏 def consumer(name,q): while 1: time.sleep(random.randint(1, 2)) #模擬消費一個food的時間 food = q.get() #從容器中取出 if food is None: break #收到結束信號則結束 print('%s消費了%s'%(name,food)) if __name__ == '__main__': q = Queue() p = Process(target=producer,args=('rain','麪包',q)) p.start() p1 = Process(target=producer,args=('snow','包子',q)) p1.start() c = Process(target=consumer,args=('sunny',q)) c.start() c1 = Process(target=consumer,args=('wind',q)) c1.start() p.join() p1.join() q.put(None) #p執行完了發送一個None表示結束,消費者收到None,表示沒有food,中止消費 q.put(None) #一樣的,有幾個消費者就須要發送幾個None #方式二 import time import random from multiprocessing import Process, Queue def producer(name, food, q): for i in range(10): time.sleep(random.randint(1, 2)) # 模擬生產一個food的時間 ret = '%s生產了第%s個%s' % (name, i, food) print(ret) q.put(ret) # 將生產好的food放進一個容器裏 q.put(None) #生產結束髮送一個結束信號 def consumer(name, q): while 1: time.sleep(random.randint(1, 2)) # 模擬消費一個food的時間 food = q.get() # 從容器中取出 if food is None: break # 收到結束信號則結束 print('%s消費了%s' % (name, food)) if __name__ == '__main__': q = Queue() Process(target=producer, args=('rain', '麪包', q)).start() Process(target=producer, args=('snow', '包子', q)).start() Process(target=consumer, args=('sunny', q)).start() Process(target=consumer, args=('wind', q)).start()
JoinableQueue()
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法: q.task_done() 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。 q.join() 生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time import random from multiprocessing import Process,JoinableQueue def producer(name,food,q): for i in range(10): time.sleep(random.randint(1,3)) ret = '%s生產了第%s個%s'%(name,i,food) print(ret) q.put(ret) q.join() #感知整個隊列的結束, 阻塞,直到一個隊列中的全部數據所有被處理完畢 def consumer(name,q): while 1: time.sleep(random.randint(1,3)) food = q.get() print('%s消費了%s'%(name,food)) q.task_done() if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer,args=('rain','麪包',q)) p1.start() p2 = Process(target=producer,args=('sunny','包子',q)) p2.start() c1 = Process(target=consumer,args=('snow',q)) c1.daemon = True c1.start() c2 = Process(target=consumer,args=('wind',q)) c2.daemon = True c2.start() p1.join() p2.join() # 在消費者這一端: # 每次獲取一個數據 # 處理一個數據 # 發送一個記號 : 標誌一個數據被處理成功 # 在生產者這一端: # 每一次生產一個數據, # 且每一次生產的數據都放在隊列中 # 在隊列中刻上一個記號 # 當生產者所有生產完畢以後, # join信號 : 已經中止生產數據了 # 且要等待以前被刻上的記號都被消費完 # 當數據都被處理完時,join阻塞結束 # consumer 中把全部的任務消耗完 # producer 端 的 join感知到,中止阻塞 # 全部的producer進程結束 # 主進程中的p.join結束 # 主進程中代碼結束 # 守護進程(消費者的進程)結束
2.管道
Pipe在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn一、conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#參數介紹: duplex:默認爲True,管道是全雙工的,若是將duplex設置成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象 #其餘方法: conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from multiprocessing import Process,Pipe def func1(conn1): conn1.send('你好') def func2(conn2): ret = conn2.recv() print(ret) if __name__ == '__main__': conn1,conn2 = Pipe() Process(target=func1,args=(conn1,)).start() Process(target=func2,args=(conn2,)).start()
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from multiprocessing import Pipe,Process def func(conn1,conn2): conn1.close() while True: try: msg = conn2.recv() print(msg) except EOFError: #EOFError 爲沒有數據可取時主動拋出的異常,當管道全部的出口關閉時,任然接收,就會拋異常 conn2.close() break if __name__ == '__main__': conn1,conn2 = Pipe() Process(target=func,args=(conn1,conn2)).start() conn2.close() for i in range(10): conn1.send('你好') conn1.close()
特別注意管道端點的正確管理問題:若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上阻塞。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time import random from multiprocessing import Process,Pipe def producer(con,pro,name,food): con.close() for i in range(4): time.sleep(random.randint(1,3)) f = '%s生產的%s%s'%(name,i,food) print(f) pro.send(f) pro.close() def consumer(con,pro,name): pro.close() while 1: try: time.sleep(random.randint(1,3)) food = con.recv() print('%s消費了%s'%(name,food)) except EOFError: con.close() break if __name__ == '__main__': con,pro = Pipe() p1 = Process(target=producer,args=(con,pro,'雨','包子')) p1.start() c1 = Process(target=consumer,args=(con,pro,'時')) c1.start() con.close() pro.close()
值得注意的是多個消費者的競爭會形成數據的不安全問題。咱們能夠經過加鎖來控制管道的行爲,避免進程之間爭搶數據。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time import random from multiprocessing import Process,Pipe,Lock def producer(con,pro,name,food): con.close() for i in range(4): time.sleep(random.randint(1,3)) f = '%s生產的%s%s'%(name,i,food) print(f) pro.send(f) pro.send(None) pro.send(None) pro.close() def consumer(con,pro,name,lock): pro.close() while 1: time.sleep(random.random()) lock.acquire() food = con.recv() lock.release() if food == None: con.close() break print('%s消費了%s' % (name, food)) if __name__ == '__main__': con,pro = Pipe() lock = Lock() p1 = Process(target=producer,args=(con,pro,'雨','包子')) p1.start() c1 = Process(target=consumer,args=(con,pro,'時',lock)) c1.start() c2 = Process(target=consumer,args=(con,pro,'間',lock)) c2.start() con.close() pro.close()
4.進程之間的數據共享
展望將來,基於消息傳遞的併發編程是大勢所趨,即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。但進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。
之後咱們會嘗試使用數據庫來解決如今進程之間的數據共享問題。
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 。雖然進程間數據獨立,但能夠經過Manager實現數據共享。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from multiprocessing import Process,Manager def func(dic): dic['count'] -= 1 if __name__ == '__main__': m = Manager() dic = m.dict({'count':100}) p_lis = [] for i in range(50): p = Process(target=func,args=(dic,)) p.start() p_lis.append(p) [p.join() for p in p_lis] print(dic) #執行屢次,每次的結果有差別 {'count': 52} {'count': 50} {'count': 51} {'count': 54} #多個進程同時對一個數據更改時會混亂
針對上面的例子,咱們可使用加鎖來保證數據的安全性
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from multiprocessing import Process,Manager,Lock def func(dic,lock): lock.acquire() dic['count'] -= 1 lock.release() if __name__ == '__main__': m = Manager() lock = Lock() dic = m.dict({'count':100}) p_lis = [] for i in range(50): p = Process(target=func,args=(dic,lock)) p.start() p_lis.append(p) [p.join() for p in p_lis] print(dic) #{'count': 50}
5.進程池和multiprocess.Pool模塊
5.1進程池
爲何要有進程池?進程池的概念。
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。
因而就有了進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。
在python中,進程池中的進程數量是固定的指定的n個進程(通常開的進程數是電腦CPU的數量+1),而在其餘一些語言中,進程池中的進程是有一個下限的n,也有一個上限m(通常20),進程池中的進程會根據必定時間內的任務量來有效合理的增長進程(最多不超過上限),減小進程(很多於下限),來給操做系統減負。
備註:關於進程池和使用Process建立進程的使用:當咱們開啓不多的進程(好比:1個或2個)用來作一些簡單的好比開啓一個守護進程之類的咱們使用Process來建立啓動進程,除此以外,咱們都用進程池去啓動一個進程。
multiprocess.Pool模塊
from multiprocessing import Pool pool = Pool() #進程池的建立
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
1 process:要建立的進程數,若是省略,將默認使用cpu_count()的值 CPU的個數 2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None 3 initargs:是要傳給initializer的參數組
進程池和多進程的效率對比
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Pool def func(n): for i in range(10): print(n+i) if __name__ == '__main__': start = time.time() pool = Pool(5) #使用進程池,每次執行5個進程 pool.map(func,range(100)) t = time.time() - start print(t) #0.17122197151184082 from multiprocessing import Process def func(n): for i in range(10): print(n+i) if __name__ == '__main__': start = time.time() p_lis = [] for i in range(100): p = Process(target=func,args=(i,)) p.start() #啓動100個進程 p_lis.append(p) [p.join() for p in p_lis] t = time.time() - start print(t) #2.8937172889709473 #由此能夠看出,進程池的效率更高
pool.map() def map(self, func, iterable, chunksize=None) map方法中參數爲,要調用的函數名,可迭代對象 ,內置close()和join()方法
pool.apply() def apply(self, func, args=(), kwds={}) apply方法中參數爲,要調用的函數名,args=() 傳參 同步的調用進程
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import os import time from multiprocessing import Pool def func(i): print(i,'start:',os.getpid()) time.sleep(0.5) print(i,'end:',os.getpid()) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply(func,args=(i,)) #輸出結果能夠看出,apply方法是一個同步提交任務的方法
pool.apply_async() def apply_async(self, func, args=(), kwds={}, callback=None) 異步的調用進程
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import os import time from multiprocessing import Pool def func(i): print(i,'start:',os.getpid()) time.sleep(0.5) print(i,'end:',os.getpid()) if __name__ == '__main__': pool = Pool(5) #進程池中從無到有建立五個進程,之後一直是這五個進程在執行任務 for i in range(10): pool.apply_async(func,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多5個子進程在異步執行 # 須要注意的是,進程池中的五個進程不會同時開啓或者同時結束 # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務 pool.close() #結束進程池接收任務 pool.join() #感知進程池中的任務執行結束 # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用join,等待進程池內任務都處理完 # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束
進程池版socket併發聊天
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import socket from multiprocessing import Pool def func(conn): conn.send(b'hello') ret = conn.recv(1024) print(ret) conn.close() if __name__ == '__main__': pool = Pool(5) sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while 1: conn,addr = sk.accept() pool.apply_async(func,args=(conn,)) sk.close()
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) msg = sk.recv(1024).decode('utf-8') print(msg) content = input('>>>>>').encode('utf-8') sk.send(content) sk.close()
備註:併發開啓多個客戶端,服務端同一時間只有5個不一樣的pid,只能結束一個客戶端,另一個客戶端纔會進來.
5.2進程池中的返回值
備註:將子進程中的值返回,這個特性是進程池特有的。而經過Process建立的進程能夠經過IPC實現將子進程中的值傳回,可是沒法直接return
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Pool def func(n): time.sleep(1) return n**2 if __name__ == '__main__': pool = Pool(5) for i in range(10): p = pool.apply(func,args=(i,)) print(p) #apply 直接將返回值返回
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Pool def func(n): time.sleep(1) return n**2 if __name__ == '__main__': pool = Pool(5) for i in range(10): p = pool.apply_async(func,args=(i,)) print(p.get()) #直接get() 會阻塞 等待輸出結果 #解決方法就是將返回值對象添加到列表中,而後在主進程中for循環列表,再get取值 if __name__ == '__main__': p_lis = [] pool = Pool(5) for i in range(10): p = pool.apply_async(func,args=(i,)) p_lis.append(p) #返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務 for ret in p_lis:print(ret.get()) #使用get來獲取apply_aync的結果, # 若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get #輸出結果爲五個五個輸出
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import time from multiprocessing import Pool def func(n): time.sleep(1) return n**2 if __name__ == '__main__': pool = Pool(5) ret = pool.map(func,range(10)) print(ret) #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 直接返回一個包含全部返回值的列表 #將結果一次性的返回 也就是說要等待全部進程計算完結果後再一次性返回 由於自帶join和close
總結:由上可知,map和apply_async都提供了異步的提交返回值的方式,map更簡單,可是apply_async性能更強大,能夠自定製不少。map有侷限性,它須要等到全部的進程執行完畢後才一次性的返回全部值,而apply_async根據你設置的進程池數量n個n個的返回結果,可以及時的進行下一步操做。值得注意的是:apply_async須要先close後join來保持多進程和主進程代碼的同步性。(根據須要,若是不須要保持同步性,就能夠不用)
5.3回調函數——callback
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程。主進程則調用一個函數去處理該結果,該函數即回調函數 咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。主要應用於爬蟲當中。在爬蟲中,最佔用時間的是網絡延遲,因此咱們利用子進程將內容爬取下來,而後利用主進程來對字符串處理。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import os,time from multiprocessing import Pool def func1(n): # print('in func1',os.getpid()) time.sleep(1) return n+1 def func2(m): print(m) # print('in func2',os.getpid()) #回調函數是在主進程中執行的 if __name__ == '__main__': # print(os.getpid()) pool = Pool(5) for i in range(10): pool.apply_async(func1,args=(i,),callback=func2) pool.close() pool.join() #將子進程的值做爲參數傳給主進程中的callback指定函數
注意:回調函數是在主進程中執行的。將子進程中函數的返回值做爲參數傳參給callback指定的主進程中的函數。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from urllib import request ret = request.urlopen('http://www.baidu.com') print(ret.read().decode('utf-8')) #獲取網址上的全部內容
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import requests #requests.get(url) #括號中加一個要爬取的網址,返回一個對象 response = requests.get('http://www.baidu.com') print(response) #<Response [200]> response 響應,迴應 print(response.status_code) #200 狀態碼 ## status_code 返回404 表示訪問的網頁找不到 返回200 表示能正常返回 其餘502 504 都是錯誤信息 print(response.content) #爬取的內容 bytes類型的 b'<!DOCTYPE html>\r\n<!--STATUS OK-->....' print(response.__dict__) #返回一個包含狀態信息的字典
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#簡單的爬取數據 import requests from multiprocessing import Pool def get(url): response = requests.get(url) if response.status_code == 200: content = response.content.decode('utf-8') return url,content def dealwith(args): url,content = args print(url,len(content)) #簡單的爬取內容的長度 if __name__ == '__main__': pool = Pool(5) url_list = ['http://www.baidu.com', 'https://www.sogou.com', 'http://www.taobao.com', 'https://www.cnblogs.com'] for url in url_list: pool.apply_async(get,args=(url,),callback=dealwith) pool.close() pool.join() #http://www.baidu.com 2287 # https://www.sogou.com 32014 # https://www.cnblogs.com 39824 # http://www.taobao.com 124012
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import re from urllib.request import urlopen from multiprocessing import Pool def get(url,pattern): content = urlopen(url).read().decode('utf-8') return content,pattern def dealwith(args): content,pattern = args ret = re.findall(pattern,content) for item in ret: dic = {'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip()} print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' #regex 正則表達式 pattern = re.compile(regex,re.S) url_dic = {'http://maoyan.com/board/7': pattern} pool = Pool(5) res_l = [] for url,pattern in url_dic.items(): res = pool.apply_async(get,args=(url,pattern),callback=dealwith) res_l.append(res) for i in res_l:i.get()
備註:若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中全部進程執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到全部結果 print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理
進程池的其餘實現方式:https://docs.python.org/dev/library/concurrent.futures.html