puthon進程開發

進程html

本節目錄python

一 背景知識

  顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。linux

  進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。操做系統的其餘全部內容都是圍繞進程的概念展開的。nginx

  因此想要真正瞭解進程,必須事先了解操做系統,點擊進入    git

  PS:即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。github

  必備的理論基礎:web

複製代碼
#一 操做系統的做用:
    1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口
    2:管理、調度進程,而且將多個進程對硬件的競爭變得有序

#二 多道技術:
    1.產生背景:針對單核,實現併發
    ps:
    如今的主機通常是多核,那麼每一個核都會利用多道技術
    有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個
    cpu中的任意一個,具體由操做系統調度算法決定。
    
    2.空間上的複用:如內存中同時有多道程序
    3.時間上的複用:複用一個cpu的時間片
       強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣
            才能保證下次切換回來時,能基於上次切走的位置繼續運行
複製代碼

 

二 什麼是進程

  進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。咱們本身在python文件中寫了一些代碼,這叫作程序,運行這個python文件的時候,這叫作進程。面試

  狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。
  廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動。它是操做系統動態執行的基本單元,在傳統的操做系統中,進程既是基本的分配單元,也是基本的執行單元。
  舉例: 好比py1文件中有個變量a=1,py2文件中有個變量a=2,他們兩個會衝突嗎?不會的,是否是,由於兩個文件運行起來後是兩個進程,操做系統讓他們在內存上隔離開,對吧。
  
第一,進程是一個實體。每個進程都有它本身的地址空間,通常狀況下,包括文本區域(text region)(python的文件)、數據區域(data region)(python文件中定義的一些變量數據)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。
第二,進程是一個「執行中的程序」。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操做系統執行之),它才能成爲一個活動的實體,咱們稱其爲進程。[3] 
進程是操做系統中最基本、重要的概念。是多道程序系統出現後,爲了刻畫系統內部出現的動態狀況,描述系統內部各道程序的活動規律引進的一個概念,全部多道程序設計操做系統都創建在進程的基礎上。
 
動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。
併發性:任何進程均可以同其餘進程一塊兒併發執行
獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
異步性:因爲進程間的相互制約,使進程具備執行的間斷性,即進程按各自獨立的、不可預知的速度向前推動
結構特徵:進程由程序、數據和進程控制塊三部分組成。
多個不一樣的進程能夠包含相同的程序:一個程序在不一樣的數據集裏就構成不一樣的進程,能獲得不一樣的結果;可是執行過程當中,程序不能發生改變。

 

程序是指令和數據的有序集合,其自己沒有任何運行的含義,是一個靜態的概念。
而進程是程序在處理機上的一次執行過程,它是一個動態的概念。
程序能夠做爲一種軟件資料長期存在,而進程是有必定生命期的。
程序是永久的,進程是暫時的。
舉例:就像qq同樣,qq是咱們安裝在本身電腦上的客戶端程序,其實就是一堆的代碼文件,咱們不運行qq,那麼他就是一堆代碼程序,當咱們運行qq的時候,這些代碼運行起來,就成爲一個進程了。

 

注意:同一個程序執行兩次,就會在操做系統中出現兩個進程,因此咱們能夠同時運行一個軟件,分別作不一樣的事情也不會混亂。好比打開暴風影音,雖然都是同一個軟件,可是一個能夠播放蒼井空,一個能夠播放飯島愛。算法

 

 

三 進程調度

要想多個進程交替運行,操做系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是須要遵循必定的法則,由此就有了進程的調度算法。shell

先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可用於做業調度,也可用於進程調度。FCFS算法比較有利於長做業(進程),而不利於短做業(進程)。由此可知,本算法適合於CPU繁忙型做業,而不利於I/O繁忙型的做業(進程)。

 

短做業(進程)優先調度算法(SJ/PF)是指對短做業或短進程優先調度的算法,該算法既可用於做業調度,也可用於進程調度。但其對長做業不利;不能保證緊迫性做業(進程)被及時處理;做業的長短只是被估算出來的。

 

複製代碼
時間片輪轉(Round Robin,RR)法的基本思路是讓每一個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,須要將CPU的處理時間分紅固定大小的時間片,例如,幾十毫秒至幾百毫秒。若是一個進程在被調度選中以後用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放本身所佔有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。
      顯然,輪轉法只能用來調度分配一些能夠搶佔的資源。這些能夠搶佔的資源能夠隨時被剝奪,並且能夠將它們再分配給別的進程。CPU是可搶佔資源的一種。但打印機等資源是不可搶佔的。因爲做業調度是對除了CPU以外的全部系統硬件資源的分配,其中包含有不可搶佔資源,因此做業調度不使用輪轉法。
在輪轉法中,時間片長度的選取很是重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。若是時間片長度太短,則調度程序搶佔處理機的次數增多。這將使進程上下文切換次數也大大增長,從而加劇系統開銷。反過來,若是時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所容許最大的進程數來肯定的。
      在輪轉法中,加入到就緒隊列的進程有3種狀況:
      一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。
      另外一種狀況是分給該進程的時間片並未用完,只是由於請求I/O或因爲進程的互斥與同步關係而被阻塞。當阻塞解除以後再回到就緒隊列。
      第三種狀況就是新建立進程進入就緒隊列。
      若是對這些進程區別對待,給予不一樣的優先級和時間片從直觀上看,能夠進一步改善系統服務質量和效率。例如,咱們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞緣由分紅不一樣的就緒隊列,每一個隊列按FCFS原則排列,各隊列之間的進程享有不一樣的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片以後,或從睡眠中被喚醒以及被建立以後,將進入不一樣的就緒隊列。  
複製代碼

 

複製代碼
前面介紹的各類用做進程調度的算法都有必定的侷限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,並且若是並未指明進程的長度,則短進程優先和基於進程長度的搶佔式調度算法都將沒法使用。
而多級反饋隊列調度算法則沒必要事先知道各類進程所需的執行時間,並且還能夠知足各類類型進程的須要,於是它是目前被公認的一種較好的進程調度算法。在採用多級反饋隊列調度算法的系統中,調度算法的實施過程以下所述。
(1) 應設置多個就緒隊列,併爲各個隊列賦予不一樣的優先級。第一個隊列的優先級最高,第二個隊列次之,其他各隊列的優先權逐個下降。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,爲每一個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長一倍,……,第i+1個隊列的時間片要比第i個隊列的時間片長一倍。
(2) 當一個新進程進入內存後,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,即可準備撤離系統;若是它在一個時間片結束時還沒有完成,調度程序便將該進程轉入第二隊列的末尾,再一樣地按FCFS原則等待調度執行;若是它在第二隊列中運行一個時間片後仍未完成,再依次將它放入第三隊列,……,如此下去,當一個長做業(進程)從第一隊列依次降到第n隊列後,在第n 隊列便採起按時間片輪轉的方式運行。

(3) 僅當第一隊列空閒時,調度程序才調度第二隊列中的進程運行;僅當第1~(i-1)隊列均空時,纔會調度第i隊列中的進程運行。若是處理機正在第i隊列中爲某進程服務時,又有新進程進入優先權較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進程將搶佔正在運行進程的處理機,即由調度程序把正在運行的進程放回到第i隊列的末尾,把處理機分配給新到的高優先權進程。
複製代碼

  對於多級反饋隊列,windows不太清楚,可是在linux裏面能夠設置某個進程的優先級,提升了有限級有可能就會多執行幾個時間片。

 

四 併發與並行

經過進程之間的調度,也就是進程之間的切換,咱們用戶感知到的好像是兩個視頻文件同時在播放,或者音樂和遊戲同時在進行,那就讓咱們來看一下什麼叫作併發和並行

