python併發學習總結

1、理解操做系統

操做系統(OS)統管了計算機的全部硬件,並負責爲應用程序分配和回收硬件資源。
硬件資源老是有限的,而應用程序對資源的慾望都是貪婪的。
當多個應用程序發生硬件資源爭奪時,OS負責出面調度,保證多任務的資源分配以保證系統穩定執行。
只有CPU能夠執行代碼,因此應用程序(任務)執行前,必須申請到CPU資源,同一時刻,一個CPU只能執行一個任務代碼。
計算機的CPU數量(資源方)遠遠小於須要執行的任務數(需求方),操做系統將CPU的資源按照時間片劃分,並根據任務類型分配,各任務輪流使用CPU
CPU的執行/切換速度很是快,對於用戶而言,多任務看上去就像同時執行同樣,此稱爲併發。ios

以下是串行和併發的對比:
算法


計算機的內存、硬盤、網卡、屏幕、鍵盤等硬件提供了數據交換的場所。
OS提供了IO接口以實現數據交換,數據交換的過程通常不須要CPU的參與。
IO接口有兩種類型:
一、阻塞型IO
發生IO(數據交換)的時候,調用線程沒法向下執行剩餘代碼,意圖佔用CPU但不執行任何代碼,單線程阻塞型IO自身沒法支持併發
二、非阻塞型IO
發生IO(數據交換)的時候,調用線程能夠向下執行剩餘代碼,單線程非阻塞型IO自身能夠支持併發編程

以下是阻塞型IO和非阻塞型IO的對比:
windows

2、任務類型

根據一個任務執行期間佔用CPU的比例來劃分,有兩種類型:
一、CPU密集型
絕大部分時間都是佔用CPU並執行代碼,好比科學計算任務
二、IO密集型
絕大部分時間都未佔用CPU,而是在發生IO操做,好比網絡服務安全

3、Socket模塊

OS提供了阻塞IO和非阻塞IO兩種類型的接口,應用程序能夠自行選擇。
Socket模塊封裝了兩種接口,Socket模塊提供的函數默認是阻塞IO類型。
用戶能夠選擇手工切換至非阻塞IO類型,使用socketobj.setblocking(False)切換至非阻塞IO模式。
下面將經過一個簡單的例子程序來記錄對併發的學習思考及總結。服務器

4、一個簡單的C/S程序

客戶端:循環接收用戶的輸入,併發送給服務器。從服務器接收反饋並打印至屏幕。
服務器:將接收到的用戶輸入,變成大寫並返回給客戶端。網絡

客戶端代碼固定,主要思考服務器端的代碼。
通常咱們會這樣寫服務端代碼:多線程

# 服務器端
import socket

addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.listen(5)
print('監聽中...')

while True:  # 連接循環
    conn, client = server.accept()
    print(f'一個客戶端上線 -> {client}')

    while True:  # 消息循環
        try:
            request = conn.recv(1024)
            if not request:
                break
            print(f"request: {request.decode('utf-8')}")
            conn.send(request.upper())

        except ConnectionResetError as why:
            print(f'客戶端丟失,緣由是: {why}')
            break

    conn.close()

客戶端代碼保持不變:併發

# 客戶端
import socket

addr = ('127.0.0.1', 8080)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(addr)
print(f'服務器{addr}鏈接成功')

while True:  # 消息循環
    inp = input('>>>').strip()
    if not inp: continue

    try:
        client.send(inp.encode('utf-8'))
        response = client.recv(1024)
        print(response.decode('utf-8'))

    except ConnectionResetError as why:
        print(f'服務端丟失,緣由是: {why}')
        break

client.close()

這種形式的編碼我稱爲:單線程+阻塞IO+循環串行,有以下幾個特色:
一、編碼簡單,模型簡潔,可讀性強
二、串行提供服務,用戶使用服務器必須一個一個排隊

單一線程的阻塞IO模型是沒法支持併發的,若是要支持併發,有以下兩類解決方案。

5、使用阻塞IO實現併發

單線程阻塞IO,本質上是沒法實現併發的。由於一旦發生IO阻塞,線程就會阻塞,下方代碼不會繼續執行。若是要使用單線程阻塞IO來實現併發,須要增長線程數目或者進程數目,當某一個線程/進程發生阻塞的時候,由OS調度至另外一個線程/進程執行。


方案一:阻塞IO+多進程

服務器端代碼
import socket
from multiprocessing import Process

