目錄python
操做系統(OS
)統管了計算機的全部硬件,並負責爲應用程序分配和回收硬件資源。
硬件資源老是有限的,而應用程序對資源的慾望都是貪婪的。
當多個應用程序發生硬件資源爭奪時,OS
負責出面調度,保證多任務的資源分配以保證系統穩定執行。
只有CPU
能夠執行代碼,因此應用程序(任務)執行前,必須申請到CPU
資源,同一時刻,一個CPU
只能執行一個任務代碼。
計算機的CPU
數量(資源方)遠遠小於須要執行的任務數(需求方),操做系統將CPU
的資源按照時間片劃分,並根據任務類型分配,各任務輪流使用CPU
。
CPU
的執行/切換速度很是快,對於用戶而言,多任務看上去就像同時執行同樣,此稱爲併發。ios
以下是串行和併發的對比:
算法
計算機的內存、硬盤、網卡、屏幕、鍵盤等硬件提供了數據交換的場所。
OS
提供了IO
接口以實現數據交換,數據交換的過程通常不須要CPU
的參與。
IO
接口有兩種類型:
一、阻塞型IO
發生IO
(數據交換)的時候,調用線程沒法向下執行剩餘代碼,意圖佔用CPU
但不執行任何代碼,單線程阻塞型IO自身沒法支持併發
二、非阻塞型IO
發生IO
(數據交換)的時候,調用線程能夠向下執行剩餘代碼,單線程非阻塞型IO自身能夠支持併發編程
以下是阻塞型IO和非阻塞型IO的對比:
windows
根據一個任務執行期間佔用CPU
的比例來劃分,有兩種類型:
一、CPU密集型
絕大部分時間都是佔用CPU
並執行代碼,好比科學計算任務
二、IO密集型
絕大部分時間都未佔用CPU
,而是在發生IO
操做,好比網絡服務安全
OS
提供了阻塞IO和非阻塞IO兩種類型的接口,應用程序能夠自行選擇。
Socket
模塊封裝了兩種接口,Socket
模塊提供的函數默認是阻塞IO類型。
用戶能夠選擇手工切換至非阻塞IO類型,使用socketobj.setblocking(False)
切換至非阻塞IO模式。
下面將經過一個簡單的例子程序來記錄對併發的學習思考及總結。服務器
客戶端:循環接收用戶的輸入,併發送給服務器。從服務器接收反饋並打印至屏幕。
服務器:將接收到的用戶輸入,變成大寫並返回給客戶端。網絡
客戶端代碼固定,主要思考服務器端的代碼。
通常咱們會這樣寫服務端代碼:多線程
# 服務器端 import socket addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.listen(5) print('監聽中...') while True: # 連接循環 conn, client = server.accept() print(f'一個客戶端上線 -> {client}') while True: # 消息循環 try: request = conn.recv(1024) if not request: break print(f"request: {request.decode('utf-8')}") conn.send(request.upper()) except ConnectionResetError as why: print(f'客戶端丟失,緣由是: {why}') break conn.close()
客戶端代碼保持不變:併發
# 客戶端 import socket addr = ('127.0.0.1', 8080) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(addr) print(f'服務器{addr}鏈接成功') while True: # 消息循環 inp = input('>>>').strip() if not inp: continue try: client.send(inp.encode('utf-8')) response = client.recv(1024) print(response.decode('utf-8')) except ConnectionResetError as why: print(f'服務端丟失,緣由是: {why}') break client.close()
這種形式的編碼我稱爲:單線程+阻塞IO+循環串行,有以下幾個特色:
一、編碼簡單,模型簡潔,可讀性強
二、串行提供服務,用戶使用服務器必須一個一個排隊
單一線程的阻塞IO模型是沒法支持併發的,若是要支持併發,有以下兩類解決方案。
單線程阻塞IO,本質上是沒法實現併發的。由於一旦發生IO阻塞,線程就會阻塞,下方代碼不會繼續執行。若是要使用單線程阻塞IO來實現併發,須要增長線程數目或者進程數目,當某一個線程/進程發生阻塞的時候,由OS
調度至另外一個線程/進程執行。
服務器端代碼 import socket from multiprocessing import Process def task(conn): """通訊循環處理函數""" while True: try: request = conn.recv(1024) if not request: break print(f"request: {request.decode('utf-8')}") conn.send(request.upper()) except ConnectionResetError as why: print(f'客戶端丟失,緣由是: {why}') break if __name__ == '__main__': # windows下須要把新建進程寫到main中,否則會報錯 addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.listen(5) print('監聽中...') while True: conn, client = server.accept() print(f'一個客戶端上線 -> {client}') p = Process(target=task, args=(conn,)) # 開啓子進程處理與用戶的消息循環 p.start()
將服務器對用戶的消息循環操做封裝到進程中,單進程依然會發生阻塞。
進程之間的調度交由OS
負責(重要)。
進程過重,建立和銷燬進程都須要比較大的開銷,此外,一臺設備所能涵蓋的進程數量很是有限(通常就幾百左右)。
進程之間的切換開銷也不小。
當進程數小於等於CPU
核心數的時候,能夠實現真正的並行,當進程數大於CPU
核心的時候,依然以併發執行。
服務器端代碼 import socket from threading import Thread def task(conn): """通訊循環處理函數""" while True: try: request = conn.recv(1024) if not request: break print(f"request: {request.decode('utf-8')}") conn.send(request.upper()) except ConnectionResetError as why: print(f'客戶端丟失,緣由是: {why}') break if __name__ == '__main__': addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.listen(5) print('監聽中...') while True: conn, client = server.accept() print(f'一個客戶端上線 -> {client}') t = Thread(target=task, args=(conn,)) # 啓動多線程處理與用戶的消息循環 t.start()
將服務器對用戶的操做封裝到線程中,單線程中依然會發生IO阻塞。
線程之間的調度交由OS負責(重要)。
線程較輕,建立和銷燬的開銷都比較小,可是線程數量也不會太大,一臺設備通常能容納幾百至上千的線程。
注意:由於CPython的GIL的存在,使用CPython編寫的多線程代碼,只能使用一個CPU核心,換句話說,使用官方的解釋器執行Python多線程代碼,沒法並行(單進程中)。
線程之間的切換開銷比較小。
實際上,多線程的最大問題並非併發數太少,而是數據安全問題。
線程之間共享同一進程的數據,在頻繁發生IO操做的過程當中,不免須要修改共享數據,這就須要增長額外的處理,當線程數量大量增長時,如何妥善處理數據安全的問題就會變成主要困難。
一、多線程和多進程都是基於阻塞IO模式提供的併發,二者編程模型比較簡單,可讀性也很高。
二、若是使用多線程/進程的方案來提供併發,當線程/進程數量不斷增大時,系統穩定性將會降低。雖然可使用線程/進程池來提供必定的優化,但超過必定數量以後,池子發揮的效果也會愈來愈小。因此,二者都沒法支持超大規模的併發(如C10M及以上)。
三、線程/進程切換都交由OS
調度,調度策略依據OS
的算法,應用程序沒法主動控制,沒法針對任務的特性作一些必要的調度算法調整。
四、編碼思惟直接、易理解,學習曲線平緩。
五、多線程/進程的方案能夠理解爲單純的增長資源,若是要想支持超大規模的併發,單純的增長資源的行爲並不合理(資源不可能無限或者總得考慮成本以及效率,並且數量越大,原有的缺點就會越凸顯)。
六、另外一種解決方案的核心思路是:改變IO模型。
單線程非阻塞IO模型,自己就直接支持併發,爲啥?請回頭看看阻塞IO和非阻塞IO的流程圖片。
非阻塞IO接口的核心是:調用線程一旦向OS
發起IO調用,OS
就直接返回結果,所以,調用線程不會被阻塞而能夠執行下方代碼。不過也正由於不會阻塞,調用線程沒法判斷當即返回的結果是否是指望結果,因此調用線程須要增長額外的操做對返回結果進行判斷,正由於這一點,就增長了編程難度(增長的難度可不是一點啊)。
對當即返回的結果進行判斷的方案有兩種:
注意:非阻塞IO實現併發有多種解決方案,編程模型的可讀性都不高,有些方案的編程思惟甚至晦澀、難以理解、且編碼困難。
服務器端代碼 import socket addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.setblocking(False) server.listen(5) print('監聽中...') # 須要執行接收的conn對象放入此列表 recv_list = [] # 須要發送數據的conn對象和數據放入此列表 send_list = [] # 執行連接循環 while True: try: conn, client = server.accept() # 執行成功,說明返回值是conn,client print(f'一個客戶端上線 -> {client}') # 將成功連接的conn放入列表,當accept發生錯誤的時候執行conn的消息接收操做 recv_list.append(conn) except BlockingIOError: # 執行accept不成功,意味着當前未有任何鏈接 # 在下一次執行accept以前,能夠執行其餘的任務(消息接收操做) # 沒法對處於遍歷期間的接收列表執行remove操做,使用臨時列表存儲須要刪除的conn對象 del_recv_list = [] # 對已經成功連接的conn列表執行接收操做 for conn in recv_list: # 對每個conn對象,執行recv獲取request try: # recv也是非阻塞 request = conn.recv(1024) # 執行成功,就要處理request if not request: # 當前conn連接已經失效 conn.close() # 再也不接收此conn連接的消息,將失效conn加入刪除列表 del_recv_list.append(conn) # 當前conn處理完畢,切換下一個 continue # request有消息,處理,而後須要加入發送列表中 response = request.upper() # 發送列表須要存放元組,發送conn和發送的數據 send_list.append((conn, response)) except BlockingIOError: # 當前conn的數據尚未準備好,處理下一個conn continue except ConnectionResetError: # 當前conn失效,再也不接收此conn消息 conn.close() del_recv_list.append(conn) # 沒法處理髮送列表遍歷期間的remove,使用臨時列表 del_send_list = [] # 接收列表所有處理完畢,準備處理髮送列表 for item in send_list: conn = item[0] response = item[1] # 執行發送 try: conn.send(response) # 發送成功,就應該從發送列表中移除此項目 del_send_list.append(item) except BlockingIOError: # 發送緩衝區有可能已經滿了,留待下次發送處理 continue except ConnectionResetError: # 連接失效 conn.close() del_recv_list.append(conn) del_send_list.append(item) # 刪除接收列表中已經失效的conn對象 for conn in del_recv_list: recv_list.remove(conn) # 刪除發送列表中已經發送或者不須要發送的對象 for item in del_send_list: send_list.remove(item)
服務器使用單線程實現了併發。
對於accept
接收到的多個conn
對象,加入列表,並經過遍歷讀取列表、發送列表來提供多用戶訪問。
單線程中的Socket
模塊提供的IO
函數都被設置成:非阻塞IO類型。
增長了額外操做:對非阻塞調用當即返回的結果,使用了Try
來判斷是否爲指望值。
由於不知道什麼時候返回的結果是指望值,因此須要不停的發起調用,並經過Try
來判斷,即,輪詢。
兩次輪詢期間,線程能夠執行其餘任務。可是模型中也只是不停的發起輪詢,並無利用好這些時間。
編碼模型複雜,難理解。
優化:此模型中的主動輪詢的工做由程序負責,其實能夠交由OS
代爲操做。這樣的話,應用程序就不須要編寫輪詢的部分,能夠更聚焦於業務邏輯(upper()
的部分),Python
提供了Select
模塊以處理應用程序的輪詢工做。
服務器端代碼 import socket import select addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.setblocking(False) server.listen(5) print('監聽中...') # 最開始的server對象須要被監聽,一旦可讀,說明能夠執行accept read_list = [server,] # 須要監聽的寫列表,一旦wl中可寫對象處理完send,應該將它也今後列表中刪除 write_list = [] # 用於臨時存放某一個sock對象須要發送的數據 data_dic = {} # 不停的發起select查詢 while True: # 發起select查詢,嘗試獲得能夠操做的socket對象 rl, wl, xl = select.select(read_list, write_list, [], 1) # 操做可讀列表 for sock in rl: # 若是可讀列表中的對象是server,意味着有連接,則server可執行accept if sock is server: # 執行accept必定不會報錯,因此不須要try conn, client = sock.accept() # 一旦得到conn,就須要將此conn加入可讀列表 read_list.append(conn) else: # 說明可讀的對象是普通的conn對象,執行recv時要處理連接失效問題 try: request = sock.recv(1024) except (ConnectionResetError, ConnectionAbortedError): # 此連接失效 sock.close() read_list.remove(sock) else: # 還須要繼續判斷request的內容 if not request: # 說明此conn連接失效 sock.close() # 再也不監控此conn read_list.remove(sock) continue # 處理請求 response = request.upper() # 加入發送列表 write_list.append(sock) # 保存發送的數據 data_dic[sock] = response # 操做可寫列表 for sock in wl: # 執行發送操做,send也會出錯 try: sock.send(data_dic[sock]) # 發送完畢後,須要移除發送列表 write_list.remove(sock) # 須要移除發送數據 data_dic.pop(sock) except (ConnectionResetError, ConnectionAbortedError): # 此連接失效 sock.close() read_list.remove(sock) write_list.remove(sock)
服務器使用單線程實現了併發。
使用了Select
模塊以後,應用程序再也不須要編寫主動輪詢的代碼,而是將此部分工做交由Select
模塊的select
函數代爲處理。
應用程序只須要遍歷select
函數返回的可操做socket
列表,並處理相關業務邏輯便可。
雖然應用程序將輪詢工做甩給了select
,本身不用編寫代碼。不過select
函數的底層接口效率不高,使用epoll
接口能夠提高效率,此接口被封裝在Selectors
模塊中。
此外,select
函數是一個阻塞IO,在併發數不多的時候,線程大部分時間會阻塞在select
函數上。因此select
函數應該適用於隨時隨刻都有socket
準備好、大規模併發的場景。
編碼困難,模型難理解。
def select(rlist, wlist, xlist, timeout=None): # real signature unknown; restored from __doc__ """ select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist) Wait until one or more file descriptors are ready for some kind of I/O. The first three arguments are sequences of file descriptors to be waited for: rlist -- wait until ready for reading wlist -- wait until ready for writing xlist -- wait for an ``exceptional condition'' If only one kind of condition is required, pass [] for the other lists. A file descriptor is either a socket or file object, or a small integer gotten from a fileno() method call on one of those. The optional 4th argument specifies a timeout in seconds; it may be a floating point number to specify fractions of seconds. If it is absent or None, the call will never time out. The return value is a tuple of three lists corresponding to the first three arguments; each contains the subset of the corresponding file descriptors that are ready. *** IMPORTANT NOTICE *** On Windows, only sockets are supported; on Unix, all file descriptors can be used. """ pass
rlist/wlist/xlist
分爲是:須要監控的讀列表/寫列表/例外列表(第3參數暫不理解)windows
下,列表中只能放socket對
象,unix
下,能夠聽任何文件描述符None
(默認),則會永久阻塞,不然按照給定的值(單位是秒)發生超時,可使用小數如0.5秒輪詢操做,效率不高。
輪詢的工做視角是:發起者按期/不按期主動發起詢問,若是數據沒有準備好,就繼續發起詢問。若是數據準備好了,發起者就處理這些數據。
假設,調用者在第35次主動輪詢的時候發現數據準備好了,那麼意味着前34次主動輪詢的操做是沒有任何收益的。
調用者要想知道數據是否就緒,就要主動詢問,而主動詢問的效率又比較低。
這個矛盾的核心關鍵在於:如何得知數據準備就緒這件事呢?
使用回調函數+事件循環。
此種方案中,調用者不會主動發起輪詢,而是被動的等待IO操做完成,並由OS
向調用者發起準備就緒的事件通知。
# 服務器端代碼 import socket from selectors import DefaultSelector, EVENT_READ def recv_read(conn, mask): # recv回調函數 try: request = conn.recv(1024) if not request: # 意味着連接失效,再也不監控此socket conn.close() selector.unregister(conn) # 結束此回調的執行 return None # 連接正常,處理數據 conn.send(request.upper()) except (ConnectionResetError, ConnectionAbortedError): # 連接失效 conn.close() selector.unregister(conn) def accept_read(server, mask): # accept回調函數 conn, client = server.accept() print(f'一個客戶端上線{client}') # 監聽conn對象的可讀事件的發生,並註冊回調函數 selector.register(conn, EVENT_READ, recv_read) if __name__ == '__main__': addr = ('127.0.0.1', 8080) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(addr) server.setblocking(False) server.listen(5) print('監聽中...') # 獲取對象 selector = DefaultSelector() # 第一個註冊,監聽server對象的可讀事件的發生,並註冊回調函數 selector.register(server, EVENT_READ, accept_read) # 執行事件循環 while True: # 循環調用select,select是阻塞調用,返回就緒事件 events = selector.select() for key, mask in events: # 獲取此事件預先註冊的回調函數 callback = key.data # 對此事件中準備就緒的socket對象執行回調 callback(key.fileobj, mask)
服務器使用單線程實現了併發。
OS
使用了Selectors
自行選擇最優的底層接口監聽socket
對象。
程序再也不須要主動發起查詢,而是註冊回調函數。
增長事件循環,用於處理準備就緒的socket
對象,調用預先註冊的回調函數。
應用程序不用再關注如何判斷非阻塞IO的返回值,而將精力聚焦於回調函數的編寫。
pass
OS
提供的兩種IO接口,區別在於調用時是否當即返回。以下是我根據網上的各類解釋,結合本身的思考給出的一個關於同步/異步簡單的例子:
同步
第一天,晚飯時間到了,你餓了,你走到你老婆面前說:老婆,我餓了,快點作飯!你老婆回答:好的,我去作飯。
你跟着老婆走到廚房,你老婆花了30分鐘的時間給你作飯。這期間,你就站在身邊,啥也不幹,就這樣注視着她,你老婆問你:你站這幹嗎?你說:我要等你作完飯再走。30分鐘後,你吃到了晚飯。
異步+輪詢
次日,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點作飯!你老婆回答:好的,我去作飯。
你老婆花了30分鐘的時間給你作飯,可是你再也不跟着你老婆走到廚房。這期間,你在客廳看電視,不過你實在餓得不行了,因而你每過5分鐘,就跑到廚房詢問:老婆,飯作好了沒?你老婆回答:還要一會。30分鐘後,你吃到了晚飯。
異步+事件通知 第三天,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點作飯!你老婆回答:好的,我去作飯。 你老婆花了30分鐘的時間給你作飯,你也再也不跟着你老婆走到廚房。這期間,你在客廳看電視,你知道你老婆在作飯,你也不會去催她,專心看電視。30分鐘後,你老婆喊你:飯作好了。最後你吃到了晚飯。