Python——併發編程

開始說併發編程以前,最好有必定的底層知識積累,這裏我把須要的知識總結了一下,若是看下面的有不理解的能夠看一下:http://www.javashuo.com/article/p-rquxjwfh-dk.htmlhtml

引子

  • 計算機的核心是CPU,它承擔了全部的計算任務。它就像一座工廠,時刻在運行。
  • 假定工廠的電力有限,一次只能供給一個車間使用。也就是說,一個車間開工的時候,其餘車間都必須停工。背後的含義就是,單個CPU一次只能運行一個任務。
  • 進程就比如工廠的車間,它表明CPU所能處理的單個任務。任一時刻,CPU老是運行一個進程,其餘進程處於非運行狀態。
  • 一個車間裏,能夠有不少工人。他們協同完成一個任務。
  • 線程就比如車間裏的工人。一個進程能夠包括多個線程。
  • 車間的空間是工人們共享的,好比許多房間是每一個工人均可以進出的。這象徵一個進程的內存空間是共享的,每一個線程均可以使用這些共享內存。
  • 但是,每間房間的大小不一樣,有些房間最多隻能容納一我的,好比廁所。裏面有人的時候,其餘人就不能進去了。這表明一個線程使用某些共享內存時,其餘線程必須等它結束,才能使用這一塊內存。
  • 一個防止他人進入的簡單方法,就是門口加一把鎖。先到的人鎖上門,後到的人看到上鎖,就在門口排隊,等鎖打開再進去。這就叫"互斥鎖"(Mutual exclusion,縮寫 Mutex),防止多個線程同時讀寫某一塊內存區域。
  • 還有些房間,能夠同時容納n我的,好比廚房。也就是說,若是人數大於n,多出來的人只能在外面等着。這比如某些內存區域,只能供給固定數目的線程使用。
  • 這時的解決方法,就是在門口掛n把鑰匙。進去的人就取一把鑰匙,出來時再把鑰匙掛回原處。後到的人發現鑰匙架空了,就知道必須在門口排隊等着了。這種作法叫作"信號量"(Semaphore),用來保證多個線程不會互相沖突。

不難看出,互斥鎖是信號量的一種特殊狀況(n=1時)。也就是說,徹底能夠用後者替代前者。可是,由於互斥鎖較爲簡單,且效率高,因此在必須保證資源獨佔的狀況下,仍是採用這種設計。python

 

上面的內容轉載自:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html編程

看了上面簡單的,再說一下複雜的api

進程

進程(英語:process),是計算機中已運行程序的實體。進程爲曾經是分時系統的基本運做單位。在面向進程設計的系統(如早期的UNIX,Linux 2.4及更早的版本)中,進程是程序的基本執行實體;在面向線程設計的系統(如當代多數操做系統、Linux 2.6及更新的版本)中,進程自己不是基本運行單位,而是線程的容器。程序自己只是指令、數據及其組織形式的描述,進程纔是程序(那些指令和數據)的真正運行實例。若干進程有可能與同一個程序相關係,且每一個進程皆能夠同步(循序)或異步(平行)的方式獨立運行。現代計算機系統可在同一段時間內以進程的形式將多個程序加載到存儲器中,並藉由時間共享(或稱時分複用),以在一個處理器上表現出同時(平行性)運行的感受。一樣的,使用多線程技術(多線程即每個線程都表明一個進程內的一個獨立執行上下文)的操做系統或計算機體系結構,一樣程序的平行線程,可在多CPU主機或網絡上真正同時運行(在不一樣的CPU上)。安全

關於進程在操做系統中的狀態及調度流程以下圖性能優化

簡單圖:網絡

複雜圖:數據結構

線程

