socket一般也稱做"套接字",用於描述IP地址和端口,是一個通訊鏈的句柄,應用程序一般經過"套接字"向網絡發出請求或者應答網絡請求。html
socket起源於Unix,而Unix/Linux基本哲學之一就是「一切皆文件」,對於文件用【打開】【讀寫】【關閉】模式來操做。socket就是該模式的一個實現,socket便是一種特殊的文件,一些socket函數就是對其進行的操做(讀/寫IO、打開、關閉)python
socket和file的區別:react
Socket Families(地址簇)程序員
socket.
AF_UNIX unix本機進程間通訊
web
socket.
AF_INET IPV4
sql
socket.
AF_INET6 IPV6
數據庫
These constants represent the address (and protocol) families, used for the first argument to
socket()
. If the AF_UNIX
constant is not defined then this protocol is unsupported. More constants may be available depending on the system.編程
python2能夠發送字符串,python3只能發送byte類型windows
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.bind(ip_port) sk.listen(5) while True: print 'server waiting...' conn,addr = sk.accept() client_data = conn.recv(1024) print client_data conn.sendall('不要回答,不要回答,不要回答') conn.close()
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.connect(ip_port) sk.sendall('請求佔領地球') server_reply = sk.recv(1024) print server_reply sk.close() socket client
錯誤:api
Traceback (most recent call last):
File "E:/python34foexam/lessons/test_socket_cli.py", line 11, in <module>
data = cli.recv(1024)
ConnectionResetError: [WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
因爲服務器的鏈接被佔用,沒法再鏈接另外一個鏈接,所以,對接收處處理方法須要用實例去調用,以下面的conn.xxx()
import socket server = socket.socket() server.bind(('localhost',6969)) # 綁定要監聽端 server.listen() # 監聽 print('listening....') conn, addr = server.accept() # 等電話打進來,標記實例和地址 print(conn,addr) print('call is coming') data = conn.recv(1024) print('recv:',data) conn.send(data.upper()) server.close()
import socket cli = socket.socket() cli.connect(('localhost',6969)) cli.send(b'Hello world') data = cli.recv(1024) print('recv:',data) cli.close()
但這樣,沒法發送中文:
File "E:/python34foexam/lessons/test_socket_cli.py", line 9 cli.send(b'世界,你好') ^ SyntaxError: bytes can only contain ASCII literal characters.
import socket cli = socket.socket() cli.connect(('localhost',6969)) cli.send('世界,你好'.encode('utf-8')) data = cli.recv(1024) print('recv:',data.decode()) cli.close() recv: 世界,你好
再改進,數據發送完再退出
import socket server = socket.socket() server.bind(('localhost',6969)) # 綁定要監聽端 server.listen() # 監聽 print('listening....') conn, addr = server.accept() # 等電話打進來,標記實例和地址 print(conn,addr) print('call is coming') while True: data = conn.recv(1024) print('recv:',data) conn.send(data.upper()) server.close()
import socket cli = socket.socket() cli.connect(('localhost',6969)) while True: msg = input('>>:').strip() cli.send(msg.encode('utf-8')) data = cli.recv(1024) print('recv:',data.decode()) cli.close()
但沒法和第二個客戶端進行鏈接,第一個鏈接結束後,服務器端也中止。
sendall就是循環send,直到數據發送完成。
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
參數一:地址簇 socket.AF_INET IPv4(默認) socket.AF_INET6 IPv6 socket.AF_UNIX 只可以用於單一的Unix系統進程間通訊 參數二:類型 socket.SOCK_STREAM 流式socket , for TCP (默認) socket.SOCK_DGRAM 數據報式socket , for UDP socket.SOCK_RAW 原始套接字,普通的套接字沒法處理ICMP、IGMP等網絡報文,而SOCK_RAW能夠;其次,SOCK_RAW也能夠處理特殊的IPv4報文;此外,利用原始套接字,能夠經過IP_HDRINCL套接字選項由用戶構造IP頭。 socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在須要執行某些特殊操做時使用,如發送ICMP報文。SOCK_RAM一般僅限於高級用戶或管理員運行的程序使用。 socket.SOCK_SEQPACKET 可靠的連續數據包服務 參數三:協議 0 (默認)與特定的地址家族相關的協議,若是是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議
import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) sk.bind(ip_port) while True: data = sk.recv(1024) print data import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) while True: inp = raw_input('數據:').strip() if inp == 'exit': break sk.sendto(inp,ip_port) sk.close() UDP Demo
sk.bind(address)
s.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。
sk.listen(backlog)
開始監聽傳入鏈接。backlog指定在拒絕鏈接以前,能夠掛起的最大鏈接數量。
backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5
這個值不能無限大,由於要在內核中維護鏈接隊列
sk.setblocking(bool)
是否阻塞(默認True),若是設置False,那麼accept和recv時一旦無數據,則報錯。
sk.accept()
通常使用conn,addr = sk.accept()生成一個實例,將實例和地址等信息賦值
接受鏈接並返回(conn,address),其中conn是新的套接字對象,能夠用來接收和發送數據。address是鏈接客戶端的地址。
接收TCP 客戶的鏈接(阻塞式)等待鏈接的到來
sk.connect(address)
鏈接到address處的套接字。通常,address的格式爲元組(hostname,port),若是鏈接出錯,返回socket.error錯誤。
sk.connect_ex(address)
同上,只不過會有返回值,鏈接成功時返回 0 ,鏈接失敗時候返回編碼,例如:10061
sk.close()
關閉套接字
sk.recv(bufsize[,flag])
接受套接字的數據。數據以字符串形式返回,bufsize指定最多能夠接收的數量。flag提供有關消息的其餘信息,一般能夠忽略。
sk.recvfrom(bufsize[.flag])
與recv()相似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。
sk.send(string[,flag])
將string中的數據發送到鏈接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。即:可能未將指定內容所有發送。
sk.sendall(string[,flag])
將string中的數據發送到鏈接的套接字,但在返回以前會嘗試發送全部數據。成功返回None,失敗則拋出異常。
內部經過遞歸調用send,將全部內容發送出去。
sk.sendto(string[,flag],address)
將數據發送到套接字,address是形式爲(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。
sk.settimeout(timeout)
設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如 client 鏈接最多等待5s )
sk.getpeername()
返回鏈接套接字的遠程地址。返回值一般是元組(ipaddr,port)。
sk.getsockname()
返回套接字本身的地址。一般是一個元組(ipaddr,port)
sk.fileno()
套接字的文件描述符
# 服務端 import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) sk.bind(ip_port) while True: data,(host,port) = sk.recvfrom(1024) print(data,host,port) sk.sendto(bytes('ok', encoding='utf-8'), (host,port)) #客戶端 import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) while True: inp = input('數據:').strip() if inp == 'exit': break sk.sendto(bytes(inp, encoding='utf-8'),ip_port) data = sk.recvfrom(1024) print(data) sk.close() UDP
實例:智能機器人
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind(ip_port) sk.listen(5) while True: conn,address = sk.accept() conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('經過可能會被錄音.balabala一大推') else: conn.sendall('請從新輸入.') conn.close() 服務端
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8005) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客戶端
accept()和recv()是阻塞的(基於連接正常)
listen(n),n表明能掛起的鏈接數,若是n=1表明能夠連接一個,阻塞一個。第三個拒絕。
I/O多路複用指:經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。
Linux
Linux中的 select,poll,epoll 都是IO多路複用的機制。
select select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。 select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。 select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。 另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。 poll poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。 poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。 另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。 epoll 直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。 epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。 epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。 另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
Python
Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路複用。
Windows Python: 提供: select Mac Python: 提供: select Linux Python: 提供: select、poll、epoll
注意:網絡操做、文件操做、終端操做等均屬於IO操做,對於windows只支持Socket操做,其餘系統支持其餘IO操做,可是沒法檢測 普通文件操做 自動上次讀取是否已經變化。
對於select方法:
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間) 參數: 可接受四個參數(前三個必須) 返回值:三個列表 select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。 一、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中 二、當 參數2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值2 序列中 三、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中 四、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化 當 超時時間 = 1時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。
#!/usr/bin/env python # -*- coding:utf-8 -*- import select import threading import sys while True: readable, writeable, error = select.select([sys.stdin,],[],[],1) if sys.stdin in readable: print 'select get stdin',sys.stdin.readline() 利用select監聽終端操做實例
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket import select sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sk1.bind(('127.0.0.1',8002)) sk1.listen(5) sk1.setblocking(0) inputs = [sk1,] while True: readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1) for r in readable_list: # 當客戶端第一次鏈接服務端時 if sk1 == r: print 'accept' request, address = r.accept() request.setblocking(0) inputs.append(request) # 當客戶端鏈接上服務端以後,再次發送數據時 else: received = r.recv(1024) # 當正常接收客戶端發送的數據時 if received: print 'received data:', received # 當客戶端關閉程序時 else: inputs.remove(r) sk1.close() 利用select實現僞同時處理多個Socket客戶端請求:服務端
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8002) sk = socket.socket() sk.connect(ip_port) while True: inp = raw_input('please input:') sk.sendall(inp) sk.close() 利用select實現僞同時處理多個Socket客戶端請求:客戶端
此處的Socket服務端相比與原生的Socket,他支持當某一個請求再也不發送數據時,服務器端不會等待而是能夠去處理其餘請求的數據。可是,若是每一個請求的耗時比較長時,select版本的服務器端也沒法完成同時操做。
#!/usr/bin/env python #coding:utf8 ''' 服務器的實現 採用select的方式 ''' import select import socket import sys import Queue #建立套接字並設置該套接字爲非阻塞模式 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking(0) #綁定套接字 server_address = ('localhost',10000) print >>sys.stderr,'starting up on %s port %s'% server_address server.bind(server_address) #將該socket變成服務模式 #backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5 #這個值不能無限大,由於要在內核中維護鏈接隊列 server.listen(5) #初始化讀取數據的監聽列表,最開始時但願從server這個套接字上讀取數據 inputs = [server] #初始化寫入數據的監聽列表,最開始並無客戶端鏈接進來,因此列表爲空 outputs = [] #要發往客戶端的數據 message_queues = {} while inputs: print >>sys.stderr,'waiting for the next event' #調用select監聽全部監聽列表中的套接字,並將準備好的套接字加入到對應的列表中 readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字 若是是文件呢? #監控文件句柄有某一處發生了變化 可寫 可讀 異常屬於Linux中的網絡編程 #屬於同步I/O操做,屬於I/O複用模型的一種 #rlist--等待到準備好讀 #wlist--等待到準備好寫 #xlist--等待到一種異常 #處理可讀取的套接字 ''' 若是server這個套接字可讀,則說明有新連接到來 此時在server套接字上調用accept,生成一個與客戶端通信的套接字 並將與客戶端通信的套接字加入inputs列表,下一次能夠經過select檢查鏈接是否可讀 而後在發往客戶端的緩衝中加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列 select系統調用是用來讓咱們的程序監視多個文件句柄(file descrīptor)的狀態變化的。程序會停在select這裏等待, 直到被監視的文件句柄有某一個或多個發生了狀態改變 ''' ''' 若可讀的套接字不是server套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開 若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後 將於客戶端通信的套接字加入到寫數據的監聽列表: 若是套接字可讀.但沒有接收到數據,則說明客戶端已經斷開。這時須要關閉與客戶端鏈接的套接字 進行資源清理 ''' for s in readable: if s is server: connection,client_address = s.accept() print >>sys.stderr,'connection from',client_address connection.setblocking(0)#設置非阻塞 inputs.append(connection) message_queues[connection] = Queue.Queue() else: data = s.recv(1024) if data: print >>sys.stderr,'received "%s" from %s'% \ (data,s.getpeername()) message_queues[s].put(data) if s not in outputs: outputs.append(s) else: print >>sys.stderr,'closing',client_address if s in outputs: outputs.remove(s) inputs.remove(s) s.close() del message_queues[s] #處理可寫的套接字 ''' 在發送緩衝區中取出響應的數據,發往客戶端。 若是沒有數據須要寫,則將套接字從發送隊列中移除,select中再也不監視 ''' for s in writable: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: print >>sys.stderr,' ',s,getpeername(),'queue empty' outputs.remove(s) else: print >>sys.stderr,'sending "%s" to %s'% \ (next_msg,s.getpeername()) s.send(next_msg) #處理異常狀況 for s in exceptional: for s in exceptional: print >>sys.stderr,'exception condition on',s.getpeername() inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s] 基於select實現socket服務端
SocketServer內部使用 IO多路複用 以及 「多線程」 和 「多進程」 ,從而實現併發處理多個客戶端請求的Socket服務端。即:每一個客戶端請求鏈接到服務器時,Socket服務端都會在服務器是建立一個「線程」或者「進程」 專門負責處理當前客戶端的全部請求。
The socketserver
module simplifies the task of writing network servers.
There are four basic concrete server classes:
socketserver.
TCPServer
(server_address, RequestHandlerClass, bind_and_activate=True)
This uses the Internet TCP protocol, which provides for continuous streams of data between the client and server. If bind_and_activate is true, the constructor automatically attempts to invoke server_bind()
andserver_activate()
. The other parameters are passed to the BaseServer
base class.
socketserver.
UDPServer
(server_address, RequestHandlerClass, bind_and_activate=True)
This uses datagrams, which are discrete packets of information that may arrive out of order or be lost while in transit. The parameters are the same as for TCPServer
.
socketserver.
UnixStreamServer
(server_address, RequestHandlerClass, bind_and_activate=True)
socketserver.
UnixDatagramServer
(server_address, RequestHandlerClass,bind_and_activate=True)
These more infrequently used classes are similar to the TCP and UDP classes, but use Unix domain sockets; they’re not available on non-Unix platforms. The parameters are the same as for TCPServer
.
These four classes process requests synchronously; each request must be completed before the next request can be started. This isn’t suitable if each request takes a long time to complete, because it requires a lot of computation, or because it returns a lot of data which the client is slow to process. The solution is to create a separate process or thread to handle each request; the ForkingMixIn
and ThreadingMixIn
mix-in classes can be used to support asynchronous behaviour.
There are five classes in an inheritance diagram, four of which represent synchronous servers of four types:
+------------+
| BaseServer | +------------+ | v +-----------+ +------------------+ | TCPServer |------->| UnixStreamServer | +-----------+ +------------------+ | v +-----------+ +--------------------+ | UDPServer |------->| UnixDatagramServer | +-----------+ +--------------------+
Note that UnixDatagramServer
derives from UDPServer
, not from UnixStreamServer
— the only difference between an IP and a Unix stream server is the address family, which is simply repeated in both Unix server classes.
socketserver.
ForkingMixIn
socketserver.
ThreadingMixIn
Forking and threading versions of each type of server can be created using these mix-in classes. For instance, ThreadingUDPServer
is created as follows:
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
The mix-in class comes first, since it overrides a method defined in UDPServer
. Setting the various attributes also changes the behavior of the underlying server mechanism.
socketserver.
ForkingTCPServer
socketserver.
ForkingUDPServer
socketserver.
ThreadingTCPServer
socketserver.
ThreadingUDPServer
These classes are pre-defined using the mix-in classes.
socketserver.
BaseRequestHandler
This is the superclass of all request handler objects. It defines the interface, given below. A concrete request handler subclass must define a new handle()
method, and can override any of the other methods. A new instance of the subclass is created for each request.
setup
()
Called before the handle()
method to perform any initialization actions required. The default implementation does nothing.
handle
()
This function must do all the work required to service a request. The default implementation does nothing. Several instance attributes are available to it; the request is available as self.request
; the client address as self.client_address
; and the server instance as self.server
, in case it needs access to per-server information.
The type of self.request
is different for datagram or stream services. For stream services,self.request
is a socket object; for datagram services, self.request
is a pair of string and socket.
ThreadingTCPServer
ThreadingTCPServer實現的Soket服務器內部會爲每一個client建立一個 「線程」,該線程用來和客戶端進行交互。
一、ThreadingTCPServer基礎
使用ThreadingTCPServer:
#!/usr/bin/env python # -*- coding:utf-8 -*- import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): # print self.request,self.client_address,self.server conn = self.request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('經過可能會被錄音.balabala一大推') else: conn.sendall('請從新輸入.') if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer) server.serve_forever()
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close()
二、ThreadingTCPServer源碼剖析
ThreadingTCPServer的類圖關係以下:
內部調用流程爲:
ThreadingTCPServer相關源碼:
class BaseServer: """Base class for server classes. Methods for the caller: - __init__(server_address, RequestHandlerClass) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you do not use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - server_close() - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - allow_reuse_address Instance variables: - RequestHandlerClass - socket """ timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def server_activate(self): """Called by constructor to activate the server. May be overridden. """ pass def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: # XXX: Consider using another file descriptor or # connecting to the socket to wake this up instead of # polling. Polling reduces our responsiveness to a # shutdown request and wastes cpu at all other times. r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait() # The distinction between handling, getting, processing and # finishing a request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls # select, get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process # or create a new thread to finish the request # - finish_request() instantiates the request handler class; # this constructor will handle the request all by itself def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return self._handle_request_noblock() def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) def handle_timeout(self): """Called if no new request arrives within self.timeout. Overridden by ForkingMixIn. """ pass def verify_request(self, request, client_address): """Verify the request. May be overridden. Return True if we should proceed with this request. """ return True def process_request(self, request, client_address): """Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """Called to clean-up the server. May be overridden. """ pass def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """Called to shutdown and close an individual request.""" self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" pass def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print '-'*40 print 'Exception happened during processing of request from', print client_address import traceback traceback.print_exc() # XXX But this goes to stderr! print '-'*40
class TCPServer(BaseServer): """Base class for various socket-based server classes. Defaults to synchronous IP stream (i.e., TCP). Methods for the caller: - __init__(server_address, RequestHandlerClass, bind_and_activate=True) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you don't use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - request_queue_size (only for stream sockets) - allow_reuse_address Instance variables: - server_address - RequestHandlerClass - socket """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close() def fileno(self): """Return socket file number. Interface required by select(). """ return self.socket.fileno() def get_request(self): """Get the request and client address from the socket. May be overridden. """ return self.socket.accept() def shutdown_request(self, request): """Called to shutdown and close an individual request.""" try: #explicitly shutdown. socket.close() merely releases #the socket and waits for GC to perform the actual close. request.shutdown(socket.SHUT_WR) except socket.error: pass #some platforms may raise ENOTCONN here self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" request.close()
class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
RequestHandler相關源碼
class BaseRequestHandler: """Base class for request handler classes. This class is instantiated for each request to be handled. The constructor sets the instance variables request, client_address and server, and then calls the handle() method. To implement a specific service, all you need to do is to derive a class which defines a handle() method. The handle() method can find the request as self.request, the client address as self.client_address, and the server (in case it needs access to per-server information) as self.server. Since a separate instance is created for each request, the handle() method can define arbitrary other instance variariables. """ def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish() def setup(self): pass def handle(self): pass def finish(self): pass
實例:
#!/usr/bin/env python # -*- coding:utf-8 -*- import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): # print self.request,self.client_address,self.server conn = self.request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('經過可能會被錄音.balabala一大推') else: conn.sendall('請從新輸入.') if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer) server.serve_forever() 服務端
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客戶端
源碼精簡:
import socket import threading import select def process(request, client_address): print request,client_address conn = request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') flag = True while flag: data = conn.recv(1024) if data == 'exit': flag = False elif data == '0': conn.sendall('經過可能會被錄音.balabala一大推') else: conn.sendall('請從新輸入.') sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.bind(('127.0.0.1',8002)) sk.listen(5) while True: r, w, e = select.select([sk,],[],[],1) print 'looping' if sk in r: print 'get request' request, client_address = sk.accept() t = threading.Thread(target=process, args=(request, client_address)) t.daemon = False t.start() sk.close()
如精簡代碼能夠看出,SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個線程,當前線程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。
ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別爲請求者創建 「線程」 和 「進程」。
基本使用:
#!/usr/bin/env python # -*- coding:utf-8 -*- import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): # print self.request,self.client_address,self.server conn = self.request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('經過可能會被錄音.balabala一大推') else: conn.sendall('請從新輸入.') if __name__ == '__main__': server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer) server.serve_forever()
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close()
以上ForkingTCPServer只是將 ThreadingTCPServer 實例中的代碼:
server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler) 變動爲: server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)
SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 os.fork 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個進程,當前新建立的進程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。
源碼剖析參考 ThreadingTCPServer
Twisted是一個事件驅動的網絡框架,其中包含了諸多功能,例如:網絡協議、線程、數據庫管理、網絡操做、電子郵件等。
事件驅動
簡而言之,事件驅動分爲二個部分:第一,註冊事件;第二,觸發事件。
自定義事件驅動框架,命名爲:「弒君者」:
#!/usr/bin/env python # -*- coding:utf-8 -*- # event_drive.py event_list = [] def run(): for event in event_list: obj = event() obj.execute() class BaseHandler(object): """ 用戶必須繼承該類,從而規範全部類的方法(相似於接口的功能) """ def execute(self): raise Exception('you must overwrite execute') 最牛逼的事件驅動框架
程序員使用「弒君者框架」:
#!/usr/bin/env python # -*- coding:utf-8 -*- from source import event_drive class MyHandler(event_drive.BaseHandler): def execute(self): print 'event-drive execute MyHandler' event_drive.event_list.append(MyHandler) event_drive.run()
如上述代碼,事件驅動只不過是框架規定了執行順序,程序員在使用框架時,能夠向原執行順序中註冊「事件」,從而在框架執行時能夠出發已註冊的「事件」。
基於事件驅動Socket
#!/usr/bin/env python # -*- coding:utf-8 -*- from twisted.internet import protocol from twisted.internet import reactor class Echo(protocol.Protocol): def dataReceived(self, data): self.transport.write(data) def main(): factory = protocol.ServerFactory() factory.protocol = Echo reactor.listenTCP(8000,factory) reactor.run() if __name__ == '__main__': main()
程序執行流程:
從源碼能夠看出,上述實例本質上使用了事件驅動的方法 和 IO多路複用的機制來進行Socket的處理。
#!/usr/bin/env python # -*- coding:utf-8 -*- from twisted.internet import reactor, protocol from twisted.web.client import getPage from twisted.internet import reactor import time class Echo(protocol.Protocol): def dataReceived(self, data): deferred1 = getPage('http://cnblogs.com') deferred1.addCallback(self.printContents) deferred2 = getPage('http://baidu.com') deferred2.addCallback(self.printContents) for i in range(2): time.sleep(1) print 'execute ',i def execute(self,data): self.transport.write(data) def printContents(self,content): print len(content),content[0:100],time.time() def main(): factory = protocol.ServerFactory() factory.protocol = Echo reactor.listenTCP(8000,factory) reactor.run() if __name__ == '__main__': main() 異步IO操做
更多請見:
https://twistedmatrix.com/trac http://twistedmatrix.com/documents/current/api/