不管是並行仍是併發,在用戶看來都是'同時'運行的,無論是進程仍是線程,都只是一個任務而已,真是幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行一個任務

  併發:是僞並行,即看起來是同時運行。單個cpu+多道技術就能夠實現併發,(並行也屬於併發)

你是一個cpu,你同時談了三個女友,每個均可以是一個戀愛任務,你被這三個任務共享要玩出併發戀愛的效果,
應該是你先跟女朋友1去看電影,看了一會說:很差,我要拉肚子,而後跑去跟第二個女朋友吃飯,吃了一會說:那啥,我去趟洗手間,而後跑去跟女朋友3開了個房,而後在你的基友眼裏,你就在和三個女朋友同時在一塊兒玩。

 

  並行:並行:同時運行,只有具有多個cpu才能實現並行

將多個cpu必須成高速公路上的多個車道,進程就比如每一個車道上行駛的車輛,並行就是說,你們在本身的車道上行駛,會不影響,同時在開車。這就是並行

 

  單核下,能夠利用多道技術,多個核,每一個核也均可以利用多道技術(多道技術是針對單核而言的

  有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,

  一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術

  而一旦任務1的I/O結束了,操做系統會從新調用它(需知進程的調度、分配給哪一個cpu運行,由操做系統說了算),可能被分配給四個cpu中的任意一個去執行

  全部現代計算機常常會在同一時間作不少件事,一個用戶的PC(不管是單cpu仍是多cpu),均可以同時運行多個任務(一個任務能夠理解爲一個進程)。

    啓動一個進程來殺毒(360軟件)

    啓動一個進程來看電影(暴風影音)

    啓動一個進程來聊天(騰訊QQ)

  全部的這些進程都需被管理,因而一個支持多進程的多道程序系統是相當重要的

  多道技術概念回顧:內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另一個,使每一個進程各自運行幾十或幾百毫秒,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻能夠運行多個進程,這就給人產生了並行的錯覺,即僞並行,以此來區分多處理器操做系統的真正硬件並行(多個cpu共享同一個物理內存)

 

五 同步\異步\阻塞\非阻塞(重點)

1.進程狀態介紹

  在瞭解其餘概念以前,咱們首先要了解進程的幾個狀態。在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。

  (1)就緒(Ready)狀態

    當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。

  (2)執行/運行(Running)狀態當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。

  (3)阻塞(Blocked)狀態正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。

    事件請求:input、sleep、文件輸入輸出、recv、accept等

    事件發生:sleep、input等完成了

    時間片到了以後有回到就緒狀態,這三個狀態不斷的在轉換。

 

2.同步異步

    所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。其實就是一個程序結束才執行另一個程序,串行的,不必定兩個程序就有依賴關係。

    所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列

好比咱們去樓下的老家肉餅吃飯,飯點好了,取餐的時候發生了一些同步異步的事情。
同步:咱們都站在隊裏等着取餐,前面有我的點了一份肉餅,後廚作了好久,可是因爲同步機制,咱們仍是要站在隊裏等着前面那我的的肉餅作好取走,咱們才往前走一步。
異步:咱們點完餐以後,點餐員給了咱們一個取餐號碼,跟你說,你不用在這裏排隊等着,去找個地方坐着玩手機去吧,等飯作好了,我叫你。這種機制(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中等着取餐的你)每每註冊一個回調機制,在所等待的事件被觸發時由觸發機制(點餐員)經過某種機制(喊號,‘250號你的包子好了‘)找到等待該事件的人。

 

3.阻塞與非阻塞

   阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的

繼續上面的那個例子,不管是排隊仍是使用號碼等待通知,若是在這個等待的過程當中,等待者除了等待消息通知以外不能作其它的事情,那麼該機制就是阻塞的,表如今程序中,也就是該程序一直阻塞在該函數調用處不能繼續往下執行。
相反,有的人喜歡在等待取餐的時候一邊打遊戲一邊等待,這樣的狀態就是非阻塞的,由於他(等待者)沒有阻塞在這個消息通知上,而是一邊作本身的事情一邊等待。阻塞的方法:input、time.sleep,socket中的recv、accept等等。

 

 4.同步/異步 與 阻塞和非阻塞

  1. 同步阻塞形式

    效率最低。拿上面的例子來講,就是你專心排隊,什麼別的事都不作。

  1. 異步阻塞形式

    若是在排隊取餐的人採用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間裏他不能作其它的事情,就在那坐着等着,不能玩遊戲等,那麼很顯然,這我的被阻塞在了這個等待的操做上面;

    異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。

  1. 同步非阻塞形式

    其實是效率低下的。

    想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的。

  1. 異步非阻塞形式

    效率更高,

    由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換

    好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴點餐員說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。

  不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來,一樣的,不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞

 

六 進程的建立、結束與併發的實現(瞭解)

1.進程的建立

    但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。

    而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程

      1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)

      2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)

      3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)

      4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)

  

    不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的:

      1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程)

      2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。

 

    關於建立的子進程,UNIX和windows

      1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。

      2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。

 

2.進程的結束 

    1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)

    2. 出錯退出(自願,python a.py中a.py不存在)

    3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)

    4. 被其餘進程殺死(非自願,如kill -9)

 3.進程併發的實現(瞭解)

    進程併發的實如今於,硬件中斷一個正在運行的進程,把此時進程運行的全部狀態保存下來,爲此,操做系統維護一張表格,即進程表(process table),每一個進程佔用一個進程表項(這些表項也稱爲進程控制塊)

    

    該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配情況、全部打開文件的狀態、賬號和調度信息,以及其餘在進程由運行態轉爲就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啓動時,就像從未被中斷過同樣。

 

===========================================================

 上面的內容都是進程的一些理論基礎,下面的內容是python中進程的應用實戰

=====================================================================

今天的內容就到這個地方吧,同窗們好好整理一下~~~~~~~~~~~~~~~~

  經過上面內容的學習,咱們已經瞭解了不少進程相關的理論知識,瞭解進程是什麼應該再也不困難了,剛剛咱們已經瞭解了,運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所學的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。

七 multiprocess模塊

仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。重點強調:進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內,可是經過一些特殊的方法,能夠實現進程之間數據的共享。

1.process模塊介紹

   process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

    咱們先寫一個程序來看看:

複製代碼
#當前文件名稱爲test.py
# from multiprocessing import Process # # def func(): # print(12345) # # if __name__ == '__main__': #windows 下才須要寫這個,這和系統建立進程的機制有關係,不用深究,記着windows下要寫就好啦 # #首先我運行當前這個test.py文件,運行這個文件的程序,那麼就產生了進程,這個進程咱們稱爲主進程 # # p = Process(target=func,) #將函數註冊到一個進程中,p是一個進程對象,此時尚未啓動進程,只是建立了一個進程對象。而且func是不加括號的,由於加上括號這個函數就直接運行了對吧。 # p.start() #告訴操做系統,給我開啓一個進程,func這個函數就被咱們新開的這個進程執行了,而這個進程是我主進程運行過程當中建立出來的,因此稱這個新建立的進程爲主進程的子進程,而主進程又能夠稱爲這個新進程的父進程。
          #而這個子進程中執行的程序,至關於將如今這個test.py文件中的程序copy到一個你看不到的python文件中去執行了,就至關於當前這個文件,被另一個py文件import過去並執行了。
          #start並非直接就去執行了,咱們知道進程有三個狀態,進程會進入進程的三個狀態,就緒,(被調度,也就是時間片切換到它的時候)執行,阻塞,而且在這個三個狀態之間不斷的轉換,等待cpu執行時間片到了。 # print('*' * 10) #這是主進程的程序,上面開啓的子進程的程序是和主進程的程序同時運行的,咱們稱爲異步
