35 - 併發編程-GIL-多進程

1 GIL

        GIL:Global Interpreter Lock 全局解釋器鎖,它的含義是:在同一時間在某一個進程內,只有一個線程能夠運行。即使是在多CPU下 。GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。python

官方是這樣解釋GIL的:編程

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once.
This lock is necessary mainly because CPython’s memory management is not thread-safe.
(However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
 

# 在CPython中,全局解釋器鎖或者簡稱GIL,

# 是一個用來阻止多線程程序在一次運行時屢次執行Python字節碼程序的鎖,

# 在CPython中這個鎖是必須的,由於CPython的內存管理不是線程安全的

# (然而,自從GIL存在,其餘特性已經發展到依賴於GIL的強制執行)

PS:爲了防止多線程併發執行機器碼。json

1.1 爲何會有GIL

        因爲物理上得限制,各CPU廠商在覈心頻率上的比賽已經被多核所取代。爲了更有效的利用多核處理器的性能,就出現了多線程的編程方式,而隨之帶來的就是線程間數據一致性和狀態同步的困難。即便在CPU內部的Cache也不例外,爲了有效解決多份緩存之間的數據同步時各廠商花費了很多心思,也不可避免的帶來了必定的性能損失。
        GIL本質就是一把互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。windows

能夠確定的一點是:保護不一樣的數據的安全,就應該加不一樣的鎖。緩存

1.2 GIL與thread lock

        GIL保護的是解釋器級的數據,保護用戶本身的數據則須要本身加鎖處理,以下圖舉例:
gil
首先:在一個進程內的全部線程數據是共享的,因爲GIL的存在,統一時刻只能一個線程在運行。安全

  1. 線程1拿到GIL鎖,加載count數據,準備修改的時候,被CPU進行調度
  2. 線程2拿到GIL鎖,加載count數據(此時沒有被修改),而後修改,保存,這時count的數據已經被修改了
  3. 線程1從新獲取GIL鎖,繼續修改count的值,這時因爲count的值已經變了,因此,就形成兩個線程同時修改共享數據,並無產生正確得結果。
    因此GIL是解釋器級別的鎖,用戶數據,那麼須要用戶自行加鎖處理。

1.3 我的總結

        多個線程先訪問到解釋器的代碼,即拿到執行權限,而後將target的代碼交給解釋器的代碼去執行
        在一個python的進程內,不只有應用的主線程或者由該主線程開啓的其餘線程,還有解釋器開啓的垃圾回收等解釋器級別的線程,總之,全部線程都運行在這一個進程內,毫無疑問解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就致使了一個問題:Python的垃圾回收線程在執行時,會掃描當前進程所在的內存空間中,引用計數爲0的變量等信息,而後進行回收。若是沒有GIL,那麼在執行清理的動做,其餘線程又對該變量進行賦值,那麼當垃圾回收線程獲取CPU執行權限後,會繼續進行清理,那麼就可能形成數據的混亂,因此當GIL鎖存在時,當垃圾回收線程檢測到引用計數爲0的數據後,對數據進行加鎖處理,這樣即使是其餘線程再次訪問也不會形成數據的混亂。
        因爲GIL的存在,同一個進程下的線程,沒法進行併發,並行也不行。 可是因爲GIL是基於進程的,因此能夠有多核多個進程併發,而每一個進程下同時只能有一個線程運行。bash

2 multiprocessing模塊

        python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(CPU的數量能夠用os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing模塊來開啓多進程,並在某些子進程中執行定製的任務(好比函數)。網絡

該模塊與多線程模塊threading的編程接口相似.用起來也很類似多線程

        multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

2.1 Process類

注意:在windows中Process()必須放到# if __name__ == '__main__':下

利用Process建立進程的類:

Process([group [, target [, name [, args [, kwargs]]]]])  # 由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:

  1. 須要使用關鍵字的方式來指定參數
  2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數:

  • group:參數未使用,值始終爲None
  • target:表示調用對象,即子進程要執行的任務
  • args:表示調用對象的位置參數元組,args=(1,2,'egon',)
  • kwargs:表示調用對象的字典,kwargs={'name':'egon','age':18}
  • name:爲子進程的名稱

2.2 Process類的方法  

Process類與Thread類相同,提供了不少操做進程的方法,這裏列舉一些經常使用的。

  • p.start():啓動進程,並調用該子進程中的p.run() --> 和直接調用run方法是不一樣的,由於它會初始化部分其餘參數。
  • p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
  • p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
  • p.is_alive():若是p仍然運行,返回True,不然返回False
  • p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是:p.join只能join住start開啓的進程,而不能join住run開啓的進程

2.3 Process的其餘屬性 

  • p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程;必須在p.start()以前設置
  • p.name:進程的名稱
  • p.pid:進程的pid
  • p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可),就像Linux下的命令執行返回值同樣,0表示正常執行完畢
  • p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功

    特別強調:設置 p.daemon=True 是會隨着主進程執行完畢而被回收,無論子進程是否完成任務。

2.3 基本使用

使用Process建立進程的類有兩種方法:

  • 經過實例化Process類完成進程的建立
  • 繼承Process類,定製本身須要的功能後實例化建立進程類
#  --------------------------- 方法1 ---------------------------
import random
import time
from multiprocessing import Process
  
def hello(name):
    print('Welcome to my Home')
    time.sleep(random.randint(1,3))
    print('Bye Bye')

if __name__ == '__main__':
    p = Process(target=hello,args=('daxin',))    # 建立子進程p
    p.start()      # 啓動子進程
    print('主進程結束')
 
 

#  --------------------------- 方法2 ---------------------------
import random
import time
from multiprocessing import Process
 
class MyProcess(Process):
 
    def __init__(self,name):
        super(MyProcess, self).__init__()    # 必須繼承父類的構造函數
        self.name = name
 
    def run(self):     # 必須叫run方法,和thread同樣,start方法開闢進程空間後執行run方法。
        print('Welcome to {0} Home'.format(self.name))
        time.sleep(random.randint(1,3))
        print('Bye Bye')

if __name__ == '__main__':
    p = MyProcess('daxin')
    p.start()
    print('主進程結束')

2.4 進程同步鎖

        進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

爭搶資源形成的順序問題

import multiprocessing

def fileinput(filename,str):

    with open(filename,'a',encoding='UTF-8') as f:
        f.write(str)

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=fileinput,args=('a.txt','進程 %s\n' % i))
        p.start()

