python-9-進程

操做系統背景知識

顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。html

進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。python

PS:即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。linux

必備的理論基礎:nginx

#一 操做系統的做用:
    1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口
    2:管理、調度進程,而且將多個進程對硬件的競爭變得有序

#二 多道技術:
    1.產生背景:針對單核,實現併發
    ps:
    如今的主機通常是多核,那麼每一個核都會利用多道技術
    有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個
    cpu中的任意一個,具體由操做系統調度算法決定。
    
    2.空間上的複用:如內存中同時有多道程序
    3.時間上的複用:複用一個cpu的時間片
       強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣
            才能保證下次切換回來時,能基於上次切走的位置繼續運行

什麼是進程

進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。git

狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。
廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動。它是 操做系統動態執行的 基本單元,在傳統的 操做系統中,進程既是基本的 分配單元,也是基本的執行單元。
第一,進程是一個實體。每個進程都有它本身的地址空間,通常狀況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。
第二,進程是一個「執行中的程序」。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操做系統執行之),它才能成爲一個活動的實體,咱們稱其爲進程。[3] 
進程是操做系統中最基本、重要的概念。是多道程序系統出現後,爲了刻畫系統內部出現的動態狀況,描述系統內部各道程序的活動規律引進的一個概念,全部多道程序設計操做系統都創建在進程的基礎上。

進程的概念
進程的概念
從理論角度看,是對正在運行的程序過程的抽象;
從實現角度看,是一種數據結構,目的在於清晰地刻畫動態系統的內在規律,有效管理和調度進入計算機系統主存儲器運行的程序。
操做系統引入進程的概念的緣由
動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。
併發性:任何進程均可以同其餘進程一塊兒併發執行
獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
異步性:因爲進程間的相互制約,使進程具備執行的間斷性,即進程按各自獨立的、不可預知的速度向前推動
結構特徵:進程由程序、數據和進程控制塊三部分組成。
多個不一樣的進程能夠包含相同的程序:一個程序在不一樣的數據集裏就構成不一樣的進程,能獲得不一樣的結果;可是執行過程當中,程序不能發生改變。

進程的特徵
進程的特徵
程序是指令和數據的有序集合,其自己沒有任何運行的含義,是一個靜態的概念。
而進程是程序在處理機上的一次執行過程,它是一個動態的概念。
程序能夠做爲一種軟件資料長期存在,而進程是有必定生命期的。
程序是永久的,進程是暫時的。
進程與程序的區別

注意:同一個程序執行兩次,就會在操做系統中出現兩個進程,因此咱們能夠同時運行一個軟件,分別作不一樣的事情也不會混亂。github

進程調度

要想多個進程交替運行,操做系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是須要遵循必定的法則,由此就有了進程的調度算法。web

先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可用於做業調度,也可用於進程調度。FCFS算法比較有利於長做業(進程),而不利於短做業(進程)。由此可知,本算法適合於CPU繁忙型做業,而不利於I/O繁忙型的做業(進程)。
先來先服務調度算法
短做業(進程)優先調度算法(SJ/PF)是指對短做業或短進程優先調度的算法,該算法既可用於做業調度,也可用於進程調度。但其對長做業不利;不能保證緊迫性做業(進程)被及時處理;做業的長短只是被估算出來的。
短做業優先調度算法
時間片輪轉(Round Robin,RR)法的基本思路是讓每一個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,須要將CPU的處理時間分紅固定大小的時間片,例如,幾十毫秒至幾百毫秒。若是一個進程在被調度選中以後用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放本身所佔有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。
      顯然,輪轉法只能用來調度分配一些能夠搶佔的資源。這些能夠搶佔的資源能夠隨時被剝奪,並且能夠將它們再分配給別的進程。CPU是可搶佔資源的一種。但打印機等資源是不可搶佔的。因爲做業調度是對除了CPU以外的全部系統硬件資源的分配,其中包含有不可搶佔資源,因此做業調度不使用輪轉法。
