進程(Process)是計算機中的程序關於數據集合上的一次運行活動,是操做系統結構的基礎。python
在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;linux
在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。nginx
狹義定義:進程是正在運行的程序的實例。web
廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動。它是操做系統動態執行的基本單元,在傳統的操做系統中,進程便是基本的分配單元,也是基本的執行單元。算法
必備的理論基礎:shell
#一 操做系統的做用: 1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口 2:管理、調度進程,而且將多個進程對硬件的競爭變得有序 #二 多道技術: 1.產生背景:針對單核,實現併發 ps: 如今的主機通常是多核,那麼每一個核都會利用多道技術 有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個 cpu中的任意一個,具體由操做系統調度算法決定。 2.空間上的複用:如內存中同時有多道程序 3.時間上的複用:複用一個cpu的時間片 強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣 才能保證下次切換回來時,能基於上次切走的位置繼續運行
第一,進程是一個實體。每個進程都有它本身的地址空間,通常狀況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。 第二,進程是一個「執行中的程序」。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操做系統執行之),它才能成爲一個活動的實體,咱們稱其爲進程。[3] 進程是操做系統中最基本、重要的概念。是多道程序系統出現後,爲了刻畫系統內部出現的動態狀況,描述系統內部各道程序的活動規律引進的一個概念,全部多道程序設計操做系統都創建在進程的基礎上。
從理論角度看,是對正在運行的程序過程的抽象
從實現角度看,是一種數據結構,目前在於清晰地刻畫動態系統的內在規律,有效管理和調度進入計算機系統主存儲器運行的程序。
動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。
併發性:任何進程均可以同其餘進程一塊兒併發執行。
獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
異步性:因爲進程間的相互制約,使進程具備執行的間斷性,即進程按各自獨立的、不可預知的速度向前推動。
結構特徵:進程由程序、數據和進程控制塊三部分組成。
多個不一樣的進程能夠包含相同的程序:一個程序在不一樣的數據集裏就構成不一樣的進程,能看到不一樣的結果;可是執行過程當中,程序不能發生改變。
程序是指令和數據的有序集合,其自己沒有任何運行的含義,是一個靜態的概念。
而進程是程序在處理機上的一次執行過程,它是一個動態的概念。
程序能夠做爲一種軟件資料長期存在,而進程是有必定生命期的。
程序是永久的,進程是暫時的
注意:同一個程序執行兩次,就會在操做系統中出現兩個進程,因此咱們能夠同時運行一個軟件,分別作不一樣的事情也不會混亂。編程
要想多個進程交替運行,操做系統必須對這些進程進行調度,這個調度也不是隨機進行的,而是須要遵循必定的法則,由此就有了進程的調度方法。json
先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可用於做業調度,也可用於進程調度。FCFS算法比較有利於長做業(進程),而不利於短做業(進程)。由此可知,本算法適合於CPU繁忙型做業,而不利於I/O繁忙型的做業(進程)。
短做業(進程)優先調度算法(SJ/PF)是指對短做業或短進程優先調度的算法,該算法既可用於做業調度,也可用於進程調度。但其對長做業不利;不能保證緊迫性做業(進程)被及時處理;做業的長短只是被估算出來的。
時間片輪轉(Round Robin,RR)法的基本思路是讓每一個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,須要將CPU的處理時間分紅固定大小的時間片,例如,幾十毫秒至幾百毫秒。若是一個進程在被調度選中以後用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放本身所佔有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。 顯然,輪轉法只能用來調度分配一些能夠搶佔的資源。這些能夠搶佔的資源能夠隨時被剝奪,並且能夠將它們再分配給別的進程。CPU是可搶佔資源的一種。但打印機等資源是不可搶佔的。因爲做業調度是對除了CPU以外的全部系統硬件資源的分配,其中包含有不可搶佔資源,因此做業調度不使用輪轉法。 在輪轉法中,時間片長度的選取很是重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。若是時間片長度太短,則調度程序搶佔處理機的次數增多。這將使進程上下文切換次數也大大增長,從而加劇系統開銷。反過來,若是時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所容許最大的進程數來肯定的。 在輪轉法中,加入到就緒隊列的進程有3種狀況: 一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。 另外一種狀況是分給該進程的時間片並未用完,只是由於請求I/O或因爲進程的互斥與同步關係而被阻塞。當阻塞解除以後再回到就緒隊列。 第三種狀況就是新建立進程進入就緒隊列。 若是對這些進程區別對待,給予不一樣的優先級和時間片從直觀上看,能夠進一步改善系統服務質量和效率。例如,咱們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞緣由分紅不一樣的就緒隊列,每一個隊列按FCFS原則排列,各隊列之間的進程享有不一樣的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片以後,或從睡眠中被喚醒以及被建立以後,將進入不一樣的就緒隊列。 時間片輪轉法
前面介紹的各類用做進程調度的算法都有必定的侷限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,並且若是並未指明進程的長度,則短進程優先和基於進程長度的搶佔式調度算法都將沒法使用。 而多級反饋隊列調度算法則沒必要事先知道各類進程所需的執行時間,並且還能夠知足各類類型進程的須要,於是它是目前被公認的一種較好的進程調度算法。在採用多級反饋隊列調度算法的系統中,調度算法的實施過程以下所述。 (1) 應設置多個就緒隊列,併爲各個隊列賦予不一樣的優先級。第一個隊列的優先級最高,第二個隊列次之,其他各隊列的優先權逐個下降。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,爲每一個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長一倍,……,第i+1個隊列的時間片要比第i個隊列的時間片長一倍。 (2) 當一個新進程進入內存後,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,即可準備撤離系統;若是它在一個時間片結束時還沒有完成,調度程序便將該進程轉入第二隊列的末尾,再一樣地按FCFS原則等待調度執行;若是它在第二隊列中運行一個時間片後仍未完成,再依次將它放入第三隊列,……,如此下去,當一個長做業(進程)從第一隊列依次降到第n隊列後,在第n 隊列便採起按時間片輪轉的方式運行。 (3) 僅當第一隊列空閒時,調度程序才調度第二隊列中的進程運行;僅當第1~(i-1)隊列均空時,纔會調度第i隊列中的進程運行。若是處理機正在第i隊列中爲某進程服務時,又有新進程進入優先權較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進程將搶佔正在運行進程的處理機,即由調度程序把正在運行的進程放回到第i隊列的末尾,把處理機分配給新到的高優先權進程。 多級反饋隊列
並行 : 並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )windows
併發 : 併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。安全
區別:
並行是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
併發是從宏觀上,在一個時間段上能夠看出是同時執行的,好比一個服務器同時處理多個session。
但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。
而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程:
1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)
2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)
3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)
4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)
不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的。
1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程) 2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。 關於建立子進程,UNIX和windows 1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。 2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。 建立進程
1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自願,python a.py中a.py不存在)
3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)
4. 被其餘進程殺死(非自願,如kill -9)
剛剛咱們已經瞭解了,運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所學的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。
仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:
建立進程部分
進程同步部分
進程池部分
進程之間數據共享
process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹: group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name爲子進程的名稱
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
方法介紹
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
屬性介紹
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
在一個python進程中開啓子進程,start方法和併發效果。
import time from multiprocessing import Process def f(name): print('hello', name) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() time.sleep(1) print('執行主進程的內容了')
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() #p.join() print('我是父進程')
import os from multiprocessing import Process def f(x): print('子進程id :',os.getpid(),'父進程id :',os.getppid()) return x*x if __name__ == '__main__': print('主進程id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()
進階,多個進程同時運行(注意,子進程的執行順序不是根據啓動順序決定的)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) p.join() # 串行 # [p.join() for p in p_lst] print('父進程在執行')
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) for p in p_lst: p.join() # 併發,並確保全部子進程運行完 print('父進程在執行')
除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() #start會自動調用run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主線程')
進程之間的數據隔離問題
from multiprocessing import Process def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主進程內: ',n)
會隨着主進程的結束而結束
主進程建立守護進程
其一:守護進程會在主進程代碼執行結束後終止(非守護進程在主進程結束以後仍然會繼續運行)
其二:其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行 p.start() time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id print('主')
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止.
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程必定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start() 使用多進程實現socket聊天併發-server
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網紅臉聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活 print(p1.is_alive()) #結果爲True print('開始') print(p1.is_alive()) #結果爲False
class Myprocess(Process): def __init__(self,person): self.name=person # name屬性是Process中的屬性,標示進程的名字 super().__init__() # 執行父類的初始化方法會覆蓋name屬性 #self.name = person # 在這裏設置就能夠修改進程名字了 #self.person = person #若是不想覆蓋進程名,就修改屬性名稱就能夠了 def run(self): print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) time.sleep(random.randrange(1,5)) print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) p1=Myprocess('哪吒') p1.start() print(p1.pid) #能夠查看子進程的進程id 進程對象的其餘屬性:pid和name
經過剛剛的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題。
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start()
# 由併發變成了串行,犧牲了運行效率,但避免了競爭 import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start()
上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。
join也能夠實現進程串行,可是lock能夠局部加鎖, 而join直接是讓整個進程串行
接下來,咱們以模擬搶票爲例,來看看數據安全的重要性。
#文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task) p.start()
#文件db的內容爲:{"count":5} #注意必定要用雙引號,否則json沒法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start() 使用鎖來保證數據安全
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然能夠用文件共享數據實現進程間通訊,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.須要本身加鎖處理 #所以咱們最好找尋一種解決方案可以兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
IPC(Inter-Process Communication)
建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
Queue([maxsize])
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
Queue([maxsize]) 建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具備如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。 q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。 q.empty() 若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。 q.full() 若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。 方法介紹
''' multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,可是隊列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。 # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。 try: q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。 print('隊列已經滿了') # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。 print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。 try: q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經空了') print(q.empty()) #空了 單看隊列用法
上面這個例子尚未加入進程通訊,只是先來看看隊列爲咱們提供的方法,以及這些方法的使用和現象。
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello']) #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。 if __name__ == '__main__': q = Queue() #建立一個Queue對象 p = Process(target=f, args=(q,)) #建立一個進程 p.start() print(q.get()) p.join() 子進程發送數據給父進程
上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最早進入的數據。 接下來看一個稍微複雜一些的例子:
import os import time import multiprocessing # 向queue中輸入數據的函數 def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中輸出數據的函數 def outputQ(queue): info = queue.get() print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info)) # Main if __name__ == '__main__': multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入進程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 輸出進程 for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式?
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發中,若是生產者處理渡很快,而消費者處理速度很慢,那麼生產者就必要等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者生產數據。爲了解決這個問題因而引入了生產者和消費者模式
什麼是生產者消費者模式?
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,二十直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主') 基於隊列實現生產者消費者模型
(例子應該加上 p1.join, c1.join)
此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.put(None) #發送結束信號 if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主') 改良版——生產者消費者模型
注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號
#有若是有多個生產者在生產的話,A生產者生產完了發送'None',B生產者還在生產中,消費者拿到了A生產者的'None',就不會繼續消費了
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() p1.join() q.put(None) #發送結束信號 取決於有多少個消費者 print('主')
JoinableQueue([maxsize])
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法: q.task_done() 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。 q.join() 生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。 if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') #主進程等--->p1,p2,p3等---->c1,c2 #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據 #於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。 JoinableQueue隊列實現消費之生產者模型