線程(英語:thread)是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。在Unix System V及SunOS中也被稱爲輕量進程(lightweight processes),但輕量進程更多指內核線程(kernel thread),而把用戶線程(user thread)稱爲線程。
線程是獨立調度和分派的基本單位。線程能夠爲操做系統內核調度的內核線程,如Win32線程;由用戶進程自行調度的用戶線程,如Linux平臺的POSIX Thread;或者由內核與用戶進程,如Windows 7的線程,進行混合調度。
同一進程中的多條線程將共享該進程中的所有系統資源,如虛擬地址空間,文件描述符和信號處理等等。但同一進程中的多個線程有各自的調用棧(call stack),本身的寄存器環境(register context),本身的線程本地存儲(thread-local storage)。
一個進程能夠有不少線程,每條線程並行執行不一樣的任務。
在多核或多CPU,或支持Hyper-threading的CPU上使用多線程程序設計的好處是顯而易見,即提升了程序的執行吞吐率。在單CPU單核的計算機上,使用多線程技術,也能夠把進程中負責I/O處理、人機交互而常被阻塞的部分與密集計算的部分分開來執行,編寫專門的workhorse線程執行密集計算,從而提升了程序的執行效率。多線程

總結:併發

  1. 一個程序至少有一個進程,一個進程至少有一個線程.(進程能夠理解成線程的容器)
  2. 進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率。
  3. 線程是最小的執行單元,進程是最小的資源單位。
  4. 進程本質上就是一段程序的運行過程。
  5. 線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。

  6. 進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位. 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程本身基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)可是它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源. 一個線程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行。

  7. 進程間的切換比線程間的切換要耗時的多得多。

python中的GIL

關於python的GIL鎖簡單說就是:不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行。

因爲GIL的存在,python中多線程其實並非真正的多線程,若是想要充分發揮CPU的資源,在python中大部分狀況須要使用多進程。然後面又通過優化,出現了多線程加協程。

在開發中,有兩種比較常見的處理狀況,一個是IO密集型,一個是計算密集型,因爲GIL鎖的存在,因此python仍是比較適用於IO操做,由於這樣能發揮多線程的能力,而要實現計算密集型就須要開多進程,纔可以真正的發揮計算機多核的能力。

這裏轉一下別人的文章,感受寫的很全面,很少說明:http://python.jobbole.com/87743/

多線程

 在python中使用的是threading模塊,它創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。

調用方式

直接調用

import threading
import time
 
def sayhi(num): #定義每一個線程要運行的函數
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例
 
    t1.start() #啓動線程,能夠認爲是讓進程進入上面進程圖中的就緒狀態
    t2.start() #啓動另外一個線程
 
    print(t1.getName()) #獲取線程名
    print(t2.getName())
View Code

 注意,函數傳入的時候要傳入函數對象,也就是不加括號的形式。

繼承式調用

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):  #繼承式調用必需要重寫這個函數

        print("running on number:%s" %self.num)    # self.name是這個線程的名字

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
View Code

線程經常使用方法

實例對象方法

  • join()
    • 在子線程完成運行以前,這個子線程的父線程將一直被阻塞。
  • setDaemon(Ture)
    •  將線程聲明爲守護線程,必須在start() 方法調用以前設置, 若是不設置爲守護線程程序會被無限掛起。這個方法基本和join是相反的。

      當咱們 在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成

      想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是 只要主線程

      完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法啦

  • run()
    • 線程被CPU調度後自動執行線程對象的run方法
  • start()
    • 啓動線程對象,能夠認爲讓其進入就緒狀態
  • isAlive()
    • 返回線程是否活動的
  • getName()
    • 返回線程名
  • setName()
    • 設置線程名

threading模塊提供的方法

  • threading.currentThread():
    • 返回當前的線程變量。
  • threading.enumerate():
    • 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  • threading.activeCount():
    • 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

互斥鎖

 首先看看下面的代碼

import time
import threading

NUM = 100  # 設定一個共享變量
thread_list = []


def lessNum():
    global NUM  # 在每一個線程中都獲取這個全局變量
    temp = NUM
    time.sleep(0.1)
    NUM = temp - 1  # 對此公共變量進行-1操做