在輪轉法中,時間片長度的選取很是重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。若是時間片長度太短,則調度程序搶佔處理機的次數增多。這將使進程上下文切換次數也大大增長,從而加劇系統開銷。反過來,若是時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所容許最大的進程數來肯定的。
      在輪轉法中,加入到就緒隊列的進程有3種狀況:
      一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。
      另外一種狀況是分給該進程的時間片並未用完,只是由於請求I/O或因爲進程的互斥與同步關係而被阻塞。當阻塞解除以後再回到就緒隊列。
      第三種狀況就是新建立進程進入就緒隊列。
      若是對這些進程區別對待,給予不一樣的優先級和時間片從直觀上看,能夠進一步改善系統服務質量和效率。例如,咱們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞緣由分紅不一樣的就緒隊列,每一個隊列按FCFS原則排列,各隊列之間的進程享有不一樣的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片以後,或從睡眠中被喚醒以及被建立以後,將進入不一樣的就緒隊列。  

時間片輪轉法
時間片輪調度算法
前面介紹的各類用做進程調度的算法都有必定的侷限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,並且若是並未指明進程的長度,則短進程優先和基於進程長度的搶佔式調度算法都將沒法使用。
而多級反饋隊列調度算法則沒必要事先知道各類進程所需的執行時間,並且還能夠知足各類類型進程的須要,於是它是目前被公認的一種較好的進程調度算法。在採用多級反饋隊列調度算法的系統中,調度算法的實施過程以下所述。
(1) 應設置多個就緒隊列,併爲各個隊列賦予不一樣的優先級。第一個隊列的優先級最高,第二個隊列次之,其他各隊列的優先權逐個下降。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,爲每一個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長一倍,……,第i+1個隊列的時間片要比第i個隊列的時間片長一倍。
(2) 當一個新進程進入內存後,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,即可準備撤離系統;若是它在一個時間片結束時還沒有完成,調度程序便將該進程轉入第二隊列的末尾,再一樣地按FCFS原則等待調度執行;若是它在第二隊列中運行一個時間片後仍未完成,再依次將它放入第三隊列,……,如此下去,當一個長做業(進程)從第一隊列依次降到第n隊列後,在第n 隊列便採起按時間片輪轉的方式運行。

(3) 僅當第一隊列空閒時,調度程序才調度第二隊列中的進程運行;僅當第1~(i-1)隊列均空時,纔會調度第i隊列中的進程運行。若是處理機正在第i隊列中爲某進程服務時,又有新進程進入優先權較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進程將搶佔正在運行進程的處理機,即由調度程序把正在運行的進程放回到第i隊列的末尾,把處理機分配給新到的高優先權進程。
多級反饋隊列

進程的並行與併發

並行 : 並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )算法

併發 : 併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。shell

區別:編程

並行是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
併發是從宏觀上,在一個時間段上能夠看出是同時執行的,好比一個服務器同時處理多個session。

同步異步阻塞非阻塞

狀態介紹

  在瞭解其餘概念以前,咱們首先要了解進程的幾個狀態。在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。

  (1)就緒(Ready)狀態

  當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。

  (2)執行/運行(Running)狀態當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。

  (3)阻塞(Blocked)狀態正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。

      

同步和異步

      所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。

  所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列

好比我去銀行辦理業務,可能會有兩種方式:
第一種 :選擇排隊等候;
第二種 :選擇取一個小紙條上面有個人號碼,等到排到我這一號時由櫃檯的人通知我輪到我去辦理業務了;

第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務狀況;

第二種:後者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)每每註冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這裏是櫃檯的人)經過某種機制(在這裏是寫在小紙條上的號碼,喊號)找到等待該事件的人。
例子

阻塞與非阻塞

      阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的

