多線程與多進程

多線程與多進程

  知識預覽

 

一 進程與線程的概念

1.1 進程

考慮一個場景:瀏覽器,網易雲音樂以及notepad++ 三個軟件只能順序執行是怎樣一種場景呢?另外,假若有兩個程序A和B,程序A在執行到一半的過程當中,須要讀取大量的數據輸入(I/O操做),而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。你是否是已經想到在程序A讀取數據的過程當中,讓程序B去執行,當程序A讀取完數據以後,讓程序B暫停。聰明,這固然沒問題,但這裏有一個關鍵詞:切換。html

既然是切換,那麼這就涉及到了狀態的保存,狀態的恢復,加上程序A與程序B所須要的系統資源(內存,硬盤,鍵盤等等)是不同的。天然而然的就須要有一個東西去記錄程序A和程序B分別須要什麼資源,怎樣去識別程序A和程序B等等(好比讀書)。python

進程定義:react

進程就是一個程序在一個數據集上的一次動態執行過程。進程通常由程序、數據集、進程控制塊三部分組成。咱們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程當中所須要使用的資源;進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。linux

舉一例說明進程:
想象一位有一手好廚藝的計算機科學家正在爲他的女兒烘製生日蛋糕。他有作生日蛋糕的食譜,廚房裏有所需的原料:麪粉、雞蛋、糖、香草汁等。在這個比喻中,作蛋糕的食譜就是程序(即用適當形式描述的算法)計算機科學家就是處理器(cpu),而作蛋糕的各類原料就是輸入數據。進程就是廚師閱讀食譜、取來各類原料以及烘製蛋糕等一系列動做的總和。如今假設計算機科學家的兒子哭着跑了進來,說他的頭被一隻蜜蜂蟄了。計算機科學家就記錄下他照着食譜作到哪兒了(保存進程的當前狀態),而後拿出一本急救手冊,按照其中的指示處理蟄傷。這裏,咱們看處處理機從一個進程(作蛋糕)切換到另外一個高優先級的進程(實施醫療救治),每一個進程擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完以後,這位計算機科學家又回來作蛋糕,從他離開時的那一步繼續作下去。git

1.2 線程

線程的出現是爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹同樣事的缺陷,使到進程內併發成爲可能。程序員

假設,一個文本程序,須要接受鍵盤輸入,將內容顯示在屏幕上,還須要保存信息到硬盤中。若只有一個進程,勢必形成同一時間只能幹同樣事的尷尬(當保存時,就不能經過鍵盤輸入內容)。如有多個進程,每一個進程負責一個任務,進程A負責接收鍵盤輸入的任務,進程B負責將內容顯示在屏幕上的任務,進程C負責保存內容到硬盤中的任務。這裏進程A,B,C間的協做涉及到了進程通訊問題,並且有共同都須要擁有的東西——-文本內容,不停的切換形成性能上的損失。如有一種機制,可使任務A,B,C共享資源,這樣上下文切換所須要保存和恢復的內容就少了,同時又能夠減小通訊所帶來的性能損耗,那就行了。是的,這種機制就是線程。
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能。線程沒有本身的系統資源。github

1.3 進程與線程的關係

進程是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。或者說進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位。
線程則是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。web

進程和線程的關係:redis

(1)一個線程只能屬於一個進程,而一個進程能夠有多個線程,但至少有一個線程。
(2)資源分配給進程,同一進程的全部線程共享該進程的全部資源。
(3)CPU分給線程,即真正在CPU上運行的是線程。算法

1.4 並行和併發

並行處理(Parallel Processing)是計算機系統中能同時執行兩個或更多個處理的一種計算方法。並行處理可同時工做於同一程序的不一樣方面。並行處理的主要目的是節省大型和複雜問題的解決時間。併發處理(concurrency Processing):指一個時間段中有幾個程序都處於已啓動運行到運行完畢之間,且這幾個程序都是在同一個處理機(CPU)上運行,但任一個時刻點上只有一個程序在處理機(CPU)上運行

併發的關鍵是你有處理多個任務的能力,不必定要同時。並行的關鍵是你有同時處理多個任務的能力。因此說,並行是併發的子集

1.5 同步與異步

在計算機領域,同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到收到返回信息才繼續執行下去;異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。舉個例子,打電話時就是同步通訊,發短息時就是異步通訊。

