基於Rabbit實現的RPC

 最近在學習項目中的通用技術,其中一個是在項目中會常用的基於RabbitMQ實現的RPC。這裏一共有三個點要學習,分別是:RPC是什麼?RabbitMQ是什麼?如何使用RabbitMQ實現RPC。奔着這三個目標,查閱了資料。作筆記記錄。python

 

RPC

 rpc的全稱叫:遠程過程調用,能夠通俗的理解爲經過網絡調用另外一臺電腦上的函數的業務處理思想。首先,咱們先看看本地的函數調用流程是怎樣。編程

本地調用:json

def fun(a,b):
    sum = a + b
    return sum


if __name__ = __main__
print "i use a function to sum "
sum_main = fun(2,3)
print sum_main

 

本地調用當執行到sum=fun(2,3)時,程序會在內存中查找函數指針fun,而後帶着參數進入fun()函數中運算,最後返回給sum_main。若是是遠程調用,則是從一個電腦A上調用另外一個電腦B上的函數。緩存

RPC思想的好處是:服務器

一、更符合編程思想。想要實現什麼功能直接調用相應的函數,這是編程最直接的思想。網絡

二、減小代碼重複率。A想實現的功能若是B中已經實現了,那麼A就直接調用B的函數,避免本身再重複實現。併發

RPC調用:框架

rpc多使用http傳輸請求,格式有xml,json等,這裏是xml。以下是使用python中自帶的RPC調用框架來實現的一個最簡單的RPC調用。異步

client.py模塊化

 

from xmlrpclib import ServerProxy            #導入xmlrpclib的包
 s = ServerProxy("http://172.171.5.205:8080") #定義xmlrpc客戶端
print s.fun_add(2,3)                            #調用服務器端的函數   

 

server.py

from SimpleXMLRPCServer import SimpleXMLRPCServer   

def fun_add(a,b):
    totle = a + b 
    return totle

if __name__ == '__main__':
    s = SimpleXMLRPCServer(('0.0.0.0', 8080))   #開啓xmlrpcserver
    s.register_function(fun_add)                #註冊函數fun_add
    print "server is online..."
    s.serve_forever()                           #開啓循環等待

 

先啓動服務器端

 後啓動客戶端

這樣就完成了一次RPC調用。RPC的調用流程以下圖所示。調用流程是:

  1. client調用以本地調用方式調用服務;
  2. client stub接收到調用後負責將方法、參數等組裝成可以進行網絡傳輸的消息體;
  3. client stub找到服務地址,並將消息發送到服務端;
  4. server stub收到消息後進行解碼;
  5. server stub根據解碼結果調用本地的服務;
  6. 本地服務執行並將結果返回給server stub;
  7. server stub將返回結果打包成消息併發送至消費方;
  8. client stub接收到消息,並進行解碼;
  9. 服務消費方獲得最終結果。

 

 

RabbitMQ

RabbitMQ是實現了AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)的軟件。主要功能是

  1. 解耦服務。使用rabbitmq能夠將自個服務解耦,實現模塊化
  2. 擴展性高。系統中增長一項功能不須要 從頭開始,自須要增長模塊便可
  3. 解決高併發瓶頸。消息隊列具備緩存消息功能,可以有效解決高併發請求。

 

如下摘錄自知乎:

 

對於初學者,舉一個飯店的例子來解釋這三個分別是什麼吧。不是百分百恰當,可是應該足以解釋這三者的區別。

RPC:假設你是一個飯店裏的服務員,顧客向你點菜,可是你不會作菜,因此你採集了顧客要點什麼以後告訴後廚去作顧客點的菜,
這叫RPC(remote procedure call),由於廚房的廚師相對於服務員而言是另一我的(在計算機的世界裏就是remote的機器上的一個進程)。
廚師作好了的菜就是RPC的返回值。

任務隊列和消息隊列:本質都是隊列,因此就只舉一個任務隊列的例子。假設這個飯店在高峯期顧客不少,而廚師只有不多的幾個,
因此服務員們不得不把單子按下單順序放在廚房的桌子上,供廚師們一個一個作,這一堆單子就是任務隊列(固然,取決於問題的語境,
可能要把放訂單的桌子也算在裏面一塊兒構成所謂的任務隊列平臺),廚師們每作完一個菜,就從桌子上的訂單裏再取出一個單子繼續作菜。

 