複製代碼

 

    上面說了,咱們經過主進程建立的子進程是異步執行的,那麼咱們就驗證一下,而且看一會兒進程和主進程(也就是父進程)的ID號(講一下pid和ppid,使用pycharm舉例),來看看是不是父子關係。

  子進程與主進程

 

    

    打開windows下的任務管理器,看pycharm的pid進程號,是咱們上面運行的test.py這個文件主進程的父進程號:

    

 

    看一個問題,說明linux和windows兩個不一樣的操做系統建立進程的不一樣機制致使的不一樣結果:  

複製代碼
import time
import os
from multiprocessing import Process

def func():
    print('aaaa')
    time.sleep(1)
    print('子進程>>',os.getpid())
    print('該子進程的父進程>>',os.getppid())
    print(12345)

print('太白老司機~~~~') #若是我在這裏加了一個打印,你會發現運行結果中會出現兩次打印出來的太白老司機,由於咱們在主進程中開了一個子進程,子進程中的程序至關於import的主進程中的程序,那麼import的時候會不會執行你import的那個文件的程序啊,前面學的,是會執行的,因此出現了兩次打印
#實際上是由於windows開起進程的機制決定的,在linux下是不存在這個效果的,由於windows使用的是process方法來開啓進程,他就會拿到主進程中的全部程序,而linux下只是去執行我子進程中註冊的那個函數,不會執行別的程序,這也是爲何在windows下要加上執行程序的時候,
要加上if __name__ == '__main__':,不然會出現子進程中運行的時候還開啓子進程,那就出現無限循環的建立進程了,就報錯了
複製代碼

 

 

    一個進程的生命週期:若是子進程的運行時間長,那麼等到子進程執行結束程序才結束,若是主進程的執行時間長,那麼主進程執行結束程序才結束,實際上咱們在子進程中打印的內容是在主進程的執行結果中看不出來的,可是pycharm幫咱們作了優化,由於它會識別到你這是開的子進程,幫你把子進程中打印的內容打印到了顯示臺上。

    若是說一個主進程運行完了以後,咱們把pycharm關了,可是子進程尚未執行結束,那麼子進程還存在嗎?這要看你的進程是如何配置的,若是說咱們沒有配置說我主進程結束,子進程要跟着結束,那麼主進程結束的時候,子進程是不會跟着結束的,他會本身執行完,若是我設定的是主進程結束,子進程必須跟着結束,那麼就不會出現單獨的子進程(孤兒進程)了,具體如何設置,看下面的守護進程的講解。好比說,咱們未來啓動項目的時候,可能經過cmd來啓動,那麼我cmd關閉了你的項目就會關閉嗎,不會的,由於你的項目不能中止對外的服務,對吧。

    Process類中參數的介紹:

參數介紹:
1 group參數未使用,值始終爲None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name爲子進程的名稱

    給要執行的函數傳參數:

複製代碼
def func(x,y):
    print(x)
    time.sleep(1)
    print(y)

if __name__ == '__main__':

    p = Process(target=func,args=('姑娘','來玩啊!'))#這是func須要接收的參數的傳送方式。
    p.start()
    print('父進程執行結束!')

#執行結果:
父進程執行結束!
姑娘
來玩啊!
複製代碼

 

    Process類中各方法的介紹:

複製代碼
1 p.start():啓動進程,並調用該子進程中的p.run() 
2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
3 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
4 p.is_alive():若是p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程  
複製代碼

     

    join方法的例子:

    讓主進程加上join的地方等待(也就是阻塞住),等待子進程執行完以後,再繼續往下執行個人主進程,好多時候,咱們主進程須要子進程的執行結果,因此必需要等待。join感受就像是將子進程和主進程拼接起來同樣,將異步改成同步執行。

複製代碼
def func(x,y):
    print(x)
    time.sleep(1)
    print(y)

if __name__ == '__main__':

    p = Process(target=func,args=('姑娘','來玩啊!'))
    p.start()
    print('我這裏是異步的啊!')  #這裏相對於子進程仍是異步的
    p.join()  #只有在join的地方纔會阻塞住,將子進程和主進程之間的異步改成同步
    print('父進程執行結束!')

#打印結果:
我這裏是異步的啊!
姑娘
來玩啊!
父進程執行結束!
複製代碼

 

  怎麼樣開啓多個進程呢?for循環。而且我有個需求就是說,全部的子進程異步執行,而後全部的子進程所有執行完以後,我再執行主進程,怎麼搞?看代碼

複製代碼
#下面的註釋按照編號去看,別忘啦!
import time
import os
from multiprocessing import Process

def func(x,y):
    print(x)
    # time.sleep(1) #進程切換:若是沒有這個時間間隔,那麼你會發現func執行結果是打印一個x而後一個y,再打印一個x一個y,不會出現打印多個x而後打印y的狀況,由於兩個打印距離太近了並且執行的也很是快,可是若是你這段程序運行慢的話,你就會發現進程之間的切換了。
    print(y)

if __name__ == '__main__':

    p_list= []
    for i in range(10):
        p = Process(target=func,args=('姑娘%s'%i,'來玩啊!'))
        p_list.append(p)
        p.start()

    [ap.join() for ap in p_list] #四、這是解決辦法,前提是咱們的子進程所有都已經去執行了,那麼我在一次給全部正在執行的子進程加上join,那麼主進程就須要等着全部子進程執行結束纔會繼續執行本身的程序了,而且保障了全部子進程是異步執行的。

        # p.join() #一、若是加到for循環裏面,那麼全部子進程包括父進程就所有變爲同步了,由於for循環也是主進程的,循環第一次的時候,一個進程去執行了,而後這個進程就join住了,那麼for循環就不會繼續執行了,等着第一個子進程執行結束纔會繼續執行for循環去建立第二個子進程。
        #二、若是我不想這樣的,也就是我想全部的子進程是異步的,而後全部的子進程執行完了再執行主進程
    #p.join() #三、若是這樣寫的話,屢次運行以後,你會發現會出現主進程的程序比一些子進程先執行完,由於咱們p.join()是對最後一個子進程進行了join,也就是說若是這最後一個子進程先於其餘子進程執行完,那麼主進程就會去執行,而此時若是還有一些子進程沒有執行完,而主進程執行
         #完了,那麼就會先打印主進程的內容了,這個cpu調度進程的機制有關係,由於咱們的電腦可能只有4個cpu,個人子進程加上住進程有11個,雖然我for循環是按順序起進程的,可是操做系統必定會按照順序給你執行你的進程嗎,答案是不會的,操做系統會按照本身的算法來分配進
              #程給cpu去執行,這裏也解釋了咱們打印出來的子進程中的內容也是沒有固定順序的緣由,由於打印結果也須要調用cpu,能夠理解成進程在爭搶cpu,若是同窗你想問這是什麼算法,這就要去研究操做系統啦。那咱們的想全部子進程異步執行,而後再執行主進程的這個需求怎麼解決啊
    print('不要錢~~~~~~~~~~~~~~~~!')
複製代碼

 

   

  模擬兩個應用場景:一、同時對一個文件進行寫操做  二、同時建立多個文件

複製代碼
import time
import os
import re
from multiprocessing import Process
#多進程同時對一個文件進行寫操做
def func(x,y,i):
    with open(x,'a',encoding='utf-8') as f:
        print('當前進程%s拿到的文件的光標位置>>%s'%(os.getpid(),f.tell()))
        f.write(y)

