Socket

socket

 

socket

socket一般也稱做"套接字",用於描述IP地址和端口,是一個通訊鏈的句柄,應用程序一般經過"套接字"向網絡發出請求或者應答網絡請求。html

socket起源於Unix,而Unix/Linux基本哲學之一就是「一切皆文件」python

  對於文件用【打開】【讀寫】【關閉】模式來操做。react

  socket就是該模式的一個實現,socket便是一種特殊的文件,一些socket函數就是對其進行的操做(讀/寫IO、打開、關閉)程序員

基本的python socket模塊web

Python 提供了兩個基本的 socket 模塊。第一個是 Socket,它提供了標準的 BSD Sockets API。第二個是 SocketServer,它提供了服務器中心類,能夠簡化網絡服務器的開發。Python 使用一種異步的方式來實現這種功能,您能夠提供一些插件類來處理服務器中應用程序特有的任務。數據庫

                                                                          類/模塊                                                             說明                                                                 
socket 低層網絡接口(每一個 BSD API)
socketserv 提供簡化網絡服務器開發的類

 

 

 

socket 模塊

Socket 模塊提供了 UNIX® 程序員所熟悉的基本網絡服務(也稱爲 BSD API)。這個模塊中提供了在構建 socket 服務器和客戶機時所須要的全部功能。編程

這個 API 與標準的 C API 之間的區別在於它是面向對象的。在 C 中,socket 描述符是從 socket 調用中得到的,而後會做爲一個參數傳遞給 BSD API 函數。在 Python 中,socket 方法會嚮應用 socket 方法的對象返回一個 socket 對象。數組

類方法實例方法 之間的區別在於,實例方法須要有一個 socket 實例(從 socket 返回)才能執行,而類方法則不須要。緩存

socket編程思路:

一、tcp服務端服務器

  • 建立套接字,綁定套接字到本地ip與端口
  • 開始監聽鏈接
  • 進入循環體,不斷接收客戶端發來的鏈接請求
  • 接收到客戶端發來的數據,並給客戶端發送一個數據證實服務端已經收到了客戶端的請求
  • 傳輸完畢後,關閉套接字

二、tcp客戶端

  • 建立套接字,鏈接遠程地址
  • 鏈接後發送數據和接收數據
  • 數據傳輸完畢,關閉套接字

建立一個socket鏈接

一、服務端

二、客戶端

socket 聊天機器人

服務端與客戶端之間的相互交流

#!usr/bin/env/python
# -*- coding:utf-8 -*-
import socket

# 建立一個socket對象
sk = socket.socket()

# 綁定容許鏈接的IP地址和端口
sk.bind(('127.0.0.1', 8000, ))

# 服務端容許起來以後,限制客戶端鏈接的數量,若是超過五個鏈接,第六個鏈接來的時候直接斷開第六個。
sk.listen(5)

while True:
    # 會一直阻塞,等待接收客戶端的請求,若是有客戶端鏈接會獲取兩個值,conn=建立的鏈接,address=客戶端的IP和端口
    conn, address = sk.accept()

    # 當用戶鏈接過來的時候就給用戶發送一條信息,在Python3裏面須要把發送的內容轉換爲字節
    conn.sendall(bytes("你好,歡迎登錄!", encoding="utf-8"))

    while True:
        # 輸出等待客戶端發送內容
        print("正在等待Client輸入內容......")
        # 接收客戶端發送過來的內容
        ret_bytes = conn.recv(1024)
        # 轉換成字符串類型
        ret_str = str(ret_bytes, encoding="utf-8")
        # 輸出用戶發送過來的內容
        print(ret_str)

        # 若是用戶輸入的是q
        if ret_str == "q":
            # 則退出循環,等待下個用戶輸入
            break
        # 給客戶端發送內容
        inp = input("Service請輸入要發送的內容>>> ")
        conn.sendall(bytes(inp, encoding="utf-8"))
服務端
#!usr/bin/env/python
# -*- coding:utf-8 -*-
import socket

# 建立一個socket對象
obj = socket.socket()

# 制定服務端的IP地址和端口
obj.connect(('127.0.0.1', 8000, ))

# 阻塞,等待服務端發送內容,接受服務端發送過來的內容,最大接受1024字節
ret_bytes = obj.recv(1024)

# 由於服務端發送過來的是字節,因此咱們須要把字節轉換爲字符串進行輸出
ret_str = str(ret_bytes, encoding="utf-8")

