目錄html
GIL:Global Interpreter Lock
全局解釋器鎖,它的含義是:在同一時間在某一個進程內,只有一個線程能夠運行。即使是在多CPU下
。GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。python
官方是這樣解釋GIL的:編程
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) # 在CPython中,全局解釋器鎖或者簡稱GIL, # 是一個用來阻止多線程程序在一次運行時屢次執行Python字節碼程序的鎖, # 在CPython中這個鎖是必須的,由於CPython的內存管理不是線程安全的 # (然而,自從GIL存在,其餘特性已經發展到依賴於GIL的強制執行)
PS:爲了防止多線程併發執行機器碼。json
因爲物理上得限制,各CPU廠商在覈心頻率上的比賽已經被多核所取代。爲了更有效的利用多核處理器的性能,就出現了多線程的編程方式,而隨之帶來的就是線程間數據一致性和狀態同步的困難。即便在CPU內部的Cache也不例外,爲了有效解決多份緩存之間的數據同步時各廠商花費了很多心思,也不可避免的帶來了必定的性能損失。
GIL本質就是一把互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。windows
能夠確定的一點是:保護不一樣的數據的安全,就應該加不一樣的鎖。緩存
GIL保護的是解釋器級的數據,保護用戶本身的數據則須要本身加鎖處理,以下圖舉例:
首先:在一個進程內的全部線程數據是共享的,因爲GIL的存在,統一時刻只能一個線程在運行。安全
多個線程先訪問到解釋器的代碼,即拿到執行權限,而後將target的代碼交給解釋器的代碼去執行
在一個python的進程內,不只有應用的主線程或者由該主線程開啓的其餘線程,還有解釋器開啓的垃圾回收等解釋器級別的線程,總之,全部線程都運行在這一個進程內,毫無疑問解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就致使了一個問題:Python的垃圾回收線程在執行時,會掃描當前進程所在的內存空間中,引用計數爲0的變量等信息,而後進行回收。若是沒有GIL,那麼在執行清理的動做,其餘線程又對該變量進行賦值,那麼當垃圾回收線程獲取CPU執行權限後,會繼續進行清理,那麼就可能形成數據的混亂,因此當GIL鎖存在時,當垃圾回收線程檢測到引用計數爲0的數據後,對數據進行加鎖處理,這樣即使是其餘線程再次訪問也不會形成數據的混亂。
因爲GIL的存在,同一個進程下的線程,沒法進行併發,並行也不行。 可是因爲GIL是基於進程的,因此能夠有多核多個進程併發,而每一個進程下同時只能有一個線程運行。bash
python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(CPU的數量能夠用os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing模塊來開啓多進程,並在某些子進程中執行定製的任務(好比函數)。網絡
該模塊與多線程模塊threading的編程接口相似.用起來也很類似多線程
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
注意:在windows中Process()必須放到# if __name__ == '__main__':下
利用Process建立進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]) # 由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
強調:
參數:
group
:參數未使用,值始終爲Nonetarget
:表示調用對象,即子進程要執行的任務args
:表示調用對象的位置參數元組,args=(1,2,'egon',)kwargs
:表示調用對象的字典,kwargs={'name':'egon','age':18}name
:爲子進程的名稱Process類與Thread類相同,提供了不少操做進程的方法,這裏列舉一些經常使用的。
p.start()
:啓動進程,並調用該子進程中的p.run() --> 和直接調用run方法是不一樣的,由於它會初始化部分其餘參數。p.run()
:進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法p.terminate()
:強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖p.is_alive()
:若是p仍然運行,返回True,不然返回Falsep.join([timeout])
:主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是:p.join只能join住start開啓的進程,而不能join住run開啓的進程p.daemon
:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程;必須在p.start()以前設置p.name
:進程的名稱p.pid
:進程的pidp.exitcode
:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可),就像Linux下的命令執行返回值同樣,0表示正常執行完畢p.authkey
:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功
特別強調:設置 p.daemon=True 是會隨着主進程執行完畢而被回收,無論子進程是否完成任務。
使用Process建立進程的類有兩種方法:
# --------------------------- 方法1 --------------------------- import random import time from multiprocessing import Process def hello(name): print('Welcome to my Home') time.sleep(random.randint(1,3)) print('Bye Bye') if __name__ == '__main__': p = Process(target=hello,args=('daxin',)) # 建立子進程p p.start() # 啓動子進程 print('主進程結束') # --------------------------- 方法2 --------------------------- import random import time from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super(MyProcess, self).__init__() # 必須繼承父類的構造函數 self.name = name def run(self): # 必須叫run方法,和thread同樣,start方法開闢進程空間後執行run方法。 print('Welcome to {0} Home'.format(self.name)) time.sleep(random.randint(1,3)) print('Bye Bye') if __name__ == '__main__': p = MyProcess('daxin') p.start() print('主進程結束')
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。
import multiprocessing def fileinput(filename,str): with open(filename,'a',encoding='UTF-8') as f: f.write(str) if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=fileinput,args=('a.txt','進程 %s\n' % i)) p.start()
打印的順序:是誰搶到誰寫,那麼順序可能不是1,2,3...9。鎖的目的就是:當程序1在使用的時候,申請鎖,而且鎖住共享資源,待使用完畢後,釋放鎖資源,其餘程序獲取鎖後,重複這個過程。
Multiprocessing模塊提供了Lock對象用來完成進程同步鎖的功能
from multiprocessing import Lock lock = Lock() # 對象沒有參數 # 經過使用lock對象的acquire/release方法來進行 鎖/釋放 的需求。
利用進程同步鎖模擬搶票軟件的需求:
import random import time import json from multiprocessing import Process,Lock def gettickles(filename,str,lock): lock.acquire() # 對要修改的部分加鎖 try: with open(filename, encoding='utf-8') as f: dic = json.loads(f.read()) except: dic = {'count':100} if dic['count'] > 0 : dic['count'] -= 1 time.sleep(random.random()) with open(filename,'w',encoding='utf-8') as f: f.write(json.dumps(dic)) print('\033[33m{0}搶票成功\033[0m'.format(str)) else: print('\033[35m{0}搶票失敗\033[0m'.format(str)) lock.release() # 修改完畢後解鎖 if __name__ == '__main__': lock = Lock() # 建立一個鎖文件 p_l = [] for i in range(1000): p = Process(target=gettickles,args=('a.txt','用戶%s' % i,lock)) p_l.append(p) p.start()
加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:
例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數...
對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程
from multiprocessing import Pool pool = Pool(processes=None, initializer=None, initargs=()) # 進程在這一句執行完畢後就建立好了。
參數:
processes
:進程池的最大進程數量initiallizer
:初始化完畢後要執行的函數initargs
:要傳遞給函數的參數p.apply(func [, args [, kwargs]])
:調用進程池中的一個進程執行函數func,args/kwargs爲傳遞的參數,注意apply是阻塞式的,既串行執行。p.apply_async(func [, args [, kwargs]])
:功能同apply,區別是非阻塞的,既異步執行。(經常使用
)p.close()
:關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成p.join()
:等待全部工做進程退出。此方法只能在close()或teminate()以後調用注意:apply_async 會返回AsyncResul對象
obj.get()
:獲取返回結果,默認等待結果到達。timeout爲等待時間,默認爲None(永久阻塞等待)。若是在指定時間內尚未到達,將引起multiprocessing.context.TimeoutError異常。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。obj.ready()
:若是進程執行完成,返回Trueobj.successful()
:若是進程執行完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起AssertionError異常obj.wait([timeout])
:等待進程執行完畢(內部也是基於event.wait()來作的)。obj.terminate()
:當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數import multiprocessing def calc(count=1000): sum = 0 for i in range(count): sum += 1 return sum if __name__ == '__main__': pool = multiprocessing.Pool(3) for i in range(3): res = pool.apply_async(func=calc,args=(100000000,)) print(res.ready()) # False # res.wait(2) # 阻塞2秒等待進程執行完畢,不然跳過等待 # print(res.get(2)) # 獲取執行結果,阻塞等待2秒 print(res.successful()) pool.close() pool.join()
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數。咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
apply_async(self, func, args=(), kwds={}, callback=None)
func的結果會交給指定的callback函數處理,callback爲單參函數,參數即爲func的返回值。
callback函數由主進程執行。
import multiprocessing import logging FORMAT = '%(asctime)s %(process)s %(threadName)s %(message)s' logging.basicConfig(level=logging.INFO,format=FORMAT) def worker(count=10000): sum = 0 for i in range(count): sum += 1 logging.info('I am worker') return sum def result(res): logging.info(res) if __name__ == '__main__': pool = multiprocessing.Pool(3) for i in range(3): pool.apply_async(worker,args=(1000000,),callback=result) pool.close() pool.join() logging.info('I am Main') # 2019-03-04 20:10:49,097 24400 MainThread I am worker # 2019-03-04 20:10:49,097 2916 Thread-3 1000000 # 2019-03-04 20:10:49,115 19364 MainThread I am worker # 2019-03-04 20:10:49,115 2916 Thread-3 1000000 # 2019-03-04 20:10:49,117 37836 MainThread I am worker # 2019-03-04 20:10:49,117 2916 Thread-3 1000000 # process ID 爲主進程,因此回調函數,由主進程完成 # 2019-03-04 20:10:49,175 2916 MainThread I am Main # 主進程 ID 2916
一個爬蟲的小例子:
from multiprocessing import Pool import requests import os def geturl(url): print('個人進程號爲: %s' % os.getpid()) print('我處理的url爲: %s ' % url ) response = requests.get(url) # 請求網頁 return response.text # 返回網頁源碼 def urlparser(htmlcode): print('個人進程號是: %s ' % os.getpid()) datalength = len(htmlcode) # 計算源碼的長度 print('解析到的html大小爲: %s' % datalength) if __name__ == '__main__': pool = Pool() url = [ 'http://www.baidu.com', 'http://www.sina.com', 'http://www.qq.com', 'http://www.163.com' ] res_l = [] for i in url: res = pool.apply_async(geturl,args=(i,),callback=urlparser) # res 是 geturl執行的結果,由於已經交給urlparser處理了,因此這裏不用拿 res_l.append(res) pool.close() pool.join() for res in res_l: print(res.get()) # 這裏拿到的就是網頁的源碼
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊提供的兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。可是還有一種基於共享數據的方式,如今已經不推薦使用,建議使用隊列的方式進行進程間通信。
展望將來,基於消息傳遞的併發編程是大勢所趨,即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。
底層就是以管道和鎖定的方式實現。
multiprocessing.Queue([maxsize])
建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
參數:
maxsize
: 隊列能承載的最大數量,省略的話則不限制隊列大小from multiprocessing import Queue q = Queue(3) q.put('a') # 數據存入Queue print(q.get()) # 從Queue中取出數據
注意:隊列(Queue)是FIFO模式,既先進先出。
q.put(obj, block=True, timeout=None)
:用於插入數據到隊列中。blocked:若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
q.put_nowait() 等同於 q.put(block=False)
q.get(block=True,timeout=None)
:用於從隊列中獲取數據。blocked:若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
q.get_nowait() 等同於 q.get(block=False)
q.empty()
:調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。q.full()
:調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。q.qsize()
:返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的,雖然進程間數據獨立,但也能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此。
multiprocessing.Manager() # 沒有參數
利用Manager建立數據,完成進程共享
使用Manager對象建立共享數據類型
import os from multiprocessing import Manager,Process def worker(d,l): d[os.getpid()]=os.getpid() # 對共享數據進行修改 l.append(os.getpid()) if __name__ == '__main__': m = Manager() d = m.dict() # 建立共享字典 l = m.list() # 建立共享列表 p_l = [] for i in range(10): p= Process(target=worker,args=(d,l)) p_l.append(p) p.start() for p in p_l: p.join() print(d) print(l)