python_day8のSocket

Socket

socket一般也稱做"套接字",用於描述IP地址和端口,是一個通訊鏈的句柄,應用程序一般經過"套接字"向網絡發出請求或者應答網絡請求。python

socket起源於Unix,而Unix/Linux基本哲學之一就是「一切皆文件」,對於文件用【打開】【讀寫】【關閉】模式來操做。socket就是該模式的一個實現,socket便是一種特殊的文件,一些socket函數就是對其進行的操做(讀/寫IO、打開、關閉)react

socket和file的區別:程序員

  • file模塊是針對某個指定文件進行【打開】【讀寫】【關閉】web

  • socket模塊是針對 服務器端 和 客戶端Socket 進行【打開】【讀寫】【關閉】數據庫

 

#  socket server編程

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

ip_port = ('127.0.0.1',9999)

sk = socket.socket()
sk.bind(ip_port)
sk.listen(5)

while True:
    print 'server waiting...'
    conn,addr = sk.accept()

    client_data = conn.recv(1024)
    print client_data
    conn.sendall('不要回答,不要回答,不要回答')

    conn.close()

#  socket clientwindows

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket()
sk.connect(ip_port)

sk.sendall('請求佔領地球')

server_reply = sk.recv(1024)
print server_reply

sk.close()

WEB服務應用:api

#!/usr/bin/env python
#coding:utf-8
import socket
 
def handle_request(client):
    buf = client.recv(1024)
    client.send("HTTP/1.1 200 OK\r\n\r\n")
    client.send("Hello, World")
 
def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('localhost',8080))
    sock.listen(5)
 
    while True:
        connection, address = sock.accept()
        handle_request(connection)
        connection.close()
 
if __name__ == '__main__':
  main()

更多功能數組

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)緩存

參數一:地址簇

  socket.AF_INET IPv4(默認)
  socket.AF_INET6 IPv6

  socket.AF_UNIX 只可以用於單一的Unix系統進程間通訊

參數二:類型

  socket.SOCK_STREAM  流式socket , for TCP (默認)
  socket.SOCK_DGRAM   數據報式socket , for UDP

  socket.SOCK_RAW 原始套接字,普通的套接字沒法處理ICMP、IGMP等網絡報文,而SOCK_RAW能夠;其次,SOCK_RAW也能夠處理特殊的IPv4報文;此外,利用原始套接字,能夠經過IP_HDRINCL套接字選項由用戶構造IP頭。
  socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在須要執行某些特殊操做時使用,如發送ICMP報文。SOCK_RAM一般僅限於高級用戶或管理員運行的程序使用。
  socket.SOCK_SEQPACKET 可靠的連續數據包服務

參數三:協議

  0  (默認)與特定的地址家族相關的協議,若是是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議

#  UDP Demo

import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
sk.bind(ip_port)

while True:
    data = sk.recv(1024)
    print data




import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
while True:
    inp = raw_input('數據:').strip()
    if inp == 'exit':
        break
    sk.sendto(inp,ip_port)

sk.close()

sk.bind(address)

  s.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。

sk.listen(backlog)

  開始監聽傳入鏈接。backlog指定在拒絕鏈接以前,能夠掛起的最大鏈接數量。

      backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5
      這個值不能無限大,由於要在內核中維護鏈接隊列

sk.setblocking(bool)

  是否阻塞(默認True),若是設置False,那麼accept和recv時一旦無數據,則報錯。

sk.accept()

  接受鏈接並返回(conn,address),其中conn是新的套接字對象,能夠用來接收和發送數據。address是鏈接客戶端的地址。

  接收TCP 客戶的鏈接(阻塞式)等待鏈接的到來

sk.connect(address)

  鏈接到address處的套接字。通常,address的格式爲元組(hostname,port),若是鏈接出錯,返回socket.error錯誤。

sk.connect_ex(address)

  同上,只不過會有返回值,鏈接成功時返回 0 ,鏈接失敗時候返回編碼,例如:10061

sk.close()

  關閉套接字

sk.recv(bufsize[,flag])

  接受套接字的數據。數據以字符串形式返回,bufsize指定最多能夠接收的數量。flag提供有關消息的其餘信息,一般能夠忽略。

sk.recvfrom(bufsize[.flag])

  與recv()相似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。