# 輸出內容
print(ret_str)

while True:
    # 當進入鏈接的時候,提示讓用戶輸入內容
    inp = input("Client請輸入要發送的內容>>> ")
    # 若是輸出q則退出
    if inp == "q":
        # 把q發送給服務端
        obj.sendall(bytes(inp, encoding="utf-8"))
        # 退出當前while
        break
    else:
        # 不然就把用戶輸入的內容發送給用戶
        obj.sendall(bytes(inp, encoding="utf-8"))
        # 等待服務端回答
        print("正在等待Server輸入內容......")
        # 獲取服務端發送過來的結果
        ret = str(obj.recv(1024), encoding="utf-8")
        # 輸出結果
        print(ret)

# 鏈接完成以後關閉連接
obj.close()
客戶端

 有進度條的文件傳輸

import socket
# 建立一個socket對象
sk = socket.socket()
# 容許鏈接的IP和端口
sk.bind(('127.0.0.1', 8000))
# 最大鏈接數
sk.listen(5)
while True:
    # 會一直阻塞,等待接收客戶端的請求,若是有客戶端鏈接會獲取兩個值,conn=建立的鏈接,address=客戶端的IP和端口
    conn, address = sk.accept()

    # 客戶端發送過來的文件大小
    file_size = str(conn.recv(1024),encoding="utf-8")

    # 給客戶端發送已經收到文件大小
    conn.sendall(bytes("ack", encoding="utf-8"))
    # 文件大小轉換成int類型
    total_size = int(file_size)
    # 建立一個默認的值
    has_recv = 0
    # 打開一個新文件,以wb模式打開
    f = open('new_my.jpg', 'wb')
    # 進入循環
    while True:
        # 若是傳送過來的大小等於文件總大小,那麼就退出
        if total_size == has_recv:
            break
        # 接受客戶端發送過來的內容
        data = conn.recv(1024)
        # 寫入到文件當中
        f.write(data)
        # 如今的大小加上客戶端發送過來的大小
        has_recv += len(data)

    # 關閉
    f.close()
服務端
#!usr/bin/env/python
# -*- coding:utf-8 -*-
import socket
import os
import time
import sys
# 建立一個socket對象
obj = socket.socket()
# 服務端的IP和端口
obj.connect(('127.0.0.1', 8000))
# 用os模塊獲取要傳送的文件總大小
size = os.stat("my.jpg").st_size
# 把文件總大小發送給服務端
obj.sendall(bytes(str(size), encoding="utf-8"))
# 接受服務端返回的信息
obj.recv(1024)
#文件傳輸以前has_size = 0
has_size = 0

# 以rb的模式打開一個要發送的文件d
with open("my.jpg", "rb") as f:
    for line in f:# 循環文件的全部內容
        has_size += len(line)#has_size的值等於文件中每一行的值加起來
        time.sleep(0.01)#因爲文件較小,要看到進度條的效果加一個執行的時間間隔
        sys.stdout.write("\r")#每一次都清空上次的寫入內容
        sys.stdout.write("%s%% | %s"%(int(has_size/size*100), int(has_size/size*50)*""))
        sys.stdout.flush()#強制刷新緩衝區
        # 發送給服務端
        obj.sendall(line)#將文件的內容一行行的發送到服務端
    sys.stdout.write("傳輸完成\n")

# 關閉退出
obj.close()
客戶端

結果展現

多用戶登陸

利用socketserver實現多併發嗎,socketserver內部會調用socket模塊進行功能上的實現

import socketserver

