python之socket編程(二)

標籤(空格分隔): socket編程java


SocketServer解析

SocketServer內部使用I/O多路複用多線程多進程來實現客戶端多併發訪問Socket服務端,while循環時使用I/O多路複用,線程或進程和client端鏈接。
socketserver
本圖中,while循環就是用I/O多路複用。python

咱們先來看看I/O多路複用

I/O多路複用指:經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。linux

linux中的I/O多路複用

在Linux中,有三種I/O多路複用機制:select、poll、epoll
來看下歷史:編程

select
 
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
 
poll
 
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。
 
epoll
 
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。
epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

三種機制的比較

機制名稱 適用平臺 是否for循環 監視描述符的數量
select 跨平臺全兼容 使用for循環,效率低 1024
poll 跨平臺全兼容 使用循環遍歷文件描述符,效率低 無限制
epoll 只支持UNIX 不使用循環,使用邊緣觸發,效率高 無限制

Python中的I/O多路

Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路複用。
|系統|支持模式|
|---|---|
|windows|select|
|Mac|select|
|Linux|select/poll/epoll|c#

網絡操做、文件操做、終端操做等均屬於IO操做,對於windows只支持Socket操做,其餘系統支持其餘IO操做,可是沒法檢測 普通文件操做 自動上次讀取是否已經變化。windows

在socket中還有兩點咱們須要肯定:數組

  • I/O操做是不佔用CPU的
  • I/O多路複用用來監聽socket對象內部是否變化了(鏈接或收發消息時候,socket對象內部發生變化)

咱們先來看下select的用法吧:緩存

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間)
 
參數: 可接受四個參數(前三個必須)
返回值:三個列表
 
select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。
一、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中
二、當 參數2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值2 序列中,可利用此特性,作socket讀寫分離的測試
三、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中
四、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化;當 超時時間 =1時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。

那麼咱們用python中的select看下怎麼實現I/O多路吧服務器

多路複用實現socket僞併發

Client:網絡

import socket

sk=socket.socket()
sk.connect(('127.0.0.1',9999))

msg=sk.recv(1024).decode()
print(msg)

while True:
    inp=input('>>:')
    sk.sendall(bytes(inp,encoding='utf8'))
sk.close()

Server:

import socket
import select

sk=socket.socket()      #用來接收客戶端鏈接,
sk.bind(('127.0.0.1',9999,))
sk.listen(5)
inputs=[sk,]  #暫時先監聽了sk一個對象,
while True:
    #I/O多路複用用來監聽socket對象內部是否變化了
    rlist,w,e,=select.select(inputs,[],[],1)     #rlist爲監聽到的 發生變化socket對象 列表
    print(len(inputs),len(rlist))
    for r in rlist:
        #若是是新客戶端來鏈接了
        if r == sk:
            print(r)
            conn,address=r.accept()     #conn用來接收消息,實際上是一個socket對象,accept建立鏈接
            inputs.append(conn)
            conn.sendall(bytes('HELLO',encoding='utf8'))
        else:
            #不然,有人給我發消息了
            r.recv(1024)

server端中的select,是來

  • 監聽(服務端)對象,若是sk發生變化,表示有客戶端來了,此時rlist值爲[sk,]
  • 監聽conn對象,若是有消息發送過來,表示客戶端有消息發送過來了,此時rlist值爲[客戶端,]

上面的代碼執行後,能夠看到rlist的變化,咱們看下下一組代碼:
client2:

sk=socket.socket()
sk.connect(('127.0.0.1',9998))

msg=sk.recv(1024).decode()
print(msg)

while True:
    inp=input('>>:')
    sk.sendall(bytes(inp,encoding='utf8'))
    print(sk.recv(1024).decode())
sk.close()

Server2:

import socket
import select

sk=socket.socket()
sk.bind(('127.0.0.1',9998,))
sk.listen(5)
inputs=[sk,]
outputs=[]
msg={}  #消息內容爲{'obj':[msg1,msg2]}