sk.send(string[,flag])

  將string中的數據發送到鏈接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。即:可能未將指定內容所有發送。

sk.sendall(string[,flag])

  將string中的數據發送到鏈接的套接字,但在返回以前會嘗試發送全部數據。成功返回None,失敗則拋出異常。

      內部經過遞歸調用send,將全部內容發送出去。

sk.sendto(string[,flag],address)

  將數據發送到套接字,address是形式爲(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。

sk.settimeout(timeout)

  設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如 client 鏈接最多等待5s )

sk.getpeername()

  返回鏈接套接字的遠程地址。返回值一般是元組(ipaddr,port)。

sk.getsockname()

  返回套接字本身的地址。一般是一個元組(ipaddr,port)

sk.fileno()

  套接字的文件描述符

#  UDP
#  服務端
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
sk.bind(ip_port)

while True:
    data,(host,port) = sk.recvfrom(1024)
    print(data,host,port)
    sk.sendto(bytes('ok', encoding='utf-8'), (host,port))


#  客戶端
import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
while True:
    inp = input('數據:').strip()
    if inp == 'exit':
        break
    sk.sendto(bytes(inp, encoding='utf-8'),ip_port)
    data = sk.recvfrom(1024)
    print(data)

sk.close()

實例:智能機器人

#  服務端

#!/usr/bin/env python
# -*- coding:utf-8 -*-


import socket

ip_port = ('127.0.0.1',8888)
sk = socket.socket()
sk.bind(ip_port)
sk.listen(5)

while True:
    conn,address =  sk.accept()
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    Flag = True
    while Flag:
        data = conn.recv(1024)
        if data == 'exit':
            Flag = False
        elif data == '0':
            conn.sendall('經過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請從新輸入.')
    conn.close()

#  客戶端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8005)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

IO多路複用

I/O多路複用指:經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。

Linux

Linux中的 select,poll,epoll 都是IO多路複用的機制。

select
 
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
 
poll
 
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。
 
epoll
 
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。
epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

Python

Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路複用。

Windows Python:
    提供: select
Mac Python:
    提供: select
Linux Python:
    提供: select、poll、epoll

注意:網絡操做、文件操做、終端操做等均屬於IO操做,對於windows只支持Socket操做,其餘系統支持其餘IO操做,可是沒法檢測 普通文件操做 自動上次讀取是否已經變化。

對於select方法:

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間)
 
參數: 可接受四個參數(前三個必須)
返回值:三個列表
 
select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。
一、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中
二、當 參數2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值2 序列中
三、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中
四、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化
   當 超時時間 = 1時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。
#  利用select監聽終端操做實例

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import select
import threading
import sys

while True:
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
    if sys.stdin in readable:
        print 'select get stdin',sys.stdin.readline()
#  利用select實現僞同時處理多個Socket客戶端請求:服務端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import select

sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0)

inputs = [sk1,]

while True:
    readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
    for r in readable_list:
        # 當客戶端第一次鏈接服務端時
        if sk1 == r:
            print 'accept'
            request, address = r.accept()
            request.setblocking(0)
            inputs.append(request)
        # 當客戶端鏈接上服務端以後,再次發送數據時
        else:
            received = r.recv(1024)
            # 當正常接收客戶端發送的數據時
            if received:
                print 'received data:', received
            # 當客戶端關閉程序時
            else:
                inputs.remove(r)

sk1.close()
#  利用select實現僞同時處理多個Socket客戶端請求:客戶端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port)

while True:
    inp = raw_input('please input:')
    sk.sendall(inp)
sk.close()

此處的Socket服務端相比與原生的Socket,他支持當某一個請求再也不發送數據時,服務器端不會等待而是能夠去處理其餘請求的數據。可是,若是每一個請求的耗時比較長時,select版本的服務器端也沒法完成同時操做。

#  基於select實現socket服務端

#!/usr/bin/env python
#coding:utf8

'''
 服務器的實現 採用select的方式
'''

import select
import socket
import sys
import Queue

#建立套接字並設置該套接字爲非阻塞模式

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#綁定套接字
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s port %s'% server_address
server.bind(server_address)

#將該socket變成服務模式
#backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5
#這個值不能無限大,由於要在內核中維護鏈接隊列

server.listen(5)

#初始化讀取數據的監聽列表,最開始時但願從server這個套接字上讀取數據
inputs = [server]

