上一篇關於 WSGI 的硬核長文,不知道有多少同窗,可以從頭看到尾的,無論大家有沒有看得很過癮,反正我是寫得很爽,總有一種將同樣知識吃透了的錯覺。python
今天我又給本身挖坑了,打算將 rpc 遠程調用的知識,好好地梳理一下,花了週末整整兩天的時間。git
什麼是RPC呢?github
百度百科給出的解釋是這樣的:「RPC(Remote Procedure Call Protocol)——遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議」。這個概念聽起來仍是比較抽象,不要緊,繼續日後看,後面概念性的東西,我會講得足夠清楚,讓你徹底掌握 RPC 的基礎內容。在後面的篇章中還會結合其在 OpenStack 中實際應用,一步一步揭開 rpc 的神祕面紗。web
有的讀者,可能會問,爲啥我舉的例子總是 OpenStack 裏的東西呢?正則表達式
由於每一個人的業務中接觸的框架都不同(我主要接觸的就是 OpenStack 框架),我沒法爲每一個人去定製寫一篇文章,但其技術原理都是同樣的。即便如此,我也會盡力將文章寫得通用,不會由於你沒接觸過 OpenStack 而成爲你理解 rpc 的瓶頸。算法
在 OpenStack 裏的進程間通訊方式主要有兩種,一種是基於HTTP協議的RESTFul API方式,另外一種則是RPC調用。shell
那麼這兩種方式在應用場景上有何區別呢?json
有使用經驗的人,就會知道:服務器
關於OpenStack中基於RESTful API的通訊方式主要是應用了WSGI,這個知識點,我在前一篇文章中,有深刻地講解過,你能夠點擊查看。網絡
對於不熟悉 OpenStack 的人,也別擔憂聽不懂,這樣吧,我給你提兩個問題:
第一個問題:RPC 和 REST 區別是什麼?
你必定會以爲這個問題很奇怪,是的,包括我,可是你在網絡上一搜,會發現相似對比的文章比比皆是,我在想可能不少初學者因爲基礎不牢固,纔會將不相干的兩者拿出來對比吧。既然是這樣,那爲了讓你更加了解陌生的RPC,就從你熟悉得不能再熟悉的 REST 入手吧。
0一、所屬類別不一樣
REST,是Representational State Transfer 的簡寫,中文描述表述性狀態傳遞(是指某個瞬間狀態的資源數據的快照,包括資源數據的內容、表述格式(XML、JSON)等信息。)
REST 是一種軟件架構風格。 這種風格的典型應用,就是HTTP。其由於簡單、擴展性強的特色而廣受開發者的青睞。
而RPC 呢,是 Remote Procedure Call Protocol 的簡寫,中文描述是遠程過程調用,它能夠實現客戶端像調用本地服務(方法)同樣調用服務器的服務(方法)。
RPC 是一種基於 TCP 的通訊協議,按理說它和REST不是一個層面上的東西,不該該放在一塊兒討論,可是誰讓REST這麼流行呢,它是目前最流行的一套互聯網應用程序的API設計標準,某種意義下,咱們說 REST 能夠其實就是指代 HTTP 協議。
0二、使用方式不一樣
從使用上來看,HTTP 接口只關注服務提供方,對於客戶端怎麼調用並不關心。接口只要保證有客戶端調用時,返回對應的數據就好了。而RPC則要求客戶端接口保持和服務端的一致。
0三、面向對象不一樣
從設計上來看,RPC,所謂的遠程過程調用 ,是面向方法的 ,REST:所謂的 Representational state transfer ,是面向資源的,除此以外,還有一種叫作 SOA,所謂的面向服務的架構,它是面向消息的,這個接觸很少,就很少說了。
0四、序列化協議不一樣
接口調用一般包含兩個部分,序列化和通訊協議。
通訊協議,上面已經說起了,REST 是 基於 HTTP 協議,而 RPC 能夠基於 TCP/UDP,也能夠基於 HTTP 協議進行傳輸的。
常見的序列化協議,有:json、xml、hession、protobuf、thrift、text、bytes等,REST 一般使用的是 JSON或者XML,而 RPC 使用的是 JSON-RPC,或者 XML-RPC。
經過以上幾點,咱們知道了 REST 和 RPC 之間有很明顯的差別。
第二個問題:爲何要採用RPC呢?
那到底爲什麼要使用 RPC,單純的依靠RESTful API不能夠嗎?爲何要搞這麼多複雜的協議,渣渣表示真的學不過來了。
關於這一點,如下幾點僅是個人我的猜測,僅供交流哈:
說了這麼多,咱們該如何選擇這二者呢?我總結了以下兩點,供你參考:
「遠程調用」意思就是:被調用方法的具體實現不在程序運行本地,而是在別的某個地方(分佈到各個服務器),調用者只想要函數運算的結果,卻不須要實現函數的具體細節。
0一、基於 xml-rpc
Python實現 rpc,可使用標準庫裏的 SimpleXMLRPCServer,它是基於XML-RPC 協議的。
有了這個模塊,開啓一個 rpc server,就變得至關簡單了。執行如下代碼:
import SimpleXMLRPCServer
class calculate:
def add(self, x, y):
return x + y
def multiply(self, x, y):
return x * y
def subtract(self, x, y):
return abs(x-y)
def divide(self, x, y):
return x/y
obj = calculate()
server = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", 8088))
# 將實例註冊給rpc server
server.register_instance(obj)
print "Listening on port 8088"
server.serve_forever()
複製代碼
有了 rpc server,接下來就是 rpc client,因爲咱們上面使用的是 XML-RPC,因此 rpc clinet 須要使用xmlrpclib 這個庫。
import xmlrpclib
server = xmlrpclib.ServerProxy("http://localhost:8088")
複製代碼
而後,咱們經過 server_proxy 對象就能夠遠程調用以前的rpc server的函數了。
>> server.add(2, 3)
5
>>> server.multiply(2, 3)
6
>>> server.subtract(2, 3)
1
>>> server.divide(2, 3)
0
複製代碼
SimpleXMLRPCServer是一個單線程的服務器。這意味着,若是幾個客戶端同時發出多個請求,其它的請求就必須等待第一個請求完成之後才能繼續。
若非要使用 SimpleXMLRPCServer 實現多線程併發,其實也不難。只要將代碼改爲以下便可。
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SocketServer import ThreadingMixIn
class ThreadXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):pass
class MyObject:
def hello(self):
return "hello xmlprc"
obj = MyObject()
server = ThreadXMLRPCServer(("localhost", 8088), allow_none=True)
server.register_instance(obj)
print "Listening on port 8088"
server.serve_forever()
複製代碼
0二、基於json-rpc
SimpleXMLRPCServer 是基於 xml-rpc 實現的遠程調用,上面咱們也提到 除了 xml-rpc 以外,還有 json-rpc 協議。
那 python 如何實現基於 json-rpc 協議呢?
答案是不少,不少web框架其自身都本身實現了json-rpc,但咱們要獨立這些框架以外,要尋求一種較爲乾淨的解決方案,我查找到的選擇有兩種
第一種是 jsonrpclib
pip install jsonrpclib -i https://pypi.douban.com/simple
複製代碼
第二種是 python-jsonrpc
pip install python-jsonrpc -i https://pypi.douban.com/simple
複製代碼
先來看第一種 jsonrpclib
它與 Python 標準庫的 SimpleXMLRPCServer 很相似(由於它的類名就叫作 SimpleJSONRPCServer ,不明真相的人真覺得它們是親兄弟)。或許能夠說,jsonrpclib 就是仿照 SimpleXMLRPCServer 標準庫來進行編寫的。
它的導入與 SimpleXMLRPCServer 略有不一樣,由於SimpleJSONRPCServer分佈在jsonrpclib庫中。
服務端
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
server = SimpleJSONRPCServer(('localhost', 8080))
server.register_function(lambda x,y: x+y, 'add')
server.serve_forever()
複製代碼
客戶端
import jsonrpclib
server = jsonrpclib.Server("http://localhost:8080")
複製代碼
再來看第二種python-jsonrpc,寫起來貌似有些複雜。
服務端
import pyjsonrpc
class RequestHandler(pyjsonrpc.HttpRequestHandler):
@pyjsonrpc.rpcmethod
def add(self, a, b):
"""Test method"""
return a + b
http_server = pyjsonrpc.ThreadingHttpServer(
server_address=('localhost', 8080),
RequestHandlerClass=RequestHandler
)
print "Starting HTTP server ..."
print "URL: http://localhost:8080"
http_server.serve_forever()
複製代碼
客戶端
import pyjsonrpc
http_client = pyjsonrpc.HttpClient(
url="http://localhost:8080/jsonrpc"
)
複製代碼
還記得上面我提到過的 zabbix API,由於我有接觸過,因此也拎出來說講。zabbix API 也是基於 json-rpc 2.0協議實現的。
由於內容較多,這裏只帶你們打個,zabbix 是如何調用的:直接指明要調用 zabbix server 的哪一個方法,要傳給這個方法的參數有哪些。
0三、基於 zerorpc
以上介紹的兩種rpc遠程調用方式,若是你足夠細心,能夠發現他們都是http+rpc 兩種協議結合實現的。
接下來,咱們要介紹的這種(zerorpc),就再也不使用走 http 了。
zerorpc 這個第三方庫,它是基於TCP協議、 ZeroMQ 和 MessagePack的,速度相對快,響應時間短,併發高。zerorpc 和 pyjsonrpc 同樣,須要額外安裝,雖然SimpleXMLRPCServer不須要額外安裝,可是SimpleXMLRPCServer性能相對差一些。
pip install zerorpc -i https://pypi.douban.com/simple
複製代碼
服務端代碼
import zerorpc
class caculate(object):
def hello(self, name):
return 'hello, {}'.format(name)
def add(self, x, y):
return x + y
def multiply(self, x, y):
return x * y
def subtract(self, x, y):
return abs(x-y)
def divide(self, x, y):
return x/y
s = zerorpc.Server(caculate())
s.bind("tcp://0.0.0.0:4242")
s.run()
複製代碼
客戶端
import zerorpc
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
複製代碼
客戶端除了可使用zerorpc框架實現代碼調用以外,它還支持使用「命令行」的方式調用。
客戶端可使用命令行,那服務端是否是也能夠呢?
是的,經過 Github 上的文檔幾個 demo 能夠體驗到這個第三方庫作真的是優秀。
好比咱們能夠用下面這個命令,建立一個rpc server,後面這個 time
Python 標準庫中的 time 模塊,zerorpc 會將 time 註冊綁定以供client調用。
zerorpc --server --bind tcp://127.0.0.1:1234 time
複製代碼
在客戶端,就能夠用這條命令來遠程調用這個 time 函數。
zerorpc --client --connect tcp://127.0.0.1:1234 strftime %Y/%m/%d
複製代碼
通過了上面的學習,咱們已經學會了如何使用多種方式實現rpc遠程調用。
經過對比,zerorpc 能夠說是脫穎而出,一支獨秀。
但爲什麼在 OpenStack 中,rpc client 不直接 rpc 調用 rpc server ,而是先把 rpc 調用請求發給 RabbitMQ ,再由訂閱者(rpc server)來取消息,最終實現遠程調用呢?
爲此,我也作了一番思考:
OpenStack 組件繁多,在一個較大的集羣內部每一個組件內部經過rpc通訊頻繁,若是都採用rpc直連調用的方式,鏈接數會很是地多,開銷大,如有些 server 是單線程的模式,超時會很是的嚴重。
OpenStack 是複雜的分佈式集羣架構,會有多個 rpc server 同時工做,假設有 server01,server02,server03 三個server,當 rpc client 要發出rpc請求時,發給哪一個好呢?這是問題一。
你可能會說輪循或者隨機,這樣對你們都公平。這樣的話還會引出另外一個問題,假若請求恰好發到server01,而server01恰好不湊巧,可能因爲機器或者其餘由於致使服務沒在工做,那這個rpc消息可就直接失敗了呀。要知道作爲一個集羣,高可用是基本要求,若是出現剛剛那樣的狀況實際上是很尷尬的。這是問題二。
集羣有可能根據實際須要擴充節點數量,若是使用直接調用,耦合度過高,不利於部署和生產。這是問題三。
引入消息中間件,能夠很好的解決這些問題。
解決問題一:消息只有一份,接收者由AMQP的負載算法決定,默認爲在全部Receiver中均勻發送(round robin)。
解決問題二:有了消息中間件作緩衝站,client 能夠任性隨意的發,server 都掛掉了?沒有關係,等 server 正常工做後,本身來消息中間件取就好了。
解決問題三:不管有多少節點,它們只要認識消息中間件這一個中介就足夠了。
因爲後面,我將實例講解 OpenStack 中如何將 rpc 和 mq broker 結合使用。
而在此以前,你必須對消息隊列的一些基本知識有個概念。
首先,RPC只是定義了一個通訊接口,其底層的實現能夠各不相同,能夠是 socket,也能夠是今天要講的 AMQP。
AMQP(Advanced Message Queuing Protocol)是一種基於隊列的可靠消息服務協議,做爲一種通訊協議,AMQP一樣存在多個實現,如Apache Qpid,RabbitMQ等。
如下是 AMQP 中的幾個必知的概念:
Publisher:消息發佈者
Receiver:消息接收者,在RabbitMQ中叫訂閱者:Subscriber。
Queue:用來保存消息的存儲空間,消息沒有被receiver前,保存在隊列中。
Exchange:用來接收Publisher發出的消息,根據Routing key 轉發消息到對應的Message Queue中,至於轉到哪一個隊列裏,這個路由算法又由exchange type決定的。
exchange type:主要四種描述exchange的類型。
direct:消息路由到知足此條件的隊列中(queue,能夠有多個): routing key = binding key
topic:消息路由到知足此條件的隊列中(queue,能夠有多個):routing key 匹配 binding pattern. binding pattern是相似正則表達式的字符串,能夠知足複雜的路由條件。
fanout:消息路由到多有綁定到該exchange的隊列中。
binding :binding是用來描述exchange和queue之間的關係的概念,一個exchang能夠綁定多個隊列,這些關係由binding創建。前面說的binding key /binding pattern也是在binding中給出。
在網上找了個圖,能夠很清晰地描述幾個名詞的關係。
關於AMQP,有幾下幾點值得注意:
前面鋪墊了那麼久,終於到了講真實應用的場景。在生產中RPC是如何應用的呢?
其餘模型我不太清楚,在 OpenStack 中的應用模型是這樣的
至於爲何要如此設計,前面我已經給出了本身的觀點。
接下來,就是源碼解讀 OpenStack ,看看其是如何經過rpc進行遠程調用的。如若你對此沒有興趣(我知道不少人對此都沒有興趣,因此不浪費你們時間),能夠直接跳過這一節,進入下一節。
目前Openstack中有兩種RPC實現,一種是在oslo messaging,一種是在openstack.common.rpc。
openstack.common.rpc是舊的實現,oslo messaging是對openstack.common.rpc的重構。openstack.common.rpc在每一個項目中都存在一份拷貝,oslo messaging即將這些公共代碼抽取出來,造成一個新的項目。oslo messaging也對RPC API 進行了從新設計,對多種 transport 作了進一步封裝,底層也是用到了kombu這個AMQP庫。(注:Kombu 是Python中的messaging庫。Kombu旨在經過爲AMQ協議提供慣用的高級接口,使Python中的消息傳遞儘量簡單,併爲常見的消息傳遞問題提供通過驗證和測試的解決方案。)
關於oslo_messaging庫,主要提供了兩種獨立的API:
由於 notify 實現是太簡單了,因此這裏我就很少說了,若是有人想要看這方面內容,能夠收藏個人博客(python-online.cn) ,我會更新補充 notify 的內容。
OpenStack RPC 模塊提供了 rpc.call,rpc.cast, rpc.fanout_cast 三種 RPC 調用方法,發送和接收 RPC 請求。
rpc.call 和 rpc.rpc.cast 從實現代碼上看,他們的區別很小,就是call調用時候會帶有wait_for_reply=True參數,而cast不帶。
要了解 rpc 的調用機制呢,首先要知道 oslo_messaging 的幾個概念
transport:RPC功能的底層實現方法,這裏是rabbitmq的消息隊列的訪問路徑
transport 就是定義你如何訪鏈接消息中間件,好比你使用的是 Rabbitmq,那在 nova.conf中應該有一行transport_url
的配置,能夠很清楚地看出指定了 rabbitmq 爲消息中間件,並配置了鏈接rabbitmq的user,passwd,主機,端口。
transport_url=rabbit://user:passwd@host:5672
複製代碼
def get_transport(conf, url=None, allowed_remote_exmods=None):
return _get_transport(conf, url, allowed_remote_exmods,
transport_cls=RPCTransport)
複製代碼
target:指定RPC topic交換機的匹配信息和綁定主機。
target用來表述 RPC 服務器監聽topic,server名稱和server監聽的exchange,是否廣播fanout。
class Target(object):
def __init__(self, exchange=None, topic=None, namespace=None, version=None, server=None, fanout=None, legacy_namespaces=None):
self.exchange = exchange
self.topic = topic
self.namespace = namespace
self.version = version
self.server = server
self.fanout = fanout
self.accepted_namespaces = [namespace] + (legacy_namespaces or [])
複製代碼
rpc server 要獲取消息,須要定義target,就像一個門牌號同樣。
rpc client 要發送消息,也須要有target,說明消息要發到哪去。
endpoints:是可供別人遠程調用的對象
RPC服務器暴露出endpoint,每一個 endpoint 包涵一系列的可被遠程客戶端經過 transport 調用的方法。直觀理解,能夠參考nova-conductor建立rpc server的代碼,這邊的endpoints就是 nova/manager.py:ConductorManager()
dispatcher:分發器,這是 rpc server 纔有的概念
在client端,是這樣指定要調用哪一個方法的。
而在server端,是如何知道要執行這個方法的呢?這就是dispatcher 要乾的事,它從 endpoint 裏找到這個方法,而後執行,最後返回。
Serializer:在 python 對象和message(notification) 之間數據作序列化或是反序列化的基類。
主要方法有四個:
executor:服務的運行方式,單線程或者多線程
每一個notification listener都和一個executor綁定,來控制收到的notification如何分配。默認狀況下,使用的是blocking executor(具體特性參加executor一節)
oslo_messaging.get_notification_listener(transport, targets, endpoints, executor=’blocking’, serializer=None, allow_requeue=False, pool=None)
複製代碼
rpc server 和rpc client 的四個重要方法
reset()
:Reset service.start()
:該方法調用後,server開始poll,從transport中接收message,而後轉發給dispatcher.該message處理過程一直進行,直到stop方法被調用。executor決定server的IO處理策略。可能會是用一個新進程、新協程來作poll操做,或是直接簡單的在一個循環中註冊一個回調。一樣,executor也決定分配message的方式,是在一個新線程中dispatch或是..... *stop()
:當調用stop以後,新的message不會被處理。可是,server可能還在處理一些以前沒有處理完的message,而且底層driver資源也還一直沒有釋放。wait()
:在stop調用以後,可能還有message正在被處理,使用wait方法來阻塞當前進程,直到全部的message都處理完成。以後,底層的driver資源會釋放。模仿是一種很高效的學習方法,我這裏根據 OpenStack 的調用方式,抽取出核心內容,寫成一個簡單的 demo,有對 OpenStack 感興趣的能夠了解一下,大部分人也能夠直接跳過這章節。
如下代碼不能直接運行,你還須要配置 rabbitmq 的鏈接方式,你能夠寫在配置文件中,經過 get_transport 從cfg.CONF 中讀取,也能夠直接將其寫成url的格式作成參數,傳給 get_transport 。
簡單的 rpc client
#coding=utf-8
import oslo_messaging
from oslo_config import cfg
# 建立 rpc client
transport = oslo_messaging.get_transport(cfg.CONF, url="")
target = oslo_messaging.Target(topic='test', version='2.0')
client = oslo_messaging.RPCClient(transport, target)
# rpc同步調用
client.call(ctxt, 'test', arg=arg)
複製代碼
簡單的 rpc server
#coding=utf-8
from oslo_config import cfg
import oslo_messaging
import time
# 定義endpoint類
class ServerControlEndpoint(object):
target = oslo_messaging.Target(namespace='control',
version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if self.server:
self.server.stop()
class TestEndpoint(object):
def test(self, ctx, arg):
return arg
# 建立rpc server
transport = oslo_messaging.get_transport(cfg.CONF, url="")
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target,endpoints,executor='blocking')
try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
server.wait()
複製代碼
以上,就是本期推送的所有內容,週末兩天沒有出門,都在寫這篇文章。真的快掏空了我本身,不過寫完後真的很暢快。