上節回顧:5種IO模型 | IO多路複用 and 萬物互聯之~網絡編程增強篇php
官方文檔:https://docs.python.org/3/library/internet.htmlhtml
總結篇:【總結篇】一文搞定網絡編程python
畫一張圖來通俗化講講TCP三次握手: git
用代碼來講,大概過程就是: github
畫圖通俗講下TCP四次揮手: web
用代碼來講,大概過程就是: 數據庫
其實這個也很好的解釋了以前的端口占用問題,若是是服務端先斷開鏈接,那麼服務器就是四次揮手的發送方,
最後一次消息是得不到回覆的,端口就會保留一段時間
(服務端的端口固定)也就會出現端口占用的狀況。若是是客戶端先斷開,那下次鏈接會自動換個端口,不影響(客戶端的端口是隨機分配的)編程
PS:以前咱們講端口就是send
一個空消息,不少人不是很清楚,這邊簡單驗證下就懂了: json
以前其實已經寫了個簡版的Web服務器了,簡單回顧下流程:flask
TCP三次握手
send
一個http的請求報文,服務器接recv
以後進行相應的處理並返回對應的頁面client close
),進行了TCP四次揮手
而後簡單說下HTTP狀態碼:
baidu.com
=302=> www.baidu.com
404 not found
500 Server Error
WSGI
)¶咱們先本身定義一個動態服務器:
import re
import socket
class HTTPServer(object):
def __init__(self):
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
while True:
self.client_socket, self.client_address = tcp_server.accept()
self.handle()
def response(self, status, body=None):
print(status)
header = f"HTTP/1.1 {status}\r\n\r\n"
with self.client_socket:
self.client_socket.send(header.encode("utf-8"))
if body:
self.client_socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handle(self):
with self.client_socket:
print(f"[來自{self.client_address}的消息:]\n")
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
print(header)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.groups(1)[0]
# Python三元表達式(以前好像忘說了)
url = "/index.html" if url == "/" else url
print("請求url:", url)
body = str()
# 動態頁面
if ".py" in url:
# 提取模塊名(把開頭的/和.py排除)
body = self.__dynamic_handler(url[1:-3])
else: # 靜態服務器
body = self.__static_handler(url)
# 根據返回的body內容,返回對應的響應碼
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else: # 匹配不到url(基本上不會發生,不排除惡意修改)
self.response((404, "404 Not Found"))
if __name__ == "__main__":
import sys
# 防止 __import__ 導入模塊的時候找不到,忘了能夠查看:
# https://www.cnblogs.com/dotnetcrazy/p/9253087.html#5.本身添加模塊路徑
sys.path.insert(1, "./www/bin")
HTTPServer()
效果:
代碼不難其中有個技術點說下:模塊名爲字符串怎麼導入
?
# test.py
# 若是模塊名是字符串,須要使用__import__
s = "time"
time = __import__(s)
def application():
return time.ctime() # 返回字符串
if __name__ == "__main__":
time_str = application()
print(type(time_str))
print(time_str)
輸出:
<class 'str'> Thu Dec 20 22:48:07 2018
和上面基本同樣,多了個路由表(self.router_urls
)而已
import re
import socket
class HttpServer(object):
def __init__(self):
# 路由表
self.router_urls = {"/test": "/test.py", "/user": "/test2.py"}
def run(self):
with socket.socket() as server:
# 端口複用
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
print(f"[{self.client_address}已上線]")
self.handler()
def response(self, status, body=None):
with self.client_socket as socket:
header = f"HTTP/1.1 {status}\r\n\r\n"
socket.send(header.encode("utf-8"))
if body:
socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handler(self):
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.group(1)
print(url) # print url log
body = None
# 路由有記錄:動態頁面
if url in self.router_urls.keys():
url = self.router_urls[url]
# 切片提取模塊名
body = self.__dynamic_handler(url[1:-3])
else: # 靜態服務器
if url == "/":
url = "/index.html"
body = self.__static_handler(url)
# 沒有這個頁面或者出錯
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else:
# 404
self.response("404 Not Found")
else:
print(f"{self.client_address}已下線")
self.client_socket.close()
if __name__ == "__main__":
import sys
# 臨時添加模塊所在路徑
sys.path.insert(1, "./www/bin")
HttpServer().run()
輸出:
看一眼test2.py
:
# test2.py
def application():
return "My Name Is XiaoMing"
if __name__ == "__main__":
print(application())
官方文檔:https://docs.python.org/3/library/wsgiref.html
其實Python官方提供了一個WSGI:Web Server Gateway Interface
的約定:
它只要求Web開發者實現一個
application
函數,就能夠響應HTTP請求
eg:(只要對應的python文件提供了 application(env,start_response)
方法就好了)
# hello.py
# env 是一個字典類型
def application(env, start_response):
# 設置動態頁面的響應頭(回頭服務器會再加上本身的響應頭)
# 列表裏面的 item 是 tuple
start_response("200 OK", [("Content-Type", "text/html")])
# 返回一個列表
return ["<h1>This is Test!</h1>".encode("utf-8")]
先使用官方的簡單服務器看看:
from wsgiref.simple_server import make_server
# 導入咱們本身編寫的application函數:
from hello import application
# 建立一個服務器,端口是8080,處理函數是application:
httpd = make_server('', 8080, application)
print('Serving HTTP on port 8080...')
# 開始監聽HTTP請求:
httpd.serve_forever()
運行後效果:127.0.0.1:8080
This is Test!
若是把hello.py
改爲下面代碼(服務端不變),那麼就能夠獲取一些請求信息了:
def application(env, start_response):
print(env["PATH_INFO"])
start_response("200 OK", [("Content-Type", "text/html")])
return [f'<h1>Hello, {env["PATH_INFO"][1:] or "web"}!</h1>'.encode("utf-8")]
輸出:
上面的application()
函數就是符合WSGI
標準的一個HTTP處理函數
,它接收兩個參數:
environ
:一個包含全部HTTP請求信息的dict
對象;start_response
:一個發送HTTP響應的函數(調用服務器定義的方法)
Header
只能發送一次 ==> 只能調用一次start_response()
函數有了WSGI
,咱們關心的就是如何從env
這個dict
對象拿到HTTP請求信息
,而後構造HTML
,經過start_response()
發送Header
,最後返回Body內容
Python內置了一個WSGI
服務器,這個模塊叫wsgiref
,它是用純Python
編寫的WSGI
服務器的參考實現(徹底符合WSGI
標準,可是不考慮任何運行效率,僅供開發和測試使用)
PS:這樣的好處就是,只要符合WSGI
規範的服務器,咱們均可以直接使用了
其實經過源碼就能夠知道這個WSGIServer
究竟是何方神聖了:
class WSGIServer(HTTPServer):
pass
# HTTPServer其實就是基於TCPServer
class HTTPServer(socketserver.TCPServer):
pass
# 這個就是咱們開頭說的Python封裝的簡單WebServer了
class TCPServer(BaseServer):
pass
若是仍是記不得能夠回顧下上次說的內容,提示:
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
若是你想要在這個基礎上進行處理,能夠和上面說的同樣,定義一個繼承class WSGIRequestHandler(BaseHTTPRequestHandler)
的類,而後再處理
在本小節結束前咱們模仿一下示例,定義一個符合WSGI
規範的簡單服務器:
import re
import socket
from index import WebFrame
class WSGIServer(object):
def __init__(self):
# 請求頭
self.env = dict()
# 存放處理後的響應頭
self.response_headers = str()
def run(self):
with socket.socket() as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
self.handler()
# 轉換瀏覽器請求頭格式
def request_headers_handler(self, headers):
# 過濾一下空字符串(不能過濾空列表)
headers = list(filter(None, headers.split("\r\n")))
# 提取 Method 和 Url
ret = re.match("^([\w]+?) (/[^ ]*?) .+$", headers[0])
if ret:
self.env["method"] = ret.group(1)
url = ret.group(2)
print(url)
self.env["path"] = "/index.html" if url == "/" else url
else:
return None
# [['Host', ' localhost:8080'], ['Connection', ' keep-alive']...]
array = map(lambda item: item.split(":", 1), headers[1:])
for item in array:
self.env[item[0].lower()] = item[1]
# print(self.env)
return "ok"
# 響應客戶端(吐槽一下,request和response的headers爲毛格式不同,這設計真不合理!)
def start_response(self, status, header_list=[]):
# 響應頭
self.response_headers = f"HTTP/1.1 {status}\r\n"
for item in header_list:
self.response_headers += f"{item[0]}:{item[1]}\r\n"
# print(self.response_headers)
# 響應瀏覽器
def response(self, body):
with self.client_socket as client:
# 省略一系列服務器響應的headers
self.response_headers += "server:WSGIServer\r\n\r\n"
client.send(self.response_headers.encode("utf-8"))
if body:
client.send(body)
def handler(self):
with self.client_socket as client:
data = client.recv(2048)
if data:
# 瀏覽器請求頭
headers = data.decode("utf-8")
if self.request_headers_handler(headers):
# 模仿php全部請求都一個文件處理
body = WebFrame().application(self.env,
self.start_response)
# 響應瀏覽器
self.response(body)
else:
self.start_response("404 Not Found")
else:
client.close()
if __name__ == "__main__":
WSGIServer().run()
本身定義的框架:
class WebFrame(object):
def __init__(self):
# 路由表
self.router_urls = {"/time": "get_time", "/user": "get_name"}
def get_time(self):
import time
return time.ctime().encode("utf-8")
def get_name(self):
return "<h2>My Name Is XiaoMing</h2>".encode("utf-8")
def application(self, env, start_response):
body = b""
url = env["path"]
# 請求的頁面都映射到路由對應的方法中
if url in self.router_urls.keys():
func = self.router_urls[url]
body = getattr(self, func)()
else:
# 不然就請求對應的靜態資源
try:
with open(f"./www{url}", "rb") as fs:
body = fs.read()
except Exception as ex:
start_response("404 Not Found")
print(ex)
return b"404 Not Found" # 出錯就直接返回了
# 返回對應的頁面響應頭
start_response("200 ok", [("Content-Type", "text/html"),
("Scripts", "Python")])
return body
輸出:
知識擴展:
從wsgiref模塊導入 https://docs.python.org/3/library/wsgiref.html Python服務器網關接口 https://www.python.org/dev/peps/pep-3333/ Python原始套接字和流量嗅探 https://blog.csdn.net/cj1112/article/details/51303021 https://blog.csdn.net/peng314899581/article/details/78082244 【源碼閱讀】輕量級Web框架:bottle https://github.com/bottlepy/bottle
上篇回顧:萬物互聯之~深刻篇
其餘專欄最新篇:協程增強之~兼容答疑篇 | 聊聊數據庫~SQL環境篇
Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/
RPC
(Remote Procedure Call
):分佈式系統常見的一種通訊方法(遠程過程調用),通俗講:能夠一臺計算機的程序調用另外一臺計算機的子程序(能夠把它當作以前咱們說的進程間通訊,只不過這一次的進程不在同一臺PC上了)
PS:RPC
的設計思想是力圖使遠程調用中的通信細節對於使用者透明,調用雙方無需關心網絡通信的具體實現
引用一張網上的圖:
和HTTP
有點類似,你能夠這樣理解:
HTTP/1.0
是短連接,而RPC
是長鏈接進行通訊
HTTP/1.1
支持了長鏈接(Connection:keep-alive
),基本上和RPC
差很少了
keep-alive
通常都限制有最長時間,或者最多處理的請求數,而RPC
是基於長鏈接的,基本上沒有這個限制HTTP/2.0
創建了gRPC
,它們之間的基本上也就差很少了
HTTP-普通話
和RPC-方言
的區別了RPC
和HTTP
調用不用通過中間件,而是端到端的直接數據交互
Socket
實現的(RPC
、HTTP
都是Socket
的讀寫操做)簡單歸納一下RPC
的優缺點就是:
PS:HTTP更可能是Client
與Server
的通信;RPC
更可能是內部服務器間的通信
上面說這麼多,可能尚未來個案例實在,咱們看個案例:
本地調用sum()
:
def sum(a, b):
"""return a+b"""
return a + b
def main():
result = sum(1, 2)
print(f"1+2={result}")
if __name__ == "__main__":
main()
輸出:(這個你們都知道)
1+2=3
官方文檔:
https://docs.python.org/3/library/xmlrpc.client.html https://docs.python.org/3/library/xmlrpc.server.html
都說RPC
用起來就像本地調用同樣,那麼用起來啥樣呢?看個案例:
服務端:(CentOS7:192.168.36.123:50051
)
from xmlrpc.server import SimpleXMLRPCServer
def sum(a, b):
"""return a+b"""
return a + b
# PS:50051是gRPC默認端口
server = SimpleXMLRPCServer(('', 50051))
# 把函數註冊到RPC服務器中
server.register_function(sum)
print("Server啓動ing,Port:50051")
server.serve_forever()
客戶端:(Win10:192.168.36.144
)
from xmlrpc.client import ServerProxy
stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")
輸出:(Client
用起來是否是和本地差很少?就是經過代理訪問了下RPCServer
而已)
1+2=3
PS:CentOS
服務器不是你綁定個端口就必定能訪問的,若是不能記讓防火牆開放對應的端口
這個以前在說MariaDB
環境的時候有詳細說:http://www.javashuo.com/article/p-uxkudhsn-dw.html
# 添加 --permanent永久生效(沒有此參數重啓後失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent
zeroRPC用起來和這個差很少,也簡單舉個例子吧:
把服務的某個方法註冊到RPCServer
中,供外部服務調用
import zerorpc
class Test(object):
def say_hi(self, name):
return f"Hi,My Name is{name}"
# 註冊一個Test的實例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()
調用服務端代碼:
import zerorpc
client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)
看了上面的引入案例,是否是感受RPC
不過如此?NoNoNo,要是真這麼簡單也就談不上RPC架構
了,上面兩個是最簡單的RPC服務了,能夠這麼說:生產環境基本上用不到,只能當案例練習罷了,對Python來講,最經常使用的RPC就兩個gRPC
and Thrift
PS:國產最出名的是Dubbo
and Tars
,Net最經常使用的是gRPC
、Thrift
、Surging
要本身實現一個RPC Server
那麼就得了解整個流程了:
Client
(調用者)以本地調用的方式發起調用RPC
服務進行遠程過程調用(RPC的目標就是要把這些步驟都封裝起來,讓使用者感受不到這個過程)
RPC Proxy
組件收到調用後,負責將被調用的方法名、參數
等打包編碼成自定義的協議RPC Proxy
組件在打包完成後經過網絡把數據包發送給RPC Server
RPC Proxy
組件把經過網絡接收到的數據包按照相應格式進行拆包解碼
,獲取方法名和參數RPC Proxy
組件根據方法名和參數進行本地調用RPC Server
(被調用者)本地執行後將結果返回給服務端的RPC Proxy
RPC Proxy
組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy
組件RPC Proxy
組件收到數據包後,進行拆包解碼,把數據返回給Client
Client
(調用者)獲得本次RPC
調用的返回結果用一張時序圖來描述下整個過程:
PS:RPC Proxy
有時候也叫Stub
(存根):(Client Stub,Server Stub)
爲屏蔽客戶調用遠程主機上的對象,必須提供某種方式來模擬本地對象,這種本地對象稱爲存根(stub),存根負責接收本地方法調用,並將它們委派給各自的具體實現對象
PRC服務實現的過程當中其實就兩核心點:
Protocol Buffers
TCP/UDP/HTTP
)下面咱們就根據上面的流程來手寫一個簡單的RPC:
1.Client調用:
# client.py
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
輸出:
1+2=3 1.1+2=3.1 Wed Jan 16 22
2.Client Stub,客戶端存根:(主要有打包
、解包
、和RPC服務器通訊
的方法)
# client_stub.py
import socket
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def convert(self, obj):
"""根據類型轉換成對應的類型編號"""
if isinstance(obj, int):
return 1
if isinstance(obj, float):
return 2
if isinstance(obj, str):
return 3
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議
格式:func:函數名@params:類型-參數,類型2-參數2...
"""
result = f"func:{func}"
if args:
params = ""
# params:類型-參數,類型2-參數2...
for item in args:
params += f"{self.convert(item)}-{item},"
# 去除最後一個,
result += f"@params:{params[:-1]}"
# print(result) # log 輸出
return result.encode("utf-8")
def unpack(self, data):
"""解包:獲取返回結果"""
msg = data.decode("utf-8")
# 格式應該是"data:xxxx"
params = msg.split(":")
if len(params) > 1:
return params[1]
return None
def get(self, func, args=None):
"""1.客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
data = self.pack(func, args)
# 2.客戶端的RPC Proxy組件在打包完成後經過網絡把數據包發送給RPC Server
self.socket.send(data)
# 等待服務端返回結果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
簡要說明下:(我根據流程在Code裏面標註了,看起來應該很輕鬆)
以前有說到核心其實就是消息協議
and傳輸控制
,我客戶端存根
的消息協議是自定義的格式(後面會說簡化方案):func:函數名@params:類型-參數,類型2-參數2...
,傳輸我是基於TCP進行了簡單的封裝
3.Server端:(實現很簡單)
# server.py
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服務端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口複用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 綁定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客戶端鏈接
client_socket, client_addr = self.socket.accept()
print(f"來自{client_addr}的請求:\n")
try:
# 交給服務端存根(Server Proxy)處理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server啓動ing,Port:50051")
server.run()
爲了簡潔,服務端代碼我單獨放在了server_code.py
中:
# 5.RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
4.而後再看看重頭戲Server Stub
:
# server_stub.py
import socket
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def convert(self, num, obj):
"""根據類型編號轉換類型"""
if num == "1":
obj = int(obj)
if num == "2":
obj = float(obj)
if num == "3":
obj = str(obj)
return obj
def unpack(self, data):
"""3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
msg = data.decode("utf-8")
# 格式應該是"格式:func:函數名@params:類型編號-參數,類型編號2-參數2..."
array = msg.split("@")
func = array[0].split(":")[1]
if len(array) > 1:
args = list()
for item in array[1].split(":")[1].split(","):
temps = item.split("-")
# 類型轉換
args.append(self.convert(temps[0], temps[1]))
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
# 格式:"data:返回值"
return f"data:{result}".encode("utf-8")
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 若是沒有這個方法則返回None
func = getattr(self.mycode, func, None)
if args:
return func(*args) # 解包
else:
return func() # 無參函數
def handle(self, client_socket, client_addr):
while True:
# 獲取客戶端發送的數據包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 執行無參函數
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 執行帶參函數
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy組件
data = self.pack(data) # 把函數執行結果按指定協議打包
# 把處理過的數據發送給客戶端
client_socket.send(data)
else:
print(f"客戶端:{client_addr}已斷開\n")
break
再簡要說明一下:裏面方法其實主要就是解包
、執行函數
、返回值打包
輸出圖示:
再貼一下上面的時序圖:
課外拓展:
HTTP1.0、HTTP1.1 和 HTTP2.0 的區別 https://www.cnblogs.com/heluan/p/8620312.html 簡述分佈式RPC框架 https://blog.csdn.net/jamebing/article/details/79610994 分佈式基礎—RPC http://www.dataguru.cn/article-14244-1.html
下節預估:RPC服務進一步簡化與演變、手寫一個簡單的REST接口
上篇回顧:萬物互聯之~RPC專欄 http://www.javashuo.com/article/p-tupqpuii-c.html
以前有網友問,不少開源的RPC中都是使用路由表,這個怎麼實現?
其實路由表實現起來也簡單,代碼基本上不變化,就修改一下server_stub.py
的__init__
和exe
兩個方法就能夠了:
class ServerStub(object):
def __init__(self, mycode):
self.func_dict = dict()
# 初始化一個方法名和方法的字典({func_name:func})
for item in mycode.__dir__():
if not item.startswith("_"):
self.func_dict[item] = getattr(mycode, item)
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 若是沒有這個方法則返回None
# func = getattr(self.mycode, func, None)
func = self.func_dict[func]
if args:
return func(*args) # 解包
else:
return func() # 無參函數
Python比較6的同志對上節課的Code確定嗤之以鼻,上次自定義協議是同的通用方法,這節課咱們先來簡化下代碼:
再貼一下上節課的時序圖:
官方文檔:https://docs.python.org/3/library/json.html
# 把字典對象轉換爲Json字符串
json_str = json.dumps({"func": func, "args": args})
# 把Json字符串從新變成字典對象
data = json.loads(data)
func, args = data["func"], data["args"]
須要注意的就是類型轉換了(eg:python tuple
==> json array
)
Python | JSON |
---|---|
dict | object |
list, tuple | array |
str | string |
int, float | number |
True | true |
False | false |
None | null |
PS:序列化:json.dumps(obj)
,反序列化:json.loads(json_str)
在原有基礎上只須要修改下Stub
的pack
和unpack
方法便可
Client_Stub(類型轉換都省掉了)
import json
import socket
class ClientStub(object):
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議
格式:{"func": "sum", "args": [1, 2]}
"""
json_str = json.dumps({"func": func, "args": args})
# print(json_str) # log 輸出
return json_str.encode("utf-8")
def unpack(self, data):
"""解包:獲取返回結果"""
data = data.decode("utf-8")
# 格式應該是"{data:xxxx}"
data = json.loads(data)
# 獲取不到就返回None
return data.get("data", None)
# 其餘Code我沒有改變
Server Stub()
import json
import socket
class ServerStub(object):
def unpack(self, data):
"""3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
data = data.decode("utf-8")
# 格式應該是"格式:{"func": "sum", "args": [1, 2]}"
data = json.loads(data)
func, args = data["func"], data["args"]
if args:
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
# 格式:"data:返回值"
json_str = json.dumps({"data": result})
return json_str.encode("utf-8")
# 其餘Code我沒有改變
輸出圖示:
RPC其實更多的是二進制的序列化方式,這邊簡單介紹下
官方文檔:https://docs.python.org/3/library/pickle.html
用法和Json
相似,PS:序列化:pickle.dumps(obj)
,反序列化:pickle.loads(buffer)
和Json案例相似,也只是改了pack
和unpack
,我這邊就貼一下完整代碼(防止被吐槽)
1.Client
# 和上一節同樣
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
2.ClientStub
import socket
import pickle
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議"""
return pickle.dumps((func, args))
def unpack(self, data):
"""解包:獲取返回結果"""
return pickle.loads(data)
def get(self, func, args=None):
"""1.客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
data = self.pack(func, args)
# 2.客戶端的RPC Proxy組件在打包完成後經過網絡把數據包發送給RPC Server
self.socket.send(data)
# 等待服務端返回結果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
3.Server
# 和上一節同樣
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服務端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口複用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 綁定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客戶端鏈接
client_socket, client_addr = self.socket.accept()
print(f"來自{client_addr}的請求:\n")
try:
# 交給服務端存根(Server Proxy)處理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server啓動ing,Port:50051")
server.run()
4.ServerCode
# 和上一節同樣
# 5.RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
5.ServerStub
import socket
import pickle
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def unpack(self, data):
"""3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
func, args = pickle.loads(data)
if args:
return (func, args) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
return pickle.dumps(result)
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 若是沒有這個方法則返回None
func = getattr(self.mycode, func)
if args:
return func(*args) # 解包
else:
return func() # 無參函數
def handle(self, client_socket, client_addr):
while True:
# 獲取客戶端發送的數據包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 執行無參函數
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 執行帶參函數
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy組件
data = self.pack(data) # 把函數執行結果按指定協議打包
# 把處理過的數據發送給客戶端
client_socket.send(data)
else:
print(f"客戶端:{client_addr}已斷開\n")
break
輸出圖示:
而後關於RPC高級的內容(會涉及到註冊中心
),我們後面說架構的時候繼續,網絡這邊就說到這
RESTful只是接口協議規範,它是創建在http基礎上的,咱們在網絡增強篇的末尾簡單帶一下,後面講爬蟲應該會再給你們說的
在編寫REST接口時,通常都是爲HTTP服務的。爲了實現一個簡單的REST接口,你只需讓代碼知足Python的WSGI
標準便可
這邊我就不本身實現了(上面手寫服務器的時候其實已經展現了Restful接口是啥樣),用Flask
快速過一遍:
看個引入案例:
import flask
app = flask.Flask(__name__)
@app.route("/")
def index():
return "This is Restful API Test"
if __name__ == "__main__":
app.run()
圖示輸出:
Server Log:
* Serving Flask app "1.test" (lazy loading) * Environment: production WARNING: Do not use the development server in a production environment. Use a production WSGI server instead. * Debug mode: off * Running on http://127.0.0.1:8080/ (Press CTRL+C to quit) 127.0.0.1 - - [17/Jan/2019 17:24:02] "GET / HTTP/1.1" 200 -
舉個查詢服務器節點
信息的例子:/api/servers/
import flask
from infos import info_list
app = flask.Flask(__name__)
# Json的404自定義處理(不加自定義處理會返回默認404頁面)
@app.errorhandler(404)
def not_found(error):
return flask.make_response(
flask.jsonify({
"data": "Not Found",
"status": 404
}), 404)
# 運行Get和Post請求
@app.route("/api/v1.0/servers/<name>", methods=["GET", "POST"])
def get_info(name):
infos = list(filter(lambda item: item["name"] == name, info_list))
if len(infos) == 0:
flask.abort(404) # 404
# 基於json.dumps的封裝版
return flask.jsonify({"infos": infos}) # 返回Json字符串
if __name__ == "__main__":
app.run(port=8080)
圖示輸出:(不深刻說,後面爬蟲會再提的)
課後拓展:
RESTful API 設計指南 http://www.ruanyifeng.com/blog/2014/05/restful_api.html RESTful API 最佳實踐 http://www.ruanyifeng.com/blog/2018/10/restful-api-best-practices.html 異步 API 的設計 http://www.ruanyifeng.com/blog/2018/12/async-api-design.html 使用python的Flask實現一個RESTful API服務器端[翻譯] https://www.cnblogs.com/vovlie/p/4178077.html