def task(conn):
    """通訊循環處理函數"""

    while True:
        try:
            request = conn.recv(1024)
            if not request:
                break
            print(f"request: {request.decode('utf-8')}")
            conn.send(request.upper())

        except ConnectionResetError as why:
            print(f'客戶端丟失,緣由是: {why}')
            break

if __name__ == '__main__':  # windows下須要把新建進程寫到main中,否則會報錯
    addr = ('127.0.0.1', 8080)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(addr)
    server.listen(5)
    print('監聽中...')

    while True:
        conn, client = server.accept()
        print(f'一個客戶端上線 -> {client}')

        p = Process(target=task, args=(conn,))  # 開啓子進程處理與用戶的消息循環
        p.start()

將服務器對用戶的消息循環操做封裝到進程中,單進程依然會發生阻塞
進程之間的調度交由OS負責(重要)
進程過重,建立和銷燬進程都須要比較大的開銷,此外,一臺設備所能涵蓋的進程數量很是有限(通常就幾百左右)。
進程之間的切換開銷也不小。
當進程數小於等於CPU核心數的時候,能夠實現真正的並行,當進程數大於CPU核心的時候,依然以併發執行。


方案二:阻塞IO+多線程

服務器端代碼
import socket
from threading import Thread

def task(conn):
    """通訊循環處理函數"""

    while True:
        try:
            request = conn.recv(1024)
            if not request:
                break
            print(f"request: {request.decode('utf-8')}")
            conn.send(request.upper())

        except ConnectionResetError as why:
            print(f'客戶端丟失,緣由是: {why}')
            break

if __name__ == '__main__':
    addr = ('127.0.0.1', 8080)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(addr)
    server.listen(5)
    print('監聽中...')

    while True:
        conn, client = server.accept()
        print(f'一個客戶端上線 -> {client}')

        t = Thread(target=task, args=(conn,))  # 啓動多線程處理與用戶的消息循環
        t.start()

將服務器對用戶的操做封裝到線程中,單線程中依然會發生IO阻塞。
線程之間的調度交由OS負責(重要)。
線程較輕,建立和銷燬的開銷都比較小,可是線程數量也不會太大,一臺設備通常能容納幾百至上千的線程。
注意:由於CPython的GIL的存在,使用CPython編寫的多線程代碼,只能使用一個CPU核心,換句話說,使用官方的解釋器執行Python多線程代碼,沒法並行(單進程中)。
線程之間的切換開銷比較小。
實際上,多線程的最大問題並非併發數太少,而是數據安全問題。
線程之間共享同一進程的數據,在頻繁發生IO操做的過程當中,不免須要修改共享數據,這就須要增長額外的處理,當線程數量大量增長時,如何妥善處理數據安全的問題就會變成主要困難。


阻塞IO模型的思考和總結

一、多線程和多進程都是基於阻塞IO模式提供的併發,二者編程模型比較簡單,可讀性也很高。
二、若是使用多線程/進程的方案來提供併發,當線程/進程數量不斷增大時,系統穩定性將會降低。雖然可使用線程/進程池來提供必定的優化,但超過必定數量以後,池子發揮的效果也會愈來愈小。因此,二者都沒法支持超大規模的併發(如C10M及以上)。
三、線程/進程切換都交由OS調度,調度策略依據OS的算法,應用程序沒法主動控制,沒法針對任務的特性作一些必要的調度算法調整。
四、編碼思惟直接、易理解,學習曲線平緩。
五、多線程/進程的方案能夠理解爲單純的增長資源,若是要想支持超大規模的併發,單純的增長資源的行爲並不合理(資源不可能無限或者總得考慮成本以及效率,並且數量越大,原有的缺點就會越凸顯)。
六、另外一種解決方案的核心思路是:改變IO模型。

6、使用非阻塞IO實現併發

單線程非阻塞IO模型,自己就直接支持併發,爲啥?請回頭看看阻塞IO和非阻塞IO的流程圖片。
非阻塞IO接口的核心是:調用線程一旦向OS發起IO調用,OS就直接返回結果,所以,調用線程不會被阻塞而能夠執行下方代碼。不過也正由於不會阻塞,調用線程沒法判斷當即返回的結果是否是指望結果,因此調用線程須要增長額外的操做對返回結果進行判斷,正由於這一點,就增長了編程難度(增長的難度可不是一點啊)。

對當即返回的結果進行判斷的方案有兩種:

  1. 輪詢
    線程按期/不按期主動發起查詢和判斷
  2. 回調函數+事件循環
    線程在發起IO時註冊回調函數,而後統一處理事件循環