#多進程同時建立多個文件
# def func(x, y):
#     with open(x, 'w', encoding='utf-8') as f:
#         f.write(y)

if __name__ == '__main__':

    p_list= []
    for i in range(10):
        p = Process(target=func,args=('can_do_girl_lists.txt','姑娘%s'%i,i)) 
        # p = Process(target=func,args=('can_do_girl_info%s.txt'%i,'姑娘電話0000%s'%i))
        p_list.append(p)
        p.start()

    [ap.join() for ap in p_list] #這就是個for循環,只不過用列表生成式的形式寫的
    with open('can_do_girl_lists.txt','r',encoding='utf-8') as f:
        data = f.read()
        all_num = re.findall('\d+',data) #打開文件,統計一下里面有多少個數據,每一個數據都有個數字,因此re匹配一下就好了
        print('>>>>>',all_num,'.....%s'%(len(all_num)))
    #print([i in in os.walk(r'你的文件夾路徑')])
    print('不要錢~~~~~~~~~~~~~~~~!')
 
複製代碼

 

 

  Process類中自帶封裝的各屬性的介紹

複製代碼
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
複製代碼

 

2.Process類的使用

  注意:在windows中Process()必須放到# if __name__ == '__main__':下

複製代碼
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 
若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
複製代碼

     

    進程的建立第二種方法(繼承)

複製代碼
class MyProcess(Process): #本身寫一個類,繼承Process類
    #咱們經過init方法能夠傳參數,若是隻寫一個run方法,那麼無法傳參數,由於建立對象的是傳參就是在init方法裏面,面向對象的時候,咱們是否是學過
    def __init__(self,person):
        super().__init__()
        self.person=person
    def run(self):
        print(os.getpid())
        print(self.pid)
        print(self.pid)
        print('%s 正在和女主播聊天' %self.person)
    # def start(self):
    #     #若是你非要寫一個start方法,能夠這樣寫,而且在run方法先後,能夠寫一些其餘的邏輯
    #     self.run()
if __name__ == '__main__':
    p1=MyProcess('Jedan')
    p2=MyProcess('太白')
    p3=MyProcess('alexDSB')

    p1.start() #start內部會自動調用run方法
    p2.start()
    # p2.run()
    p3.start()


    p1.join()
    p2.join()
    p3.join()
複製代碼

      

    進程之間的數據是隔離的:

複製代碼
#咱們說進程之間的數據是隔離的,也就是數據不共享,看下面的驗證
from multiprocessing import Process
n=100 #首先我定義了一個全局變量,在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了
def work():
    global n
    n=0
    print('子進程內: ',n)

if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    p.join() #等待子進程執行完畢,若是數據共享的話,我子進程是否是經過global將n改成0了,可是你看打印結果,主進程在子進程執行結束以後,仍然是n=100,子進程n=0,說明子進程對n的修改沒有在主進程中生效,說明什麼?說明他們之間的數據是隔離的,互相不影響的
    print('主進程內: ',n)

#看結果:
# 子進程內:  0
# 主進程內:  100
複製代碼

 

     練習:咱們以前學socket的時候,知道tcp協議的socket是不能同時和多個客戶端進行鏈接的,(這裏先不考慮socketserver那個模塊),對不對,那咱們本身經過多進程來實現一下同時和多個客戶端進行鏈接通訊。

     服務端代碼示例:(注意一點:經過這個是不能作qq聊天的,由於qq聊天是qq的客戶端把信息發給另一個qq的客戶端,中間有一個服務端幫你轉發消息,而不是咱們這樣的單純的客戶端和服務端對話,而且子進程開啓以後我們是無法操做的,而且沒有爲子進程input輸入提供控制檯,全部你再在子進程中寫上了input會報錯,EOFError錯誤,這個錯誤的意思就是你的input須要輸入,可是你輸入不了,就會報這個錯誤。而子進程的輸出打印之類的,是pycharm作了優化,將全部子進程中的輸出結果幫你打印出來了,但實質仍是不一樣進程的。)

複製代碼
from socket import *
from multiprocessing import Process

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            print('客戶端消息>>',msg)
            if not msg:break
            conn.send(msg.upper())
            #在這裏有同窗可能會想,我能不能在這裏寫input來本身輸入內容和客戶端進行對話?朋友,是這樣的,按說是能夠的,可是須要什麼呢?須要你像咱們用pycharm的是同樣下面有一個輸入內容的控制檯,當咱們的子進程去執行的時候,咱們是沒有地方能夠顯示可以讓你輸入內容的控制檯的,因此你沒辦法輸入,就會給你報錯。
        except Exception:
            break

if __name__ == '__main__': #windows下start進程必定要寫到這下面
    server = socket(AF_INET, SOCK_STREAM)
    # server.setsockopt(SOL_SOCKET, SO_REUSEADDR,1)  # 若是你將若是你將bind這些代碼寫到if __name__ == '__main__'這行代碼的上面,那麼地址重用必需要有,由於咱們知道windows建立的子進程是對整個當前文件的內容進行的copy,前面說了就像import,若是你開啓了子進程,那麼子進程是會執行bind的,那麼你的主進程bind了這個ip和端口,子進程在進行bind的時候就會報錯。
    server.bind(('127.0.0.1', 8080))
    #有同窗可能還會想,我爲何多個進程就能夠鏈接一個server段的一個ip和端口了呢,我記得當時說tcp的socket的時候,我是不能在你這個ip和端口被鏈接的狀況下再鏈接你的啊,這裏是由於當時咱們就是一個進程,一個進程裏面是隻能一個鏈接的,多進程是能夠多鏈接的,這和進程之間是單獨的內存空間有關係,先這樣記住他,好嗎?
    server.listen(5)
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
複製代碼

 

     客戶端代碼示例:

複製代碼
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
複製代碼

     

    上面咱們經過多進程實現了併發,可是有個問題

每來一個客戶端,都在服務端開啓一個進程,若是併發來一個萬個客戶端,要開啓一萬個進程嗎,你本身嘗試着在你本身的機器上開啓一萬個,10萬個進程試一試。
解決方法:進程池,本篇博客後面會講到,你們繼續學習呀

 

     Process對象的其餘方法或屬性(簡單瞭解一下就能夠啦)

複製代碼
#進程對象的其餘方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is 打飛機' %self.name)
        # s = input('???') #別忘了再pycharm下子進程中不能input輸入,會報錯EOFError: EOF when reading a line,由於子進程中沒有像咱們主進程這樣的在pycharm下的控制檯能夠輸入東西的地方
        time.sleep(2)
        print('%s is 打飛機結束' %self.name)

if __name__ == '__main__':
    p1=Piao('太白')
    p1.start()
    time.sleep(5)
    p1.terminate()#關閉進程,不會當即關閉,有個等着操做系統去關閉這個進程的時間,因此is_alive馬上查看的結果可能仍是存活,可是稍微等一會,就被關掉了
    print(p1.is_alive()) #結果爲True
    print('等會。。。。')
    time.sleep(1)
    print(p1.is_alive()) #結果爲False
複製代碼

 

複製代碼
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #因此加到這裏,會覆蓋咱們的self.name=name

        #爲咱們開啓的進程設置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

p=Piao('egon')
p.start()
print('開始')
print(p.pid) #查看pid
複製代碼

 

    殭屍進程與孤兒進程(簡單瞭解 一下就能夠啦)

複製代碼
參考博客:http://www.cnblogs.com/Anker/p/3271773.html

