RabbitMQ消息隊列(九)RPC開始應用吧

 

一 簡單應用 

RPC——遠程過程調用,經過網絡調用運行在另外一臺計算機上的程序的函數\方法,是構建分佈式程序的一種方式。RabbitMQ是一個消息隊列系統,能夠在程序之間收發消息。利用RabbitMQ能夠實現RPC。本文全部操做都是在ubuntu16.04.3上進行的,示例代碼語言爲Python2.7。html

yum install rabbitmq-server python-pika -y
/etc/init.d/rabbitmq-server start
update-rc.d rabbitmq-server enable

  

1 RPC的基本實現python

root@ansible:~/workspace/RPC_TEST/RPC01# cat RPC_Server.py 
#!/usr/bin/env python
# coding:utf-8

"""
1.首先與rabbitmq創建連接,而後定義個函數fun(),
fun的功能是傳入一個數返回該數的2倍,這個函數就是咱們要遠程調用的函數
2.on_request()是一個回調函數,他做爲參數傳遞給了basic_consume(),
當basic_consume()在隊列中消費1條消息時,on_request()就會被調用
3.on_request()從消息內容body中獲取數,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的隊列
隊列名稱保存在props.relay_to中
"""
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fun(n):
    return n*2

def on_request(channel,method,props,body):
    print " props.correlation_id: %s"  %props.correlation_id
    print "props.reply_to: %s" %props.reply_to
    n = int(body)
    response = fun(n)
    channel.basic_publish(exchange='',routing_key=props.reply_to,
                          properties=pika.BasicProperties(
                              correlation_id=props.correlation_id),
                          body=str(response))
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print "[x] Waiting RPC request..."
channel.start_consuming()

以上代碼中,首先與RabbitMQ服務創建鏈接,而後定義了一個函數fun(),fun()功能很簡單,輸入一個數而後返回該數的兩倍,這個函數就是咱們要遠程調用的函數。on_request()是一個回調函數,它做爲參數傳遞給了basic_consume(),當basic_consume()在隊列中消費1條消息時,on_request()就會被調用,on_request()從消息內容body中獲取數字,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的接收隊列,隊列名稱保存在變量props.reply_to中。json

root@ansible:~/workspace/RPC_TEST/RPC01# cat RPC_Client.py 
#!/usr/bin/env python
# coding:utf-8

"""
1. 連接rabbitmq ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過RPC_Server端的Request屬性中的
props.reply_to告訴server端,把返回的消息發送到這裏隊列中
2. basic_consume()的回調函數爲on_response(),這個函數從callback_queue隊列中取出消息的結果
3. 函數call實際的發送請求,把數字n發給服務器端,當response不爲空時,返回response的值

"""


import pika
import uuid

class RpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue


        # 訂閱消息,並觸發回調函數
        self.channel.basic_consume(self.on_response,no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self,channel,method,props,body):
        # 判斷client端此次的請求是server端此次的響應
        if self.corr_id == props.correlation_id:
            print "self.corr_id: %s" %self.corr_id
            print "self.callback_queue: %s" %self.callback_queue
            self.response = body

    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 發佈消息,relay_to表示接收消息的隊列,correlation_id表示攜帶請求的惟一ID
        self.channel.basic_publish(exchange='',routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                   correlation_id=self.corr_id,),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return str(self.response)

rpc = RpcClient()

print "[x] Requesting..."
response = rpc.call(2)

print "[.] Got %r" %response

代碼開始也是鏈接RabbitMQ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過Request的屬性reply_to傳遞給服務端,就是在上面介紹服務端代碼時提到過的props.reply_to,做用是告訴服務端把結果發到這個隊列。 basic_consume()的回調函數變成了on_response(),這個函數從callback_queue的消息內容中獲取返回結果。ubuntu

函數call實際發起請求,把數字n發給服務端程序,當response不爲空時,返回response值。小程序

 

有本事來張圖描述一下:windows

當客戶端啓動時,它將建立一個callback queue用於接收服務端的返回消息Reply,名稱由RabbitMQ自動生成,如上圖中的amq.gen-Xa2..。同一個客戶端可能會發出多個Request,這些Request的Reply都由callback queue接收,爲了互相區分,就引入了correlation_id屬性,每一個請求的correlation_id值惟一。這樣,客戶端發起的Request就帶由2個關鍵屬性:reply_to告訴服務端向哪一個隊列返回結果;correlation_id用來區分是哪一個Request的返回。服務器

 

2 稍微複雜點的RPC網絡

若是服務端定義了多個函數供遠程調用怎麼辦?有兩種思路,一種是利用Request的屬性app_id傳遞函數名,另外一種是把函數名經過消息內容發送給服務端。app

 1)第一種思路dom