繼續上面的那個例子,不管是排隊仍是使用號碼等待通知,若是在這個等待的過程當中,等待者除了等待消息通知以外不能作其它的事情,那麼該機制就是阻塞的,表如今程序中,也就是該程序一直阻塞在該函數調用處不能繼續往下執行。
相反,有的人喜歡在銀行辦理這些業務的時候一邊打打電話發發短信一邊等待,這樣的狀態就是非阻塞的,由於他(等待者)沒有阻塞在這個消息通知上,而是一邊作本身的事情一邊等待。

注意:同步非阻塞形式其實是效率低下的,想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有。若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒有這樣的問題,由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換。
例子

同步/異步與阻塞/非阻塞

  1. 同步阻塞形式

  效率最低。拿上面的例子來講,就是你專心排隊,什麼別的事都不作。

  1. 異步阻塞形式

  若是在銀行等待辦理業務的人採用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間裏他不能離開銀行作其它的事情,那麼很顯然,這我的被阻塞在了這個等待的操做上面;

  異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。

  1. 同步非阻塞形式

  其實是效率低下的。

  想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的。

  1. 異步非阻塞形式

  效率更高,

  由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換

  好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。  

不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來,一樣的,不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞

進程的建立與結束

進程的建立

  但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。

  而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程:

  1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)

  2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)

  3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)

  4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)

  不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的。

1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程)

  2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。

  關於建立子進程,UNIX和windows

  1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。

  2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。

建立進程
建立進程

進程的結束

  1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出錯退出(自願,python a.py中a.py不存在)

  3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)

  4. 被其餘進程殺死(非自願,如kill -9)

在python程序中的進程操做

  以前咱們已經瞭解了不少進程相關的理論知識,瞭解進程是什麼應該再也不困難了,剛剛咱們已經瞭解了,運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所學的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。

multiprocess模塊

      仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。

multiprocess.process模塊

process模塊介紹

process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:
1 group參數未使用,值始終爲None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name爲子進程的名稱
複製代碼
1 p.start():啓動進程,並調用該子進程中的p.run() 
2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
3 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
4 p.is_alive():若是p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程  
方法介紹
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
屬性介紹
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。
在windows中使用process模塊的注意事項

使用process模塊建立進程

在一個python進程中開啓子進程,start方法和併發效果。

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    print('我是子進程')

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    time.sleep(1)
    print('執行主進程的內容了')
    
在python中啓動的第一個子進程
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)
    print('我是子進程')


if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    #p.join()
    print('我是父進程')
join方法
import os
from multiprocessing import Process

def f(x):
    print('子進程id :',os.getpid(),'父進程id :',os.getppid())
    return x*x

if __name__ == '__main__':
    print('主進程id :', os.getpid())
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()
查看主進程和子進程的進程號

 

進階,多個進程同時運行(注意,子進程的執行順序不是根據啓動順序決定的)

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
多個進程同時運行
import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
    # [p.join() for p in p_lst]
    print('父進程在執行')
多個進程同時運行,再談join方法(1)
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)

if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
    # [p.join() for p 
多個進程同時運行,再談join方法(2)

除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式

import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() #start會自動調用run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主線程')
經過繼承Process類開啓進程

進程之間的數據隔離問題

from multiprocessing import Process

def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主進程內: ',n)
進程之間的數據隔離問題

守護進程

會隨着主進程的結束而結束。

主進程建立守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)


p=Myprocess('哪吒')
p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
p.start()
time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id
print('')

守護進程的啓動
守護進程的啓動
from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止.

主進程代碼執行結束守護進程當即結束
主進程代碼執行結束守護進程當即結束

socket聊天併發實例

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start進程必定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()

使用多進程實現socket聊天併發-server
使用多進程實現socket聊天併發-server
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client端

多進程中的其餘方法

from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s還在和網紅臉聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive()) #結果爲True

print('開始')
print(p1.is_alive()) #結果爲False