class MyServer(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request#創建鏈接
        conn.sendall(bytes("你好,歡迎登錄!", encoding="utf-8"))#只要鏈接一創建服務端直接給客戶端一個迴應
        while True:
            # 輸出等待客戶端發送內容
            print("正在等待Client輸入內容......")
            # 接收客戶端發送過來的內容
            ret_bytes = conn.recv(1024)
            # 轉換成字符串類型
            ret_str = str(ret_bytes, encoding="utf-8")
            # 輸出用戶發送過來的內容
            print(ret_str)
            # 若是用戶輸入的是q
            if ret_str == "q":
                # 則退出循環,等待下個用戶輸入
                break
            # 給客戶端發送內容
            inp = input("Service請輸入要發送的內容>>> ")
            conn.sendall(bytes(inp, encoding="utf-8"))

if __name__ == "__main__":
    server = socketserver.ThreadingTCPServer(('127.0.0.1', 999, ), MyServer)
    server.serve_forever()
服務端
import socket

# 建立一個socket對象
obj = socket.socket()

# 制定服務端的IP地址和端口
obj.connect(('127.0.0.1', 999, ))

# 阻塞,等待服務端發送內容,接受服務端發送過來的內容,最大接受1024字節
ret_bytes = obj.recv(1024)

# 由於服務端發送過來的是字節,因此咱們須要把字節轉換爲字符串進行輸出
ret_str = str(ret_bytes, encoding="utf-8")

# 輸出內容
print(ret_str)

while True:
    # 當進入鏈接的時候,提示讓用戶輸入內容
    inp = input("Client請輸入要發送的內容>>> ")
    # 若是輸出q則退出
    if inp == "q":
        # 把q發送給服務端
        obj.sendall(bytes(inp, encoding="utf-8"))
        # 退出當前while
        break
    else:
        # 不然就把用戶輸入的內容發送給用戶
        obj.sendall(bytes(inp, encoding="utf-8"))
        # 等待服務端回答
        print("正在等待Server輸入內容......")
        # 獲取服務端發送過來的結果
        ret = str(obj.recv(1024), encoding="utf-8")
        # 輸出結果
        print(ret)

# 鏈接完成以後關閉連接
obj.close()
客戶端

文件上傳以及斷點續傳的功能:

import socket 
  
server_socket = socket.socket() 
server_socket.bind(('127.0.0.1', 8000,)) 
server_socket.listen(4) 
  
conn, address = server_socket.accept() 
toatl_size =int(str(conn.recv(1024), encoding='utf-8')) 
conn.sendall(bytes(str(toatl_size), encoding='utf-8'))  # 解決粘包的問題 
  
have_recv = 0
f = open('22.txt', 'wb') 
while True: 
    if have_recv == toatl_size: 
        break
    else: 
        ret = conn.recv(1024) 
        f.write(ret) 
        have_recv += len(ret) 
  
f.close() 
服務端
import socket 
import os 
client_socket = socket.socket() 
client_socket.connect(('127.0.0.1', 8000,)) 
  
file_size = os.stat('1.txt').st_size       # 由客戶端傳文件的時候,客戶端告訴服務端文件大小 
client_socket.sendall(bytes(str(file_size), encoding='utf-8'))   # 發的時候先保存在緩衝區,可能會出現粘包 
  
data = client_socket.recv(1024)                                  # 接受下服務端發來的數據,確認收到了發過去的文件大小 
have_rcv = 0
  
if int(str(data, encoding='utf-8')) == file_size: 
    with open('1.txt', 'rb') as f: 
        for line in f: 
            client_socket.sendall(line) 
            have_rcv += len(line) 
            print('中場休息,等下再傳') 
            break                               # 傳送一行後中斷,模擬斷點 
  
    with open('1.txt', 'rb') as f:              # 模擬斷點續傳 
        f.seek(have_rcv)                         # 文件指針指到已經上傳完成的部分 
        for line in f: 
            client_socket.sendall(line) 
  
  
client_socket.close() 
客戶端

I/O多路複用
I/O多路複用:經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。

Linux

Linux中的 select,poll,epoll 都是IO多路複用的機制。

複製代碼
select
  
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。 
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。 
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。 
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。 
  
poll  
  
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。 
poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。 
另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。 
  
epoll  
  
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。 
epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。 
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。 
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。 
複製代碼

Python

Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路複用。

?
1
2
3
4
5
6
Windows Python:
     提供: select
Mac Python:
     提供: select
Linux Python:
     提供: select、poll、epoll

 對於select方法:

?
1
2
3
4
5
6
7
8
9
10
11
句柄列表 11 , 句柄列表 22 , 句柄列表 33 = select.select(句柄序列 1 , 句柄序列 2 , 句柄序列 3 , 超時時間)
   
參數: 可接受四個參數(前三個必須)
返回值:三個列表
   
select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。
1 、當 參數 1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值 1 序列中
2 、當 參數 2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值 2 序列中
3 、當 參數 3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值 3 序列中
4 、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化
    當 超時時間 = 1 時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import select
import threading
import sys

while True:
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
    if sys.stdin in readable:
        print 'select get stdin',sys.stdin.readline()
利用select監聽終端操做實例
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import select

sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0)