二 threading模塊

2.1 線程對象的建立

2.1.1 Thread類直接建立

複製代碼
import threading
import time

def countNum(n): # 定義某個線程要運行的函數

    print("running on number:%s" %n)

    time.sleep(3)

if __name__ == '__main__':

    t1 = threading.Thread(target=countNum,args=(23,)) #生成一個線程實例
    t2 = threading.Thread(target=countNum,args=(34,))

    t1.start() #啓動線程
    t2.start()

    print("ending!")
複製代碼

2.1.2 Thread類繼承式建立

複製代碼
#繼承Thread式建立

import threading
import time

class MyThread(threading.Thread):

    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num=num

    def run(self):
        print("running on number:%s" %self.num)
        time.sleep(3)

t1=MyThread(56)
t2=MyThread(78)

t1.start()
t2.start()
print("ending")
複製代碼

2.2 Thread類的實例方法

2.2.1 join()和setDaemon()

複製代碼
# join():在子線程完成運行以前,這個子線程的父線程將一直被阻塞。

# setDaemon(True):
        '''
         將線程聲明爲守護線程,必須在start() 方法調用以前設置,若是不設置爲守護線程程序會被無限掛起。

         當咱們在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成

         想退出時,會檢驗子線程是否完成。若是子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是隻要主線程

         完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法啦'''


import threading
from time import ctime,sleep
import time

def Music(name):

        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):

        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))


threads = []


t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    #t2.setDaemon(True)

    for t in threads:

        #t.setDaemon(True) #注意:必定在start以前設置
        t.start()

        #t.join()

    #t1.join()
    #t2.join()    #  考慮這三種join位置下的結果?

    print ("all over %s" %ctime())
複製代碼

daemon
A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when no alive non-daemon threads are left.

當daemon被設置爲True時,若是主線程退出,那麼子線程也將跟着退出,

反之,子線程將繼續運行,直到正常退出。

daemon

 2.2.2 其它方法

複製代碼
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
複製代碼

2.3 GIL(全局解釋器鎖)

複製代碼
'''
定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) '''
複製代碼

Python中的線程是操做系統的原生線程,Python虛擬機使用一個全局解釋器鎖(Global Interpreter Lock)來互斥線程對Python虛擬機的使用。爲了支持多線程機制,一個基本的要求就是須要實現不一樣線程對共享資源訪問的互斥,因此引入了GIL。
GIL:在一個線程擁有了解釋器的訪問權以後,其餘的全部線程都必須等待它釋放解釋器的訪問權,即便這些線程的下一條指令並不會互相影響。
在調用任何Python C API以前,要先得到GIL
GIL缺點:多處理器退化爲單處理器;優勢:避免大量的加鎖解鎖操做

2.3.1 GIL的早期設計

Python支持多線程,而解決多線程之間數據完整性和狀態同步的最簡單方法天然就是加鎖。 因而有了GIL這把超級大鎖,而當愈來愈多的代碼庫開發者接受了這種設定後,他們開始大量依賴這種特性(即默認python內部對象是thread-safe的,無需在實現時考慮額外的內存鎖和同步操做)。慢慢的這種實現方式被發現是蛋疼且低效的。但當你們試圖去拆分和去除GIL的時候,發現大量庫代碼開發者已經重度依賴GIL而很是難以去除了。有多難?作個類比,像MySQL這樣的「小項目」爲了把Buffer Pool Mutex這把大鎖拆分紅各個小鎖也花了從5.5到5.6再到5.7多個大版爲期近5年的時間,而且仍在繼續。MySQL這個背後有公司支持且有固定開發團隊的產品走的如此艱難,那又更況且Python這樣核心開發和代碼貢獻者高度社區化的團隊呢?

2.3.2 GIL的影響

不管你啓多少個線程,你有多少個cpu, Python在執行一個進程的時候會淡定的在同一時刻只容許一個線程運行。
因此,python是沒法利用多核CPU實現多線程的。
這樣,python對於計算密集型的任務開多線程的效率甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。

計算密集型:

#coding:utf8
from threading import Thread
import time

def counter():
i = 0
for _ in range(50000000):
i = i + 1

return True


def main():

l=[]
start_time = time.time()

for i in range(2):