打印的順序:是誰搶到誰寫,那麼順序可能不是1,2,3...9。鎖的目的就是:當程序1在使用的時候,申請鎖,而且鎖住共享資源,待使用完畢後,釋放鎖資源,其餘程序獲取鎖後,重複這個過程。

Multiprocessing模塊提供了Lock對象用來完成進程同步鎖的功能

from multiprocessing import Lock
lock = Lock()
 

# 對象沒有參數

# 經過使用lock對象的acquire/release方法來進行 鎖/釋放 的需求。

利用進程同步鎖模擬搶票軟件的需求:

  1. 建立票文件,內容爲json,設置餘票數量
  2. 併發100個進程搶票
  3. 利用random + time 模塊模擬網絡延遲
import random
import time
import json
from multiprocessing import Process,Lock
 
def gettickles(filename,str,lock):

    lock.acquire()      # 對要修改的部分加鎖
    try:
        with open(filename, encoding='utf-8') as f:
            dic = json.loads(f.read())
    except:
        dic = {'count':100}

    if dic['count'] > 0 :
        dic['count'] -= 1
        time.sleep(random.random())
        with open(filename,'w',encoding='utf-8') as f:
            f.write(json.dumps(dic))
        print('\033[33m{0}搶票成功\033[0m'.format(str))
    else:
        print('\033[35m{0}搶票失敗\033[0m'.format(str))
 
    lock.release()     # 修改完畢後解鎖
 
if __name__ == '__main__':
    lock = Lock()   # 建立一個鎖文件
 
    p_l = []
    for i in range(1000):
        p = Process(target=gettickles,args=('a.txt','用戶%s' % i,lock))
        p_l.append(p)
        p.start()

        加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

2.5 進程池

        在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:

  1. 很明顯須要併發執行的任務一般要遠大於核數
  2. 一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
  3. 進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)

        例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數...

對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

        建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

from multiprocessing import Pool
pool = Pool(processes=None, initializer=None, initargs=())   # 進程在這一句執行完畢後就建立好了。

參數:

  • processes:進程池的最大進程數量
  • initiallizer:初始化完畢後要執行的函數
  • initargs:要傳遞給函數的參數

2.5.1 經常使用方法

  • p.apply(func [, args [, kwargs]]):調用進程池中的一個進程執行函數func,args/kwargs爲傳遞的參數,注意apply是阻塞式的,既串行執行。
  • p.apply_async(func [, args [, kwargs]]):功能同apply,區別是非阻塞的,既異步執行。(經常使用)
  • p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
  • p.join():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

注意:apply_async 會返回AsyncResul對象

2.5.2 AsyncResul對象經常使用方法

  • obj.get():獲取返回結果,默認等待結果到達。timeout爲等待時間,默認爲None(永久阻塞等待)。若是在指定時間內尚未到達,將引起multiprocessing.context.TimeoutError異常。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
  • obj.ready():若是進程執行完成,返回True
  • obj.successful():若是進程執行完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起AssertionError異常
  • obj.wait([timeout]):等待進程執行完畢(內部也是基於event.wait()來作的)。
  • obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
import multiprocessing

def calc(count=1000):
    sum = 0
    for i in range(count):
        sum += 1
    return sum