root@ansible:~/workspace/RPC_TEST/RPC02# cat RPC_Server.py 
#!/usr/bin/env python
# coding:utf-8

"""
1.首先與rabbitmq創建連接,而後定義個函數fun(),
fun的功能是傳入一個數返回該數的2倍,這個函數就是咱們要遠程調用的函數
2.on_request()是一個回調函數,他做爲參數傳遞給了basic_consume(),
當basic_consume()在隊列中消費1條消息時,on_request()就會被調用
3.on_request()從消息內容body中獲取數,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的隊列
隊列名稱保存在props.relay_to中。

疑問:
1. server端怎麼獲得client的callback_queue的?
是經過過 routing_key=props.reply_to獲得的,props是一個神奇的東西
2. 一個隊列中多個請求,怎麼區分的 ?
是經過props.correlation_id 而後client端作判斷,是否和本身的相等。仍是props
"""
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def a(n):
    return n*2

def b(n):
    return n*4

def on_request(channel,method,props,body):
    print " props.correlation_id: %s"  %props.correlation_id
    print "props.reply_to: %s" %props.reply_to
    #n = int(body)
    n = body
    funname = props.app_id
    print 'funname: %s' %funname
    if funname == 'a':

        response = a(n)
    if funname == 'b':
        response = b(n)

    channel.basic_publish(exchange='',routing_key=props.reply_to,
                          properties=pika.BasicProperties(
                              correlation_id=props.correlation_id),
                          body=str(response))
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print "[x] Waiting RPC request..."
channel.start_consuming()

 

上面代碼的一點改進就是若是函數過多怎麼辦?尼瑪的,破電腦,都寫完了,尼瑪忽然關機了,真是操了,有機會立馬換ubuntu系統,windows太尼瑪坑爹了,尤爲是TMD的win10。

移駕http://www.cnblogs.com/wanstack/p/7052874.html

root@ansible:~/workspace/RPC_TEST/RPC02# cat RPC_Client.py 
#!/usr/bin/env python
# coding:utf-8

"""
1. 連接rabbitmq ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過RPC_Server端的Request屬性中的
props.reply_to告訴server端,把返回的消息發送到這裏隊列中
2. basic_consume()的回調函數爲on_response(),這個函數從callback_queue隊列中取出消息的結果
3. 函數call實際的發送請求,把數字n發給服務器端,當response不爲空時,返回response的值

"""


import pika
import uuid

class RpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue


        # 訂閱消息,並觸發回調函數
        self.channel.basic_consume(self.on_response,no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self,channel,method,props,body):
        # 判斷client端此次的請求是server端此次的響應
        if self.corr_id == props.correlation_id:
            print "self.corr_id: %s" %self.corr_id
            print "self.callback_queue: %s" %self.callback_queue
            self.response = body

    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 發佈消息,relay_to表示接收消息的隊列,correlation_id表示攜帶請求的惟一ID
        self.channel.basic_publish(exchange='',routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                   correlation_id=self.corr_id,
                                   app_id=str(n)),
                                   body=str('request'))
        while self.response is None:
            self.connection.process_data_events()
        return str(self.response)

rpc = RpcClient()

print "[x] Requesting..."
response = rpc.call('b')

print "[.] Got %r" %response

 

函數call()接收參數name做爲被調用的遠程函數的名字,經過app_id傳給服務端程序,這段代碼裏咱們選擇調用服務端的函數b(),rpc.call(「b」)。

2)第二種方式

root@ansible:~/workspace/RPC_TEST/RPC03# cat RPC_Server.py 
#!/usr/bin/env python
# coding:utf-8

"""
1.首先與rabbitmq創建連接,而後定義個函數fun(),
fun的功能是傳入一個數返回該數的2倍,這個函數就是咱們要遠程調用的函數
2.on_request()是一個回調函數,他做爲參數傳遞給了basic_consume(),
當basic_consume()在隊列中消費1條消息時,on_request()就會被調用
3.on_request()從消息內容body中獲取數,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的隊列
隊列名稱保存在props.relay_to中。

疑問:
1. server端怎麼獲得client的callback_queue的?
是經過過 routing_key=props.reply_to獲得的,props是一個神奇的東西
2. 一個隊列中多個請求,怎麼區分的 ?
是經過props.correlation_id 而後client端作判斷,是否和本身的相等。仍是props
"""
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def a():
    return 2

def b():
    return 4