一:殭屍進程(有害)
  殭屍進程:一個進程使用fork建立子進程,若是子進程退出,而父進程並無調用wait或waitpid獲取子進程的狀態信息,那麼子進程的進程描述符仍然保存在系統中。這種進程稱之爲僵死進程。詳解以下

咱們知道在unix/linux中,正常狀況下子進程是經過父進程建立的,子進程在建立新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠沒法預測子進程到底何時結束,若是子進程一結束就馬上回收其所有資源,那麼在父進程內將沒法獲取子進程的狀態信息。

所以,UNⅨ提供了一種機制能夠保證父進程能夠在任意時刻獲取子進程結束時的狀態信息:
一、在每一個進程退出的時候,內核釋放該進程全部的資源,包括打開的文件,佔用的內存等。可是仍然爲其保留必定的信息(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)
二、直到父進程經過wait / waitpid來取時才釋放. 但這樣就致使了問題,若是進程不調用wait / waitpid的話,那麼保留的那段信息就不會釋放,其進程號就會一直被佔用,可是系統所能使用的進程號是有限的,若是大量的產生僵死進程,將由於沒有可用的進程號而致使系統不能產生新的進程. 此即爲殭屍進程的危害,應當避免。

  任何一個子進程(init除外)在exit()以後,並不是立刻就消失掉,而是留下一個稱爲殭屍進程(Zombie)的數據結構,等待父進程處理。這是每一個子進程在結束時都要通過的階段。若是子進程在exit()以後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是「Z」。若是父進程能及時 處理,可能用ps命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不通過殭屍狀態。  若是父進程在子進程結束以前退出,則子進程將由init接管。init將會以父進程的身份對殭屍狀態的子進程進行處理。

二:孤兒進程(無害)

  孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成爲孤兒進程。孤兒進程將被init進程(進程號爲1)所收養,並由init進程對它們完成狀態收集工做。

  孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善後工做。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置爲init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程淒涼地結束了其生命週期的時候,init進程就會表明黨和政府出面處理它的一切善後工做。所以孤兒進程並不會有什麼危害。

咱們來測試一下(建立完子進程後,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成爲孤兒進程,而非殭屍進程),文件內容

import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print 'im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
#執行pid=os.fork()則會生成一個子進程
#返回值pid有兩種值:
#    若是返回的pid值爲0,表示在子進程當中
#    若是返回的pid值>0,表示在父進程當中
if pid > 0:
    print 'father died..'
    sys.exit(0)

# 保證主線程退出完畢
time.sleep(1)
print 'im child', os.getpid(), os.getppid()

執行文件,輸出結果:
im father pid 32515 ppid 32015
father died..
im child 32516 1

看,子進程已經被pid爲1的init進程接收了,因此殭屍進程在這種狀況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明週期結束天然會被init來銷燬。


三:殭屍進程危害場景:

  例若有個進程,它按期的產 生一個子進程,這個子進程須要作的事情不多,作完它該作的事情以後就退出了,所以這個子進程的生命週期很短,可是,父進程只管生成新的子進程,至於子進程 退出以後的事情,則一律漠不關心,這樣,系統運行上一段時間以後,系統中就會存在不少的僵死進程,假若用ps命令查看的話,就會看到不少狀態爲Z的進程。 嚴格地來講,僵死進程並非問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。所以,當咱們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是經過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程以後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。

四:測試
#一、產生殭屍進程的程序test.py內容以下

#coding:utf-8
from multiprocessing import Process
import time,os

def run():
    print('子',os.getpid())

if __name__ == '__main__':
    p=Process(target=run)
    p.start()
    
    print('主',os.getpid())
    time.sleep(1000)


#二、在unix或linux系統上執行
[root@vm172-31-0-19 ~]# python3  test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 18652
子 18653

[root@vm172-31-0-19 ~]# ps aux |grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出現殭屍進程
root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z

[root@vm172-31-0-19 ~]# top #執行top命令發現1zombie
top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
%Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                        
root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin                                                                                                                      


#三、
等待父進程正常結束後會調用wait/waitpid去回收殭屍進程
但若是父進程是一個死循環,永遠不會結束,那麼該殭屍進程就會一直存在,殭屍進程過多,就是有害的
解決方法一:殺死父進程
解決方法二:對開啓的子進程應該記得使用join,join會回收殭屍進程
參考python2源碼註釋
class Process(object):
    def join(self, timeout=None):
        '''
        Wait until child process terminates
        '''
        assert self._parent_pid == os.getpid(), 'can only join a child process'
        assert self._popen is not None, 'can only join a started process'
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)

join方法中調用了wait,告訴系統釋放殭屍進程。discard爲從本身的children中剔除

解決方法三:http://blog.csdn.net/u010571844/article/details/50419798
複製代碼

 

 

3.守護進程

    以前咱們講的子進程是不會隨着主進程的結束而結束,子進程所有執行完以後,程序才結束,那麼若是有一天咱們的需求是個人主進程結束了,由我主進程建立的那些子進程必須跟着結束,怎麼辦?守護進程就來了!

    主進程建立守護進程

      其一:守護進程會在主進程代碼執行結束後就終止

      其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

    注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

複製代碼
import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)
        time.sleep(3)
if __name__ == '__main__':
    p=Myprocess('太白')
    p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
    p.start()
    # time.sleep(1) # 在sleep時linux下查看進程id對應的進程ps -ef|grep id
    print('主')
複製代碼

 

4.進程同步(鎖)

     經過剛剛的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題:進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

複製代碼
import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(5):
        p=Process(target=work,args=(i,))
        p.start()

# 看結果:經過結果能夠看出兩個問題:問題一:每一個進程中work函數的第一個打印就不是按照咱們for循環的0-4的順序來打印的
#問題二:咱們發現,每一個work進程中有兩個打印,可是咱們看到全部進程中第一個打印的順序爲0-2-1-4-3,可是第二個打印沒有按照這個順序,變成了2-1-0-3-4,說明咱們一個進程中的程序的執行順序都混亂了。
#問題的解決方法,第二個問題加鎖來解決,第一個問題是沒有辦法解決的,由於進程開到了內核,有操做系統來決定進程的調度,咱們本身控制不了
# 0: 9560 is running
# 2: 13824 is running
# 1: 7476 is running
# 4: 11296 is running
# 3: 14364 is running

# 2:13824 is done
# 1:7476 is done
# 0:9560 is done
# 3:14364 is done
# 4:11296 is done
複製代碼

 

複製代碼
#由併發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(n,lock):
    #加鎖,保證每次只有一個進程在執行鎖裏面的程序,這一段程序對於全部寫上這個鎖的進程,你們都變成了串行
    lock.acquire()
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(1)
    print('%s:%s is done' %(n,os.getpid()))
    #解鎖,解鎖以後其餘進程才能去執行本身的程序
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(5):
        p=Process(target=work,args=(i,lock))
        p.start()

#打印結果:
# 2: 10968 is running
# 2:10968 is done
# 0: 7932 is running
# 0:7932 is done
# 4: 4404 is running
# 4:4404 is done
# 1: 12852 is running
# 1:12852 is done
# 3: 980 is running
# 3:980 is done

#結果分析:(本身去屢次運行一下,看看結果,我拿出其中一個結果來看)經過結果咱們能夠看出,多進程剛開始去執行的時候,每次運行,首先打印出來哪一個進程的程序是不固定的,可是咱們解決了上面打印混亂示例代碼的第二個問題,那就是同一個進程中的兩次打印都是先完成的,而後才切換到下一個進程去,打印下一個進程中的兩個打印結果,說明咱們控制住了同一進程中的代碼執行順序,若是涉及到多個進程去操做同一個數據或者文件的時候,就不擔憂數據算錯或者文件中的內容寫入混亂了。
複製代碼

 

    上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。

    接下來,咱們以模擬搶票爲例,來看看數據安全的重要性。 

