上篇回顧:靜態服務器+壓測html
同步是指一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成。linux
異步是指不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做。而後繼續執行下面代碼邏輯,只要本身完成了整個任務就算完成了(異步通常使用狀態、通知和回調)web
PS:項目裏面通常是這樣的:(我的經驗)數據庫
await xxx()
阻塞是指調用結果返回以前,當前線程會被掛起,一直處於等待消息通知,不可以執行其餘業務(大部分代碼都是這樣的)編程
非阻塞是指在不能馬上獲得結果以前,該函數不會阻塞當前線程,而會馬上返回(繼續執行下面代碼,或者重試機制走起)緩存
PS:項目裏面重試機制爲啥通常都是3次?服務器
對於一次IO訪問,數據會先被拷貝到內核的緩衝區中,而後纔會從內核的緩衝區拷貝到應用程序的地址空間。須要經歷兩個階段:網絡
因爲存在這兩個階段,Linux產生了下面五種IO模型(以socket爲例
)架構
recvfrom
等阻塞方法時,內核進入IO的第1個階段:準備數據(內核須要等待足夠的數據再拷貝)這個過程須要等待,用戶進程會被阻塞,等內核將數據準備好,而後拷貝到用戶地址空間,內核返回結果,用戶進程才從阻塞態進入就緒態kernel
中的數據尚未準備好,那麼它並不會block
用戶進程,而是馬上返回一個error
。error
時,它就知道數據尚未準備好,因而它能夠再次發送read操做kernel
中的數據準備好了,而且又再次收到了用戶進程的system call
,那麼它立刻就將數據拷貝到了用戶內存,而後返回select
、poll
和epoll
POSIX
的aio_
系列函數)
IO read
以後,會馬上返回,不會阻塞用戶進程。signal
告訴它read操做完成了貼一下Unix編程裏面的圖:併發
開始以前我們經過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False)
)
import time import socket def select(socket_addr_list): for client_socket, client_addr in socket_addr_list: try: data = client_socket.recv(2048) if data: print(f"[來自{client_addr}的消息:]\n") print(data.decode("utf-8")) client_socket.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: # 沒有消息是觸發異常,空消息是斷開鏈接 client_socket.close() # 關閉客戶端鏈接 socket_addr_list.remove((client_socket, client_addr)) print(f"[客戶端{client_addr}已斷開鏈接,當前鏈接數:{len(socket_addr_list)}]") except Exception: pass def main(): # 存放客戶端集合 socket_addr_list = list() with socket.socket() as tcp_server: # 防止端口綁定的設置 tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() tcp_server.setblocking(False) # 服務端非阻塞 while True: try: client_socket, client_addr = tcp_server.accept() client_socket.setblocking(False) # 客戶端非阻塞 socket_addr_list.append((client_socket, client_addr)) except Exception: pass else: print(f"[來自{client_addr}的鏈接,當前鏈接數:{len(socket_addr_list)}]") # 防止客戶端斷開後出錯 if socket_addr_list: # 輪詢查看客戶端有沒有消息 select(socket_addr_list) # 引用傳參 time.sleep(0.01) if __name__ == "__main__": main()
輸出:
能夠思考下:
select和上面的有點相似,就是輪詢的過程交給了操做系統:
kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程
來個和上面等同的案例:
import select import socket def main(): with socket.socket() as tcp_server: tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() socket_info_dict = dict() socket_list = [tcp_server] # 監測列表 while True: # 劣勢:select列表數量有限制 read_list, write_list, error_list = select.select( socket_list, [], []) for item in read_list: # 服務端迎接新的鏈接 if item == tcp_server: client_socket, client_address = item.accept() socket_list.append(client_socket) socket_info_dict[client_socket] = client_address print(f"[{client_address}已鏈接,當前鏈接數:{len(socket_list)-1}]") # 客戶端發來 else: data = item.recv(2048) if data: print(data.decode("utf-8")) item.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: item.close() socket_list.remove(item) info = socket_info_dict[item] print(f"[{info}已斷開,當前鏈接數:{len(socket_list)-1}]") if __name__ == "__main__": main()
輸出和上面同樣
擴展說明:
select 函數監視的文件描述符分3類,分別是
writefds
、readfds
、和exceptfds
。調用後select函數會阻塞,直到有描述符就緒函數返回(有數據可讀、可寫、或者有except)或者超時(timeout指定等待時間,若是當即返回設爲null便可)
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024(64位=>2048)
而後Poll就出現了,就是把上限給去掉了,本質並沒變,仍是使用的輪詢
epoll在內核2.6中提出(Linux獨有),使用一個文件描述符管理多個描述符,將用戶關心的文件描述符的事件存放到內核的一個事件表中,採用監聽回調的機制,這樣在用戶空間和內核空間的copy只需一次,避免再次遍歷就緒的文件描述符列表
先來看個案例吧:(輸出和上面同樣)
import socket import select def main(): with socket.socket() as tcp_server: tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() # epoll是linux獨有的 epoll = select.epoll() # tcp_server註冊到epoll中 epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET) # key-value fd_socket_dict = dict() # 回調須要本身處理 while True: # 返回可讀寫的socket fd 集合 poll_list = epoll.poll() for fd, event in poll_list: # 服務器的socket if fd == tcp_server.fileno(): client_socket, client_addr = tcp_server.accept() fd = client_socket.fileno() fd_socket_dict[fd] = (client_socket, client_addr) # 把客戶端註冊進epoll中 epoll.register(fd, select.EPOLLIN | select.EPOLLET) else: # 客戶端 client_socket, client_addr = fd_socket_dict[fd] data = client_socket.recv(2048) print( f"[來自{client_addr}的消息,當前鏈接數:{len(fd_socket_dict)}]\n") if data: print(data.decode("utf-8")) client_socket.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: del fd_socket_dict[fd] print( f"[{client_addr}已離線,當前鏈接數:{len(fd_socket_dict)}]\n" ) # 從epoll中註銷 epoll.unregister(fd) client_socket.close() if __name__ == "__main__": main()
擴展:epoll的兩種工做模式
LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。LT模式是默認的工做模式。
LT模式同時支持阻塞和非阻塞socket。
ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。
ET是高速工做方式,只支持非阻塞socket(ET模式減小了epoll事件被重複觸發的次數,所以效率要比LT模式高)
Code提煉一下:
epoll = select.epoll()
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
epoll.unregister(fd)
PS:epoll
不必定比Select
性能高,通常都是分場景的:
其實IO多路複用還有一個kqueue
,和epoll
相似,下面的通用寫法中有包含
Selector
)通常來講:Linux下使用epoll,Win下使用select(IO多路複用會這個通用的便可)
先看看Python源代碼:
# 選擇級別:epoll|kqueue|devpoll > poll > select if 'KqueueSelector' in globals(): DefaultSelector = KqueueSelector elif 'EpollSelector' in globals(): DefaultSelector = EpollSelector elif 'DevpollSelector' in globals(): DefaultSelector = DevpollSelector elif 'PollSelector' in globals(): DefaultSelector = PollSelector else: DefaultSelector = SelectSelector
實戰案例:(可讀和可寫能夠不分開)
import socket import selectors # Linux下使用epoll,Win下使用select Selector = selectors.DefaultSelector() class Task(object): def __init__(self): # 存放客戶端fd和socket鍵值對 self.fd_socket_dict = dict() def run(self): self.server = socket.socket() self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind(('', 8080)) self.server.listen() # 把Server註冊到epoll Selector.register(self.server.fileno(), selectors.EVENT_READ, self.connected) def connected(self, key): """客戶端鏈接時處理""" client_socket, client_address = self.server.accept() fd = client_socket.fileno() self.fd_socket_dict[fd] = (client_socket, client_address) # 註冊一個客戶端讀的事件(服務端去讀消息) Selector.register(fd, selectors.EVENT_READ, self.call_back_reads) print(f"{client_address}已鏈接,當前鏈接數:{len(self.fd_socket_dict)}") def call_back_reads(self, key): """客戶端可讀時處理""" # 一個fd只能註冊一次,監測可寫的時候須要把可讀給註銷 Selector.unregister(key.fd) client_socket, client_address = self.fd_socket_dict[key.fd] print(f"[來自{client_address}的消息:]\n") data = client_socket.recv(2048) if data: print(data.decode("utf-8")) # 註冊一個客戶端寫的事件(服務端去發消息) Selector.register(key.fd, selectors.EVENT_WRITE, self.call_back_writes) else: client_socket.close() del self.fd_socket_dict[key.fd] print(f"{client_address}已斷開,當前鏈接數:{len(self.fd_socket_dict)}") def call_back_writes(self, key): """客戶端可寫時處理""" Selector.unregister(key.fd) client_socket, client_address = self.fd_socket_dict[key.fd] client_socket.send(b"ok") Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads) def main(): t = Task() t.run() while True: ready = Selector.select() for key, obj in ready: # 須要本身回調 call_back = key.data call_back(key) if __name__ == "__main__": main()
Code提煉一下:
Selector = selectors.DefaultSelector()
Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
Selector.unregister(key.fd)
業餘拓展:
select, iocp, epoll,kqueue及各類I/O複用機制 https://blog.csdn.net/shallwake/article/details/5265287 kqueue用法簡介 http://www.cnblogs.com/luminocean/p/5631336.html
下級預估:協程篇 or 網絡深刻篇