程序運行時檢測到的錯誤被稱爲異常,程序的運行也會被終止python
錯誤分爲兩種,即語法錯誤和邏輯錯誤mysql
實例:linux
>>> 10 * (1/0) Traceback (most recent call last): File "<stdin>", line 1, in ? ZeroDivisionError: division by zero >>> '2' + 2 Traceback (most recent call last): File "<stdin>", line 1, in ? TypeError: Can't convert 'int' object to str implicitly
異常以不一樣的類型出現,這些類型都做爲信息的一部分打印出來。錯誤信息的前面部分顯示了異常發生的上下文,並以調用棧的形式顯示具體信息nginx
AttributeError 試圖訪問一個對象沒有的樹形,好比foo.x,可是foo沒有屬性x IOError 輸入/輸出異常;基本上是沒法打開文件 ImportError 沒法引入模塊或包;基本上是路徑問題或名稱錯誤 IndentationError 語法錯誤(的子類);代碼沒有正確對齊 IndexError 下標索引超出序列邊界,好比當x只有三個元素,卻試圖訪問x[5] KeyError 試圖訪問字典裏不存在的鍵 KeyboardInterrupt Ctrl+C被按下 NameError 使用一個還未被賦予對象的變量 SyntaxError Python代碼非法,代碼不能編譯(我的認爲這是語法錯誤,寫錯了) TypeError 傳入對象類型與要求的不符合 UnboundLocalError 試圖訪問一個還未被設置的局部變量,基本上是因爲另有一個同名的全局變量,致使你覺得正在訪問它 ValueError 傳入一個調用者不指望的值,即便值的類型是正確的
第一種狀況:若是錯誤發生的條件是可預知的,咱們須要用if進行處理:在錯誤發生以前進行預防git
AGE = 25 while True: age = input('>>>:') if age.isdigit(): age = int(age) if age == AGE: print('You got it') break
第二種狀況:若是錯誤發生的條件是不可預知的,則須要用到try...except:在錯誤發生以後進行處理程序員
#基本語法爲 ''' try: 被檢測的代碼塊 except 異常類型: try中一旦檢測到異常,就執行這個位置的邏輯 ''' #實例: try: x = int(input("Please enter a number: ")) except ValueError: print("That was no valid number. Try again")
try語句按照以下方式工做;web
首先,執行try子句(在關鍵字try和關鍵字except之間的語句)sql
若是沒有異常發生,忽略except子句,try子句執行後結束。shell
若是在執行try子句的過程當中發生了異常,那麼try子句餘下的部分將被忽略。若是異常的類型和 except 以後的名稱相符,那麼對應的except子句將被執行。最後執行 try 語句以後的代碼。編程
若是一個異常沒有與任何的except匹配,那麼這個異常將會傳遞給上層的try中
一個 try 語句可能包含多個except子句,分別來處理不一樣的特定的異常。最多隻有一個分支會被執行
處理程序將只針對對應的try子句中的異常進行處理,而不是其餘的 try 的處理程序中的異常
一個except子句能夠同時處理多個異常,這些異常將被放在一個括號裏成爲一個元組,例如:
except (RuntimeError, TypeError, NameError): pass
萬能異常,可以匹配全部的異常:
s1 = 'hello' try: int(s1) except Exception as e: #把異常賦值給e print(e)
try except 語句還有一個可選的else子句,若是使用這個子句,那麼必須放在全部的except子句以後。這個子句將在try子句沒有發生任何異常的時候執行。例如:
for arg in sys.argv[1:]: try: f = open(arg, 'r') except IOError: print('cannot open', arg) else: print(arg, 'has', len(f.readlines()), 'lines') f.close()
try except 語句還有一個可選的finally子句,若是使用這個子句,那麼必須放在全部的except子句以後。不管try子句有沒有發生異常它都會執行
Python 使用 raise 語句拋出一個指定的異常。例如:
try: raise TypeError('類型錯誤') except Exception as e: print(e)
經過建立一個新的exception類來擁有本身的異常。異常應該繼承自 Exception 類,或者直接繼承,或者間接繼承,例如:
class EgonException(BaseException): def __init__(self,msg): self.msg=msg def __str__(self): return self.msg try: raise EgonException('類型錯誤') except EgonException as e: print(e)
try 語句還有另一個可選的子句,它定義了不管在任何狀況下都會執行的清理行爲。 例如:
s1 = 'hello' try: int(s1) except IndexError as e: print(e) except KeyError as e: print(e) except ValueError as e: print(e) #except Exception as e: # print(e) else: print('try內代碼塊沒有異常則執行我') finally: print('不管異常與否,都會執行該模塊,一般是進行清理工做')
狹義定義:進程是正在運行的程序的實例
廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動。它是操做系統動態執行的基本單元,在傳統的操做系統中,進程既是基本的分配單元,也是基本的執行單元
程序是永存的;進程是暫時的,是程序在數據集上的一次執行,有建立有撤銷,存在是暫時的;
程序是靜態的觀念,進程是動態的觀念;
進程具備併發性,而程序沒有;
進程是競爭計算機資源的基本單位,程序不是。
進程和程序不是一一對應的: 一個程序可對應多個進程即多個進程可執行同一程序; 一個進程能夠執行一個或幾個程序
併發與並行是兩個既類似而又不相同的概念:併發性,又稱共行性,是指能處理多個同時性活動的能力;並行是指同時發生的兩個併發事件,具備併發的含義,而併發則不必定並行,也亦是說併發事件之間不必定要同一時刻發生。
打個比方:
你吃飯吃到一半,電話來了,你一直到吃完了之後纔去接,這就說明你不支持併發也不支持並行。
你吃飯吃到一半,電話來了,你停了下來接了電話,接完後繼續吃飯,這說明你支持併發。
你吃飯吃到一半,電話來了,你一邊打電話一邊吃飯,這說明你支持並行。
併發的實質是一個物理CPU(也能夠多個物理CPU) 在若干道程序之間多路複用,併發性是對有限物理資源強制行使多用戶共享以提升效率。
並行性指兩個或兩個以上事件或活動在同一時刻發生。在多道程序環境下,並行性使多個程序同一時刻可在不一樣CPU上同時執行。
併發,是在同一個cpu上同時(不是真正的同時,而是看來是同時,由於cpu要在多個程序間切換)運行多個程序
並行,是每一個cpu運行一個程序
同步:在發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回
異步:與同步相反。當一個異步功能調用發出後,調用者不能馬上獲得結果。當該異步功能完成後,經過狀態、通知或回調來通知調用者
阻塞:阻塞調用是指調用結果返回以前,當前線程會被掛起。調用線程只有在獲得結果以後纔會返回
非阻塞:與阻塞相反。指在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線
小結:
1. 同步與異步針對的是函數/任務的調用方式:同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處於激活狀態。而異步狀況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行,當函數返回的時候經過狀態、通知、事件等方式通知進程任務完成。
2. 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能知足的時候就將進程掛起,而非阻塞則不會阻塞當前進程
1) 對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程:
系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)
一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)
用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)
一個批處理做業的初始化(只在大型機的批處理系統中應用)
2) 不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的:
在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程)
在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。
3) 關於建立的子進程,UNIX和windows的異同:
相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。
不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。
正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
出錯退出(自願,python a.py中a.py不存在)
嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)
被其餘進程殺死(非自願,如kill -9)
不管UNIX仍是windows,進程只有一個父進程,不一樣的是:
在UNIX中全部的進程,都是以init進程爲根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的全部成員。
在windows中,沒有進程層次的概念,全部的進程都是地位相同的,惟一相似於進程層次的暗示,是在建立進程時,父進程獲得一個特別的令牌(稱爲句柄),該句柄能夠用來控制子進程,可是父進程有權把該句柄傳給其餘子進程,這樣就沒有層次了。
在兩種狀況下會致使一個進程在邏輯上不能運行:
進程掛起是自身緣由,遇到I/O阻塞,便要讓出CPU讓其餘進程去執行,這樣保證CPU一直在工做
與進程無關,是操做系統層面,可能會由於一個進程佔用時間過多,或者優先級等緣由,而調用其餘的進程去使用CPU。
於是一個程序有三種狀態:
進程併發的實如今於,硬件中斷一個正在運行的進程,把此時進程運行的全部狀態保存下來,爲此,操做系統維護一張表格,即進程表(process table),每一個進程佔用一個進程表項(這些表項也稱爲進程控制塊)
該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配情況、全部打開文件的狀態、賬號和調度信息,以及其餘在進程由運行態轉爲就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啓動時,就像從未被中斷過同樣。
multiprocessing是一個和threading模塊相似,提供API,生成進程的模塊。multiprocessing包提供本地和遠程併發,經過使用子進程而不是線程有效地轉移全局解釋器鎖。所以,multiprocessing模塊容許程序員充分利用給定機器上的多個處理器。它在Unix和Windows上均可以運行。
在multiprocessing中,經過建立Process對象,而後調用其start()方法來生成進程。Process遵循threading.Thread的API
語法:
Process([group [, target [, name [, args [, kwargs]]]]]) #由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
參數:
group:未使用,值始終未None
target:調用對象,即子進程要執行的任務
args:調用對象的位置參數元組
kwargs:調用對象的字典
name:子進程的名稱
方法:
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
屬性:
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功
建立並開啓子進程的第一種方式:
from multiprocessing import Process def foo(name): print('My name is %s' %name) p1 = Process(target=foo,args=('Yim',)) #必須加,號 p2 = Process(target=foo,kwargs={'name':'Jim'}) if __name__ == '__main__': #在windows中Process()必須放到# if __name__ == '__main__'下 ''' 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的源,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用 ''' p1.start() p2.start() #執行結果: My name is Yim My name is Jim
建立並開啓子進程的第二種方式:
from multiprocessing import Process class foo(Process): #繼承Process類 def __init__(self,name): super().__init__() self.name = name def run(self): print('My name is %s' %self.name) if __name__ == '__main__': p1 = foo('Yim') p2 = foo('Jim') p1.start() p2.start() #執行結果: My name is Yim My name is Jim
服務器端代碼:
#!/usr/bin/python # -*- coding:utf-8 -*- from socket import * from multiprocessing import Process def talk(conn,client_addr): while True: try: print('客戶端IP:%s,端口:%s'%(client_addr[0],client_addr[1])) data = conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() def server(): s = socket(AF_INET,SOCK_STREAM) s.bind(('127.0.0.1',8000)) s.listen(5) while True: conn, client_addr = s.accept() p = Process(target=talk,args=(conn,client_addr,)) p.start() s.close() if __name__ == '__main__': server()
客戶端代碼:
#!/usr/bin/python # -*- coding:utf-8 -*- from socket import * c = socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8000)) while True: msg = input('>>>:').strip() if not msg:continue c.send(msg.encode('utf-8')) data = c.recv(1024) print(data.decode('utf-8')) c.close()
修改子進程內的變量,不會影響父進程變量
#!/usr/bin/python # -*- coding:utf-8 -*- from multiprocessing import Process import time,random n = 100 def task(): global n n = 1 if __name__ == '__main__': p = Process(target=task) p.start() print(n) #執行結果 100
join([timeout])
若是可選參數timeout爲None(默認值),則該方法將阻塞,直到調用join()方法的進程終止。若是超時是正數,則它最多阻止超時秒。
一個過程能夠鏈接屢次。
進程不能本身加入,由於這將致使死鎖。嘗試在進程啓動以前加入進程是一個錯誤。
#!/usr/bin/python # -*- coding:utf-8 -*- from multiprocessing import Process import time,random def foo(name): print('My name is %s' %name) time.sleep(random.randint(5,10)) if __name__ == '__main__': p1 = Process(target=foo,args=('Yim',)) p2 = Process(target=foo,args=('Jim',)) p1.start() p1.join() #等待p1執行完畢,這樣就是串行執行 p2.start() # 簡單寫法 # if __name__ == '__main__': # p_l = [p1, p2, p3] # for p in p_l: # p.start() # for p in p_l: # p.join() #執行結果: My name is Yim My name is Jim #隔一段時間才輸出
terminate和is_alive
#!/usr/bin/python # -*- coding:utf-8 -*- from multiprocessing import Process import time,random def foo(name): print('My name is %s' %name) if __name__ == '__main__': p1 = Process(target=foo,args=('Yim',)) p1.start() p1.terminate() #關閉進程。不會當即關閉,因此is_alive馬上查看的結果可能仍是存活的 print(p1.is_alive()) time.sleep(random.randint(2,3)) print(p1.is_alive()) #執行結果: True False
pid屬性
from multiprocessing import Process,Pool import time,random import os def task(): print('%s is running parent[%s]' %(os.getpid(),os.getppid())) if __name__ == '__main__': p=Process(target=task) p.start() print(p.pid) #p這個進程的id print('主',os.getpid()) #查看aaa.py的id號碼 print(os.getppid()) #pycharm的進程id #執行結果: 16812 主 17560 16608 16812 is running parent[17560]
主進程建立守護進程
守護進程會在主進程代碼執行結束後就終止
守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
from multiprocessing import Process import time,random def foo(name): print('My name is %s' %name) if __name__ == '__main__': p1 = Process(target=foo,args=('Yim',)) p1.daemon = True #必定要在p1.start()前設置,設置p1爲守護進程,禁止p1建立子進程,而且父進程代碼執行結束,p1即終止運行 p1.start()
多進程是實現併發的手段之一,須要注意的問題是:
須要併發執行的任務一般會遠大於CPU核數
一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
進程開啓過多,效率反而會降低
咱們能夠經過維護一個進程池來控制進程數量,語法以下:
Pool([numprocess [,initializer [, initargs]]]) #建立進程池
參數:
numprocess:要建立的進程數。若是省略,將默認使用os.cpu_count()的值
initializer:每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:要傳給initializer的參數組
主要方法:
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
其餘方法:
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法:
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
同步調用:
import os,time,random from multiprocessing import Pool # Pool進程的用法 # p.apply_async() #等同於concurrent.futures裏的p.submit(),異步 # p.apply() #等同於concurrent.futures裏的p.submit().result(),同步 def task(i): print('%s is running %s' %(os.getpid(),i)) time.sleep(random.randint(1,3)) return i**2 if __name__ == '__main__': pool = Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 futrues = [] for i in range(10): futrue = pool.apply(task,args=(i,)) #同步調用,直到本次任務執行完畢拿到futrue,等待任務task執行的過程當中可能有阻塞也可能沒有阻塞,但無論該任務是否存在阻塞,同步調用都會在原地等着,只是等的過程當中如果任務發生了阻塞就會被奪走cpu的執行權限 futrues.append(futrue) print(futrues)
異步調用:
import os,time,random from multiprocessing import Pool def task(i): print('%s is running %s' %(os.getpid(),i)) time.sleep(random.randint(1,3)) return i**2 if __name__ == '__main__': pool = Pool() futrues = [] for i in range(10): futrue = pool.apply_async(task,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到futrue futrues.append(futrue) # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果,不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了 pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 for future in futrues: print(futrue.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,Executor import time,os,random def task(i): print('%s is running %s' %(os.getpid(),i)) time.sleep(random.randint(1,3)) return i**2 if __name__ == '__main__': # print(os.cpu_count()) pool=ProcessPoolExecutor() objs=[] for i in range(10): obj=pool.submit(task,i) #異步的方式提交任務 objs.append(obj) # res=pool.submit(task,i).result() #同步方式提交任務 # print(res) pool.shutdown(wait=True) #shutdown表明不容許再往進程池裏提交任務,wait=True就是join的意思:等待任務都執行完畢 print('主') for obj in objs: print(obj.result())
若是多個進程對某個數據修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要加鎖處理
#併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is running' %os.getpid()) if __name__ == '__main__': for i in range(3): p = Process(target=work) p.start() #執行結果: 15608 is running 20112 is running 24332 is running 15608 is running 20112 is running 24332 is running
加鎖處理:
#由併發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s is running' % os.getpid()) time.sleep(2) print('%s is running' % os.getpid()) lock.release() if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=work,args=(lock,)) p.start() #執行結果: 18996 is running 18996 is running 24524 is running 24524 is running 10992 is running 10992 is running
總結:
加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改。速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
建立隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]) #建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 maxsize:隊列中容許最大項數,省略則無大小限制
Queue模塊中的常見方法:
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤
q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲
應用:
#!/usr/bin/python # -*- coding:utf-8 -*- from multiprocessing import Process,Queue q = Queue(3) q.put('1') q.put('2') q.put('3') print(q.get()) print(q.get()) print(q.get()) #執行結果: 1 2 3
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題,該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式?在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發中,若是生產者處理數據很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式?生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
#!/usr/bin/python # -*- coding:utf-8 -*- from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res = q.get() if res is None:break time.sleep(random.randint(1,3)) print('%s吃了%s' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res = '包子%s' %i q.put(res) print('%s生產了%s' %(os.getpid(),res)) if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=(q,)) #生產 c1 = Process(target=consumer,args=(q,)) #消費 p1.start() c1.start() p1.join() q.put(None) #發送結束信號,消費者在接收到結束信號後就能夠break出死循環 print('主') #執行結果: 16896生產了包子0 6540吃了包子0 16896生產了包子1 主 6540吃了包子1
進程間通訊應該儘可能避免使用共享數據的方式
#!/usr/bin/python # -*- coding:utf-8 -*- from multiprocessing import Process,Manager,Lock def work(d,lock): with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 d['count'] -= 1 if __name__ == '__main__': lock = Lock() with Manager() as m: dic = m.dict({'count':100}) p_l = [] for i in range(10): p = Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) #執行結果: {'count': 0}
線程是程序中一個單一的順序控制流程。進程內一個相對獨立的、可調度的執行單元,是系統獨立調度和分派CPU的基本單位指運行中的程序的調度單位。在單個程序中同時運行多個線程完成不一樣的工做,稱爲多線程。
1) 一個程序至少有一個進程,一個進程至少有一個線程.
2) 線程的劃分尺度小於進程,使得多線程程序的併發性高。
3) 另外,進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率。
4) 線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
5) 從邏輯角度來看,多線程的意義在於一個應用程序中,有多個執行部分能夠同時執行。但操做系統並無將多個線程看作多個獨立的應用,來實現進程的調度和管理以及資源分配。這就是進程和線程的重要區別。
6) 優缺點:線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。同時,線程適合於在SMP機器上運行,而進程則能夠跨機器遷移。
方式一:
from threading import Thread import time def foo(name): time.sleep(2) print('My name is %s' %name) if __name__ == '__main__': t = Thread(target=foo,args=('Yim',)) t.start() #執行結果: My name is Yim
方式二:
from threading import Thread import time class foo(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print('My name is %s'%self.name) if __name__ == '__main__': t1 = foo('Yim') t2 = foo('Jim') t1.start() t2.start() #執行結果 My name is Jim My name is Yim
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread n = 100 def task(): global n n = 1 if __name__ == '__main__': t = Thread(target=task) t.start() t.join() print(n) #執行結果: 1
鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據
注意:
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread,Lock import time n = 100 def task(): global n lock.acquire() temp = n time.sleep(0.1) n = temp - 1 lock.release() if __name__ == '__main__': lock = Lock() t_l = [] for i in range(2): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join print(n) #執行結果: 100
主進程在其代碼結束後就算運行完畢了,而後主進程會一直等待非守護的子進程運行完畢,而後回收子進程的資源,進程結束
主線程在其餘非守護線程運行完畢後纔算運行完畢,進程必須保證非守護線程都運行完畢後才能結束
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread import os import time import random def task(): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) print('%s is done' %os.getpid()) if __name__ == '__main__': t=Thread(target=task,) t.daemon=True #必須在t.start()以前設置 t.start() print('主') #執行結果: 12544 is runing 主
死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖:
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() #執行結果: Thread-1 拿到A鎖 Thread-1 拿到B鎖 Thread-1 拿到B鎖 Thread-2 拿到A鎖 #卡住,死鎖了
解決方法:遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
from threading import Thread,Lock,current_thread,RLock import time # mutexA=Lock() # mutexB=Lock() mutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止 class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到A鎖' %self.name) #current_thread().getName() mutexB.acquire() print('%s 拿到B鎖' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到B鎖' % self.name) # current_thread().getName() time.sleep(0.1) mutexA.acquire() print('%s 拿到A鎖' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=Mythread() t.start() #執行結果: Thread-1 拿到A鎖 Thread-1 拿到B鎖 Thread-1 拿到B鎖 Thread-1 拿到A鎖 Thread-2 拿到A鎖 Thread-2 拿到B鎖 Thread-2 拿到B鎖 Thread-2 拿到A鎖 Thread-4 拿到A鎖 Thread-4 拿到B鎖 Thread-4 拿到B鎖 Thread-4 拿到A鎖 Thread-6 拿到A鎖 Thread-6 拿到B鎖 Thread-6 拿到B鎖 Thread-6 拿到A鎖 Thread-8 拿到A鎖 Thread-8 拿到B鎖 Thread-8 拿到B鎖 Thread-8 拿到A鎖 Thread-10 拿到A鎖 Thread-10 拿到B鎖 Thread-10 拿到B鎖 Thread-10 拿到A鎖 Thread-5 拿到A鎖 Thread-5 拿到B鎖 Thread-5 拿到B鎖 Thread-5 拿到A鎖 Thread-9 拿到A鎖 Thread-9 拿到B鎖 Thread-9 拿到B鎖 Thread-9 拿到A鎖 Thread-7 拿到A鎖 Thread-7 拿到B鎖 Thread-7 拿到B鎖 Thread-7 拿到A鎖 Thread-3 拿到A鎖 Thread-3 拿到B鎖 Thread-3 拿到B鎖 Thread-3 拿到A鎖
同進程的同樣
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。
實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread,Semaphore,current_thread import time,random sm = Semaphore(5) def task(): with sm: print('%s is running'%current_thread().getName()) time.sleep(random.randint(1,3)) if __name__ == '__main__': for i in range(10): t = Thread(target=task) t.start() #執行結果: Thread-1 is running #同時只有5個線程 Thread-2 is running Thread-3 is running Thread-4 is running Thread-5 is running Thread-6 is running Thread-7 is running Thread-8 is running Thread-9 is running Thread-10 is running
與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
制定n秒後執行某操做
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Timer def foo(): print('from foo') t = Timer(2,foo) #2秒後執行foo t.start()
queue隊列 :使用import queue,用法與進程Queue同樣
#!/usr/bin/python # -*- coding:utf-8 -*- import queue q = queue.Queue(3) q.put('1') q.put('2') q.put('3') print(q.get()) print(q.get()) print(q.get()) #執行結果: 1 2 3
堆棧:
import queue q = queue.LifoQueue(3) q.put('1') q.put('2') q.put('3') print(q.get()) print(q.get()) print(q.get()) #執行結果: 3 2 1
存儲數據時可設置優先級的隊列:
import queue q = queue.PriorityQueue(3) #元組,第一個元素是優先級,數字越小,優先級越高 q.put((10,'a')) q.put((5,'b')) q.put((-2,'c')) print(q.get()) print(q.get()) print(q.get()) #執行結果: (-2, 'c') (5, 'b') (10, 'a')
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行。
方法:
event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False
例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做
from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('連接超時') print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print('<%s>連接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql conn1.start() conn2.start() check.start()
python中的socketserver模塊,主要是用來提供服務器類,而且提供異步處理的能力
服務器端:
#TCP # import socketserver # # class.txt MyTCPhandler(socketserver.BaseRequestHandler): #通訊 # def handle(self): # while True: # # conn.recv(1024) # data=self.request.recv(1024) # self.request.send(data.upper()) # # # if __name__ == '__main__': # # print(socketserver.ForkingTCPServer) # # s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyTCPhandler) # s.serve_forever() #UDP import socketserver class MyTCPhandler(socketserver.BaseRequestHandler): #通訊 def handle(self): print(self.request) client_data=self.request[0] self.request[1].sendto(client_data.upper(),self.client_address) if __name__ == '__main__': s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyTCPhandler) s.serve_forever()
客戶端:
#TCP # from socket import * # # # c=socket(AF_INET,SOCK_STREAM) # c.connect(('127.0.0.1',8080)) # # count=1 # while True: # msg=input('>>: ').strip() # if not msg:break # c.send(msg.encode('utf-8')) # data=c.recv(1024) # print(data) # # c.close() #UDP from socket import * c=socket(AF_INET,SOCK_DGRAM) while True: msg=input('>>: ').strip() if not msg:break c.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) data=c.recvfrom(1024) print(data) c.close()