if __name__ == '__main__':
    for i in range(100):
        t = threading.Thread(target=lessNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  # 等待全部線程執行完畢
        t.join()

    print('final num:', NUM)
View Code

觀察:time.sleep(0.1)  /0.001/0.0000001 結果分別是多少?

按照咱們的預想,應該是100個線程每次減1,而後結果是0,可是實際上不是這樣,

當讓函數睡0.1秒後,個人電腦的NUM的結果是99,

當讓函數睡0.001秒後個人電腦的NUM的結果是90左右,

當讓函數睡0.001秒後個人電腦的NUM的結果是也是90左右,

固然了,不一樣電腦由於運算速度不同,可能會產生不一樣的結果。可是不管怎樣也不是咱們預想的結果0,是由於,當線程開啓後,這100個線程操做的是同一個資源,當讓函數睡0.1秒後,按照如今大部分電腦的運行速度,足夠全部的線程得到那個NUM,而那個NUM的值都是100,減一以後,至關於給這個NUM賦值了100個100-1的結果,天然就是99,後面的0.001,和0.0000001結果也大體是這樣的流程,只不過隨着時間的減小,線程被cpu調用時須要必定的時間,因此一些線程處理的是其餘線程處理以後的結果。

由此能夠看出多個線程都在同時操做同一個共享資源,可能形成了資源破壞,怎麼辦呢?(join會形成串行,失去所線程的意義)

咱們能夠經過同步鎖來解決這種問題,咱們將核心的處理共享數據的地方用鎖鎖住,就像引子中寫的那樣,同一時間內只讓一個線程訪問這個資源。

互斥鎖的格式爲:

lock = threading.Lock()
lock.acquire()
要鎖的內容
lock.release()

上面例子中加鎖以後的樣子

import time
import threading

NUM = 100  # 設定一個共享變量
thread_list = []


def lessNum():
    global NUM  # 在每一個線程中都獲取這個全局變量
    lock.acquire()
    temp = NUM
    time.sleep(0.0000001)
    NUM = temp - 1  # 對此公共變量進行-1操做
    lock.release()

if __name__ == '__main__':
    lock = threading.Lock()
    for i in range(100):
        t = threading.Thread(target=lessNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  # 等待全部線程執行完畢
        t.join()

    print('final num:', NUM)
View Code

 能夠看出來,這樣修改了以後就像是串行的了,而這個和python沒有關係,其餘的編程語言也是這樣解決的。那麼這樣,多線程還有意義嗎?答案是確定的,由於只有這一部分須要加鎖,其餘部分則不須要。

死鎖與遞歸鎖

 在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都正在使用,全部這兩個線程在無外力做用下將一直等待下去。下面是一個死鎖的例子:

import threading, time


class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name, "gotlockA", time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name, "gotlockB", time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name, "gotlockB", time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name, "gotlockA", time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()


if __name__ == "__main__":
    lockA = threading.Lock()
    lockB = threading.Lock()
    threads = []
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
View Code

結果爲

能夠看到程序陷入了阻塞,實際上是發生了死鎖。而有一種經常使用的解決方式,就是使用遞歸鎖。

遞歸鎖的格式:

r_lock = threading.RLock()
r_lock = r_lock.acquire()
要鎖的內容 r_lock
= r_lock.release()

爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。也就是說每調用一次acquire方法就加1,每調用一次release方法就減1,沒人用的時候是默認的0。

上面例子中加鎖以後的樣子

import threading, time


class myThread(threading.Thread):
    def doA(self):
        r_lock.acquire()
        print(self.name, "gotlockA", time.ctime())
        time.sleep(3)
        r_lock.acquire()
        print(self.name, "gotlockB", time.ctime())
        r_lock.release()
        r_lock.release()

    def doB(self):
        r_lock.acquire()
        print(self.name, "gotlockB", time.ctime())
        time.sleep(2)
        r_lock.acquire()
        print(self.name, "gotlockA", time.ctime())
        r_lock.release()
        r_lock.release()

    def run(self):
        self.doA()
        self.doB()


if __name__ == "__main__":
    r_lock = threading.RLock()
    threads = []
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
View Code

同步

同步和異步在開始給的網址中已經說過,這裏爲了方便起見,仍是在進行說明 

同步異步一般用來形容一次方法調用。

  • 同步方法調用一旦開始,調用者必須等到方法調用返回後,才能繼續後續的行爲。
  • 異步方法調用更像一個消息傳遞,一旦開始,方法調用就會當即返回,調用者就能夠繼續後續的操做。而,異步方法一般會在另一個線程中,「真實」地執行着。整個過程,不會阻礙調用者的工做。

這裏實現同步就是對線程進行阻塞,等待另外一個線程將數據處理結束而後解除這個阻塞狀態。

建立同步對象:

event = threading.Event()

同步對象的經常使用方法:

event.wait():等待flag被設定,一旦event被設定,等同於pass
event.set():設定flag 

event.clear():清除flag

event.isSet(): 查看當前flag狀態

注:一個event能夠用在多個線程中。

例子

import threading, time


class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚你們都要加班到22:00。")
        print(event.isSet())  # False
        event.set()
        time.sleep(5)
        print("BOSS:<22:00>能夠下班了。")
        print(event.isSet())
        event.set()


class Worker(threading.Thread):
    def run(self):
        event.wait()  # 一旦event被設定,等同於pass

        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")


if __name__ == "__main__":
    event = threading.Event()

    threads = []
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print("ending.....")
View Code

信號量

信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。

計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)

BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。

 例子:

import threading, time


class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(2)
            semaphore.release()


if __name__ == "__main__":
    semaphore = threading.Semaphore(5) 

    thrs = []
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()
View Code

這個信號量默認是1。

不難看出,互斥鎖是信號量的一種特殊狀況(n=1時)。也就是說,徹底能夠用後者替代前者。可是,由於互斥鎖較爲簡單,且效率高,因此在必須保證資源獨佔的狀況下,仍是採用這種設計。

隊列

Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞 

建立一個隊列對象

import Queue
q
= Queue.Queue(maxsize = 10) Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

 python Queue模塊有三種隊列及構造函數

  1. Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize)
  2. LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize)
  3. 還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize)