複製代碼
#注意:首先在當前文件目錄下建立一個名爲db的文件
#文件db的內容爲:{"count":1},只有這一行數據,而且注意,每次運行完了以後,文件中的1變成了0,你須要手動將0改成1,而後在去運行代碼。
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random

#查看剩餘票數
def search():
    dic=json.load(open('db')) #打開文件,直接load文件中的內容,拿到文件中的包含剩餘票數的字典
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

#搶票
def get():
    dic=json.load(open('db'))
    time.sleep(0.1)       #模擬讀數據的網絡延遲,那麼進程之間的切換,致使全部人拿到的字典都是{"count": 1},也就是每一個人都拿到了這一票。
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2)   #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        #最終結果致使,每一個人顯示都搶到了票,這就出現了問題~
        print('\033[43m購票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(3): #模擬併發100個客戶端搶票
        p=Process(target=task)
        p.start()

#看結果分析:因爲網絡延遲等緣由使得進程切換,致使每一個人都搶到了這最後一張票
# 剩餘票數1
# 剩餘票數1
# 剩餘票數1
# 購票成功
# 購票成功
# 購票成功
複製代碼

 

  加鎖:購票行爲由併發變成了串行,犧牲了效率,可是保證了數據安全

 

     進程鎖總結: 

複製代碼
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理

#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

IPC通訊機制(瞭解):IPC是intent-Process Communication的縮寫,含義爲進程間通訊或者跨進程通訊,是指兩個進程之間進行數據交換的過程。IPC不是某個系統所獨有的,任何一個操做系統都須要有相應的IPC機制,
好比Windows上能夠經過剪貼板、管道和郵槽等來進行進程間通訊,而Linux上能夠經過命名共享內容、信號量等來進行進程間通訊。Android它也有本身的進程間通訊方式,Android建構在Linux基礎上,繼承了一
部分Linux的通訊方式。
複製代碼

 

次日進程的學習就到這裏啦~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

5.隊列(推薦使用)

    進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。隊列就像一個特殊的列表,可是能夠設置固定長度,而且從前面插入數據,從後面取出數據,先進先出。

Queue([maxsize]) 建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖實現。

    先看下面的代碼示例,而後再看方法介紹。

    queue的方法介紹

複製代碼
q = Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
複製代碼

 

     queue的其餘方法(瞭解)

複製代碼
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread() 
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
複製代碼

 

     咱們看一些代碼示例:

複製代碼
from multiprocessing import Queue
q=Queue(3) #建立一個隊列對象,隊列長度爲3

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)   #往隊列中添加數據
q.put(2)
q.put(1)
# q.put(4)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
try:
    q.put_nowait(4) # 能夠使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
    print('隊列已經滿了')

# 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
print(q.full()) #查看是否滿了,滿了返回True,不滿返回False

print(q.get())  #取出數據
print(q.get())
print(q.get())
# print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 能夠使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty()) #空了
複製代碼

 

複製代碼
#看下面的隊列的時候,按照編號看註釋
import time
from multiprocessing import Process, Queue

# 8. q = Queue(2) #建立一個Queue對象,若是寫在這裏,那麼在windows還子進程去執行的時候,咱們知道子進程中還會執行這個代碼,可是子進程中不可以再次建立了,也就是這個q就是你主進程中建立的那個q,經過咱們下面在主進程中先添加了一個字符串以後,在去開啓子進程,你會發現,小鬼這個字符串還在隊列中,也就是說,咱們使用的仍是主進程中建立的這個隊列。
def f(q):
    # q = Queue() #9. 咱們在主進程中開啓了一個q,若是咱們在子進程中的函數裏面再開一個q,那麼你下面q.put('姑娘,多少錢~')添加到了新建立的這q裏裏面了
    q.put('姑娘,多少錢~')  #4.調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。
    # print(q.qsize()) #6.查看隊列中有多少條數據了

def f2(q):
    print('》》》》》》》》')
    print(q.get())  #5.取數據
if __name__ == '__main__':
    q = Queue() #1.建立一個Queue對象
    q.put('小鬼')

    p = Process(target=f, args=(q,)) #2.建立一個進程
    p2 = Process(target=f2, args=(q,)) #3.建立一個進程
    p.start()
    p2.start()
    time.sleep(1) #7.若是阻塞一點時間,就會出現主進程運行太快,致使咱們在子進程中查看qsize爲1個。
    # print(q.get()) #結果:小鬼
    print(q.get()) #結果:姑娘,多少錢~
    p.join()
複製代碼

 

    接下來看一個稍微複雜一些的例子:

複製代碼
import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))

# Main
if __name__ == '__main__':
    #windows下,若是開啓的進程比較多的話,程序會崩潰,爲了防止這個問題,使用freeze_support()方法來解決。知道就行啦
    multiprocessing.freeze_support()
    record1 = []   # store input processes
    record2 = []   # store output processes
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()
複製代碼

     隊列是進程安全的:同一時間只能一個進程拿到隊列中的一個數據,你拿到了一個數據,這個數據別人就拿不到了。

    下面咱們來看一個叫作生產者消費者模型的東西:

      在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

    爲何要使用生產者和消費者模式

      在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

    什麼是生產者消費者模式

      生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力,而且我能夠根據生產速度和消費速度來均衡一下多少個生產者能夠爲多少個消費者提供足夠的服務,就能夠開多進程等等,而這些進程都是到阻塞隊列或者說是緩衝區中去獲取或者添加數據。

    通俗的解釋:看圖說話。。背景有點亂,等我更新~~

    

 

    那麼咱們基於隊列來實現一個生產者消費者模型,代碼示例:

複製代碼
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')
複製代碼

 

複製代碼
#生產者消費者模型總結

    #程序中有兩類角色
        一類負責生產數據(生產者)
        一類負責處理數據(消費者)
        
    #引入生產者消費者模型爲了解決的問題是:
        平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度
        
    #如何實現:
        生產者<-->隊列<——>消費者
    #生產者消費者模型實現類程序的解耦和
複製代碼

 

    經過上面基於隊列的生產者消費者代碼示例,咱們發現一個問題:主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

    解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

 

複製代碼
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(5):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #在本身的子進程的最後加入一個結束信號
if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    print('主')
複製代碼

 

   注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

複製代碼
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    p1.join() #等待生產者進程結束
    q.put(None) #發送結束信號
    print('主')
複製代碼

     但上述解決方式,在有多個生產者和多個消費者時,因爲隊列咱們說了是進程安全的,我一個進程拿走告終束信號,另一個進程就拿不到了,還須要多發送一個結束信號,有幾個取數據的進程就要發送幾個結束信號,咱們則須要用一個很low的方式去解決

複製代碼
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))



if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None) #有幾個消費者就應該發送幾回結束信號None
    q.put(None) #發送結束信號
    print('主')
複製代碼

     其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制

    JoinableQueue([maxsize]) 

複製代碼
#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

   #參數介紹:
    maxsize是隊列中容許最大項數,省略則無大小限制。    
  #方法介紹:
    JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止,也就是隊列中的數據所有被get拿走了。
複製代碼

 

複製代碼
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
        q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走並執行完了

def producer(name,q):
    for i in range(10):
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    print('%s生產結束'%name)
    q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。
    print('%s生產結束~~~~~~'%name)