t = Thread(target=counter)
t.start()
l.append(t)
t.join()

# for t in l:
# t.join()

end_time = time.time()
print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
main()


'''
py2.7:
串行:25.4523348808s
併發:31.4084379673s
py3.5:
串行:8.62115597724914s
併發:8.99609899520874s

'''

2.3.3 解決方案

用multiprocessing替代Thread multiprocessing庫的出現很大程度上是爲了彌補thread庫由於GIL而低效的缺陷。它完整的複製了一套thread所提供的接口方便遷移。惟一的不一樣就是它使用了多進程而不是多線程。每一個進程有本身的獨立的GIL,所以也不會出現進程之間的GIL爭搶。

#coding:utf8
from multiprocessing import Process
import time

def counter():
i = 0
for _ in range(40000000):
i = i + 1

return True

def main():

l=[]
start_time = time.time()

for _ in range(2):
t=Process(target=counter)
t.start()
l.append(t)
#t.join()

for t in l:
t.join()

end_time = time.time()
print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
main()


'''

py2.7:
串行:6.1565990448 s
並行:3.1639978885 s

py3.5:
串行:6.556925058364868 s
併發:3.5378448963165283 s

'''

固然multiprocessing也不是萬能良藥。它的引入會增長程序實現時線程間數據通信和同步的困難。就拿計數器來舉例子,若是咱們要多個線程累加同一個變量,對於thread來講,申明一個global變量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing因爲進程之間沒法看到對方的數據,只能經過在主線程申明一個Queue,put再get或者用share memory的方法。這個額外的實現成本使得原本就很是痛苦的多線程程序編碼,變得更加痛苦了。

總結:由於GIL的存在,只有IO Bound場景下得多線程會獲得較好的性能 - 若是對並行計算性能較高的程序能夠考慮把核心部分也成C模塊,或者索性用其餘語言實現 - GIL在較長一段時間內將會繼續存在,可是會不斷對其進行改進。

因此對於GIL,既然不能反抗,那就學會去享受它吧!

2.4 同步鎖 (Lock)

複製代碼
import time
import threading

def addNum():
    global num #在每一個線程中都獲取這個全局變量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 對此公共變量進行-1操做

num = 100  #設定一個共享變量

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部線程執行完畢
    t.join()

print('Result: ', num)
複製代碼

鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

複製代碼
import threading

R=threading.Lock()

R.acquire()
'''
對公共數據的操做
'''
R.release()
複製代碼

擴展思考

'''
一、爲何有了GIL,還須要線程同步?

多線程環境下必須存在資源的競爭,那麼如何才能保證同一時刻只有一個線程對共享資源進行存取?

加鎖, 對, 加鎖能夠保證存取操做的惟一性, 從而保證同一時刻只有一個線程對共享數據存取.

一般加鎖也有2種不一樣的粒度的鎖:

coarse-grained(粗粒度): python解釋器層面維護着一個全局的鎖機制,用來保證線程安全。
內核級經過GIL實現的互斥保護了內核的共享資源。

fine-grained(細粒度): 那麼程序員須要自行地加,解鎖來保證線程安全,
用戶級經過自行加鎖保護的用戶程序的共享資源。

二、GIL爲何限定在一個進程上?

你寫一個py程序,運行起來自己就是一個進程,這個進程是有解釋器來翻譯的,因此GIL限定在當前進程;
若是又建立了一個子進程,那麼兩個進程是徹底獨立的,這個字進程也是有python解釋器來運行的,因此
這個子進程上也是受GIL影響的


'''

2.5 死鎖與遞歸鎖

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。

複製代碼
import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()
        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()
複製代碼

在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

1
mutex  =  threading.RLock()

2.6 Event對象

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

複製代碼
event.isSet():返回event的狀態值;

event.wait():若是 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。
複製代碼

       

能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。

複製代碼
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()
複製代碼

threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:

複製代碼
def worker(event):
    while not event.is_set():
        logging.debug('Waiting for redis ready...')
        event.wait(2)
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)
複製代碼

這樣,咱們就能夠在等待Redis服務啓動的同時,看到工做線程里正在等待的狀況。

2.7 Semaphore(信號量)

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

複製代碼
import threading
import time

semaphore = threading.Semaphore(5)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()
複製代碼

應用:鏈接池