#初始化寫入數據的監聽列表,最開始並無客戶端鏈接進來,因此列表爲空

outputs = []

#要發往客戶端的數據
message_queues = {}
while inputs:
    print >>sys.stderr,'waiting for the next event'
    #調用select監聽全部監聽列表中的套接字,並將準備好的套接字加入到對應的列表中
    readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字  若是是文件呢? 
    #監控文件句柄有某一處發生了變化 可寫 可讀  異常屬於Linux中的網絡編程 
    #屬於同步I/O操做,屬於I/O複用模型的一種
    #rlist--等待到準備好讀
    #wlist--等待到準備好寫
    #xlist--等待到一種異常
    #處理可讀取的套接字

    '''
        若是server這個套接字可讀,則說明有新連接到來
        此時在server套接字上調用accept,生成一個與客戶端通信的套接字
        並將與客戶端通信的套接字加入inputs列表,下一次能夠經過select檢查鏈接是否可讀
        而後在發往客戶端的緩衝中加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列
        select系統調用是用來讓咱們的程序監視多個文件句柄(file descrīptor)的狀態變化的。程序會停在select這裏等待,
        直到被監視的文件句柄有某一個或多個發生了狀態改變
        '''

    '''
        若可讀的套接字不是server套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開
        若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後
        將於客戶端通信的套接字加入到寫數據的監聽列表:
        若是套接字可讀.但沒有接收到數據,則說明客戶端已經斷開。這時須要關閉與客戶端鏈接的套接字
        進行資源清理
        '''
        
    for s in readable: 
        if s is server:
            connection,client_address = s.accept()
            print >>sys.stderr,'connection from',client_address
            connection.setblocking(0)#設置非阻塞
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print >>sys.stderr,'received "%s" from %s'% \
                (data,s.getpeername())
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print >>sys.stderr,'closing',client_address
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
                    
    #處理可寫的套接字
    '''
        在發送緩衝區中取出響應的數據,發往客戶端。
        若是沒有數據須要寫,則將套接字從發送隊列中移除,select中再也不監視
        '''

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()

        except Queue.Empty:
            print >>sys.stderr,'  ',s,getpeername(),'queue empty'
            outputs.remove(s)
        else:
            print >>sys.stderr,'sending "%s" to %s'% \
            (next_msg,s.getpeername())
            s.send(next_msg)



    #處理異常狀況

    for s in exceptional:
        for s in exceptional:
            print >>sys.stderr,'exception condition on',s.getpeername()
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
            del message_queues[s]

SocketServer模塊

SocketServer內部使用 IO多路複用 以及 「多線程」 和 「多進程」 ,從而實現併發處理多個客戶端請求的Socket服務端。即:每一個客戶端請求鏈接到服務器時,Socket服務端都會在服務器是建立一個「線程」或者「進程」 專門負責處理當前客戶端的全部請求。

ThreadingTCPServer

ThreadingTCPServer實現的Soket服務器內部會爲每一個client建立一個 「線程」,該線程用來和客戶端進行交互。

一、ThreadingTCPServer基礎

使用ThreadingTCPServer:

  • 建立一個繼承自 SocketServer.BaseRequestHandler 的類

  • 類中必須定義一個名稱爲 handle 的方法

  • 啓動ThreadingTCPServer

#  SocketServer實現服務器

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
#  客戶端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

二、ThreadingTCPServer源碼剖析

ThreadingTCPServer的類圖關係以下:

 

內部調用流程爲:

  • 啓動服務端程序

  • 執行 TCPServer.__init__ 方法,建立服務端Socket對象並綁定 IP 和 端口

  • 執行 BaseServer.__init__ 方法,將自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle賦值給 self.RequestHandlerClass

  • 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...

  • 當客戶端鏈接到達服務器

  • 執行 ThreadingMixIn.process_request 方法,建立一個 「線程」 用來處理請求

  • 執行 ThreadingMixIn.process_request_thread 方法

  • 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass()  即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)

實例:

#  服務端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
#  客戶端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

源碼精簡:

import socket
import threading
import select


def process(request, client_address):
    print request,client_address
    conn = request
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    flag = True
    while flag:
        data = conn.recv(1024)
        if data == 'exit':
            flag = False
        elif data == '0':
            conn.sendall('經過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請從新輸入.')

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(('127.0.0.1',8002))
sk.listen(5)

