RPC原理及其調用過程

遠程過程調用,簡稱爲RPC,是一個計算機通訊協議,它容許運行於一臺計算機的程序調用另外一臺計算機的子程序,而無需額外地爲這個交互做用編程。編程

RPC與傳統的HTTP對比服務器

優勢:網絡

  1. 傳輸效率高(二進制傳輸)多線程

  2. 發起調用的一方無需知道RPC的具體實現,如同調用本地函數般調用socket

缺點:tcp

  1. 通用性不如HTTP好(HTTP是標準協議)函數

總結:RPC適合內部服務間的通訊調用;HTTP適合面向用戶與服務間的通訊調用性能

 

RPC調用過程以下:編碼

 

  1. 調用者(客戶端Client)以本地調用的方式發起調用;
  2. Client stub(客戶端存根)收到調用後,負責將被調用的方法名、參數等打包編碼成特定格式的能進行網絡傳輸的消息體;
  3. Client stub將消息體經過網絡發送給服務端;
  4. Server stub(服務端存根)收到經過網絡接收到消息後按照相應格式進行拆包解碼,獲取方法名和參數;
  5. Server stub根據方法名和參數進行本地調用;
  6. 被調用者(Server)本地調用執行後將結果返回給server stub;
  7. Server stub將返回值打包編碼成消息,並經過網絡發送給客戶端;
  8. Client stub收到消息後,進行拆包解碼,返回給Client;
  9. Client獲得本次RPC調用的最終結果。spa

 

對於RPC調用流程的實現,拋開調用方與被調用方,其核心主要是:消息協議傳輸控制的實現 

(1). RPC消息協議:客戶端調用的參數和服務端的返回值這些在網絡上傳輸的數據以何種方式打包編碼和拆包解碼

 RPC的消息協議在設計時主要要考慮,消息轉換及傳輸的效率,爲解決消息轉換及傳輸的效率,能夠以二進制的方式傳輸消息,使用原始的二進制傳輸能夠省去中間轉換的環節並減小傳輸的數據量。

在Python中可使用struct模塊對二進制進行編碼和解碼:

struct.pact將其它類型轉換爲二進制,一般用於消息長度的轉換:

import struct # 將整數2轉換成適用網絡傳輸的無符號的4個字節整數
>>> struct.pack('!I', 2) '\x00\x00\x00\x02'

 struct.unpack將二進制類型轉換爲其它類型:

byte_data = '\x00\x00\x00\x02'
# 將2對應的二進制 轉換爲十進制整數 返回結果是個元組
>>> struct.unpack('!I',byte_data) (2,)

 

 (2). RPC傳輸控制

 對於消息數據的傳輸,主要有HTTP傳輸和TCP傳輸,鑑於TCP傳輸的可靠性,RPC的傳輸通常使用TCP做爲傳輸協議

在TCP傳輸中客戶端與服務端經過socket進行通訊,通訊流程:

 

RPC使用TCP進行傳輸控制的實現

 1 import socket
 2 
 3 class Channel(object):
 4     """
 5     與客戶端創建網絡鏈接
 6     """
 7 
 8     def __init__(self, host, port):
 9         self.host = host  # 服務器地址
10         self.port = port  # 服務器端口
11 
12     def get_connection(self):
13         """
14         獲取一個tcp鏈接
15         """
16         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
17         sock.connect((self.host, self.port))
18         return sock
19 
20 
21 class Server(object):
22     def __init__(self, host, port, handlers):
23         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
25         self.host = host
26         self.port = port
27         self.sock.bind((host, port))
28         self.handlers = handlers
29 
30     def serve(self):
31         """
32         開啓服務器運行,提供RPC服務
33         """
34         # 開啓服務監聽,等待客戶端鏈接
35         self.sock.listen(128)
36         print("開始監聽")
37         while True:
38             # 接收客戶端的鏈接請求
39             conn, addr = self.sock.accept()
40             print("創建鏈接{}".format(str(addr)))
41 
42             # 建立ServerStub對象,完成客戶端具體的RPC調用
43             stub = ServerStub(conn, self.handlers)
44             try:
45                 while True:
46                     stub.process()
47             except EOFError:
48                 # 表示客戶端關閉了鏈接
49                 print("客戶端關閉鏈接")
50             # 關閉服務端鏈接
51             conn.close()

 

經過Socket使得RPC客戶端與服務器進行通訊

 1 TCP服務端
 2 sock = socket.socket()  # 建立一個套接字
 3 sock.bind()  # 綁定端口
 4 sock.listen()  # 監聽鏈接
 5 sock.accept()  # 接受新鏈接
 6 sock.close()  # 關閉服務器套接字
 7 
 8 
 9 TCP客戶端
10 sock = socket.socket()  # 建立一個套接字
11 sock.connect()  # 鏈接遠程服務器
12 sock.recv() #
13 sock.send()  #
14 sock.sendall()  # 徹底寫
15 sock.close()  # 關閉

 

RPC服務的實現:爲了能讓RPC服務器同時處理多個客戶端的請求,提高性能,能夠採用多線程、多進程方式實現,下面分別介紹這兩種方式實現的RPC服務

多線程RPC服務:

 1 class ThreadServer(object):
 2     """
 3    多線程RPC服務器
 4     """
 5 
 6     def __init__(self, host, port, handlers):
 7         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 8         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 9         self.host = host
10         self.port = port
11         self.sock.bind((host, port))
12         self.handlers = handlers
13 
14     def serve(self):
15         """
16         開始服務
17         """
18         self.sock.listen(128)
19         print("開始監聽")
20         while True:
21             conn, addr = self.sock.accept()
22             print("創建連接{}".format(str(addr)))
23             t = threading.Thread(target=self.handle, args=(conn,))
24             t.start()
25 
26     def handle(self, client):
27         stub = ServerStub(client, self.handlers)
28         try:
29             while True:
30                 stub.process()
31         except EOFError:
32             print("客戶端關閉鏈接")
33 
34         client.close()

多進程RPC服務:

 1 class MultiProcessServer(object):
 2     """
 3    多進程服務器
 4     """
 5 
 6     def __init__(self, host, port, handlers):
 7         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 8         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 9         self.host = host
10         self.port = port
11         self.sock.bind((host, port))
12         self.handlers = handlers
13 
14     def serve(self):
15         """
16         開始服務
17         """
18         self.sock.listen(128)
19         print("開始監聽")
20         while True:
21             conn, addr = self.sock.accept()
22             print("創建連接{}".format(str(addr)))
23             t = Process(target=self.handle, args=(conn,))
24             t.start()
25 
26     def handle(self, client):
27         stub = ServerStub(client, self.handlers)
28         try:
29             while True:
30                 stub.process()
31         except EOFError:
32             print("客戶端關閉鏈接")
33 
34         client.close()
相關文章
相關標籤/搜索