while True:
    rlist,wlist,e,=select.select(inputs,outputs,[],1)
    print(len(inputs),len(rlist),len(wlist),len(outputs))
    for r in rlist:
        if r == sk:
            print(r)
            conn,address=r.accept()
            msg[conn]=[]
            inputs.append(conn)
            conn.sendall(bytes('HELLO',encoding='utf8'))
        else:   #接收消息並去除ioputs列表中斷開的鏈接
            print('=====')
            try:
                res=r.recv(1024)
                if not res: #空消息斷開
                    raise Exception('鏈接斷開')
                else:
                    outputs.append(r)
                    msg[r].append(res)
            except Exception as e:
                inputs.remove(r)
                del msg[r]
    for w in wlist: #回消息
        res_msg=msg[r].pop()    #將消息拿出來
        res_p=res_msg+'response'
        w.sendall(bytes(res_p,encoding='utf8'))
        outputs.remove(w) #處理完成後去除對象,下一次不處理,若是不處理,會有異常,由於socket已關閉了

經過這一組代碼,能看到發送消息和接受到消息時socket數量的變化,仍是消息之間的交互。

銀角在2.7中的解釋,基於select實現socket服務端:

#!/usr/bin/env python
#coding:utf8

'''
 服務器的實現 採用select的方式
'''

import select
import socket
import sys
import Queue

#建立套接字並設置該套接字爲非阻塞模式

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#綁定套接字
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s port %s'% server_address
server.bind(server_address)

#將該socket變成服務模式
#backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5
#這個值不能無限大,由於要在內核中維護鏈接隊列

server.listen(5)

#初始化讀取數據的監聽列表,最開始時但願從server這個套接字上讀取數據
inputs = [server]

#初始化寫入數據的監聽列表,最開始並無客戶端鏈接進來,因此列表爲空

outputs = []

#要發往客戶端的數據
message_queues = {}
while inputs:
    print >>sys.stderr,'waiting for the next event'
    #調用select監聽全部監聽列表中的套接字,並將準備好的套接字加入到對應的列表中
    readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字  若是是文件呢? 
    #監控文件句柄有某一處發生了變化 可寫 可讀  異常屬於Linux中的網絡編程 
    #屬於同步I/O操做,屬於I/O複用模型的一種
    #rlist--等待到準備好讀
    #wlist--等待到準備好寫
    #xlist--等待到一種異常
    #處理可讀取的套接字

    '''
        若是server這個套接字可讀,則說明有新連接到來
        此時在server套接字上調用accept,生成一個與客戶端通信的套接字
        並將與客戶端通信的套接字加入inputs列表,下一次能夠經過select檢查鏈接是否可讀
        而後在發往客戶端的緩衝中加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列
        select系統調用是用來讓咱們的程序監視多個文件句柄(file descrīptor)的狀態變化的。程序會停在select這裏等待,
        直到被監視的文件句柄有某一個或多個發生了狀態改變
        '''

    '''
        若可讀的套接字不是server套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開
        若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後
        將於客戶端通信的套接字加入到寫數據的監聽列表:
        若是套接字可讀.但沒有接收到數據,則說明客戶端已經斷開。這時須要關閉與客戶端鏈接的套接字
        進行資源清理
        '''
        
    for s in readable: 
        if s is server:
            connection,client_address = s.accept()
            print >>sys.stderr,'connection from',client_address
            connection.setblocking(0)#設置非阻塞
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print >>sys.stderr,'received "%s" from %s'% \
                (data,s.getpeername())
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print >>sys.stderr,'closing',client_address
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
                    
    #處理可寫的套接字
    '''
        在發送緩衝區中取出響應的數據,發往客戶端。
        若是沒有數據須要寫,則將套接字從發送隊列中移除,select中再也不監視
        '''

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()

        except Queue.Empty:
            print >>sys.stderr,'  ',s,getpeername(),'queue empty'
            outputs.remove(s)
        else:
            print >>sys.stderr,'sending "%s" to %s'% \
            (next_msg,s.getpeername())
            s.send(next_msg)



    #處理異常狀況

    for s in exceptional:
        for s in exceptional:
            print >>sys.stderr,'exception condition on',s.getpeername()
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
            del message_queues[s]

I/O多路複用,適用於全部的I/O操做

這個是須要注意的,I/O多路複用適用於除文件操做外的全部I/O操做,支持終端。

ThreadingTCPServer

ThreadingTCPServer基礎:

先看下ThreadingTCPServer如何使用:

  • 建立一個繼承自 SocketServer.BaseRequestHandler 的類
  • 類中必須定義一個名稱爲 handle 的方法
  • 啓動ThreadingTCPServer