def on_request(channel,method,props,body):
    print " props.correlation_id: %s"  %props.correlation_id
    print "props.reply_to: %s" %props.reply_to
    #n = int(body)

    funname = body
    # args01 = body.__code__.co_varnames[0]
    print 'funname: %s' %funname
    if funname == 'a':

        response = a()
    if funname == 'b':
        response = b()

    channel.basic_publish(exchange='',routing_key=props.reply_to,
                          properties=pika.BasicProperties(
                              correlation_id=props.correlation_id),
                          body=str(response))
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print "[x] Waiting RPC request..."
channel.start_consuming()

  

root@ansible:~/workspace/RPC_TEST/RPC03# cat RPC_Client.py 
#!/usr/bin/env python
# coding:utf-8

"""
1. 連接rabbitmq ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過RPC_Server端的Request屬性中的
props.reply_to告訴server端,把返回的消息發送到這裏隊列中
2. basic_consume()的回調函數爲on_response(),這個函數從callback_queue隊列中取出消息的結果
3. 函數call實際的發送請求,把數字n發給服務器端,當response不爲空時,返回response的值

"""


import pika
import uuid

class RpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue


        # 訂閱消息,並觸發回調函數
        self.channel.basic_consume(self.on_response,no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self,channel,method,props,body):
        # 判斷client端此次的請求是server端此次的響應
        if self.corr_id == props.correlation_id:
            print "self.corr_id: %s" %self.corr_id
            print "self.callback_queue: %s" %self.callback_queue
            self.response = body

    def call(self,name):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 發佈消息,relay_to表示接收消息的隊列,correlation_id表示攜帶請求的惟一ID
        self.channel.basic_publish(exchange='',routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                   correlation_id=self.corr_id,
                                   ),
                                   body=str(name))
        while self.response is None:
            self.connection.process_data_events()
        return str(self.response)

rpc = RpcClient()

print "[x] Requesting..."
response = rpc.call('b')

print "[.] Got %r" %response

  

與第一種實現方法的區別就是沒有使用屬性app_id,而是把要調用的函數名放在消息內容body中,執行結果跟第一種方法同樣。

 

一個簡單的實際應用案例

下面咱們將編寫一個小程序,用於收集多臺KVM宿主機上的虛擬機數量和剩餘可以使用的資源。程序由兩部分組成,運行在每臺宿主機上的腳本agent.py和管理機上收集信息的腳本collect.py。從RPC的角度,agent.py是服務端,collect.py是客戶端。

root@ansible:~/workspace/RPC_TEST/RPC04# cat agent.py 
#!/usr/bin/env python
# coding:utf-8

"""
相似於RPC中的Server端
"""


import pika
import libvirt
import psutil
import json
import socket
import os
import sys
# 用於解析XML文件
from xml.dom import minidom

RabbitmqHost = '172.20.6.184'
RabbitmqUser = 'admin'
RabbitmqPwd = 'admin'

credentials = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)

# 連接libvirt,libvirt是一個虛擬機、容器管理程序

def get_conn():
    conn = libvirt.open('qemu:///system')
    if conn == None:
        print "Failed to open connection to QEMU/KVM"

        sys.exit(2)

    else:
        return conn

# 獲取宿主機虛擬機running的數量
def getVMcount():
    conn = get_conn()
    domainIDs = conn.listDomainsID()
    return len(domainIDs)

# 獲取分配給全部虛擬機的內存之和
def getMemoryused():
    conn = get_conn()
    domainIDs = conn.listDomainsID()
    used_mem = 0
    for id in domainIDs:
        dom = conn.lookupByID(id)
        used_mem += dom.maxMemory()/(1024*1024)
    # used_mem = ''.join((str(used_mem),'G'))
    return used_mem

# 獲取分配給全部虛擬機的內存之和
def getCPUused():
    conn = get_conn()
    domainIDs = conn.listDomainsID()
    used_cpu = 0
    for id in domainIDs:
        dom = conn.lookupByID(id)
        used_cpu += dom.maxVcpus()
    return used_cpu

# 獲取全部虛擬機磁盤文件大小之和
def getDiskused():

    conn = get_conn()
    domainIDs = conn.listDomainsID()
    diskused = 0

    for id in domainIDs:
        # 獲取libvirt對象
        dom = conn.lookupByID(id)
        # 獲取虛擬機xml描述配置文件
        xml = dom.XMLDesc(0)
        doc = minidom.parseString(xml)
        disks = doc.getElementsByTagName('disk')
        for disk in disks:
            if disk.getAttribute('device') == 'disk':
                diskfile = disk.getElementsByTagName('source')[0].getAttribute('file')
                diskused += dom.blockInfo(diskfile,0)[0]/(1024**3)
    return diskused