if __name__ == '__main__':
    pool = multiprocessing.Pool(3)
    for i in range(3):
        res = pool.apply_async(func=calc,args=(100000000,))
        print(res.ready())     # False
        # res.wait(2)          # 阻塞2秒等待進程執行完畢,不然跳過等待
        # print(res.get(2))    # 獲取執行結果,阻塞等待2秒
        print(res.successful())
    pool.close()
    pool.join()

2.5.3 回調函數

        須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數。咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

apply_async(self, func, args=(), kwds={}, callback=None)

func的結果會交給指定的callback函數處理,callback爲單參函數,參數即爲func的返回值。

callback函數由主進程執行。

import multiprocessing
import logging

FORMAT = '%(asctime)s %(process)s %(threadName)s %(message)s'
logging.basicConfig(level=logging.INFO,format=FORMAT)

def worker(count=10000):
    sum = 0
    for i in range(count):
        sum += 1
    logging.info('I am worker')
    return sum

def result(res):
    logging.info(res)

if __name__ == '__main__':
    pool = multiprocessing.Pool(3)
    for i in range(3):
        pool.apply_async(worker,args=(1000000,),callback=result)

    pool.close()
    pool.join()
    logging.info('I am Main')


# 2019-03-04 20:10:49,097 24400 MainThread I am worker

# 2019-03-04 20:10:49,097 2916 Thread-3 1000000

# 2019-03-04 20:10:49,115 19364 MainThread I am worker

# 2019-03-04 20:10:49,115 2916 Thread-3 1000000

# 2019-03-04 20:10:49,117 37836 MainThread I am worker

# 2019-03-04 20:10:49,117 2916 Thread-3 1000000        # process ID 爲主進程,因此回調函數,由主進程完成

# 2019-03-04 20:10:49,175 2916 MainThread I am Main    # 主進程 ID 2916

一個爬蟲的小例子:

from multiprocessing import Pool
import requests
import os
 
def geturl(url):
    print('個人進程號爲: %s' % os.getpid())
    print('我處理的url爲: %s ' % url )
    response = requests.get(url)    # 請求網頁
    return response.text     # 返回網頁源碼
 
def urlparser(htmlcode):
    print('個人進程號是: %s ' % os.getpid())
    datalength = len(htmlcode)      # 計算源碼的長度
    print('解析到的html大小爲: %s' % datalength)
 
if __name__ == '__main__':
    pool = Pool()
    url = [
        'http://www.baidu.com',
        'http://www.sina.com',
        'http://www.qq.com',
        'http://www.163.com'
    ]
 
    res_l = []
    for i in url:
        res = pool.apply_async(geturl,args=(i,),callback=urlparser)    # res 是 geturl執行的結果,由於已經交給urlparser處理了,因此這裏不用拿
        res_l.append(res)
 
    pool.close()
    pool.join()
    for res in res_l:
        print(res.get())    # 這裏拿到的就是網頁的源碼

2.6 進程間通信

        進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊提供的兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。可是還有一種基於共享數據的方式,如今已經不推薦使用,建議使用隊列的方式進行進程間通信。

展望將來,基於消息傳遞的併發編程是大勢所趨,即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。

2.6.1 隊列

        底層就是以管道和鎖定的方式實現。

multiprocessing.Queue([maxsize])

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
參數:

  • maxsize: 隊列能承載的最大數量,省略的話則不限制隊列大小

基本使用

from multiprocessing import Queue
 
q = Queue(3)
q.put('a')          # 數據存入Queue
print(q.get())      # 從Queue中取出數據

注意:隊列(Queue)是FIFO模式,既先進先出。

隊列的方法

  • q.put(obj, block=True, timeout=None):用於插入數據到隊列中。
    參數:
    • timeout:等待時間。
    • blocked:若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。

      q.put_nowait() 等同於 q.put(block=False)

  • q.get(block=True,timeout=None):用於從隊列中獲取數據。
    參數:
    • timeout:等待時間。
    • blocked:若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.

      q.get_nowait() 等同於 q.get(block=False)

  • q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
  • q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
  • q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

2.6.2 共享數據

        進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的,雖然進程間數據獨立,但也能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此。

multiprocessing.Manager() 

# 沒有參數

利用Manager建立數據,完成進程共享

使用Manager對象建立共享數據類型

import os
from multiprocessing import Manager,Process
 
def worker(d,l):
 
    d[os.getpid()]=os.getpid()   # 對共享數據進行修改
    l.append(os.getpid())

if __name__ == '__main__':
    m = Manager()
    d = m.dict()    # 建立共享字典
    l = m.list()    # 建立共享列表
 
    p_l = []
    for i in range(10):
        p= Process(target=worker,args=(d,l))
        p_l.append(p)
        p.start()
 
    for p in p_l:
        p.join()
 
    print(d)
    print(l)
相關文章
相關標籤/搜索