以下代碼:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()

依據上面的代碼,咱們能夠來查看下ThreadingTCPServer的源碼

ThreadingTCPServer的源碼

先來看下執行順序:

  1. 建立一個SocketServer.ThreadingTCPServer對象(綁定IP和端口,和一個MyServer的類)
  2. 建立對象後先執行ThreadingTCPServer的__init__構造方法,一直查找到BaseServer的__init__構造方法
  3. 對象建立完成後,對象執行serve_forever的方法,查找serve_forever,查找到BaseServer的serve_forever方法
  4. 查看serve_forever的的方法,執行_handle_request_noblock方法
  5. _handle_request_noblock方法中調用process_request方法,查找對象的process_request,在ThreadingMixIn的方法中,也是建立多線程的時候
  6. process_request方法中調用了ThreadingMixIn中的process_request_thread方法
  7. process_request_thread方法中又調用了BaseServer中的finish_request方法.
  8. finish_request方法中調用了RequestHandlerClass,進行查找,在剛開始進行查找的時候,咱們會發現,建立對象時,參數MyServer已是RequestHandlerClass
  9. MyServer會執行構造方法__init__,查找會在BaseRequestHandler找到構造方法
  10. 在BaseRequestHandler的構造方法中,會有self.handle()一句,執行MyServer中定義的hanler方法.

相關源碼就不符了,直接在源碼裏找吧!最後把源碼精簡一下就是以下(copy自銀角老師blog):

import socket
import threading
import select


def process(request, client_address):
    print request,client_address
    conn = request
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    flag = True
    while flag:
        data = conn.recv(1024)
        if data == 'exit':
            flag = False
        elif data == '0':
            conn.sendall('經過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請從新輸入.')

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(('127.0.0.1',8002))
sk.listen(5)

while True:
    r, w, e = select.select([sk,],[],[],1)
    print 'looping'
    if sk in r:
        print 'get request'
        request, client_address = sk.accept()
        t = threading.Thread(target=process, args=(request, client_address))
        t.daemon = False
        t.start()

sk.close()

如精簡代碼能夠看出,SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個線程,當前線程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。

多進程、多線程

一個應用程序,能夠多進程、也能夠多線程:
一個python腳本,默認是單進程,單線程的。
I/O操做(音頻、視頻、顯卡操做),不佔用CPU,因此:

  • 對於I/O密集型操做,不會佔用CPU,使用多線程操做,能提升效率
  • 對於計算密集型操做,因爲佔用CPU,使用多進程操做,能提升效率

python中有個全局解釋器鎖,叫GIL(全稱Global Interpreter Lock),致使一個進程只能由一個線程讓CPU去調度,但在java c#可使用多個線程。
多線程,多進程的目的,是爲了提升併發,I/O密集型用多線程,計算密集型,用多進程。

咱們來看看怎麼建立多線程:

def f1(args):
     print(args)
import threading
t=threading.Thread(target=f1,args=(123,))    #建立一個線程,target表示線程執行的目標,args表示參數,是一個元組
t.start()     #並不表明當前當即被執行,系統來決定
f1(111)

以上代碼結果print順序會隨機!

更多的方法:

  • start 不表明當前線程並不會當即被執行,而是等待CPU調度
  • setName 爲線程設置名稱
  • setDaemon(True) True表示主線程不等待子線程,執行完本身的任務後,自動關閉,子線程有可能未執行完畢。(默認狀況下,主線程要等待子線程執行完畢後再關閉主線程),True:後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止;False:前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止
  • join(2) 若是不想讓線程併發的操做,表示主線程到此等待,等待直到子線程執行完畢。若是加上參數,表示主線程在此最多等幾秒。該方法使得多線程變得無心義
  • run 線程被cpu調度後自動執行線程對象的run方法
import time

def f1(args):
    time.sleep(5)
    print(args)

import threading
t1=threading.Thread(target=f1,args=(123,))
t1.setDaemon(True)  #表示主線程不等待子線程
t.start()     #並不表明當前被當即被執行,系統來決定
f1(111)

t.join(2) #表示主程序執行到此,等待...直到子線程執行完畢
print(222222)
print(333333)

待續...

相關文章
相關標籤/搜索