if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True #若是不加守護,那麼主進程結束不了,可是加了守護以後,必須確保生產者的內容生產完而且被處理完了,全部必須還要在主進程給生產者設置join,才能確保生產者生產的任務被執行完了,而且可以確保守護進程在全部任務執行完成以後才隨着主進程的結束而結束。
    c2.daemon=True

    #開始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join() #我要確保你的生產者進程結束了,生產者進程的結束標誌着你生產的全部的人任務都已經被處理完了
    p2.join()
    p3.join()
    print('主')
    
    # 主進程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
    # 於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
複製代碼

 

6.管道(瞭解)

    進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可),會致使數據不安全的狀況出現,後面咱們會說到爲何會帶來數據 不安全的問題。

複製代碼
#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
複製代碼

 

複製代碼
from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello 妹妹") #子進程發送了消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() #創建管道,拿到管道的兩端,雙工通訊方式,兩端均可以收發消息
    p = Process(target=f, args=(child_conn,)) #將管道的一段給子進程
    p.start() #開啓子進程
    print(parent_conn.recv()) #主進程接受了消息
    p.join()
複製代碼

     

    應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起(就是阻塞)。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道的相同一端就會能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。   

複製代碼
from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引起EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()            
複製代碼

    主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯狀況,都是在recv接收的時候報錯的:

    1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError;

    2.若是你管道的一端在主進程和子進程中都關閉了,可是你還用這個關閉的一端去接收消息,那麼就會出現OSError;

 

    因此你關閉管道的時候,就容易出現問題,須要將全部只用這個管道的進程中的兩端所有關閉才行。固然也能夠經過異常捕獲(try:except EOFerror)來處理。

    雖然咱們在主進程和子進程中都打印了一下conn1一端的對象,發現兩個再也不同一個地址,可是子進程中的管道和主進程中的管道仍是能夠通訊的,由於管道是同一套,系統可以記錄。    

 

    咱們的目的就是關閉全部的管道,那麼主進程和子進程進行通訊的時候,能夠給子進程傳管道的一端就夠了,而且用咱們以前學到的,信息發送完以後,再發送一個結束信號None,那麼你收到的消息爲None的時候直接結束接收或者說結束循環,就不用每次都關閉各個進程中的管道了。

複製代碼
from multiprocessing import Pipe,Process

def func(conn):
    while True:
        msg = conn.recv()
        if msg is None:break
        print(msg)

if __name__ == '__main__':
    conn1,conn2 = Pipe()
    p = Process(target=func,args=(conn1,))
    p.start()
    for i in range(10):
        conn2.send('約吧')
    conn2.send(None)
複製代碼

 

複製代碼
from multiprocessing import Process,Pipe

def consumer(p,name):
    produce, consume=p
    produce.close()
    while True:
        try:
            baozi=consume.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            break

def producer(seq,p):
    produce, consume=p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(produce,consume))

    produce.close()
    consume.close()

    c1.join()
    print('主進程')
複製代碼

     

   

複製代碼
關於管道會形成數據不安全問題的官方解釋:
    The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
    
由Pipe方法返回的兩個鏈接對象表示管道的兩端。每一個鏈接對象都有send和recv方法(除其餘以外)。注意,若是兩個進程(或線程)試圖同時從管道的同一端讀取或寫入數據,那麼管道中的數據可能會損壞。固然,在使用管道的不一樣端部的過程當中不存在損壞風險。
複製代碼

 

複製代碼
from multiprocessing import Process,Pipe,Lock

def consumer(p,name,lock):
    produce, consume=p
    produce.close()
    while True:
        lock.acquire()
        baozi=consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' %(name,baozi))
        else:
            consume.close()
            break