注意:非阻塞IO實現併發有多種解決方案,編程模型的可讀性都不高,有些方案的編程思惟甚至晦澀、難以理解、且編碼困難。


方案一:非阻塞IO+Try+輪詢

服務器端代碼
import socket

addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.setblocking(False)
server.listen(5)
print('監聽中...')

# 須要執行接收的conn對象放入此列表
recv_list = []

# 須要發送數據的conn對象和數據放入此列表
send_list = []

# 執行連接循環
while True:
    try:
        conn, client = server.accept()
        # 執行成功,說明返回值是conn,client
        print(f'一個客戶端上線 -> {client}')
        # 將成功連接的conn放入列表,當accept發生錯誤的時候執行conn的消息接收操做
        recv_list.append(conn)

    except BlockingIOError:
        # 執行accept不成功,意味着當前未有任何鏈接
        # 在下一次執行accept以前,能夠執行其餘的任務(消息接收操做)

        # 沒法對處於遍歷期間的接收列表執行remove操做,使用臨時列表存儲須要刪除的conn對象
        del_recv_list = []

        # 對已經成功連接的conn列表執行接收操做
        for conn in recv_list:
            # 對每個conn對象,執行recv獲取request
            try:
                # recv也是非阻塞
                request = conn.recv(1024)
                # 執行成功,就要處理request
                if not request:
                    # 當前conn連接已經失效
                    conn.close()
                    # 再也不接收此conn連接的消息,將失效conn加入刪除列表
                    del_recv_list.append(conn)
                    # 當前conn處理完畢,切換下一個
                    continue
                # request有消息,處理,而後須要加入發送列表中
                response = request.upper()
                # 發送列表須要存放元組,發送conn和發送的數據
                send_list.append((conn, response))

            except BlockingIOError:
                # 當前conn的數據尚未準備好,處理下一個conn
                continue
            except ConnectionResetError:
                # 當前conn失效,再也不接收此conn消息
                conn.close()
                del_recv_list.append(conn)

        # 沒法處理髮送列表遍歷期間的remove,使用臨時列表
        del_send_list = []

        # 接收列表所有處理完畢,準備處理髮送列表
        for item in send_list:
            conn = item[0]
            response = item[1]

            # 執行發送
            try:
                conn.send(response)
                # 發送成功,就應該從發送列表中移除此項目
                del_send_list.append(item)

            except BlockingIOError:
                # 發送緩衝區有可能已經滿了,留待下次發送處理
                continue
            except ConnectionResetError:
                # 連接失效
                conn.close()
                del_recv_list.append(conn)
                del_send_list.append(item)

        # 刪除接收列表中已經失效的conn對象
        for conn in del_recv_list:
            recv_list.remove(conn)

        # 刪除發送列表中已經發送或者不須要發送的對象
        for item in del_send_list:
            send_list.remove(item)

服務器使用單線程實現了併發。
對於accept接收到的多個conn對象,加入列表,並經過遍歷讀取列表、發送列表來提供多用戶訪問。

單線程中的Socket模塊提供的IO函數都被設置成:非阻塞IO類型。
增長了額外操做:對非阻塞調用當即返回的結果,使用了Try來判斷是否爲指望值。
由於不知道什麼時候返回的結果是指望值,因此須要不停的發起調用,並經過Try來判斷,即,輪詢。
兩次輪詢期間,線程能夠執行其餘任務。可是模型中也只是不停的發起輪詢,並無利用好這些時間。

編碼模型複雜,難理解。

優化:此模型中的主動輪詢的工做由程序負責,其實能夠交由OS代爲操做。這樣的話,應用程序就不須要編寫輪詢的部分,能夠更聚焦於業務邏輯(upper()的部分),Python提供了Select模塊以處理應用程序的輪詢工做。


方案二:非阻塞IO+Select代理輪詢

服務器端代碼
import socket
import select

addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.setblocking(False)
server.listen(5)
print('監聽中...')

# 最開始的server對象須要被監聽,一旦可讀,說明能夠執行accept
read_list = [server,]

# 須要監聽的寫列表,一旦wl中可寫對象處理完send,應該將它也今後列表中刪除
write_list = []

# 用於臨時存放某一個sock對象須要發送的數據
data_dic = {}