經常使用的方法

  • q.put(item[, block[, timeout]])
    • 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲True。若是隊列當前爲空且block爲True,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲False,put方法將引起Full異常。
    • timeout爲等待時間
  • q.get([block[, timeout]])
    • 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。
    • timeout爲等待時間
  • q.get_nowait()方法

    • 至關Queue.get(False),這種方法在向一個空隊列取值的時候會拋一個Empty異常,因此更經常使用的方法是先判斷一個隊列是否爲空,若是不爲空則取值

  • q.put_nowait(item)
    • 至關Queue.put(item, False)
  • q.qsize()
    •  返回隊列的大小
  • q.empty()
    •  若是隊列爲空,返回True,反之False
  • q.full()
    • 若是隊列滿了,返回True,反之False
    • q.full 與 maxsize 大小對應
  • q.task_done() 
    • 消費者線程從隊列中get到任務後,任務處理完成,當全部的隊列中的任務處理完成後,會使調用queue.join()的線程返回,表示隊列中任務以處理完畢。
    • 若是當前一個join()正在阻塞,它將在隊列中的全部任務都處理完時恢復執行(即每個由put()調用入隊的任務都有一個對應的task_done()調用)。
  • q.join()
    • 阻塞調用線程,直到隊列中的全部任務被處理掉。
    • 只要有數據被加入隊列,未完成的任務數就會增長。當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減小。當未完成的任務數降到0,join()解除阻塞。
    • 實際上意味着等到隊列爲空,再執行別的操做 

除了按照先進先出,還有一個按照優先級的處理順序

q=queue.PriorityQueue()
q.put([5,100])      # 優先級爲5,放入參數爲100
q.put([7,200])      # 優先級爲7,放入參數爲200
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

生產者和消費者模型

 關於隊列,經常跟這個生產者和消費者的模型關聯起來,由於二者的契合度很高。因此這裏也提一下。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。

下面用隊列的內容寫一個生產者和消費者的例子

import time, random
import queue, threading

q = queue.Queue()


def Producer(name):
    count = 0
    while count < 10:
        print("making........")
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has produced %s baozi..' % (name, count))
        count += 1
        # q.task_done()
        # q.join()
        print("ok......")


def Consumer(name):
    count = 0
    while count < 10:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            # q.task_done()
            # q.join()
            print(data)
            print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' % (name, data))
        else:
            print("-----no baozi anymore----")
        count += 1


p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
View Code

多進程

 multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

調用方式

直接調用