簡單消息隊列:

 

最簡單的消息隊列,生產者-消費者模式。一端產生消息,發送到隊列,另外一端消費者收取消息。

consume_simple.py

 1 #coding:UTF-8
 2 
 3 import pika  4 import time  5 
 6 # 創建實例
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(  8                'localhost'))  9 # 聲明管道
10 channel = connection.channel() 11 
14 channel.queue_declare(queue='hello') 15 
16 def callback(ch, method, properties, body):  
17     
18     print "ch",ch 19     print "method",method 20     print "properties",properties 21     print "body",body
25     print(" [x] Received %r" % body) 27 # 消費消息
28 channel.basic_consume(  
29     callback,  # 若是收到消息,就調用callback函數來處理消息
30     queue='hello',  # 你要從那個隊列裏收消息
33  ) 34 
35 print(' [*] Waiting for messages. To exit press CTRL+C') 36 channel.start_consuming()  # 開始消費消息

 

 productor_simple.py

 1 #coding:UTF-8
 2 import pika  3 
 4 # 創建一個實例
 5 connection = pika.BlockingConnection(  6     pika.ConnectionParameters('localhost')  
 7  )  8 # 聲明一個管道,在管道里發消息
 9 channel = connection.channel() 10 # 在管道里聲明queue
11 channel.queue_declare(queue='hello')
13 channel.basic_publish(exchange='', 14                       routing_key='hello',  # queue名字
15                       body='Hello World!')  # 消息內容
16 print(" [x] Sent 'Hello World!'") 17 connection.close()  # 隊列關閉

 

 先運行消費者

 在運行生產者

觀察消費者獲取的消息

 

RabbitMQ實現RPC

 RPC的要求是等待得到返回值,而RabbitMQ常出現的場景是異步等待。這就要求RabbitMQ能夠當即返回結果。使用了兩種技術:

1、爲調用指明id,要求id和結果一塊兒返回,使用id來判斷是哪個函數的調用返回;

2、指明返回的隊列名,返回結果時指明返回隊列的名字確保會正確返回到調用者。

 

工做流程:

  1. 客戶端建立message時指定reply_to隊列名、correlation_id標記調用者。
  2. 經過隊列,服務端收到消息。調用函數處理,而後返回。
  3. 返回的隊列是reply_to指定的隊列,並攜帶correlation_id。
  4. 返回消息到達客戶端,客戶端根據correlation_id判斷是哪個函數的調用返回。

 

#coding:UTF-8

import
pika import uuid import time class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, # 只要一收到消息就調用on_response no_ack=True, queue=self.callback_queue) # 收這個queue的消息 def on_response(self, ch, method, props, body): # 必須四個參數 # 若是收到的ID和本機生成的相同,則返回的結果就是我想要的指令返回的結果 if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None # 初始self.response爲None self.corr_id = str(uuid.uuid4()) # 隨機惟一字符串 self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 發消息到rpc_queue properties=pika.BasicProperties( # 消息持久化 reply_to = self.callback_queue, # 讓服務端命令結果返回到callback_queue correlation_id = self.corr_id, # 把隨機uuid同時發給服務器 ), body=str(n) ) while self.response is None: # 當沒有數據,就一直循環 # 啓動後,on_response函數接到消息,self.response 值就不爲空了 self.connection.process_data_events() # 非阻塞版的start_consuming() # print("no msg……") # time.sleep(0.5) # 收到消息就調用on_response return int(self.response) if __name__ == '__main__': fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(7)") response = fibonacci_rpc.call(7) print(" [.] Got %r" % response)

 

 

#coding:UTF-8

import
pika import time def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange='', # 把執行結果發回給客戶端 routing_key=props.reply_to, # 客戶端要求返回想用的queue # 返回客戶端發過來的correction_id 爲了讓客戶端驗證消息一致性 properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response) ) ch.basic_ack(delivery_tag = method.delivery_tag) # 任務完成,告訴客戶端 if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') # 聲明一個rpc_queue , channel.basic_qos(prefetch_count=1) # 在rpc_queue裏收消息,收到消息就調用on_request channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
相關文章
相關標籤/搜索