select函數操做集合的時候有個要求,要麼集合自己是描述符,要麼他提供一個fileno()接口,返回一個描述符。python
I/O多路複用是在單線程模式下實現多線程的效果,實現一個多I/O併發的效果。看一個簡單socket例子:數據庫
import socket SOCKET_FAMILY = socket.AF_INET SOCKET_TYPE = socket.SOCK_STREAM sockServer = socket.socket(SOCKET_FAMILY, SOCKET_TYPE) sockServer.bind(('0.0.0.0', 8888)) sockServer.listen(5) while True: cliobj, addr = sockServer.accept() while True: recvdata = cliobj.recv(1024) if recvdata: print(recvdata.decode()) else: cliobj.close() break
客戶端:編程
import socket socCli = socket.socket() socCli.connect(('127.0.0.1', 8888)) while True: data = input("input str:") socCli.send(data.encode())
以上爲一個簡單的客戶端發送一個輸入信息給服務端的socket通訊的實例,在以上的例子中,服務端是一個單線程、阻塞模式的。如何實現多客戶端鏈接呢,咱們可使用多線程模式,這個固然沒有問題。 使用多線程、阻塞socket來處理的話,代碼會很直觀,可是也會有很多缺陷。它很難確保線程共享資源沒有問題。並且這種編程風格的程序在只有一個CPU的電腦上面效率更低。但若是一個用戶開啓的線程有限的狀況下,好比1024個。當第1025個客戶端鏈接是仍然會阻塞。
有沒有一種比較好的方式呢,固然有,其一是使用異步socket。 數組
這種socket只有在一些event觸發時纔會阻塞。相反,程序在異步socket上面執行一個動做,會當即被告知這個動做是否成功。程序會根據這個信 息決定怎麼繼續下面的操做因爲異步socket是非阻塞的,就沒有必要再來使用多線程。全部的工做均可以在一個線程中完成。這種單線程模式有它本身的挑 戰,但能夠成爲不少方案不錯的選擇。它也能夠結合多線程一塊兒使用:單線程使用異步socket用於處理服務器的網絡部分,多線程能夠用來訪問其餘阻塞資 源,好比數據庫。Linux的2.6內核有一系列機制來管理異 步socket,其中3個有對應的Python的API:select、poll和epoll。epoll和pool比select更好,由於 Python程序不須要檢查每個socket感興趣的event。相反,它能夠依賴操做系統來告訴它哪些socket可能有這些event。epoll 比pool更好,由於它不要求操做系統每次都去檢查python程序須要的全部socket感興趣的event。而是Linux在event發生的時候會 跟蹤到,並在Python須要的時候返回一個列表。所以epoll對於大量(成千上萬)併發socket鏈接,是更有效率和可擴展的機制
異步I/O處理模型服務器
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。網絡
select 原理數據結構
1.從用戶空間拷貝fd_set到內核空間(fd_set 過大致使佔用空間且慢);
2.註冊回調函數__pollwait;
3.遍歷全部fd,對所有指定設備作一次poll(這裏的poll是一個文件操做,它有兩個參數,一個是文件fd自己,一個是當設備還沒有就緒時調用的回調函數__pollwait,這個函數把設備本身特有的等待隊列傳給內核,讓內核把當前的進程掛載到其中)(遍歷數組中全部 fd);
4.當設備就緒時,設備就會喚醒在本身特有等待隊列中的【全部】節點,因而當前進程就獲取到了完成的信號。poll文件操做返回的是一組標準的掩碼,其中的各個位指示當前的不一樣的就緒狀態(全0爲沒有任何事件觸發),根據mask可對fd_set賦值;
5.若是全部設備返回的掩碼都沒有顯示任何的事件觸發,就去掉回調函數的函數指針,進入有限時的睡眠狀態,再恢復和不斷作poll,再做有限時的睡眠,直到其中一個設備有事件觸發爲止。
只要有事件觸發,系統調用返回,將fd_set從內核空間拷貝到用戶空間,回到用戶態,用戶就能夠對相關的fd做進一步的讀或者寫操做了。多線程
epoll 原理併發
調用epoll_create時,作了如下事情:app
內核幫咱們在epoll文件系統裏建了個file結點;
在內核cache裏建了個紅黑樹用於存儲之後epoll_ctl傳來的socket;
創建一個list鏈表,用於存儲準備就緒的事件。
調用epoll_ctl時,作了如下事情:
把socket放到epoll文件系統裏file對象對應的紅黑樹上;
給內核中斷處理程序註冊一個回調函數,告訴內核,若是這個句柄的中斷到了,就把它放到準備就緒list鏈表裏。
調用epoll_wait時,作了如下事情:
觀察list鏈表裏有沒有數據。有數據就返回,沒有數據就sleep,等到timeout時間到後即便鏈表沒數據也返回。並且,一般狀況下即便咱們要監控百萬計的句柄,大多一次也只返回不多量的準備就緒句柄而已,因此,epoll_wait僅須要從內核態copy少許的句柄到用戶態而已。
1.優缺點
select本質上是經過設置或者檢查存放fd標誌位的數據結構來進行下一步處理。這樣所帶來的缺點是(從聲明、到系統調用、到掃描、到返回後掃描):
1).單個進程可監視的fd數量被限制
2).須要維護一個用來存放大量fd的數據結構,這樣會使得用戶空間和內核空間在傳遞該結構時複製開銷大
3).對socket進行掃描時是線性掃描
4).用戶也須要對返回的 fd_set 進行遍歷
poll本質上和select沒有區別,它將用戶傳入的數組拷貝到內核空間,而後查詢每一個fd對應的設備狀態,若是設備就緒則在設備等待隊列中加入一項並繼續遍歷,若是遍歷完全部fd後沒有發現就緒設備,則掛起當前進程,直到設備就緒或者主動超時,被喚醒後它又要再次遍歷fd。這個過程經歷了屢次無謂的遍歷。
poll沒有最大鏈接數的限制,緣由是poll是基於鏈表來存儲的,可是一樣有一個缺點:
大量的fd的數組被總體複製於用戶態和內核地址空間之間,而無論這樣的複製是否是有意義。
poll還有一個特色是「水平觸發」,若是報告了fd後,沒有被處理,那麼下次poll時會再次報告該fd。
epoll支持水平觸發和邊緣觸發,最大的特色在於邊緣觸發,它只告訴進程哪些fd剛剛變爲就需態,而且只會通知一次。在前面說到的複製問題上,epoll使用mmap減小複製開銷。還有一個特色是,epoll使用「事件」的就緒通知方式,經過epoll_ctl註冊fd,一旦該fd就緒,內核就會採用相似callback的回調機制來激活該fd,epoll_wait即可以收到通知
2.支持一個進程所能打開的最大鏈接數
select 單個進程所能打開的最大鏈接數有FD_SETSIZE宏定義,其大小是32個整數的大小(在32位的機器上,大小就是32*32,同理64位機器上FD_SETSIZE爲32*64),固然咱們能夠對進行修改,而後從新編譯內核,可是性能可能會受到影響,這須要進一步的測試。
poll poll本質上和select沒有區別,可是它沒有最大鏈接數的限制,緣由是它是基於鏈表來存儲的
epoll 雖然鏈接數有上限,可是很大,1G內存的機器上能夠打開10萬左右的鏈接,2G內存的機器能夠打開20萬左右的鏈接
3 FD劇增後帶來的IO效率問題
select 由於每次調用時都會對鏈接進行線性遍歷,因此隨着FD的增長會形成遍歷速度慢的「線性降低性能問題」。
poll 同上
epoll 由於epoll內核中實現是根據每一個fd上的callback函數來實現的,只有活躍的socket纔會主動調用callback,因此在活躍socket較少的狀況下,使用epoll沒有前面二者的線性降低的性能問題,可是全部socket都很活躍的狀況下,可能會有性能問題。
4 消息傳遞方式
select 內核須要將消息傳遞到用戶空間,都須要內核拷貝動做。
poll 同上
epoll epoll經過內核和用戶空間共享一塊內存來實現的。
下面咱們對上面的socket例子進行改造,看一下select的例子:
elect 詳細解釋,用線程的IO多路複用實現一個讀寫分離的、支持多客戶端的鏈接請求
import socket import queue from select import select SERVER_IP = ('127.0.0.1', 9999) # 保存客戶端發送過來的消息,將消息放入隊列中 message_queue = {} input_list = [] output_list = [] if __name__ == "__main__": server = socket.socket() server.bind(SERVER_IP) server.listen(10) # 設置爲非阻塞 server.setblocking(False) # 初始化將服務端加入監聽列表 input_list.append(server) while True: # 開始 select 監聽,對input_list中的服務端server進行監聽 stdinput, stdoutput, stderr = select(input_list, output_list, input_list) # 循環判斷是否有客戶端鏈接進來,當有客戶端鏈接進來時select將觸發 for obj in stdinput: # 判斷當前觸發的是否是服務端對象, 當觸發的對象是服務端對象時,說明有新客戶端鏈接進來了 if obj == server: # 接收客戶端的鏈接, 獲取客戶端對象和客戶端地址信息 conn, addr = server.accept() print("Client {0} connected! ".format(addr)) # 將客戶端對象也加入到監聽的列表中, 當客戶端發送消息時 select 將觸發 input_list.append(conn) # 爲鏈接的客戶端單首創建一個消息隊列,用來保存客戶端發送的消息 message_queue[conn] = queue.Queue() else: # 因爲客戶端鏈接進來時服務端接收客戶端鏈接請求,將客戶端加入到了監聽列表中(input_list),客戶端發送消息將觸發 # 因此判斷是不是客戶端對象觸發 try: recv_data = obj.recv(1024) # 客戶端未斷開 if recv_data: print("received {0} from client {1}".format(recv_data.decode(), addr)) # 將收到的消息放入到各客戶端的消息隊列中 message_queue[obj].put(recv_data) # 將回復操做放到output列表中,讓select監聽 if obj not in output_list: output_list.append(obj) except ConnectionResetError: # 客戶端斷開鏈接了,將客戶端的監遵從input列表中移除 input_list.remove(obj) # 移除客戶端對象的消息隊列 del message_queue[obj] print("\n[input] Client {0} disconnected".format(addr)) # 若是如今沒有客戶端請求,也沒有客戶端發送消息時,開始對發送消息列表進行處理,是否須要發送消息 for sendobj in output_list: try: # 若是消息隊列中有消息,從消息隊列中獲取要發送的消息 if not message_queue[sendobj].empty(): # 從該客戶端對象的消息隊列中獲取要發送的消息 send_data = message_queue[sendobj].get() sendobj.sendall(send_data) else: # 將監聽移除等待下一次客戶端發送消息 output_list.remove(sendobj) except ConnectionResetError: # 客戶端鏈接斷開了 del message_queue[sendobj] output_list.remove(sendobj) print("\n[output] Client {0} disconnected".format(addr))
epoll實現實例
#!/usr/bin/env python import select import socket response = b'' serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.bind(('0.0.0.0', 8080)) serversocket.listen(1) # 由於socket默認是阻塞的,因此須要使用非阻塞(異步)模式。 serversocket.setblocking(0) # 建立一個epoll對象 epoll = select.epoll() # 在服務端socket上面註冊對讀event的關注。一個讀event隨時會觸發服務端socket去接收一個socket鏈接 epoll.register(serversocket.fileno(), select.EPOLLIN) try: # 字典connections映射文件描述符(整數)到其相應的網絡鏈接對象 connections = {} requests = {} responses = {} while True: # 查詢epoll對象,看是否有任何關注的event被觸發。參數「1」表示,咱們會等待1秒來看是否有event發生。 # 若是有任何咱們感興趣的event發生在此次查詢以前,這個查詢就會帶着這些event的列表當即返回 events = epoll.poll(1) # event做爲一個序列(fileno,event code)的元組返回。fileno是文件描述符的代名詞,始終是一個整數。 for fileno, event in events: # 若是是服務端產生event,表示有一個新的鏈接進來 if fileno == serversocket.fileno(): connection, address = serversocket.accept() print('client connected:', address) # 設置新的socket爲非阻塞模式 connection.setblocking(0) # 爲新的socket註冊對讀(EPOLLIN)event的關注 epoll.register(connection.fileno(), select.EPOLLIN) connections[connection.fileno()] = connection # 初始化接收的數據 requests[connection.fileno()] = b'' # 若是發生一個讀event,就讀取從客戶端發送過來的新數據 elif event & select.EPOLLIN: print("------recvdata---------") # 接收客戶端發送過來的數據 requests[fileno] += connections[fileno].recv(1024) # 若是客戶端退出,關閉客戶端鏈接,取消全部的讀和寫監聽 if not requests[fileno]: connections[fileno].close() # 刪除connections字典中的監聽對象 del connections[fileno] # 刪除接收數據字典對應的句柄對象 del requests[connections[fileno]] print(connections, requests) epoll.modify(fileno, 0) else: # 一旦完成請求已收到,就註銷對讀event的關注,註冊對寫(EPOLLOUT)event的關注。寫event發生的時候,會回覆數據給客戶端 epoll.modify(fileno, select.EPOLLOUT) # 打印完整的請求,證實雖然與客戶端的通訊是交錯進行的,但數據能夠做爲一個總體來組裝和處理 print('-' * 40 + '\n' + requests[fileno].decode()) # 若是一個寫event在一個客戶端socket上面發生,它會接受新的數據以便發送到客戶端 elif event & select.EPOLLOUT: print("-------send data---------") # 每次發送一部分響應數據,直到完整的響應數據都已經發送給操做系統等待傳輸給客戶端 byteswritten = connections[fileno].send(requests[fileno]) requests[fileno] = requests[fileno][byteswritten:] if len(requests[fileno]) == 0: # 一旦完整的響應數據發送完成,就再也不關注寫event epoll.modify(fileno, select.EPOLLIN) # HUP(掛起)event代表客戶端socket已經斷開(即關閉),因此服務端也須要關閉。 # 沒有必要註冊對HUP event的關注。在socket上面,它們老是會被epoll對象註冊 elif event & select.EPOLLHUP: print("end hup------") # 註銷對此socket鏈接的關注 epoll.unregister(fileno) # 關閉socket鏈接 connections[fileno].close() del connections[fileno] finally: # 打開的socket鏈接不須要關閉,由於Python會在程序結束的時候關閉。這裏顯式關閉是一個好的代碼習慣 epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()
轉自:http://blog.csdn.net/songfreeman/article/details/51179213