from multiprocessing import Process
import time
def f(name):
    time.sleep(1)
    print('hello', name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('老王',))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')
View Code

繼承式調用

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name

    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')
View Code

Process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前尚未實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():無論任務是否完成,當即中止工做進程

屬性:

  daemon:和線程的setDeamon功能同樣

  name:進程名字。

  pid:進程號。

from multiprocessing import Process
import os
import time


def info(title):
    print("title:", title)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())


def f(name):
    info('function f')
    print('hello', name)


if __name__ == '__main__':

    info('main process line')

    time.sleep(1)
    print("------------------")
    p = Process(target=info, args=('老王',))
    p.start()
    p.join()
View Code

結果以下:

能夠從進程號看到父子進程之間的關係,而主進程的父進程不是解釋器,我用的工具是pycharm,因此主進程的父進程是pycharm的端口號。

進程間的通訊

進程隊列Queue

from multiprocessing import Process, Queue


def f(q, n):
    q.put([123, 456, 'hello'])
    q.put(n * n + 1)
    print("son process", id(q))


if __name__ == '__main__':
    q = Queue()
    print("main process", id(q))

    for i in range(3):
        p = Process(target=f, args=(q, i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
View Code

管道Pipe

pipe()返回兩個鏈接對象表明pipe的兩端。每一個鏈接對象都有send()方法和recv()方法。

可是若是兩個進程或線程對象同時讀取或寫入管道兩端的數據時,管道中的數據有可能會損壞。

當進程使用的是管道兩端的不一樣的數據則不會有數據損壞的風險。

 Pipe()函數返回一個由管道鏈接的鏈接對象,默認狀況下爲雙工(雙向)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([12, {"name": "laowang"}, 'hello'])
    response = conn.recv()
    print("response", response)
    conn.close()
    print("q_ID2:", id(conn))


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 雙向管道

    print("q_ID1:", id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()

    print(parent_conn.recv())
    parent_conn.send("hello")
    p.join()
View Code

Managers

上面的Queue和Pipe只實現了數據交互,沒有實現數據共享(即一個進程去改變另外一個進程的數據)。

Manager()返回的manager對象控制了一個server進程,此進程包含的python對象能夠被其餘的進程經過proxies來訪問。從而達到多進程間數據通訊且安全。

Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

from multiprocessing import Process, Manager
 
 
def f(d, l, n):
    d[n] = '1'  # {0:"1"}
    d['2'] = 2  # {0:"1","2":2}

    l.append(n)  # [0,1,2,3,4,   0,1,2,3,4,5,6,7,8,9]


if __name__ == '__main__':

    with Manager() as manager:

        d = manager.dict()  # {}
        l = manager.list(range(5))  # [0,1,2,3,4]

        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d, l, i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)    # {0: '1', 1: '1', 2: '1', 3: '1', 4: '1', '2': 2, 6: '1', 7: '1', 8: '1', 9: '1', 5: '1'}
        print(l)    # [0, 1, 2, 3, 4, 0, 2, 3, 1, 4, 5, 6, 7, 9, 8]
View Code

 進程互斥鎖

from multiprocessing import Process, Lock
import time


def f(l, i):
    l.acquire()
    time.sleep(1)
    print('hello world %s' % i)
    l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()
View Code

 進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有兩個方法:

  • pool.apply_async(func=Foo, args(i,), callback=demo)

    • 這個是異步的接口,進程併發執行。
    • 第三個爲回調函數,每執行完一次函數就調用一次回調函數
  • pool.apply(func=Foo, args(i,), callback=demo)

    • 這個是同步的接口,不管線程池設爲多少,都是一個進程一個進程的執行。

    • 第三個爲回調函數,每執行完一次函數就調用一次回調函數

注:以前感受回調函數徹底沒有做用,讓子進程去調用那個函數不就能夠了嗎,爲何要加一個回調函數呢,回調函數的區別就是他是由主進程調用的。這樣有什麼好處呢。好比開十個進程,咱們須要把這個行爲記錄下來,也就是咱們常說的日誌就能夠用回調函數來弄。咱們能夠把邏輯之外,並且公用的操做放到這個回調函數中,不用每次都要進程去作與邏輯無關的事情。並且這個回調函數必需要加一個值,就是用來接收進程函數中的return返回的值。

from  multiprocessing import Process, Pool
import time, os


def processPool(i):
    time.sleep(1)
    print(i)
    print("son", os.getpid())

    return "HELLO %s" % i


def Back(arg):
    print(arg)


if __name__ == '__main__':

    pool = Pool(5)  # 參數爲進程池中維護的進程數量,不寫默認爲當前計算機的核心數。
    print("main pid", os.getpid())
    for i in range(100):
        pool.apply_async(func=processPool, args=(i,), callback=Back)

    pool.close()
    pool.join()  # join與close調用順序是固定的

    print('end')
View Code

 注:pool.close()必須放在pool.join()的前面。

協程

協程,又稱微線程,纖程。英文名Coroutine。

協程是一種用戶級的輕量級線程。協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

在併發編程中,協程與線程相似,每一個協程表示一個執行單元,有本身的本地數據,與其它協程共享全局數據和其它資源。

目前主流語言基本上都選擇了多線程做爲併發設施,與線程相關的概念是搶佔式多任務(Preemptive multitasking),而與協程相關的是協做式多任務。

無論是進程仍是線程,每次阻塞、切換都須要陷入系統調用(system call),先讓CPU跑操做系統的調度程序,而後再由調度程序決定該跑哪個進程(線程)。
並且因爲搶佔式調度執行順序沒法肯定的特色,使用線程時須要很是當心地處理同步問題,而協程徹底不存在這個問題(事件驅動和異步程序也有一樣的優勢)。

由於協程是用戶本身來編寫調度邏輯的,對CPU來講,協程實際上是單線程,因此CPU不用去考慮怎麼調度、切換上下文,這就省去了CPU的切換開銷,因此協程在必定程度上又好於多線程。


優勢1: 協程有極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。
優勢2: 不須要多線程的鎖機制,協程是非搶佔式的,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。

協程的原理就是python中的生成器

首先用yield完成簡單的生產者和消費者模型

import time
import queue


def consumer(name):
    print("--->ready to eat baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)


def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while 1:
        time.sleep(1)
        print("\033[32;1m[producer]\033[0m is making baozi %s and %s" % (n, n + 1))
        con.send(n)
        con2.send(n + 1)

        n += 2


if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()
View Code

Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

from greenlet import greenlet


def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)


gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()


# 結果
# 12
# 56
# 34
# 78
例子

Gevent

咱們能夠看到能夠用greenlet進行了切換,可是還不夠,有個更進一步的封裝gevent

greenlet咱們還有手動切換,而gevent能夠自動的切換,

import gevent
import time


def f(num):
    for i in num:
        time.sleep(0.1)
        print(i)


if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(f, [1, 2, 3, 4, 5]),
        gevent.spawn(f, "helloworld")
    ])