# 不停的發起select查詢
while True:

    # 發起select查詢,嘗試獲得能夠操做的socket對象
    rl, wl, xl = select.select(read_list, write_list, [], 1)

    # 操做可讀列表
    for sock in rl:
        # 若是可讀列表中的對象是server,意味着有連接,則server可執行accept
        if sock is server:
            # 執行accept必定不會報錯,因此不須要try
            conn, client = sock.accept()
            # 一旦得到conn,就須要將此conn加入可讀列表
            read_list.append(conn)
        else:
            # 說明可讀的對象是普通的conn對象,執行recv時要處理連接失效問題
            try:
                request = sock.recv(1024)

            except (ConnectionResetError, ConnectionAbortedError):
                # 此連接失效
                sock.close()
                read_list.remove(sock)
            else:
                # 還須要繼續判斷request的內容
                if not request:
                    # 說明此conn連接失效
                    sock.close()
                    # 再也不監控此conn
                    read_list.remove(sock)
                    continue
                # 處理請求
                response = request.upper()
                # 加入發送列表
                write_list.append(sock)
                # 保存發送的數據
                data_dic[sock] = response

    # 操做可寫列表
    for sock in wl:
        # 執行發送操做,send也會出錯
        try:
            sock.send(data_dic[sock])
            # 發送完畢後,須要移除發送列表
            write_list.remove(sock)
            # 須要移除發送數據
            data_dic.pop(sock)

        except (ConnectionResetError, ConnectionAbortedError):
            # 此連接失效
            sock.close()
            read_list.remove(sock)
            write_list.remove(sock)

服務器使用單線程實現了併發。
使用了Select模塊以後,應用程序再也不須要編寫主動輪詢的代碼,而是將此部分工做交由Select模塊的select函數代爲處理。
應用程序只須要遍歷select函數返回的可操做socket列表,並處理相關業務邏輯便可。
雖然應用程序將輪詢工做甩給了select,本身不用編寫代碼。不過select函數的底層接口效率不高,使用epoll接口能夠提高效率,此接口被封裝在Selectors模塊中。
此外,select函數是一個阻塞IO,在併發數不多的時候,線程大部分時間會阻塞在select函數上。因此select函數應該適用於隨時隨刻都有socket準備好、大規模併發的場景。
編碼困難,模型難理解。


select函數接口說明

def select(rlist, wlist, xlist, timeout=None): # real signature unknown; restored from __doc__
    """
    select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
    
    Wait until one or more file descriptors are ready for some kind of I/O.
    The first three arguments are sequences of file descriptors to be waited for:
    rlist -- wait until ready for reading
    wlist -- wait until ready for writing
    xlist -- wait for an ``exceptional condition''
    If only one kind of condition is required, pass [] for the other lists.
    A file descriptor is either a socket or file object, or a small integer
    gotten from a fileno() method call on one of those.
    
    The optional 4th argument specifies a timeout in seconds; it may be
    a floating point number to specify fractions of seconds.  If it is absent
    or None, the call will never time out.
    
    The return value is a tuple of three lists corresponding to the first three
    arguments; each contains the subset of the corresponding file descriptors
    that are ready.
    
    *** IMPORTANT NOTICE ***
    On Windows, only sockets are supported; on Unix, all file
    descriptors can be used.
    """
    pass
  1. 輸入4個參數(3位置,1默認),返回3個值
  2. select函數是阻塞IO,函數的返回必須等到至少1個文件描述符準備就緒
  3. 位置參數rlist/wlist/xlist分爲是:須要監控的讀列表/寫列表/例外列表(第3參數暫不理解)
  4. windows下,列表中只能放socket對象,unix下,能夠聽任何文件描述符
  5. 第4參數若是是None(默認),則會永久阻塞,不然按照給定的值(單位是秒)發生超時,可使用小數如0.5秒
  6. 返回值是3個列表,裏面涵蓋的是能夠操做的文件描述符對象

關於輪詢效率的思考

輪詢操做,效率不高。
輪詢的工做視角是:發起者按期/不按期主動發起詢問,若是數據沒有準備好,就繼續發起詢問。若是數據準備好了,發起者就處理這些數據。
假設,調用者在第35次主動輪詢的時候發現數據準備好了,那麼意味着前34次主動輪詢的操做是沒有任何收益的。
調用者要想知道數據是否就緒,就要主動詢問,而主動詢問的效率又比較低。
這個矛盾的核心關鍵在於:如何得知數據準備就緒這件事呢?

使用回調函數+事件循環
此種方案中,調用者不會主動發起輪詢,而是被動的等待IO操做完成,並由OS向調用者發起準備就緒的事件通知。

方案三:非阻塞IO+Selectors+回調函數+事件循環

