官方提供了socketserver包去方便咱們快速的搭建一個服務器框架。算法
server類
緩存
socketserver包提供5個Server類,這些單獨使用這些Server類都只能完成同步的操做,他是一個單線程的,不能同時處理各個客戶端的請求,只能按照順序依次處理。安全
1
2
3
4
5
6
7
8
9
10
11
12
13
|
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
|
兩個Mixin類
bash
1
2
3
|
+--------------+ +----------------+
| ForkingMixIn | | ThreadingMixIn |
+--------------+ +----------------+
|
各自實現了多進程和多線程的功能(ForkingMixIn在Windows不支持)服務器
因而將這些同步類和Mixin類組合就實現了異步服務類的效果。多線程
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass框架class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
異步
基本使用
socket
因爲server須要同時處理來自多個客戶端的請求,須要提供異步的支持,因此一般使用上面的異步類建立服務器。在Windows系統中沒有提供os.fork()接口,Windows沒法使用多進程的ForkingUDPServer和ForkingTCPServer,只能使用ThreadingTCPServer或者ThreadingUDPServer;而Linux和Unix多線程和多進程版本均可以使用。tcp
服務器主要負責接受客戶端的鏈接請求,當一個新的客戶端請求到來後,將分配一個新的線程去處理這個請求(異步服務器ThreadingTCPServer),而與客戶端信息的交互則交給了專門的請求處理類(RequestHandlerClass)處理。
1
2
3
4
|
import
socketserver
# 建立一個基於TCP的server對象,並使用BaseRequestHandler處理客戶端發送的消息
server
=
socketserver.ThreadingTCPServer((
"127.0.0.1"
,
8000
), BaseRequestHandler)
server.serve_forever()
# 啓動服務器,
|
只須要上面兩行代碼就能夠建立開啓一個服務,運行上面代碼後常看本機8000端口,發現有程序正在監聽。
C:\Users\user>netstat -anp tcp | findstr 8000
TCP 127.0.0.1:8000 0.0.0.0:0 LISTENING
ThreadingTCPServer能夠對咱們的請求進行接受,可是並不會進行處理請求,處理請求的類是上面指定BaseRequestHandler類,該類能夠定義handle方法來處理接受的請求。
BaseRequestHandler的源碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class
BaseRequestHandler:
def
__init__(
self
, request, client_address, server):
self
.request
=
request
self
.client_address
=
client_address
self
.server
=
server
self
.setup()
try
:
self
.handle()
finally
:
self
.finish()
def
setup(
self
):
pass
def
handle(
self
):
pass
def
finish(
self
):
pass
|
在server = socketserver.ThreadingTCPServer(("127.0.0.1", 8000), BaseRequestHandler)中,BaseRequestHandler將做爲參數綁定到服務器的實例上,服務器啓動後,每當有一個新的客戶端接接入服務器,將會實例化一個請求處理對象,並傳入三個參數,request(鏈接客戶端的socket)、client_address(遠程客戶端的地址)、server(服務器對象),執行init方法,將這三個參數保存到對應屬性上。這個請求處理對象即可以與客戶端交互了。
簡單示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import
socketserver
import
threading
class
MyRequestHandler(socketserver.BaseRequestHandler):
""" BaseRequestHandler的實例化方法中,得到了三個屬性
self.request = request # 該線程中與客戶端交互的 socket 對象。
self.client_address # 該線程處理的客戶端地址
self.server = server # 服務器對象
"""
def
handle(
self
):
while
True
:
msg
=
self
.request.recv()
# 接受客戶端的數據
if
msg
=
=
b
"quit"
or
msg
=
=
"":
# 退出
break
print
(msg.decode())
self
.request.send(msg)
# 將消息發送回客戶端
def
finish(
self
):
self
.request.close()
# 關閉套接字
if
__name__
=
=
"__main__"
:
# 建立一個基於TCP的server對象,並使用BaseRequestHandler處理客戶端發送的消息
server
=
socketserver.ThreadingTCPServer((
"127.0.0.1"
,
8000
), MyRequestHandler)
server.serve_forever()
# 啓動服務器
|
咱們建立了一個ThreadingTCPServer服務器,而後在傳入的處理類MyRequestHandler,並在handle方法中提供與客戶端消息交互的業務邏輯,此處只是將客戶端的消息返回客戶端。最後咱們在finish方法中關閉資源,finish方法使用了finally機制,保證了這些代碼必定會執行。
上一篇使用socket實現了一個羣聊服務器,這個裏使用socketServer將更加方便的實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
class
MyRequestHandle(BaseRequestHandler):
clients
=
{}
# 在類屬性中記錄全部與客戶端鏈接socket。
lock
=
threading.Lock()
# 互斥鎖,各個線程共用
def
setup(
self
):
# 新的用戶鏈接時,預處理,將這個新的鏈接加入到clients中,考慮線程安全,須要加鎖
with
self
.lock:
self
.clients[
self
.client_address]
=
self
.request
def
handle(
self
):
# 處理客戶端的請求主邏輯
while
True
:
data
=
self
.request.recv(
1024
).strip()
# 接受數據
if
data
=
=
b
"quit"
or
data
=
=
b"":
# 客戶端退出
with
self
.lock:
self
.server.clients.pop(
self
.client_address)
self
.request.close()
break
print
(
"{}-{}: {}"
.
format
(
*
self
.client_address, data.decode()))
with
self
.lock:
for
_, c
in
self
.server.clients.items():
# 羣發
c.send(data)
def
finish(
self
):
with server.lock:
for
_, c
in
server.clients.items():
c.close()
server.server_close()
def
main():
server
=
ThreadingTCPServer((
"127.0.0.1"
,
8000
), MyRequestHandle)
# 將建立的全部線程設置爲daemon線程,這樣控臺主程序退出時,這個服務器的全部線程將會被結束
server.daemon_threads
=
True
if
__name__
=
=
"__main__"
:
main()
|
上面requestHandlerclass中的handle方法和finish方式對應了上一篇中TCP服務器的recv方法和stop方法,他們處理請求的邏輯是相同的。只是上面使用了socketserver的代碼變少了,處理的邏輯也變少了,TCPserver幫咱們完成了大量的工做,這利於軟件的快速開發。
內置的兩個RequestHandlerClass
StreamHandlerRequest
StreamHandlerRequest顧名思義是一種流式的求情處理類,對應TCP協議的面向字節流的傳輸形式。咱們從源代碼分析。(去除了一些次要代碼)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
class
StreamRequestHandler(BaseRequestHandler):
rbufsize
=
-
1
# 讀緩存
wbufsize
=
0
# 寫緩存
timeout
=
None
# 超時時間
# IP/TCP擁塞控制的Nagle算法算法。
disable_nagle_algorithm
=
False
def
setup(
self
):
# 實現了setup,
self
.connection
=
self
.request
if
self
.timeout
is
not
None
:
self
.connection.settimeout(
self
.timeout)
if
self
.disable_nagle_algorithm:
self
.connection.setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY,
True
)
# 使用 makefile方法得到了一個只讀文件對象 rfile
self
.rfile
=
self
.connection.makefile(
'rb'
,
self
.rbufsize)
# 得到一個只寫的文件對象 wfile
if
self
.wbufsize
=
=
0
:
self
.wfile
=
_SocketWriter(
self
.connection)
else
:
self
.wfile
=
self
.connection.makefile(
'wb'
,
self
.wbufsize)
def
finish(
self
):
# 負責將這個 wfile 和 rfile方法關閉。
if
not
self
.wfile.closed:
try
:
self
.wfile.flush()
except
socket.error:
pass
self
.wfile.close()
self
.rfile.close()
|
使用StreamRequestHandler方法能夠將這個socket包裝成一個類文件對象,方便咱們使用一套文件對象的方法處理這個socket,它沒有實現handle方法,我仍然須要咱們實現。咱們能夠這樣使用它
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class
MyHandle(StreamRequestHandler):
# 若是須要使用setup和finish方法,須要調用父類方法,不然該方法將會被覆蓋。
def
setup(
self
):
super
().setup()
# 添加本身的需求
def
handle(
self
):
# 這裏咱們可使用wfile和rfile來處理socket消息了,例如以前使用self.request.recv()方法等同於self.rfile.read()
# 而 self.wfile.write 等同於 self.request.send(),在handle方法中完成業務邏輯便可
def
finish(
self
):
super
().finish()
server
=
ThreadingTCPServer(
"127.0.0.1"
, MyHandle)
server.serve_forever()
|
StreamRequestHandler主要定義了兩個新的 wfile對象和rfile對象,來分別對這個socket進行讀寫操做,當咱們業務須要時,好比須要使用文件接口方法時,選擇繼承於StreamRequestHandler構建咱們本身處理請求類來完成業務邏輯將會更加的方便。
DatagramRequestHandler
DatagramRequestHandler字面意思是數據報請求處理,也就是基於UDPServer的服務器才能使用該請求處理類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class
DatagramRequestHandler(BaseRequestHandler):
def
setup(
self
):
from
io
import
BytesIO
# udp的self.request包含兩部分(data,socket)它來自於
# data, client_addr = self.socket.recvfrom(self.max_packet_size)
# return (data, self.socket), client_addr
# (data, self.socket)就是這個self.request,在這裏將其解構,data爲recvfrom接收的數據
self
.packet,
self
.socket
=
self
.request
# 該數據包封裝爲 BytesIO,一樣爲一個類文件對象。
self
.rfile
=
BytesIO(
self
.packet)
self
.wfile
=
BytesIO()
def
finish(
self
):
self
.socket.sendto(
self
.wfile.getvalue(),
self
.client_address)
|
從源碼能夠看出,DatagramRequestHandler將數據包封裝爲一個rfile,並實例化一個ByteIO對象用於寫入數據,寫入的數據能夠經過self.socket這個套接字發送。這樣可使用rfile和wfile這兩個類文件對象的read或者write接口來進行一些IO方面的操做。
本文出自https://www.jb51.net/article/188551.htm