# 使agent.py進入守護進程模式
def daemonize(stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'):
    try:
        pid = os.fork()
        if pid > 0:
            sys.exit(0)
    except OSError,e:
        sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno,e.strerror))
        sys.exit(1)
    os.chdir("/")
    os.umask(0)
    os.setsid()
    try:
        pid = os.fork()
        if pid > 0:
            sys.exit(0)
    except OSError,e:
        sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno,e.strerror))
        sys.exit(1)
    for f in sys.stdout,sys.stderr,: f.flush()
    si = file(stdin,'r')
    so = file(stdout,'a+',0)
    se = file(stderr,'a+',0)
    os.dup2(si.fileno(),sys.stdin.fileno())
    os.dup2(so.fileno(),sys.stdout.fileno())
    os.dup2(se.fileno(),sys.stderr.fileno())
 
daemonize('/dev/null','/root/kvm/agent.log','/root/kvm/agent.log')


connection = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitmqHost,
                                                               credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='kvm',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 把隨機queue綁定到exchange上
channel.queue_bind(exchange='kvm',queue=queue_name)

# 定義回調函數
def on_request(channle,method,props,body):
    sys.stdout.write(body+'\n')
    sys.stdout.write("callback_queue : %s" %props.reply_to)
    sys.stdout.flush()
    mem_total = psutil.virtual_memory()[0]/(1024*1024*1024)
    cpu_total = psutil.cpu_count()
    statvfs = os.statvfs('/root')
    disk_total = (statvfs.f_frsize * statvfs.f_blocks)/(1024**3)
    print(type(mem_total))
    print(type(getMemoryused()))
    mem_unused = mem_total - getMemoryused()
    cpu_unused = cpu_total - getCPUused()
    disk_unused = disk_total - getDiskused()
    data = {
        'hostname': socket.gethostname(),
        'vm' : getVMcount(),
        'available_memory' : mem_unused,
        'available_cpu' : cpu_unused,
        'available_disk' : disk_unused,
    }
    json_str = json.dumps(data) # 把dict轉換成str類型

    # 服務器端回覆client消息到callback_queue中
    channel.basic_publish(exchange='',routing_key=props.reply_to,
                          properties=pika.BasicProperties(
                              correlation_id=props.correlation_id,
                          ),
                          body=json_str,
                          )
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request,queue=queue_name)

sys.stdout.write('[x] Waiting PRC request\n')
sys.stdout.flush()

channel.start_consuming()

  

root@ansible:~/workspace/RPC_TEST/RPC04# cat collent.py 
#!/usr/bin/env python
# coding:utf-8

import pika
import uuid
import json
import datetime

RabbitmqHost = '172.20.6.184'
RabbitmqUser = 'admin'
RabbitmqPwd = 'admin'

credentials = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)

class RpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitmqHost,
                                                                            credentials=credentials))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='kvm', type='fanout')
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_responses,no_ack=True,queue=self.callback_queue)

        self.responses = []

    def on_responses(self,channel,method,props,body):
        if self.corr_id == props.correlation_id:
            self.responses.append(body)


    def call(self):
        timestamp = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%dT%H:%M:%SZ')
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='kvm',routing_key='',
                           properties=pika.BasicProperties(
                               reply_to=self.callback_queue,
                               correlation_id=self.corr_id,
                           ),
                           body='%s: receive a request.' %timestamp)

        # 定義超時回調函數
        def outoftime():
            self.channel.stop_consuming()

        self.connection.add_timeout(30,outoftime)
        self.channel.start_consuming()
        print "callback_queue : %s" %self.callback_queue
        return self.responses

rpc = RpcClient()
responses = rpc.call()
for i in responses:
    response = json.loads(i)
    print '[.] Got %r' %response

本文在前面演示的RPC都是隻有一個服務端的狀況,客戶端發起請求後是用一個while循環來阻塞程序以等待返回結果的,當self.response不爲None,就退出循環。

  若是在多服務端的狀況下照搬過來就會出問題,實際狀況中咱們可能有幾十臺宿主機,每臺上面都運行了一個agent.py,當collect.py向幾十個agent.py發起請求時,收到第一個宿主機的返回結果後就會退出上述while循環,致使後續其餘宿主機的返回結果被丟棄。這裏我選擇定義了一個超時回調函數outoftime()來替代以前的while循環,超時時間設爲30秒。collect.py發起請求後阻塞30秒來等待全部宿主機的迴應。若是宿主機數量特別多,能夠再調大超時時間。真是怕了,先這樣結束吧。還有一個例子下篇寫

 

 

 

地址原文:http://blog.51cto.com/3646344/2097020

相關文章
相關標籤/搜索