# 服務器端代碼
import socket
from selectors import DefaultSelector, EVENT_READ

def recv_read(conn, mask):
    # recv回調函數
    try:
        request = conn.recv(1024)
        if not request:
            # 意味着連接失效,再也不監控此socket
            conn.close()
            selector.unregister(conn)
            # 結束此回調的執行
            return None
        # 連接正常,處理數據
        conn.send(request.upper())

    except (ConnectionResetError, ConnectionAbortedError):
        # 連接失效
        conn.close()
        selector.unregister(conn)

def accept_read(server, mask):
    # accept回調函數
    conn, client = server.accept()
    print(f'一個客戶端上線{client}')

    # 監聽conn對象的可讀事件的發生,並註冊回調函數
    selector.register(conn, EVENT_READ, recv_read)


if __name__ == '__main__':
    addr = ('127.0.0.1', 8080)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(addr)
    server.setblocking(False)
    server.listen(5)
    print('監聽中...')

    # 獲取對象
    selector = DefaultSelector()
    # 第一個註冊,監聽server對象的可讀事件的發生,並註冊回調函數
    selector.register(server, EVENT_READ, accept_read)

    # 執行事件循環
    while True:
        # 循環調用select,select是阻塞調用,返回就緒事件
        events = selector.select()
        for key, mask in events:
            # 獲取此事件預先註冊的回調函數
            callback = key.data
            # 對此事件中準備就緒的socket對象執行回調
            callback(key.fileobj, mask)

服務器使用單線程實現了併發。
OS使用了Selectors自行選擇最優的底層接口監聽socket對象。
程序再也不須要主動發起查詢,而是註冊回調函數。
增長事件循環,用於處理準備就緒的socket對象,調用預先註冊的回調函數。
應用程序不用再關注如何判斷非阻塞IO的返回值,而將精力聚焦於回調函數的編寫。

方案四:非阻塞IO+協程+回調函數+事件循環(待後續補充)

pass


非阻塞IO的思考和總結(待後續補充)

  1. 若是將一個IO密集型任務的IO模型設置爲非阻塞,則此任務類型將會從IO密集型逐漸轉變爲CPU密集型。
  2. 非阻塞IO的編程模型比較困難,可讀性較差,模型理解困難
  3. 我認爲,含有非阻塞IO+回調+事件循環的編程模型,就是異步編程。
    pass

7、關於同步/異步,阻塞IO/非阻塞IO的區別和思考

  1. 阻塞IO和非阻塞IO指的是OS提供的兩種IO接口,區別在於調用時是否當即返回。
  2. 同步和異步指的是兩個任務之間的執行模型
    同步:兩個任務關聯性大,任務相互依賴,對任務執行的先後順序有必定要求
    異步:兩個任務關聯性小,任務能夠相互獨立,任務執行順序沒有要求
  3. 網上有不少關於同步阻塞、同步非阻塞、異步阻塞、異步非阻塞的各類理解,站在不一樣的角度,理解都不同。我以爲應該把同步/異步劃爲一類,用於描述任務執行模型,而把阻塞/非阻塞IO劃爲一類,用於描述IO調用模型。

以下是我根據網上的各類解釋,結合本身的思考給出的一個關於同步/異步簡單的例子:

  1. 同步
    第一天,晚飯時間到了,你餓了,你走到你老婆面前說:老婆,我餓了,快點作飯!你老婆回答:好的,我去作飯。
    你跟着老婆走到廚房,你老婆花了30分鐘的時間給你作飯。這期間,你就站在身邊,啥也不幹,就這樣注視着她,你老婆問你:你站這幹嗎?你說:我要等你作完飯再走。30分鐘後,你吃到了晚飯。

  2. 異步+輪詢
    次日,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點作飯!你老婆回答:好的,我去作飯。
    你老婆花了30分鐘的時間給你作飯,可是你再也不跟着你老婆走到廚房。這期間,你在客廳看電視,不過你實在餓得不行了,因而你每過5分鐘,就跑到廚房詢問:老婆,飯作好了沒?你老婆回答:還要一會。30分鐘後,你吃到了晚飯。

  3. 異步+事件通知 第三天,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點作飯!你老婆回答:好的,我去作飯。 你老婆花了30分鐘的時間給你作飯,你也再也不跟着你老婆走到廚房。這期間,你在客廳看電視,你知道你老婆在作飯,你也不會去催她,專心看電視。30分鐘後,你老婆喊你:飯作好了。最後你吃到了晚飯。

相關文章
相關標籤/搜索