inputs = [sk1,]

while True:
    readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
    for r in readable_list:
        # 當客戶端第一次鏈接服務端時
        if sk1 == r:
            print 'accept'
            request, address = r.accept()
            request.setblocking(0)
            inputs.append(request)
        # 當客戶端鏈接上服務端以後,再次發送數據時
        else:
            received = r.recv(1024)
            # 當正常接收客戶端發送的數據時
            if received:
                print 'received data:', received
            # 當客戶端關閉程序時
            else:
                inputs.remove(r)

sk1.close()
利用select實現僞同時處理多個Socket客戶端請求:服務端
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port)

while True:
    inp = raw_input('please input:')
    sk.sendall(inp)
sk.close()
利用select實現僞同時處理多個Socket客戶端請求:客戶端

此處的Socket服務端相比與原生的Socket,他支持當某一個請求再也不發送數據時,服務器端不會等待而是能夠去處理其餘請求的數據。可是,若是每一個請求的耗時比較長時,select版本的服務器端也沒法完成同時操做。

#!/usr/bin/env python
#coding:utf8

'''
 服務器的實現 採用select的方式
'''

import select
import socket
import sys
import Queue

#建立套接字並設置該套接字爲非阻塞模式

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#綁定套接字
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s port %s'% server_address
server.bind(server_address)

#將該socket變成服務模式
#backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5
#這個值不能無限大,由於要在內核中維護鏈接隊列

server.listen(5)

#初始化讀取數據的監聽列表,最開始時但願從server這個套接字上讀取數據
inputs = [server]

#初始化寫入數據的監聽列表,最開始並無客戶端鏈接進來,因此列表爲空

outputs = []

#要發往客戶端的數據
message_queues = {}
while inputs:
    print >>sys.stderr,'waiting for the next event'
    #調用select監聽全部監聽列表中的套接字,並將準備好的套接字加入到對應的列表中
    readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字  若是是文件呢? 
    #監控文件句柄有某一處發生了變化 可寫 可讀  異常屬於Linux中的網絡編程 
    #屬於同步I/O操做,屬於I/O複用模型的一種
    #rlist--等待到準備好讀
    #wlist--等待到準備好寫
    #xlist--等待到一種異常
    #處理可讀取的套接字

    '''
        若是server這個套接字可讀,則說明有新連接到來
        此時在server套接字上調用accept,生成一個與客戶端通信的套接字
        並將與客戶端通信的套接字加入inputs列表,下一次能夠經過select檢查鏈接是否可讀
        而後在發往客戶端的緩衝中加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列
        select系統調用是用來讓咱們的程序監視多個文件句柄(file descrīptor)的狀態變化的。程序會停在select這裏等待,
        直到被監視的文件句柄有某一個或多個發生了狀態改變
        '''

    '''
        若可讀的套接字不是server套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開
        若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後
        將於客戶端通信的套接字加入到寫數據的監聽列表:
        若是套接字可讀.但沒有接收到數據,則說明客戶端已經斷開。這時須要關閉與客戶端鏈接的套接字
        進行資源清理
        '''
        
    for s in readable: 
        if s is server:
            connection,client_address = s.accept()
            print >>sys.stderr,'connection from',client_address
            connection.setblocking(0)#設置非阻塞
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print >>sys.stderr,'received "%s" from %s'% \
                (data,s.getpeername())
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print >>sys.stderr,'closing',client_address
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
                    
    #處理可寫的套接字
    '''
        在發送緩衝區中取出響應的數據,發往客戶端。
        若是沒有數據須要寫,則將套接字從發送隊列中移除,select中再也不監視
        '''

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()

        except Queue.Empty:
            print >>sys.stderr,'  ',s,getpeername(),'queue empty'
            outputs.remove(s)
        else:
            print >>sys.stderr,'sending "%s" to %s'% \
            (next_msg,s.getpeername())
            s.send(next_msg)



    #處理異常狀況

    for s in exceptional:
        for s in exceptional:
            print >>sys.stderr,'exception condition on',s.getpeername()
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
            del message_queues[s]
基於select實現socket服務端

 

python 實現多個端口監聽,並實現讀寫分離:

import  select 
import socket 
  
sk = socket.socket() 
sk.bind(('127.0.0.1', 8000,)) 
sk.listen(5) 
  
inputs = [sk] 
outputs = [] 
message_dict = {}    # 存儲每一個客戶端接受到的信息 
  