View Code

注:進程池multiprocessing.Pool和gevent有衝突不能同時使用,有興趣的能夠研究gevent.pool協程池。

猴子補丁

使用Gevent的性能確實要比用傳統的線程高,甚至高不少。但這裏不得不說它的一個坑那就是猴子補丁

使用方法:

from gevent import monkey
monkey.patch_all()

(1)猴子補丁的由來

         猴子補丁的這個叫法起源於Zope框架,你們在修正Zope的Bug的時候常常在程序後面追加更新部分,這些被稱做是「雜牌軍補丁(guerillapatch)」,後來guerilla就漸漸的寫成了gorllia(猩猩),再後來就寫了monkey(猴子),因此猴子補丁的叫法是這麼莫名其妙的得來的。

         後來在動態語言中,不改變源代碼而對功能進行追加和變動,統稱爲「猴子補丁」。因此猴子補丁並非Python中專有的。猴子補丁這種東西充分利用了動態語言的靈活性,能夠對現有的語言Api進行追加,替換,修改Bug,甚至性能優化等等。

  使用猴子補丁的方式,gevent可以修改標準庫裏面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變爲協做式運行。也就是經過猴子補丁的monkey.patch_xxx()來將python標準庫中模塊或函數改爲gevent中的響應的具備協程的協做式對象(通常寫爲gevent.monkey.patch_all())。這樣在不改變原有代碼的狀況下,將應用的阻塞式方法,變成協程式的。

