標籤(空格分隔): socket編程java
SocketServer內部使用I/O多路複用
,多線程
,多進程
來實現客戶端多併發訪問Socket服務端,while循環時使用I/O多路複用,線程或進程和client端鏈接。
本圖中,while循環就是用I/O多路複用。python
I/O多路複用指:經過一種機制,能夠監視多個描述符
,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。linux
在Linux中,有三種I/O多路複用機制:select、poll、epoll
來看下歷史:編程
select select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。 select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。 select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。 另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。 poll poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。 poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。 另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。 epoll 直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。 epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。 epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。 另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
機制名稱 | 適用平臺 | 是否for循環 | 監視描述符的數量 |
---|---|---|---|
select | 跨平臺全兼容 | 使用for循環,效率低 | 1024 |
poll | 跨平臺全兼容 | 使用循環遍歷文件描述符,效率低 | 無限制 |
epoll | 只支持UNIX | 不使用循環,使用邊緣觸發,效率高 | 無限制 |
Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路複用。
|系統|支持模式|
|---|---|
|windows|select|
|Mac|select|
|Linux|select/poll/epoll|c#
網絡操做、文件操做、終端操做等均屬於IO操做,對於windows只支持Socket操做,其餘系統支持其餘IO操做,可是沒法檢測 普通文件操做 自動上次讀取是否已經變化。windows
在socket中還有兩點咱們須要肯定:數組
咱們先來看下select的用法吧:緩存
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間) 參數: 可接受四個參數(前三個必須) 返回值:三個列表 select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。 一、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中 二、當 參數2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值2 序列中,可利用此特性,作socket讀寫分離的測試 三、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中 四、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化;當 超時時間 =1時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。
那麼咱們用python中的select看下怎麼實現I/O多路吧服務器
Client:網絡
import socket sk=socket.socket() sk.connect(('127.0.0.1',9999)) msg=sk.recv(1024).decode() print(msg) while True: inp=input('>>:') sk.sendall(bytes(inp,encoding='utf8')) sk.close()
Server:
import socket import select sk=socket.socket() #用來接收客戶端鏈接, sk.bind(('127.0.0.1',9999,)) sk.listen(5) inputs=[sk,] #暫時先監聽了sk一個對象, while True: #I/O多路複用用來監聽socket對象內部是否變化了 rlist,w,e,=select.select(inputs,[],[],1) #rlist爲監聽到的 發生變化socket對象 列表 print(len(inputs),len(rlist)) for r in rlist: #若是是新客戶端來鏈接了 if r == sk: print(r) conn,address=r.accept() #conn用來接收消息,實際上是一個socket對象,accept建立鏈接 inputs.append(conn) conn.sendall(bytes('HELLO',encoding='utf8')) else: #不然,有人給我發消息了 r.recv(1024)
server端中的select,是來
上面的代碼執行後,能夠看到rlist的變化,咱們看下下一組代碼:
client2:
sk=socket.socket() sk.connect(('127.0.0.1',9998)) msg=sk.recv(1024).decode() print(msg) while True: inp=input('>>:') sk.sendall(bytes(inp,encoding='utf8')) print(sk.recv(1024).decode()) sk.close()
Server2:
import socket import select sk=socket.socket() sk.bind(('127.0.0.1',9998,)) sk.listen(5) inputs=[sk,] outputs=[] msg={} #消息內容爲{'obj':[msg1,msg2]} while True: rlist,wlist,e,=select.select(inputs,outputs,[],1) print(len(inputs),len(rlist),len(wlist),len(outputs)) for r in rlist: if r == sk: print(r) conn,address=r.accept() msg[conn]=[] inputs.append(conn) conn.sendall(bytes('HELLO',encoding='utf8')) else: #接收消息並去除ioputs列表中斷開的鏈接 print('=====') try: res=r.recv(1024) if not res: #空消息斷開 raise Exception('鏈接斷開') else: outputs.append(r) msg[r].append(res) except Exception as e: inputs.remove(r) del msg[r] for w in wlist: #回消息 res_msg=msg[r].pop() #將消息拿出來 res_p=res_msg+'response' w.sendall(bytes(res_p,encoding='utf8')) outputs.remove(w) #處理完成後去除對象,下一次不處理,若是不處理,會有異常,由於socket已關閉了
經過這一組代碼,能看到發送消息和接受到消息時socket數量的變化,仍是消息之間的交互。
銀角在2.7中的解釋,基於select實現socket服務端:
#!/usr/bin/env python #coding:utf8 ''' 服務器的實現 採用select的方式 ''' import select import socket import sys import Queue #建立套接字並設置該套接字爲非阻塞模式 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking(0) #綁定套接字 server_address = ('localhost',10000) print >>sys.stderr,'starting up on %s port %s'% server_address server.bind(server_address) #將該socket變成服務模式 #backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5 #這個值不能無限大,由於要在內核中維護鏈接隊列 server.listen(5) #初始化讀取數據的監聽列表,最開始時但願從server這個套接字上讀取數據 inputs = [server] #初始化寫入數據的監聽列表,最開始並無客戶端鏈接進來,因此列表爲空 outputs = [] #要發往客戶端的數據 message_queues = {} while inputs: print >>sys.stderr,'waiting for the next event' #調用select監聽全部監聽列表中的套接字,並將準備好的套接字加入到對應的列表中 readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字 若是是文件呢? #監控文件句柄有某一處發生了變化 可寫 可讀 異常屬於Linux中的網絡編程 #屬於同步I/O操做,屬於I/O複用模型的一種 #rlist--等待到準備好讀 #wlist--等待到準備好寫 #xlist--等待到一種異常 #處理可讀取的套接字 ''' 若是server這個套接字可讀,則說明有新連接到來 此時在server套接字上調用accept,生成一個與客戶端通信的套接字 並將與客戶端通信的套接字加入inputs列表,下一次能夠經過select檢查鏈接是否可讀 而後在發往客戶端的緩衝中加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列 select系統調用是用來讓咱們的程序監視多個文件句柄(file descrīptor)的狀態變化的。程序會停在select這裏等待, 直到被監視的文件句柄有某一個或多個發生了狀態改變 ''' ''' 若可讀的套接字不是server套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開 若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後 將於客戶端通信的套接字加入到寫數據的監聽列表: 若是套接字可讀.但沒有接收到數據,則說明客戶端已經斷開。這時須要關閉與客戶端鏈接的套接字 進行資源清理 ''' for s in readable: if s is server: connection,client_address = s.accept() print >>sys.stderr,'connection from',client_address connection.setblocking(0)#設置非阻塞 inputs.append(connection) message_queues[connection] = Queue.Queue() else: data = s.recv(1024) if data: print >>sys.stderr,'received "%s" from %s'% \ (data,s.getpeername()) message_queues[s].put(data) if s not in outputs: outputs.append(s) else: print >>sys.stderr,'closing',client_address if s in outputs: outputs.remove(s) inputs.remove(s) s.close() del message_queues[s] #處理可寫的套接字 ''' 在發送緩衝區中取出響應的數據,發往客戶端。 若是沒有數據須要寫,則將套接字從發送隊列中移除,select中再也不監視 ''' for s in writable: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: print >>sys.stderr,' ',s,getpeername(),'queue empty' outputs.remove(s) else: print >>sys.stderr,'sending "%s" to %s'% \ (next_msg,s.getpeername()) s.send(next_msg) #處理異常狀況 for s in exceptional: for s in exceptional: print >>sys.stderr,'exception condition on',s.getpeername() inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
這個是須要注意的,I/O多路複用適用於除文件操做外的全部I/O操做,支持終端。
先看下ThreadingTCPServer如何使用:
以下代碼:
#!/usr/bin/env python # -*- coding:utf-8 -*- import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): # print self.request,self.client_address,self.server conn = self.request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('經過可能會被錄音.balabala一大推') else: conn.sendall('請從新輸入.') if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer) server.serve_forever()
依據上面的代碼,咱們能夠來查看下ThreadingTCPServer的源碼
先來看下執行順序:
MyServer
已是RequestHandlerClass
MyServer
會執行構造方法__init__,查找會在BaseRequestHandler找到構造方法self.handle()
一句,執行MyServer
中定義的hanler方法.相關源碼就不符了,直接在源碼裏找吧!最後把源碼精簡一下就是以下(copy自銀角老師blog):
import socket import threading import select def process(request, client_address): print request,client_address conn = request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') flag = True while flag: data = conn.recv(1024) if data == 'exit': flag = False elif data == '0': conn.sendall('經過可能會被錄音.balabala一大推') else: conn.sendall('請從新輸入.') sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.bind(('127.0.0.1',8002)) sk.listen(5) while True: r, w, e = select.select([sk,],[],[],1) print 'looping' if sk in r: print 'get request' request, client_address = sk.accept() t = threading.Thread(target=process, args=(request, client_address)) t.daemon = False t.start() sk.close()
如精簡代碼能夠看出,SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個線程,當前線程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。
一個應用程序,能夠多進程、也能夠多線程:
一個python腳本,默認是單進程,單線程的。
I/O操做(音頻、視頻、顯卡操做),不佔用CPU,因此:
python中有個全局解釋器鎖,叫GIL(全稱Global Interpreter Lock
),致使一個進程只能由一個線程讓CPU去調度,但在java c#可使用多個線程。
多線程,多進程的目的,是爲了提升併發,I/O密集型用多線程,計算密集型,用多進程。
咱們來看看怎麼建立多線程:
def f1(args): print(args) import threading t=threading.Thread(target=f1,args=(123,)) #建立一個線程,target表示線程執行的目標,args表示參數,是一個元組 t.start() #並不表明當前當即被執行,系統來決定 f1(111)
以上代碼結果print順序會隨機!
更多的方法:
import time def f1(args): time.sleep(5) print(args) import threading t1=threading.Thread(target=f1,args=(123,)) t1.setDaemon(True) #表示主線程不等待子線程 t.start() #並不表明當前被當即被執行,系統來決定 f1(111) t.join(2) #表示主程序執行到此,等待...直到子線程執行完畢 print(222222) print(333333)
待續...