while True: 
    r_list, w_list, e_list = select.select(inputs, outputs, [], 1) 
    print(len(inputs)) 
    for sk_or_conn in r_list: 
        if sk_or_conn == sk:          # 一旦有客戶端鏈接,sk發生變化 
            conn, addr = sk.accept() 
            inputs.append(conn)         # 將客戶端的conn添加到監聽列表 
            message_dict[conn] = []      # 以客戶端的conn爲key,生成一個新列表來存儲接受到的信息 
            # conn.sendall(bytes('hello', encoding='utf-8')) 
        else: 
            try: 
                ret = sk_or_conn.recv(1024) # 監聽的列表裏面若是有客戶端發送信息, 
  
            except Exception as ex: 
                inputs.remove(sk_or_conn)   # 若是客戶端斷開的話,從監聽的列表裏面移除 
            else: 
                data = str(ret, encoding='utf-8') 
                message_dict[sk_or_conn].append(data)  # 將監聽的信息放到字典裏 
                outputs.append(sk_or_conn)         # 放到outputs裏面 
                # sk_or_conn.sendall(bytes(data+'hello', encoding='utf-8')) 
  
    for conn in w_list:                    # 單獨實現寫的操做 
        message = message_dict[conn][0] 
        conn.sendall(bytes(message+'hello', encoding='utf-8')) 
        del message_dict[conn][0] 
        outputs.remove(conn) 
服務端
import socket 
client_socket = socket.socket() 
client_socket.connect(('127.0.0.1', 8000,)) 
while True: 
    inp = input('>>>') 
    client_socket.sendall(bytes(inp, encoding='utf-8')) 
    data = str(client_socket.recv(1024), encoding='utf-8') 
    print(data) 
client_socket.close() 
  
  
#client.py2 
  
import socket 
client_socket = socket.socket() 
client_socket.connect(('127.0.0.1', 8000,)) 
while True: 
    inp = input('>>>') 
    client_socket.sendall(bytes(inp, encoding='utf-8')) 
    data = str(client_socket.recv(1024), encoding='utf-8') 
    print(data) 
client_socket.close() 
客戶端

SocketServer模塊

SocketServer內部使用 IO多路複用 以及 「多線程」 和 「多進程」 ,從而實現併發處理多個客戶端請求的Socket服務端。即:每一個客戶端請求鏈接到服務器時,Socket服務端都會在服務器是建立一個「線程」或者「進程」 專門負責處理當前客戶端的全部請求。

ThreadingTCPServer

ThreadingTCPServer實現的Soket服務器內部會爲每一個client建立一個 「線程」,該線程用來和客戶端進行交互。

一、ThreadingTCPServer基礎

使用ThreadingTCPServer:

  • 建立一個繼承自 SocketServer.BaseRequestHandler 的類
  • 類中必須定義一個名稱爲 handle 的方法
  • 啓動ThreadingTCPServer
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
SocketServer實現服務器
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()
客戶端

二、ThreadingTCPServer源碼剖析

ThreadingTCPServer的類圖關係以下:

 

內部調用流程爲:

  • 啓動服務端程序
  • 執行 TCPServer.__init__ 方法,建立服務端Socket對象並綁定 IP 和 端口
  • 執行 BaseServer.__init__ 方法,將自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle賦值給 self.RequestHandlerClass
  • 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...
  • 當客戶端鏈接到達服務器
  • 執行 ThreadingMixIn.process_request 方法,建立一個 「線程」 用來處理請求
  • 執行 ThreadingMixIn.process_request_thread 方法
  • 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass()  即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)

ThreadingTCPServer相關源碼

class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    Instance variables:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # XXX: Consider using another file descriptor or
                # connecting to the socket to wake this up instead of
                # polling. Polling reduces our responsiveness to a
                # shutdown request and wastes cpu at all other times.
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print '-'*40
BaseServer
class TCPServer(BaseServer):

    """Base class for various socket-based server classes.

    Defaults to synchronous IP stream (i.e., TCP).

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you don't use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - request_queue_size (only for stream sockets)
    - allow_reuse_address

    Instance variables:

    - server_address
    - RequestHandlerClass
    - socket

    """

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

    def fileno(self):
        """Return socket file number.

        Interface required by select().

        """
        return self.socket.fileno()

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()
TCPServer
class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
ThreadingMixIn
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
ThreadingTCPServer