進程對象的其餘方法:terminate,is_alive
進程對象的其餘方法:terminate,is_alive
class Myprocess(Process):
    def __init__(self,person):
        self.name=person   # name屬性是Process中的屬性,標示進程的名字
        super().__init__() # 執行父類的初始化方法會覆蓋name屬性
        #self.name = person # 在這裏設置就能夠修改進程名字了
        #self.person = person #若是不想覆蓋進程名,就修改屬性名稱就能夠了
    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        # print('%s正在和網紅臉聊天' %self.person)
        time.sleep(random.randrange(1,5))
        print('%s正在和網紅臉聊天' %self.name)
        # print('%s正在和網紅臉聊天' %self.person)


p1=Myprocess('哪吒')
p1.start()
print(p1.pid)    #能夠查看子進程的進程id

進程對象的其餘屬性:pid和name
進程對象的其餘屬性:pid和name

進程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

鎖 —— multiprocess.Lock

      經過剛剛的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題。

  當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work,args=(i,))
        p.start()

多進程搶佔輸出資源
多進程搶佔輸出資源
# 由併發變成了串行,犧牲了運行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()
使用鎖維護執行順序

上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。

  接下來,咱們以模擬搶票爲例,來看看數據安全的重要性。 

#文件db的內容爲:{"count":1}
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[43m購票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task)
        p.start()
