上篇回顧:萬物互聯之~深刻篇html
Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/python
其餘專欄最新篇:協程增強之~兼容答疑篇 | 聊聊數據庫~SQL環境篇git
RPC
(Remote Procedure Call
):分佈式系統常見的一種通訊方法(遠程過程調用),通俗講:能夠一臺計算機的程序調用另外一臺計算機的子程序(能夠把它當作以前咱們說的進程間通訊,只不過這一次的進程不在同一臺PC上了)github
PS:RPC
的設計思想是力圖使遠程調用中的通信細節對於使用者透明,調用雙方無需關心網絡通信的具體實現shell
引用一張網上的圖:
數據庫
和HTTP
有點類似,你能夠這樣理解:json
HTTP/1.0
是短連接,而RPC
是長鏈接進行通訊
HTTP/1.1
支持了長鏈接(Connection:keep-alive
),基本上和RPC
差很少了
keep-alive
通常都限制有最長時間,或者最多處理的請求數,而RPC
是基於長鏈接的,基本上沒有這個限制HTTP/2.0
創建了gRPC
,它們之間的基本上也就差很少了
HTTP-普通話
和RPC-方言
的區別了RPC
和HTTP
調用不用通過中間件,而是端到端的直接數據交互
Socket
實現的(RPC
、HTTP
都是Socket
的讀寫操做)簡單歸納一下RPC
的優缺點就是:服務器
PS:HTTP更可能是Client
與Server
的通信;RPC
更可能是內部服務器間的通信網絡
上面說這麼多,可能尚未來個案例實在,咱們看個案例:架構
本地調用sum()
:
def sum(a, b): """return a+b""" return a + b def main(): result = sum(1, 2) print(f"1+2={result}") if __name__ == "__main__": main()
輸出:(這個你們都知道)
1+2=3
官方文檔:
https://docs.python.org/3/library/xmlrpc.client.html https://docs.python.org/3/library/xmlrpc.server.html
都說RPC
用起來就像本地調用同樣,那麼用起來啥樣呢?看個案例:
服務端:(CentOS7:192.168.36.123:50051
)
from xmlrpc.server import SimpleXMLRPCServer def sum(a, b): """return a+b""" return a + b # PS:50051是gRPC默認端口 server = SimpleXMLRPCServer(('', 50051)) # 把函數註冊到RPC服務器中 server.register_function(sum) print("Server啓動ing,Port:50051") server.serve_forever()
客戶端:(Win10:192.168.36.144
)
from xmlrpc.client import ServerProxy stub = ServerProxy("http://192.168.36.123:50051") result = stub.sum(1, 2) print(f"1+2={result}")
輸出:(Client
用起來是否是和本地差很少?就是經過代理訪問了下RPCServer
而已)
1+2=3
PS:CentOS
服務器不是你綁定個端口就必定能訪問的,若是不能記讓防火牆開放對應的端口
這個以前在說MariaDB
環境的時候有詳細說:http://www.javashuo.com/article/p-uxkudhsn-dw.html
# 添加 --permanent永久生效(沒有此參數重啓後失效) firewall-cmd --zone=public --add-port=80/tcp --permanent
zeroRPC用起來和這個差很少,也簡單舉個例子吧:
把服務的某個方法註冊到RPCServer
中,供外部服務調用
import zerorpc class Test(object): def say_hi(self, name): return f"Hi,My Name is{name}" # 註冊一個Test的實例 server = zerorpc.Server(Test()) server.bind("tcp://0.0.0.0:50051") server.run()
調用服務端代碼:
import zerorpc client = zerorpc.Client("tcp://192.168.36.123:50051") result = client.say_hi("RPC") print(result)
看了上面的引入案例,是否是感受RPC
不過如此?NoNoNo,要是真這麼簡單也就談不上RPC架構
了,上面兩個是最簡單的RPC服務了,能夠這麼說:生產環境基本上用不到,只能當案例練習罷了,對Python來講,最經常使用的RPC就兩個gRPC
and Thrift
PS:國產最出名的是Dubbo
and Tars
,Net最經常使用的是gRPC
、Thrift
、Surging
要本身實現一個RPC Server
那麼就得了解整個流程了:
Client
(調用者)以本地調用的方式發起調用RPC
服務進行遠程過程調用(RPC的目標就是要把這些步驟都封裝起來,讓使用者感受不到這個過程)
RPC Proxy
組件收到調用後,負責將被調用的方法名、參數
等打包編碼成自定義的協議RPC Proxy
組件在打包完成後經過網絡把數據包發送給RPC Server
RPC Proxy
組件把經過網絡接收到的數據包按照相應格式進行拆包解碼
,獲取方法名和參數RPC Proxy
組件根據方法名和參數進行本地調用RPC Server
(被調用者)本地執行後將結果返回給服務端的RPC Proxy
RPC Proxy
組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy
組件RPC Proxy
組件收到數據包後,進行拆包解碼,把數據返回給Client
Client
(調用者)獲得本次RPC
調用的返回結果用一張時序圖來描述下整個過程:
PS:RPC Proxy
有時候也叫Stub
(存根):(Client Stub,Server Stub)
爲屏蔽客戶調用遠程主機上的對象,必須提供某種方式來模擬本地對象,這種本地對象稱爲存根(stub),存根負責接收本地方法調用,並將它們委派給各自的具體實現對象
PRC服務實現的過程當中其實就兩核心點:
Protocol Buffers
TCP/UDP/HTTP
)下面咱們就根據上面的流程來手寫一個簡單的RPC:
1.Client調用:
# client.py from client_stub import ClientStub def main(): stub = ClientStub(("192.168.36.144", 50051)) result = stub.get("sum", (1, 2)) print(f"1+2={result}") result = stub.get("sum", (1.1, 2)) print(f"1.1+2={result}") time_str = stub.get("get_time") print(time_str) if __name__ == "__main__": main()
輸出:
1+2=3 1.1+2=3.1 Wed Jan 16 22
2.Client Stub,客戶端存根:(主要有打包
、解包
、和RPC服務器通訊
的方法)
# client_stub.py import socket class ClientStub(object): def __init__(self, address): """address ==> (ip,port)""" self.socket = socket.socket() self.socket.connect(address) def convert(self, obj): """根據類型轉換成對應的類型編號""" if isinstance(obj, int): return 1 if isinstance(obj, float): return 2 if isinstance(obj, str): return 3 def pack(self, func, args): """打包:把方法和參數拼接成自定義的協議 格式:func:函數名@params:類型-參數,類型2-參數2... """ result = f"func:{func}" if args: params = "" # params:類型-參數,類型2-參數2... for item in args: params += f"{self.convert(item)}-{item}," # 去除最後一個, result += f"@params:{params[:-1]}" # print(result) # log 輸出 return result.encode("utf-8") def unpack(self, data): """解包:獲取返回結果""" msg = data.decode("utf-8") # 格式應該是"data:xxxx" params = msg.split(":") if len(params) > 1: return params[1] return None def get(self, func, args=None): """1.客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議""" data = self.pack(func, args) # 2.客戶端的RPC Proxy組件在打包完成後經過網絡把數據包發送給RPC Server self.socket.send(data) # 等待服務端返回結果 data = self.socket.recv(2048) if data: return self.unpack(data) return None
簡要說明下:(我根據流程在Code裏面標註了,看起來應該很輕鬆)
以前有說到核心其實就是消息協議
and傳輸控制
,我客戶端存根
的消息協議是自定義的格式(後面會說簡化方案):func:函數名@params:類型-參數,類型2-參數2...
,傳輸我是基於TCP進行了簡單的封裝
3.Server端:(實現很簡單)
# server.py import socket from server_stub import ServerStub class RPCServer(object): def __init__(self, address, mycode): self.mycode = mycode # 服務端存根(RPC Proxy) self.server_stub = ServerStub(mycode) # TCP Socket self.socket = socket.socket() # 端口複用 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 綁定端口 self.socket.bind(address) def run(self): self.socket.listen() while True: # 等待客戶端鏈接 client_socket, client_addr = self.socket.accept() print(f"來自{client_addr}的請求:\n") # 交給服務端存根(Server Proxy)處理 self.server_stub.handle(client_socket, client_addr) if __name__ == "__main__": from server_code import MyCode server = RPCServer(('', 50051), MyCode()) print("Server啓動ing,Port:50051") server.run()
爲了簡潔,服務端代碼我單獨放在了server_code.py
中:
# 5.RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy class MyCode(object): def sum(self, a, b): return a + b def get_time(self): import time return time.ctime()
4.而後再看看重頭戲Server Stub
:
# server_stub.py import socket class ServerStub(object): def __init__(self, mycode): self.mycode = mycode def convert(self, num, obj): """根據類型編號轉換類型""" if num == "1": obj = int(obj) if num == "2": obj = float(obj) if num == "3": obj = str(obj) return obj def unpack(self, data): """3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數""" msg = data.decode("utf-8") # 格式應該是"格式:func:函數名@params:類型編號-參數,類型編號2-參數2..." array = msg.split("@") func = array[0].split(":")[1] if len(array) > 1: args = list() for item in array[1].split(":")[1].split(","): temps = item.split("-") # 類型轉換 args.append(self.convert(temps[0], temps[1])) return (func, tuple(args)) # (func,args) return (func, ) def pack(self, result): """打包:把方法和參數拼接成自定義的協議""" # 格式:"data:返回值" return f"data:{result}".encode("utf-8") def exec(self, func, args=None): """4.服務端的RPC Proxy組件根據方法名和參數進行本地調用""" # 若是沒有這個方法則返回None func = getattr(self.mycode, func, None) if args: return func(*args) # 解包 else: return func() # 無參函數 def handle(self, client_socket, client_addr): while True: # 獲取客戶端發送的數據包 data = client_socket.recv(2048) if data: try: data = self.unpack(data) # 解包 if len(data) == 1: data = self.exec(data[0]) # 執行無參函數 elif len(data) > 1: data = self.exec(data[0], data[1]) # 執行帶參函數 else: data = "RPC Server Error Code:500" except Exception as ex: data = "RPC Server Function Error" print(ex) # 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy組件 data = self.pack(data) # 把函數執行結果按指定協議打包 # 把處理過的數據發送給客戶端 client_socket.send(data) else: print(f"客戶端:{client_addr}已斷開\n") break
再簡要說明一下:裏面方法其實主要就是解包
、執行函數
、返回值打包
輸出圖示:
再貼一下上面的時序圖:
課外拓展:
HTTP1.0、HTTP1.1 和 HTTP2.0 的區別 https://www.cnblogs.com/heluan/p/8620312.html 簡述分佈式RPC框架 https://blog.csdn.net/jamebing/article/details/79610994 分佈式基礎—RPC http://www.dataguru.cn/article-14244-1.html
上篇回顧:萬物互聯之~RPC專欄 https://www.cnblogs.com/dunitian/p/10279946.html
以前有網友問,不少開源的RPC中都是使用路由表,這個怎麼實現?
其實路由表實現起來也簡單,代碼基本上不變化,就修改一下server_stub.py
的__init__
和exe
兩個方法就能夠了:
class ServerStub(object): def __init__(self, mycode): self.func_dict = dict() # 初始化一個方法名和方法的字典({func_name:func}) for item in mycode.__dir__(): if not item.startswith("_"): self.func_dict[item] = getattr(mycode, item) def exec(self, func, args=None): """4.服務端的RPC Proxy組件根據方法名和參數進行本地調用""" # 若是沒有這個方法則返回None # func = getattr(self.mycode, func, None) func = self.func_dict[func] if args: return func(*args) # 解包 else: return func() # 無參函數
Python比較6的同志對上節課的Code確定嗤之以鼻,上次自定義協議是同的通用方法,這節課咱們先來簡化下代碼:
再貼一下上節課的時序圖:
官方文檔:https://docs.python.org/3/library/json.html
# 把字典對象轉換爲Json字符串 json_str = json.dumps({"func": func, "args": args}) # 把Json字符串從新變成字典對象 data = json.loads(data) func, args = data["func"], data["args"]
須要注意的就是類型轉換了(eg:python tuple
==> json array
)
Python | JSON |
---|---|
dict | object |
list, tuple | array |
str | string |
int, float | number |
True | true |
False | false |
None | null |
PS:序列化:json.dumps(obj)
,反序列化:json.loads(json_str)
在原有基礎上只須要修改下Stub
的pack
和unpack
方法便可
Client_Stub(類型轉換都省掉了)
import json import socket class ClientStub(object): def pack(self, func, args): """打包:把方法和參數拼接成自定義的協議 格式:{"func": "sum", "args": [1, 2]} """ json_str = json.dumps({"func": func, "args": args}) # print(json_str) # log 輸出 return json_str.encode("utf-8") def unpack(self, data): """解包:獲取返回結果""" data = data.decode("utf-8") # 格式應該是"{data:xxxx}" data = json.loads(data) # 獲取不到就返回None return data.get("data", None) # 其餘Code我沒有改變
Server Stub()
import json import socket class ServerStub(object): def unpack(self, data): """3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數""" data = data.decode("utf-8") # 格式應該是"格式:{"func": "sum", "args": [1, 2]}" data = json.loads(data) func, args = data["func"], data["args"] if args: return (func, tuple(args)) # (func,args) return (func, ) def pack(self, result): """打包:把方法和參數拼接成自定義的協議""" # 格式:"data:返回值" json_str = json.dumps({"data": result}) return json_str.encode("utf-8") # 其餘Code我沒有改變
輸出圖示:
RPC其實更多的是二進制的序列化方式,這邊簡單介紹下
官方文檔:https://docs.python.org/3/library/pickle.html
用法和Json
相似,PS:序列化:pickle.dumps(obj)
,反序列化:pickle.loads(buffer)
和Json案例相似,也只是改了pack
和unpack
,我這邊就貼一下完整代碼(防止被吐槽)
1.Client
# 和上一節同樣 from client_stub import ClientStub def main(): stub = ClientStub(("192.168.36.144", 50051)) result = stub.get("sum", (1, 2)) print(f"1+2={result}") result = stub.get("sum", (1.1, 2)) print(f"1.1+2={result}") time_str = stub.get("get_time") print(time_str) if __name__ == "__main__": main()
2.ClientStub
import socket import pickle class ClientStub(object): def __init__(self, address): """address ==> (ip,port)""" self.socket = socket.socket() self.socket.connect(address) def pack(self, func, args): """打包:把方法和參數拼接成自定義的協議""" return pickle.dumps((func, args)) def unpack(self, data): """解包:獲取返回結果""" return pickle.loads(data) def get(self, func, args=None): """1.客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議""" data = self.pack(func, args) # 2.客戶端的RPC Proxy組件在打包完成後經過網絡把數據包發送給RPC Server self.socket.send(data) # 等待服務端返回結果 data = self.socket.recv(2048) if data: return self.unpack(data) return None
3.Server
# 和上一節同樣 import socket from server_stub import ServerStub class RPCServer(object): def __init__(self, address, mycode): self.mycode = mycode # 服務端存根(RPC Proxy) self.server_stub = ServerStub(mycode) # TCP Socket self.socket = socket.socket() # 端口複用 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 綁定端口 self.socket.bind(address) def run(self): self.socket.listen() while True: # 等待客戶端鏈接 client_socket, client_addr = self.socket.accept() print(f"來自{client_addr}的請求:\n") try: # 交給服務端存根(Server Proxy)處理 self.server_stub.handle(client_socket, client_addr) except Exception as ex: print(ex) if __name__ == "__main__": from server_code import MyCode server = RPCServer(('', 50051), MyCode()) print("Server啓動ing,Port:50051") server.run()
4.ServerCode
# 和上一節同樣 # 5.RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy class MyCode(object): def sum(self, a, b): return a + b def get_time(self): import time return time.ctime()
5.ServerStub
import socket import pickle class ServerStub(object): def __init__(self, mycode): self.mycode = mycode def unpack(self, data): """3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數""" func, args = pickle.loads(data) if args: return (func, args) # (func,args) return (func, ) def pack(self, result): """打包:把方法和參數拼接成自定義的協議""" return pickle.dumps(result) def exec(self, func, args=None): """4.服務端的RPC Proxy組件根據方法名和參數進行本地調用""" # 若是沒有這個方法則返回None func = getattr(self.mycode, func) if args: return func(*args) # 解包 else: return func() # 無參函數 def handle(self, client_socket, client_addr): while True: # 獲取客戶端發送的數據包 data = client_socket.recv(2048) if data: try: data = self.unpack(data) # 解包 if len(data) == 1: data = self.exec(data[0]) # 執行無參函數 elif len(data) > 1: data = self.exec(data[0], data[1]) # 執行帶參函數 else: data = "RPC Server Error Code:500" except Exception as ex: data = "RPC Server Function Error" print(ex) # 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy組件 data = self.pack(data) # 把函數執行結果按指定協議打包 # 把處理過的數據發送給客戶端 client_socket.send(data) else: print(f"客戶端:{client_addr}已斷開\n") break
輸出圖示:
而後關於RPC高級的內容(會涉及到註冊中心
),我們後面說架構的時候繼續,網絡這邊就說到這