RequestHandler相關源碼

實例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
服務端
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()
客戶端

ForkingTCPServer

ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別爲請求者創建 「線程」  和 「進程」。

基本使用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
服務端

 

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()
客戶端

以上ForkingTCPServer只是將 ThreadingTCPServer 實例中的代碼:

?
1
2
3
server = SocketServer.ThreadingTCPServer(( '127.0.0.1' , 8009 ),MyRequestHandler)
變動爲:
server = SocketServer.ForkingTCPServer(( '127.0.0.1' , 8009 ),MyRequestHandler)

 SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 os.fork 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個進程,當前新建立的進程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。

源碼剖析參考 ThreadingTCPServer

Twisted

Twisted是一個事件驅動的網絡框架,其中包含了諸多功能,例如:網絡協議、線程、數據庫管理、網絡操做、電子郵件等。  

 

事件驅動

簡而言之,事件驅動分爲二個部分:第一,註冊事件;第二,觸發事件。

自定義事件驅動框架,命名爲:「弒君者」:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# event_drive.py

event_list = []


def run():
    for event in event_list:
        obj = event()
        obj.execute()


class BaseHandler(object):
    """
    用戶必須繼承該類,從而規範全部類的方法(相似於接口的功能)
    """
    def execute(self):
        raise Exception('you must overwrite execute')
最牛逼的事件驅動框架

程序員使用「弒君者框架」:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from source import event_drive


class MyHandler(event_drive.BaseHandler):

    def execute(self):
        print 'event-drive execute MyHandler'


event_drive.event_list.append(MyHandler)
event_drive.run()
View Code

如上述代碼,事件驅動只不過是框架規定了執行順序,程序員在使用框架時,能夠向原執行順序中註冊「事件」,從而在框架執行時能夠出發已註冊的「事件」。

基於事件驅動Socket

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# -*- coding:utf-8 -*-
   
from twisted.internet import protocol
from twisted.internet import reactor
   
class Echo(protocol.Protocol):
     def dataReceived( self , data):
         self .transport.write(data)
   
def main():
     factory = protocol.ServerFactory()
     factory.protocol = Echo
   
     reactor.listenTCP( 8000 ,factory)
     reactor.run()
   
if __name__ = = '__main__' :
     main()

程序執行流程:

  • 運行服務端程序
  • 建立Protocol的派生類Echo
  • 建立ServerFactory對象,並將Echo類封裝到其protocol字段中
  • 執行reactor的 listenTCP 方法,內部使用 tcp.Port 建立socket server對象,並將該對象添加到了 reactor的set類型的字段 _read 中
  • 執行reactor的 run 方法,內部執行 while 循環,並經過 select 來監視 _read 中文件描述符是否有變化,循環中...
  • 客戶端請求到達
  • 執行reactor的 _doReadOrWrite 方法,其內部經過反射調用 tcp.Port 類的 doRead 方法,內部 accept 客戶端鏈接並建立Server對象實例(用於封裝客戶端socket信息)和 建立 Echo 對象實例(用於處理請求) ,而後調用 Echo 對象實例的 makeConnection 方法,建立鏈接。
  • 執行 tcp.Server 類的 doRead 方法,讀取數據,
  • 執行 tcp.Server 類的 _dataReceived 方法,若是讀取數據內容爲空(關閉連接),不然,出發 Echo 的 dataReceived 方法
  • 執行 Echo 的 dataReceived 方法

從源碼能夠看出,上述實例本質上使用了事件驅動的方法 和 IO多路複用的機制來進行Socket的處理。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from twisted.internet import reactor, protocol
from twisted.web.client import getPage
from twisted.internet import reactor
import time

class Echo(protocol.Protocol):

    def dataReceived(self, data):
        deferred1 = getPage('http://cnblogs.com')
        deferred1.addCallback(self.printContents)

        deferred2 = getPage('http://baidu.com')
        deferred2.addCallback(self.printContents)

        for i in range(2):
            time.sleep(1)
            print 'execute ',i


    def execute(self,data):
        self.transport.write(data)

    def printContents(self,content):
        print len(content),content[0:100],time.time()

def main():

    factory = protocol.ServerFactory()
    factory.protocol = Echo

    reactor.listenTCP(8000,factory)
    reactor.run()

if __name__ == '__main__':
    main()
異步IO操做

 

預習的內容
相關文章
相關標籤/搜索