def producer(p,n):
    produce, consume=p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == '__main__':
    produce,consume=Pipe()
    lock = Lock()
    c1=Process(target=consumer,args=((produce,consume),'c1',lock))
    c2=Process(target=consumer,args=((produce,consume),'c2',lock))
    p1=Process(target=producer,args=((produce,consume),10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主進程')
複製代碼

     管道能夠用於雙工通訊,一般利用在客戶端/服務端中使用的請求/響應模型,或者遠程過程調用,就能夠使用管道編寫與進程交互的程序,像前面將網絡通訊的時候,咱們使用了一個叫subprocess的模塊,裏面有個參數是pipe管道,執行系統指令,並經過管道獲取結果。

 

7.數據共享(瞭解)

    展望將來,基於消息傳遞的併發編程是大勢所趨

    即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合

    經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中

    進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題,應該儘可能避免使用本節所講的共享數據的方式,之後咱們會嘗試使用數據庫來解決進程之間的數據共享問題。

    進程之間數據共享的模塊之一Manager模塊:

複製代碼
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
複製代碼

   多進程共同去處理共享數據的時候,就和咱們多進程同時去操做一個文件中的數據是同樣的,不加鎖就會出現錯誤的結果,進程不安全的,因此也須要加鎖

複製代碼
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
複製代碼

 

總結一下,進程之間的通訊:隊列、管道、數據共享也算

下面要講的信號量和事件也至關於鎖,也是全局的,全部進程都能拿到這些鎖的狀態,進程之間這些鎖啊信號量啊事件啊等等的通訊,其實底層仍是socekt,只不過是基於文件的socket通訊,而不是跟上面的數據共享啊空間共享啊之類的機制,咱們以前學的是基於網絡的socket通訊,還記得socket的兩個家族嗎,一個文件的一個網絡的,因此未來若是說這些鎖之類的報錯,可能你看到的就是相似於socket的錯誤,簡單知道一下就能夠啦~~~

工做中經常使用的是鎖,信號量和事件不經常使用,可是信號量和事件面試的時候會問到,你能知道就行啦~~~

8.信號量(瞭解)

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

   好比大保健:提早設定好,一個房間只有4個牀(計數器如今爲4),那麼同時只能四我的進來,誰先來的誰先佔一個牀(acquire,計數器減1),4個牀滿了以後(計數器爲0了),第五我的就要等着,等其中一我的出來(release,計數器加1),他就去佔用那個牀了。

複製代碼
from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 佔到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每一個人在ktv中待的時間不一樣
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')
複製代碼

 

9.事件(瞭解)

複製代碼
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

clear:將「Flag」設置爲False
set:將「Flag」設置爲True
複製代碼

 

複製代碼
from multiprocessing import Process,Semaphore,Event
import time,random

e = Event() #建立一個事件對象
print(e.is_set())  #is_set()查看一個事件的狀態,默認爲False,可經過set方法改成True
print('look here!')
# e.set()          #將is_set()的狀態改成True。
# print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr
# e.clear()        #將is_set()的狀態改成False
# print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr
e.wait()           #根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那麼就阻塞,is_set()=True就不阻塞
print('give me!!')

#set和clear  修改事件的狀態 set-->True   clear-->False
#is_set     用來查看一個事件的狀態
#wait       依據事件的狀態來決定是否阻塞 False-->阻塞  True-->不阻塞
複製代碼

 

複製代碼
from multiprocessing import Process, Event
import time, random

def car(e, n):
    while True:
        if not e.is_set():  # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait()    # 阻塞,等待is_set()的值變成True,模擬信號燈爲綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(2,4))
            if not e.is_set():   #若是is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
                continue
            print('車開遠了,car', n)
            break

# def police_car(e, n):
#     while True:
#         if not e.is_set():# 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
#             print('\033[31m紅燈亮\033[0m,car%s等着' % n)
#             e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s以後沒有等到綠燈就闖紅燈走了
#             if not e.is_set():
#                 print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
#             else:
#                 print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
#         break

def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設置爲False
        else:
            e.set()    # ---->將is_set()的值設置爲True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 建立10個進程控制10輛車
        time.sleep(random.random(1, 3))    #車不是一會兒全過來
        p.start()

    # for i in range(5):
    #     p = Process(target=police_car, args=(e, i,))  # 建立5個進程控制5輛警車
    #     p.start()

    #信號燈必須是單獨的進程,由於它無論你車開到哪了,我就按照我紅綠燈的規律來閃爍變換,對吧
    t = Process(target=traffic_lights, args=(e, 5))  # 建立一個進程控制紅綠燈
    t.start()

    print('預備~~~~開始!!!')
複製代碼

 

八 進程池和mutiprocess.Poll

  爲何要有進程池?進程池的概念。

  在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程(空間,變量,文件信息等等的內容)也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還須要進行切換而且記錄每一個進程的執行節點,也就是記錄上下文(各類變量等等亂七八糟的東西,雖然你看不到,可是操做系統都要作),這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。就看咱們上面的一些代碼例子,你會發現有些程序是否是執行的時候比較慢纔出結果,就是這個緣由,那麼咱們要怎麼作呢?

  在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果

multiprocess.Poll模塊

   建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務(高級一些的進程池能夠根據你的併發量,搞成動態增長或減小進程池中的進程數量的操做),不會開啓其餘進程,提升操做系統效率,減小空間的佔用等。

   概念介紹:

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

 

numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組

 

複製代碼
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
    
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
複製代碼

 

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

 

複製代碼
import time
from multiprocessing import Pool,Process

#針對range(100)這種參數的
# def func(n):
#     for i in range(3):
#         print(n + 1)

def func(n):
    print(n)
    # 結果:
    #     (1, 2)
    #     alex
def func2(n):
    for i in range(3):
        print(n - 1)
if __name__ == '__main__':
    #1.進程池的模式
    s1 = time.time()  #咱們計算一下開多進程和進程池的執行效率
    poll = Pool(5) #建立含有5個進程的進程池
    # poll.map(func,range(100)) #異步調用進程,開啓100個任務,map自帶join的功能
    poll.map(func,[(1,2),'alex']) #異步調用進程,開啓100個任務,map自帶join的功能
    # poll.map(func2,range(100))  #若是想讓進程池完成不一樣的任務,能夠直接這樣搞
    #map只限於接收一個可迭代的數據類型參數,列表啊,元祖啊等等,若是想作其餘的參數之類的操做,須要用後面咱們要學的方法。
    # t1 = time.time() - s1
    #
    # #2.多進程的模式
    # s2 = time.time()
    # p_list = []
    # for i in range(100):
    #     p = Process(target=func,args=(i,))
    #     p_list.append(p)
    #     p.start()
    # [pp.join() for pp in p_list]
    # t2 = time.time() - s2
    #
    # print('t1>>',t1) #結果:0.5146853923797607s 進程池的效率高
    # print('t2>>',t2) #結果:12.092015027999878s
複製代碼

 

  有一點,map是異步執行的,而且自帶close和join

  通常約定俗成的是進程池中的進程數量爲CPU的數量,工做中要看具體狀況來考量。

 

  實際應用代碼示例:

  同步與異步兩種執行方式:

複製代碼
import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(1)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞
                                    # 但無論該任務是否存在阻塞,同步調用都會在原地等着
        res_l.append(res)
    print(res_l)
複製代碼

 

複製代碼
import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行,而且能夠執行不一樣的任務,傳送任意的參數了。
                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
                                          # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  
        res_l.append(res)

    # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用join,等待進程池內任務都處理完,而後能夠用get收集結果
    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    p.close() #不是關閉進程池,而是結束進程池接收任務,確保沒有新任務再提交過來。
    p.join()   #感知進程池中的任務已經執行結束,只有當沒有新的任務添加進來的時候,才能感知到任務結束了,因此在join以前必須加上close方法
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
複製代碼

 

複製代碼
#一:使用進程池(異步調用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res)
        # s = res.get() #若是直接用res這個結果對象調用get方法獲取結果的話,這個程序就變成了同步,由於get方法直接就在這裏等着你建立的進程的結果,第一個進程建立了,而且去執行了,那麼get就會等着第一個進程的結果,沒有結果就一直等着,那麼主進程的for循環是沒法繼續的,因此你會發現變成了同步的效果
    print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了

    pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

#二:使用進程池(同步調用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個
    print("==============================>")
    pool.close()
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法
        print(i)
複製代碼

 

 

  進程池版的socket併發聊天代碼示例:

複製代碼
#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count())
#開啓6個客戶端,會發現2個客戶端處於等待狀態
#在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('進程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
複製代碼

 

複製代碼
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
複製代碼

   

   發現:併發開啓多個客戶端,服務端同一時間只有4個不一樣的pid,只能結束一個客戶端,另一個客戶端纔會進來.

   同時最多和4我的進行聊天,由於進程池中只有4個進程可供調用,那有同窗會問,咱們這麼多人想同時聊天怎麼辦,又不讓用多進程,進程池也不能開太多的進程,那咋整啊,後面咱們會學到多線程,到時候你們就知道了,如今大家先這樣記住就好啦

 

   而後咱們再提一個回調函數

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,可是咱們也能夠經過進程通訊來拿到返回值,進程池的這個回調也是進程通訊的機制完成的。

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果

  

  回調函數的簡單使用

  回調函數在寫的時候注意一點,回調函數的形參執行有一個,若是你的執行函數有多個返回值,那麼也能夠被回調函數的這一個形參接收,接收的是一個元祖,包含着你執行函數的全部返回值。

  

  使用進程池來搞爬蟲的時候,最耗時間的是請求地址的網絡請求延遲,那麼若是咱們在將處理數據的操做加到每一個子進程中,那麼全部在進程池後面排隊的進程就須要等更長的時間才能獲取進程池裏面的執行進程來執行本身,因此通常咱們就將請求做成一個執行函數,經過進程池去異步執行,剩下的數據處理的內容放到另一個進程或者主進程中去執行,將網絡延遲的時間也利用起來,效率更高。

  requests這個模塊的get方法請求頁面,就和咱們在瀏覽器上輸入一個網址而後回車去請求別人的網站的效果是同樣的。安裝requests模塊的指令:在cmd窗口執行pip install requests。

import requests
response = requests.get('http://www.baidu.com')
print(response)
print(response.status_code) #200正常,404找不到網頁,503等5開頭的是人家網站內部錯誤
print(response.content.decode('utf-8'))

 

  

複製代碼
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了

'''
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''
複製代碼

 

複製代碼
from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
複製代碼

 

  若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

複製代碼
from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中全部進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到全部結果
    print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理
複製代碼

 

  進程池和信號量的區別:

  進程池是多個須要被執行的任務在進程池外面排隊等待獲取進程對象去執行本身,而信號量是一堆進程等待着去執行一段邏輯代碼。

  信號量不能控制建立多少個進程,可是能夠控制同時多少個進程可以執行,可是進程池能控制你能夠建立多少個進程。

  舉例:就像那些開大車拉煤的,信號量是什麼呢,就比如我只有五個車道,你每次只能過5輛車,可是不影響你建立100輛車,可是進程池至關於什麼呢?至關於你只有5輛車,每次5個車拉東西,拉完你再把車放回來,給別的人拉煤用。

  其餘語言裏面有更高級的進程池,在設置的時候,能夠將進程池中的進程動態的建立出來,當需求增大的時候,就會自動在進程池中添加進程,需求小的時候,自動減小進程,而且能夠設置進程數量的上線,最多爲多,python裏面沒有。

  進程池的其餘實現方式:https://docs.python.org/dev/library/concurrent.futures.html

相關文章
相關標籤/搜索