思考:與Rlock的區別?

2.8 隊列(queue)

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

2.8.1 get與put方法

複製代碼
'''

建立一個「隊列」對象

import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數
maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;
第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,
put方法將引起Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且
block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

'''
複製代碼

2.8.2  join與task_done方法

複製代碼
'''
join() 阻塞進程,直到全部任務完成,須要配合另外一個方法task_done。

    def join(self):
     with self.all_tasks_done:
      while self.unfinished_tasks:
       self.all_tasks_done.wait()

task_done() 表示某個任務完成。每一條get語句後須要一條task_done。


import queue
q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()

q.join()

print("ending!")
'''
複製代碼

2.8.3 其餘經常使用方法

複製代碼
'''

此包中的經常使用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小 q.empty() 若是隊列爲空,返回True,反之False q.full() 若是隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 至關q.get(False)非阻塞
q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 至關q.put(item, False) q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列爲空,再執行別的操做 '''
複製代碼

2.8.4 其餘模式

複製代碼
'''

Python Queue模塊有三種隊列及構造函數: 

一、Python Queue模塊的FIFO隊列先進先出。  class queue.Queue(maxsize) 
二、LIFO相似於堆,即先進後出。           class queue.LifoQueue(maxsize) 
三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 


import queue

#先進後出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#優先級
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=q.get()
  print(data)

'''
複製代碼

2.8.5 生產者消費者模型

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。

複製代碼
import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
複製代碼

三 multiprocessing模塊

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

因爲GIL的存在,python中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。

multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

3.1 python的進程調用

複製代碼
# Process類調用

from multiprocessing import Process
import time
def f(name):

    print('hello', name,time.ctime())
    time.sleep(1)

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('alvin:%s'%i,))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')

# 繼承Process類調用
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        # self.name = name

    def run(self):

        print ('hello', self.name,time.ctime())
        time.sleep(1)


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')
複製代碼

3.2 process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前尚未實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():無論任務是否完成,當即中止工做進程

屬性:

  daemon:和線程的setDeamon功能同樣

  name:進程名字。

  pid:進程號。

複製代碼
from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)

def foo(name):

    info(name)