多進程同時搶購餘票
#文件db的內容爲:{"count":5}
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[32m購票成功\033[0m')
    else:
        print('\033[31m購票失敗\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()

使用鎖來保證數據安全
使用鎖來保證數據安全
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理

#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

信號量 —— multiprocess.Semaphore(瞭解)

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
信號量介紹Semaphore
from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 佔到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每一個人在ktv中待的時間不一樣
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

例子
例子

事件 —— multiprocess.Event(瞭解)

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

clear:將「Flag」設置爲False
set:將「Flag」設置爲True
事件介紹
from multiprocessing import Process, Event
import time, random


def car(e, n):
    while True:
        if not e.is_set():  # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait()    # 阻塞,等待is_set()的值變成True,模擬信號燈爲綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():   #若是is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
                continue
            print('車開遠了,car', n)
            break


def police_car(e, n):
    while True:
        if not e.is_set():# 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s以後沒有等到綠燈就闖紅燈走了
            if not e.is_set():
                print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
            else:
                print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
        break



def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設置爲False
        else:
            e.set()    # ---->將is_set()的值設置爲True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 建立是個進程控制10輛車
        p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))  # 建立5個進程控制5輛警車
        p.start()
    t = Process(target=traffic_lights, args=(e, 10))  # 建立一個進程控制紅綠燈
    t.start()

    print('============》')

紅綠燈實例
紅綠燈實例

進程間通訊——隊列和管道(multiprocess.Queue、multiprocess.Pipe)

進程間通訊

IPC(Inter-Process Communication)

隊列 

概念介紹

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 

Queue([maxsize]) 
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。

方法介紹
方法介紹
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread() 
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
其餘方法(瞭解)

代碼實例

'''
multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,可是隊列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
try:
    q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
    print('隊列已經滿了')

# 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty()) #空了

單看隊列用法
單看隊列用法

上面這個例子尚未加入進程通訊,只是先來看看隊列爲咱們提供的方法,以及這些方法的使用和現象。

import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), 'from Eva', 'hello'])  #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。

if __name__ == '__main__':
    q = Queue() #建立一個Queue對象
    p = Process(target=f, args=(q,)) #建立一個進程
    p.start()
    print(q.get())
    p.join()
子進程發送數據給父進程

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最早進入的數據。 接下來看一個稍微複雜一些的例子:

import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))

# Main
if __name__ == '__main__':
    multiprocessing.freeze_support()
    record1 = []   # store input processes
    record2 = []   # store output processes
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()

批量生產數據放入隊列再批量獲取結果 x
批量生產數據放入隊列再批量獲取結果 x
生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')

基於隊列實現生產者消費者模型
基於隊列實現生產者消費者模型

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #發送結束信號
if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')

改良版——生產者消費者模型
改良版——生產者消費者模型

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #發送結束信號
    print('')

主進程在生產者生產完畢後發送結束信號None
主進程在生產者生產完畢後發送結束信號None

但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None) #有幾個消費者就應該發送幾回結束信號None
    q.put(None) #發送結束信號
    print('')

多個消費者的例子:有幾個消費者就須要發送幾回結束信號
多個消費者的例子:有幾個消費者就須要發送幾回結束信號

JoinableQueue([maxsize]) 
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:

q.task_done() 
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。

q.join() 
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 
下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。

方法介紹
方法介紹
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
        q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。


if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #開始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('') 
    
    #主進程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
    #於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。

JoinableQueue隊列實現消費之生產者模型
JoinableQueue隊列實現消費之生產者模型

管道(瞭解)

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。

介紹
介紹
from multiprocessing import Process, Pipe


def f(conn):
    conn.send("Hello The_Third_Wave")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()
pipe初使用

應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 

from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引起EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()
引起EOFError
from multiprocessing import Process,Pipe

def consumer(p,name):
    produce, consume=p
    produce.close()
    while True:
        try:
            baozi=consume.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            break

def producer(seq,p):
    produce, consume=p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(produce,consume))

    produce.close()
    consume.close()

    c1.join()
    print('主進程')
pipe實現生產者消費者模型
from multiprocessing import Process,Pipe,Lock

def consumer(p,name,lock):
    produce, consume=p
    produce.close()
    while True:
        lock.acquire()
        baozi=consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' %(name,baozi))
        else:
            consume.close()
            break


def producer(p,n):
    produce, consume=p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == '__main__':
    produce,consume=Pipe()
    lock = Lock()
    c1=Process(target=consumer,args=((produce,consume),'c1',lock))
    c2=Process(target=consumer,args=((produce,consume),'c2',lock))
    p1=Process(target=producer,args=((produce,consume),10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主進程')

多個消費之之間的競爭問題帶來的數據不安全問題
多個消費之之間的競爭問題帶來的數據不安全問題

程池和multiprocess.Pool模塊

進程池

爲何要有進程池?進程池的概念。

在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?

在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。

multiprocess.Pool模塊

概念介紹

Pool([numprocess  [,initializer [, initargs]]]):建立進程池
1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
3 initargs:是要傳給initializer的參數組
參數介紹
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

主要方法
主要方法
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

其餘方法(瞭解)
其餘方法(瞭解)

代碼實例

同步和異步
import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞
                                    # 但無論該任務是否存在阻塞,同步調用都會在原地等着
    print(res_l)

進程池的同步調用
進程池的同步調用
import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
                                          # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  
        res_l.append(res)

    # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

進程池的異步調用
進程池的異步調用

練習

#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count())
#開啓6個客戶端,會發現2個客戶端處於等待狀態
#在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('進程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

server:進程池版socket併發聊天
server:進程池版socket併發聊天
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

client
client

發現:併發開啓多個客戶端,服務端同一時間只有4個不一樣的pid,只能結束一個客戶端,另一個客戶端纔會進來.

回調函數
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了

'''
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''

使用多進程請求多個url來減小網絡等待浪費的時間
使用多進程請求多個url來減小網絡等待浪費的時間
import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')
    return pattern,response

def parse_page(info):
    pattern,page_content=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)
if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

爬蟲實例
爬蟲實例

若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中全部進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到全部結果
    print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理

無需回調函數
無需回調函數

進程池的其餘實現方式:https://docs.python.org/dev/library/concurrent.futures.html

參考資料http://www.cnblogs.com/linhaifeng/articles/6817679.htmlhttps://www.jianshu.com/p/1200fd49b583https://www.jianshu.com/p/aed6067eeac9
相關文章
相關標籤/搜索