(2)猴子補丁使用時的注意事項

猴子補丁的功能很強大,可是也帶來了不少的風險,尤爲是像gevent這種直接進行API替換的補丁,整個Python進程所使用的模塊都會被替換,可能本身的代碼能hold住,可是其它第三方庫,有時候問題並很差排查,即便排查出來也是很棘手,因此,就像松本建議的那樣,若是要使用猴子補丁,那麼只是作功能追加,儘可能避免大規模的API覆蓋。或者得確保項目中用到其餘用到的網絡庫也必須使用純Python或者明確說明支持Gevent

補坑

在前面說了爲了減小GIL鎖對高併發的程序產生的影響,不少人想了不少辦法。

舉個例子:給你200W條url,須要你把每一個url對應的頁面抓取保存起來,這種時候,單單使用多進程,效果確定是不好的。爲何呢?

例如每次請求的等待時間是2秒,那麼以下(忽略cpu計算時間):

一、單進程+單線程:須要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的

二、單進程+多線程:例如咱們在這個進程中開了10個多線程,比1中可以提高10倍速度,也就是大約4.63天可以完成200W條抓取,請注意,這裏的實際執行是:線程1碰見了阻塞,CPU切換到線程2去執行,碰見阻塞又切換到線程3等等,10個線程都阻塞後,這個進程就阻塞了,而直到某個線程阻塞完成後,這個進程才能繼續執行,因此速度上提高大約能到10倍(這裏忽略了線程切換帶來的開銷,實際上的提高應該是不能達到10倍的),可是須要考慮的是線程的切換也是有開銷的,因此不能無限的啓動多線程(開200W個線程確定是不靠譜的)

三、多進程+多線程:這裏就厲害了,通常來講也有不少人用這個方法,多進程下,每一個進程都能佔一個cpu,而多線程從必定程度上繞過了阻塞的等待,因此比單進程下的多線程又更好使了,例如咱們開10個進程,每一個進程裏開20W個線程,執行的速度理論上是比單進程開200W個線程快10倍以上的(爲何是10倍以上而不是10倍,主要是cpu切換200W個線程的消耗確定比切換20W個進程大得多,考慮到這部分開銷,因此是10倍以上)。

而根據前面對協程的解釋,它是不須要沒有切換線程的開銷的。這個時候使用多進程+協程(能夠看做是每一個進程裏都是單線程,而這個單線程是協程化的)

多進程+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於數據量較大的爬蟲還有文件讀寫之類的效率提高是巨大的。

可是上面的內容也只是針對這種IO密集型,計算密集型仍是多進程+單線程跑吧,沒轍,這樣還要快一些。

#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()
 
import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
    try:
        s = requests.Session()
        r = s.get(url,timeout=1)#在這裏抓取頁面
    except Exception,e:
        print e 
    return ''
 
def process_start(url_list):
    tasks = []
    for url in url_list:
        tasks.append(gevent.spawn(fetch,url))
    gevent.joinall(tasks)#使用協程來執行
 
def task_start(filepath,flag = 100000):#每10W條url啓動一個進程
    with open(filepath,'r') as reader:#從給定的文件中讀取url
        url = reader.readline().strip()
        url_list = []#這個list用於存放協程任務
        i = 0 #計數器,記錄添加了多少個url到協程隊列
        while url!='':
            i += 1
            url_list.append(url)#每次讀取出url,將url添加到隊列
            if i == flag:#必定數量的url就啓動一個進程並執行
                p = Process(target=process_start,args=(url_list,))
                p.start()
                url_list = [] #重置url隊列
                i = 0 #重置計數器
            url = reader.readline().strip()
        if url_list not []:#若退出循環後任務隊列裏還有url剩餘
            p = Process(target=process_start,args=(url_list,))#把剩餘的url全都放到最後這個進程來執行
            p.start()
  
if __name__ == '__main__':
    task_start('./testData.txt')#讀取指定文件
一個例子
相關文章
相關標籤/搜索