while True:
    r, w, e = select.select([sk,],[],[],1)
    print 'looping'
    if sk in r:
        print 'get request'
        request, client_address = sk.accept()
        t = threading.Thread(target=process, args=(request, client_address))
        t.daemon = False
        t.start()

sk.close()

如精簡代碼能夠看出,SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個線程,當前線程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。

ForkingTCPServer

ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別爲請求者創建 「線程」  和 「進程」。

基本使用:

#  服務端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('經過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請從新輸入.')


if __name__ == '__main__':
    server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
#  客戶端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

以上ForkingTCPServer只是將 ThreadingTCPServer 實例中的代碼:

server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
變動爲:
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)

SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 os.fork 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個進程,當前新建立的進程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。

Twisted

Twisted是一個事件驅動的網絡框架,其中包含了諸多功能,例如:網絡協議、線程、數據庫管理、網絡操做、電子郵件等。

事件驅動

簡而言之,事件驅動分爲二個部分:第一,註冊事件;第二,觸發事件。

自定義事件驅動框架,命名爲:「弒君者」:

#  最牛逼的事件驅動框架

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# event_drive.py

event_list = []


def run():
    for event in event_list:
        obj = event()
        obj.execute()


class BaseHandler(object):
    """
    用戶必須繼承該類,從而規範全部類的方法(相似於接口的功能)
    """
    def execute(self):
        raise Exception('you must overwrite execute')

程序員使用「弒君者框架」:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from source import event_drive


class MyHandler(event_drive.BaseHandler):

    def execute(self):
        print 'event-drive execute MyHandler'


event_drive.event_list.append(MyHandler)
event_drive.run()

如上述代碼,事件驅動只不過是框架規定了執行順序,程序員在使用框架時,能夠向原執行順序中註冊「事件」,從而在框架執行時能夠出發已註冊的「事件」。

基於事件驅動Socket

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from twisted.internet import protocol
from twisted.internet import reactor
 
class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)
 
def main():
    factory = protocol.ServerFactory()
    factory.protocol = Echo
 
    reactor.listenTCP(8000,factory)
    reactor.run()
 
if __name__ == '__main__':
    main()

程序執行流程:

  • 運行服務端程序

  • 建立Protocol的派生類Echo

  • 建立ServerFactory對象,並將Echo類封裝到其protocol字段中

  • 執行reactor的 listenTCP 方法,內部使用 tcp.Port 建立socket server對象,並將該對象添加到了 reactor的set類型的字段 _read 中

  • 執行reactor的 run 方法,內部執行 while 循環,並經過 select 來監視 _read 中文件描述符是否有變化,循環中...

  • 客戶端請求到達

  • 執行reactor的 _doReadOrWrite 方法,其內部經過反射調用 tcp.Port 類的 doRead 方法,內部 accept 客戶端鏈接並建立Server對象實例(用於封裝客戶端socket信息)和 建立 Echo 對象實例(用於處理請求) ,而後調用 Echo 對象實例的 makeConnection 方法,建立鏈接。

  • 執行 tcp.Server 類的 doRead 方法,讀取數據,

  • 執行 tcp.Server 類的 _dataReceived 方法,若是讀取數據內容爲空(關閉連接),不然,出發 Echo 的 dataReceived 方法

  • 執行 Echo 的 dataReceived 方法

從源碼能夠看出,上述實例本質上使用了事件驅動的方法 和 IO多路複用的機制來進行Socket的處理。

#  異步IO操做

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from twisted.internet import reactor, protocol
from twisted.web.client import getPage
from twisted.internet import reactor
import time

class Echo(protocol.Protocol):

    def dataReceived(self, data):
        deferred1 = getPage('http://cnblogs.com')
        deferred1.addCallback(self.printContents)

        deferred2 = getPage('http://baidu.com')
        deferred2.addCallback(self.printContents)

        for i in range(2):
            time.sleep(1)
            print 'execute ',i


    def execute(self,data):
        self.transport.write(data)

    def printContents(self,content):
        print len(content),content[0:100],time.time()

def main():

    factory = protocol.ServerFactory()
    factory.protocol = Echo

    reactor.listenTCP(8000,factory)
    reactor.run()

if __name__ == '__main__':
    main()

更多請見:

  https://twistedmatrix.com/trac  http://twistedmatrix.com/documents/current/api/

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息