if __name__ == '__main__':

    info('main process line')


    p1 = Process(target=info, args=('alvin',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")
複製代碼

經過tasklist(Win)或者ps -elf |grep(linux)命令檢測每個進程號(PID)對應的進程名

3.3 進程間通信 

3.3.1 進程對列Queue

複製代碼
from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
複製代碼

3.3.2 管道(pipe)

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

複製代碼
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"yuan"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    if __name__ == '__main__':

    parent_conn, child_conn = Pipe()
   
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("兒子你好!")
    p.join()
複製代碼

Pipe()返回的兩個鏈接對象表明管道的兩端。 每一個鏈接對象都有send()和recv()方法(等等)。 請注意,若是兩個進程(或線程)嘗試同時讀取或寫入管道的同一端,管道中的數據可能會損壞。

3.3.3 manager

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

複製代碼
from multiprocessing import Process, Manager

def f(d, l,n):

    d[n] = n
    d["name"] ="alvin"
    l.append(n)

    #print("l",l)

if __name__ == '__main__':

    with Manager() as manager:

        d = manager.dict()

        l = manager.list(range(5))
        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)
複製代碼

3.4 進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

複製代碼
from multiprocessing import Pool
import time

def foo(args):
 time.sleep(1)
 print(args)

if __name__ == '__main__':
 p = Pool(5)
 for i in range(30):
     p.apply_async(func=foo, args= (i,))

 p.close()   # 等子進程執行完畢後關閉線程池
 # time.sleep(2)
 # p.terminate()  # 馬上關閉線程池
 p.join()
複製代碼

進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有如下幾個主要方法:

  1. apply:從進程池裏取一個進程並執行
  2. apply_async:apply的異步版本
  3. terminate:馬上關閉線程池
  4. join:主進程等待全部子進程執行完畢,必須在close或terminate以後
  5. close:等待全部進程結束後,才關閉線程池

四 協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

4.1 yield與協程

複製代碼
import time

"""
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。
若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。
"""
# 注意到consumer函數是一個generator(生成器):
# 任何包含yield關鍵字的函數都會自動成爲生成器(generator)對象

def consumer():
    r = ''
    while True:
        # 三、consumer經過yield拿到消息,處理,又經過yield把結果傳回;
        #    yield指令具備return關鍵字的做用。而後函數的堆棧會自動凍結(freeze)在這一行。
        #    當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,
        #    就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。經過這種方式,迭代器能夠實現無限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    # 一、首先調用c.next()啓動生成器
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 二、而後,一旦生產了東西,經過c.send(n)切換到consumer執行;
        cr = c.send(n)
        # 四、produce拿到consumer處理的結果,繼續生產下一條消息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 五、produce決定不生產了,經過c.close()關閉consumer,整個過程結束。
    c.close()
if __name__=='__main__':
    # 六、整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。
    c = consumer()
    produce(c)
    
    
'''
result:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''
複製代碼

4.2 greenlet

greenlet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍後使用next()或send()操做進行恢復爲止。可使用一個調度器循環在一組生成器函數之間協做多個任務。greentlet是python中實現咱們所謂的"Coroutine(協程)"的一個基礎庫.

複製代碼
from greenlet import greenlet
 
def test1():
    print (12)
    gr2.switch()
    print (34)
    gr2.switch()
 
def test2():
    print (56)
    gr1.switch()
    print (78)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
複製代碼

4.2 基於greenlet的框架

4.2.1 gevent模塊實現協程

Python經過yield提供了對協程的基本支持,可是不徹底。而第三方的gevent爲Python提供了比較完善的協程支持。

gevent是第三方庫,經過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:

複製代碼
import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)
複製代碼

固然,實際代碼裏,咱們不會用gevent.sleep()去切換協程,而是在執行到IO操做時,gevent自動切換,代碼以下:

複製代碼
from gevent import monkey
monkey.patch_all() import gevent from urllib import request import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) start=time.time() gevent.joinall([ gevent.spawn(f, 'https://itk.org/'), gevent.spawn(f, 'https://www.github.com/'), gevent.spawn(f, 'https://zhihu.com/'), ]) # f('https://itk.org/') # f('https://www.github.com/') # f('https://zhihu.com/') print(time.time()-start)
複製代碼

擴展

===========================一


gevent是一個基於協程(coroutine)的Python網絡函數庫,經過使用greenlet提供了一個在libev事件循環頂部的高級別併發API。

主要特性有如下幾點:

<1> 基於libev的快速事件循環,Linux上面的是epoll機制

<2> 基於greenlet的輕量級執行單元

<3> API複用了Python標準庫裏的內容

<4> 支持SSL的協做式sockets

<5> 可經過線程池或c-ares實現DNS查詢

<6> 經過monkey patching功能來使得第三方模塊變成協做式

gevent.spawn()方法spawn一些jobs,而後經過gevent.joinall將jobs加入到微線程執行隊列中等待其完成,設置超時爲2秒。執行後的結果經過檢查gevent.Greenlet.value值來收集。


===========================二
一、關於Linux的epoll機制:

epoll是Linux內核爲處理大批量文件描述符而做了改進的poll,是Linux下多路複用IO接口select/poll的
加強版本,它能顯著提升程序在大量併發鏈接中只有少許活躍的狀況下的系統CPU利用率。epoll的優勢:

(1)支持一個進程打開大數目的socket描述符。select的一個進程所打開的FD由FD_SETSIZE的設置來限定,而epoll沒有這個限制,它所支持的FD上限是
最大可打開文件的數目,遠大於2048。

(2)IO效率不隨FD數目增長而線性降低:因爲epoll只會對「活躍」的socket進行操做,因而,只有」活躍」的socket纔會主動去調用 callback函數,其餘
idle狀態的socket則不會。

(3)使用mmap加速內核與用戶空間的消息傳遞。epoll是經過內核於用戶空間mmap同一塊內存實現的。

(4)內核微調。

二、libev機制

提供了指定文件描述符事件發生時調用回調函數的機制。libev是一個事件循環器:向libev註冊感興趣的事件,好比socket可讀事件,libev會對所註冊的事件
的源進行管理,並在事件發生時觸發相應的程序。

===========================三


‘’‘

import gevent

from gevent import socket

urls = [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ]

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=2)

[job.value for job in jobs]


[‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’]

’‘’

gevent.spawn()方法spawn一些jobs,而後經過gevent.joinall將jobs加入到微線程執行隊列中等待其完成,設置超時爲2秒。執行後的結果經過檢查gevent.Greenlet.value值來收集。gevent.socket.gethostbyname()函數與標準的socket.gethotbyname()有相同的接口,但它不會阻塞整個解釋器,所以會使得其餘的greenlets跟隨着無阻的請求而執行。

Monket patching

Python的運行環境容許咱們在運行時修改大部分的對象,包括模塊、類甚至函數。雖然這樣作會產生「隱式的反作用」,並且出現問題很難調試,但在須要修改Python自己的基礎行爲時,Monkey patching就派上用場了。Monkey patching可以使得gevent修改標準庫裏面大部分的阻塞式系統調用,包括socket,ssl,threading和select等模塊,而變成協做式運行。

 

from gevent import monkey ;

monkey . patch_socket ()

import urllib2

 

經過monkey.patch_socket()方法,urllib2模塊可使用在多微線程環境,達到與gevent共同工做的目的。

事件循環

不像其餘網絡庫,gevent和eventlet相似, 在一個greenlet中隱式開始事件循環。沒有必須調用run()或dispatch()的反應器(reactor),在twisted中是有 reactor的。當gevent的API函數想阻塞時,它得到Hub實例(執行時間循環的greenlet),並切換過去。若是沒有集線器實例則會動態 建立。

libev提供的事件循環默認使用系統最快輪詢機制,設置LIBEV_FLAGS環境變量可指定輪詢機制。LIBEV_FLAGS=1爲select, LIBEV_FLAGS = 2爲poll, LIBEV_FLAGS = 4爲epoll,LIBEV_FLAGS = 8爲kqueue。

Libev的API位於gevent.core下。注意libev API的回調在Hub的greenlet運行,所以使用同步greenlet的API。可使用spawn()和Event.set()等異步API。

eventlet實現協程(瞭解)

eventlet 是基於 greenlet 實現的面向網絡應用的併發處理框架,提供「線程」池、隊列等與其餘 Python 線程、進程模型很是類似的 api,而且提供了對 Python 發行版自帶庫及其餘模塊的超輕量併發適應性調整方法,比直接使用 greenlet 要方便得多。

其基本原理是調整 Python 的 socket 調用,當發生阻塞時則切換到其餘 greenlet 執行,這樣來保證資源的有效利用。須要注意的是:
eventlet 提供的函數只能對 Python 代碼中的 socket 調用進行處理,而不能對模塊的 C 語言部分的 socket 調用進行修改。對後者這類模塊,仍然須要把調用模塊的代碼封裝在 Python 標準線程調用中,以後利用 eventlet 提供的適配器實現 eventlet 與標準線程之間的協做。
雖然 eventlet 把 api 封裝成了很是相似標準線程庫的形式,但二者的實際併發執行流程仍然有明顯區別。在沒有出現 I/O 阻塞時,除非顯式聲明,不然當前正在執行的 eventlet 永遠不會把 cpu 交給其餘的 eventlet,而標準線程則是不管是否出現阻塞,老是由全部線程一塊兒爭奪運行資源。全部 eventlet 對 I/O 阻塞無關的大運算量耗時操做基本沒有什麼幫助。

總結

協程的好處:

無需線程上下文切換的開銷
無需原子操做鎖定及同步的開銷
方便切換控制流,簡化編程模型
高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。
缺點:

沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

五 IO模型

同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什麼,到底有什麼區別?這個問題其實不一樣的人給出的答案均可能不一樣,好比wiki,就認爲asynchronous IO和non-blocking IO是一個東西。這實際上是由於不一樣的人的知識背景不一樣,而且在討論這個問題的時候上下文(context)也不相同。因此,爲了更好的回答這個問題,先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。 

Stevens在文章中一共比較了五種IO Model:

      •     blocking IO
      •     nonblocking IO
      •     IO multiplexing
      •     signal driven IO
      •     asynchronous IO

因爲signal driven IO在實際中並不經常使用,因此我這隻說起剩下的四種IO Model。
再說一下IO發生時涉及的對象和步驟。
對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,它會經歷兩個階段:

  •  等待數據準備 (Waiting for the data to be ready)
  •  將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

記住這兩點很重要,由於這些IO Model的區別就是在兩個階段上各有不一樣的狀況。

5.1 blocking IO (阻塞IO)

在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來講,不少時候數據在一開始尚未到達(好比,尚未收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。
因此,blocking IO的特色就是在IO執行的兩個階段都被block了。

5.2 non-blocking IO(非阻塞IO)

linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:

從圖中能夠看出,當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。因此,用戶進程實際上是須要不斷的主動詢問kernel數據好了沒有。

 注意:

      在網絡IO時候,非阻塞IO也會進行recvform系統調用,檢查數據是否準備好,與阻塞IO不同,」非阻塞將大的整片時間的阻塞分紅N多的小的阻塞, 因此進程不斷地有機會 ‘被’ CPU光顧」。即每次recvform系統調用之間,cpu的權限還在進程手中,這段時間是能夠作其餘事情的,

      也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
sk.bind(('127.0.0.1',6667))
sk.listen(5)
sk.setblocking(False)
while True:
try:
print ('waiting client connection .......')
connection,address = sk.accept() # 進程主動輪詢
print("+++",address)
client_messge = connection.recv(1024)
print(str(client_messge,'utf8'))
connection.close()
except Exception as e:
print (e)
time.sleep(4)

#############################client

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
sk.connect(('127.0.0.1',6667))
print("hello")
sk.sendall(bytes("hello","utf8"))
time.sleep(2)
break

優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在同時執行)。

缺點:任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。

5.3 IO multiplexing(IO多路複用)

      IO multiplexing這個詞可能有點陌生,可是若是我說select,epoll,大概就都能明白了。有些地方也稱這種IO方式爲event driven IO。咱們都知道,select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:

      當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。(多說一句。因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)
在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接 

#***********************server.py

import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8800))
sk.listen(5)
sk.setblocking(False)
inputs=[sk,]

while True:
r,w,e=select.select(inputs,[],[],5)
print(len(r))

for obj in r:
if obj==sk:
conn,add=obj.accept()
print("conn:",conn)
inputs.append(conn)
else:

data_byte=obj.recv(1024)
print(str(data_byte,'utf8'))
if not data_byte:
inputs.remove(obj)
continue
inp=input('回答%s: >>>'%inputs.index(obj))
obj.sendall(bytes(inp,'utf8'))

print('>>',r)


#***********************client.py

import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8802))

while True:
inp=input(">>>>") # how much one night?
sk.sendall(bytes(inp,"utf8"))
data=sk.recv(1024)
print(str(data,'utf8'))

 

思考1:select監聽fd變化的過程

用戶進程建立socket對象,拷貝監聽的fd到內核空間,每個fd會對應一張系統文件表,內核空間的fd響應到數據後,就會發送信號給用戶進程數據已到;用戶進程再發送系統調用,好比(accept)將內核空間的數據copy到用戶空間,同時做爲接受數據端內核空間的數據清除,這樣從新監聽時fd再有新的數據又能夠響應到了(發送端由於基於TCP協議因此須要收到應答後纔會清除)。

思考2: 上面的示例中,開啓三個客戶端,分別連續向server端發送一個內容(中間server端不迴應),結果會怎樣,爲何?

 5.4 Asynchronous I/O(異步IO)

linux下的asynchronous IO其實用得不多。先看一下它的流程:

 

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

5.5 IO模型比較分析

      到目前爲止,已經將四個IO Model都介紹完了。如今回過頭來回答最初的那幾個問題:blocking和non-blocking的區別在哪,synchronous IO和asynchronous IO的區別在哪。
先回答最簡單的這個:blocking vs non-blocking。前面的介紹中其實已經很明確的說明了這二者的區別。調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。

在說明synchronous IO和asynchronous IO的區別以前,須要先給出二者的定義。Stevens給出的定義(實際上是POSIX的定義)是這樣子的:
    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked; 
      二者的區別就在於synchronous IO作」IO operation」的時候會將process阻塞。按照這個定義,以前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。有人可能會說,non-blocking IO並無被block啊。這裏有個很是「狡猾」的地方,定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。

各個IO Model的比較如圖所示:

      

通過上面的介紹,會發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。

 5.6 selectors模塊

複製代碼
import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
複製代碼

 

 

 

 

 

 

 
 
 
好文要頂  已關注  收藏該文   
相關文章
相關標籤/搜索