RPC——遠程過程調用,經過網絡調用運行在另外一臺計算機上的程序的函數\方法,是構建分佈式程序的一種方式。RabbitMQ是一個消息隊列系統,能夠在程序之間收發消息。利用RabbitMQ能夠實現RPC。本文全部操做都是在ubuntu16.04.3上進行的,示例代碼語言爲Python2.7。html
yum install rabbitmq-server python-pika -y /etc/init.d/rabbitmq-server start update-rc.d rabbitmq-server enable
1 RPC的基本實現python
root@ansible:~/workspace/RPC_TEST/RPC01# cat RPC_Server.py #!/usr/bin/env python # coding:utf-8 """ 1.首先與rabbitmq創建連接,而後定義個函數fun(), fun的功能是傳入一個數返回該數的2倍,這個函數就是咱們要遠程調用的函數 2.on_request()是一個回調函數,他做爲參數傳遞給了basic_consume(), 當basic_consume()在隊列中消費1條消息時,on_request()就會被調用 3.on_request()從消息內容body中獲取數,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的隊列 隊列名稱保存在props.relay_to中 """ import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fun(n): return n*2 def on_request(channel,method,props,body): print " props.correlation_id: %s" %props.correlation_id print "props.reply_to: %s" %props.reply_to n = int(body) response = fun(n) channel.basic_publish(exchange='',routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id), body=str(response)) channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request,queue='rpc_queue') print "[x] Waiting RPC request..." channel.start_consuming()
以上代碼中,首先與RabbitMQ服務創建鏈接,而後定義了一個函數fun(),fun()功能很簡單,輸入一個數而後返回該數的兩倍,這個函數就是咱們要遠程調用的函數。on_request()是一個回調函數,它做爲參數傳遞給了basic_consume(),當basic_consume()在隊列中消費1條消息時,on_request()就會被調用,on_request()從消息內容body中獲取數字,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的接收隊列,隊列名稱保存在變量props.reply_to中。json
root@ansible:~/workspace/RPC_TEST/RPC01# cat RPC_Client.py #!/usr/bin/env python # coding:utf-8 """ 1. 連接rabbitmq ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過RPC_Server端的Request屬性中的 props.reply_to告訴server端,把返回的消息發送到這裏隊列中 2. basic_consume()的回調函數爲on_response(),這個函數從callback_queue隊列中取出消息的結果 3. 函數call實際的發送請求,把數字n發給服務器端,當response不爲空時,返回response的值 """ import pika import uuid class RpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 訂閱消息,並觸發回調函數 self.channel.basic_consume(self.on_response,no_ack=True, queue=self.callback_queue) def on_response(self,channel,method,props,body): # 判斷client端此次的請求是server端此次的響應 if self.corr_id == props.correlation_id: print "self.corr_id: %s" %self.corr_id print "self.callback_queue: %s" %self.callback_queue self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) # 發佈消息,relay_to表示接收消息的隊列,correlation_id表示攜帶請求的惟一ID self.channel.basic_publish(exchange='',routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id,), body=str(n)) while self.response is None: self.connection.process_data_events() return str(self.response) rpc = RpcClient() print "[x] Requesting..." response = rpc.call(2) print "[.] Got %r" %response
代碼開始也是鏈接RabbitMQ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過Request的屬性reply_to傳遞給服務端,就是在上面介紹服務端代碼時提到過的props.reply_to,做用是告訴服務端把結果發到這個隊列。 basic_consume()的回調函數變成了on_response(),這個函數從callback_queue的消息內容中獲取返回結果。ubuntu
函數call實際發起請求,把數字n發給服務端程序,當response不爲空時,返回response值。小程序
有本事來張圖描述一下:windows
當客戶端啓動時,它將建立一個callback queue用於接收服務端的返回消息Reply,名稱由RabbitMQ自動生成,如上圖中的amq.gen-Xa2..。同一個客戶端可能會發出多個Request,這些Request的Reply都由callback queue接收,爲了互相區分,就引入了correlation_id屬性,每一個請求的correlation_id值惟一。這樣,客戶端發起的Request就帶由2個關鍵屬性:reply_to告訴服務端向哪一個隊列返回結果;correlation_id用來區分是哪一個Request的返回。服務器
2 稍微複雜點的RPC網絡
若是服務端定義了多個函數供遠程調用怎麼辦?有兩種思路,一種是利用Request的屬性app_id傳遞函數名,另外一種是把函數名經過消息內容發送給服務端。app
1)第一種思路dom
root@ansible:~/workspace/RPC_TEST/RPC02# cat RPC_Server.py #!/usr/bin/env python # coding:utf-8 """ 1.首先與rabbitmq創建連接,而後定義個函數fun(), fun的功能是傳入一個數返回該數的2倍,這個函數就是咱們要遠程調用的函數 2.on_request()是一個回調函數,他做爲參數傳遞給了basic_consume(), 當basic_consume()在隊列中消費1條消息時,on_request()就會被調用 3.on_request()從消息內容body中獲取數,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的隊列 隊列名稱保存在props.relay_to中。 疑問: 1. server端怎麼獲得client的callback_queue的? 是經過過 routing_key=props.reply_to獲得的,props是一個神奇的東西 2. 一個隊列中多個請求,怎麼區分的 ? 是經過props.correlation_id 而後client端作判斷,是否和本身的相等。仍是props """ import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def a(n): return n*2 def b(n): return n*4 def on_request(channel,method,props,body): print " props.correlation_id: %s" %props.correlation_id print "props.reply_to: %s" %props.reply_to #n = int(body) n = body funname = props.app_id print 'funname: %s' %funname if funname == 'a': response = a(n) if funname == 'b': response = b(n) channel.basic_publish(exchange='',routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id), body=str(response)) channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request,queue='rpc_queue') print "[x] Waiting RPC request..." channel.start_consuming()
上面代碼的一點改進就是若是函數過多怎麼辦?尼瑪的,破電腦,都寫完了,尼瑪忽然關機了,真是操了,有機會立馬換ubuntu系統,windows太尼瑪坑爹了,尤爲是TMD的win10。
移駕http://www.cnblogs.com/wanstack/p/7052874.html
root@ansible:~/workspace/RPC_TEST/RPC02# cat RPC_Client.py #!/usr/bin/env python # coding:utf-8 """ 1. 連接rabbitmq ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過RPC_Server端的Request屬性中的 props.reply_to告訴server端,把返回的消息發送到這裏隊列中 2. basic_consume()的回調函數爲on_response(),這個函數從callback_queue隊列中取出消息的結果 3. 函數call實際的發送請求,把數字n發給服務器端,當response不爲空時,返回response的值 """ import pika import uuid class RpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 訂閱消息,並觸發回調函數 self.channel.basic_consume(self.on_response,no_ack=True, queue=self.callback_queue) def on_response(self,channel,method,props,body): # 判斷client端此次的請求是server端此次的響應 if self.corr_id == props.correlation_id: print "self.corr_id: %s" %self.corr_id print "self.callback_queue: %s" %self.callback_queue self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) # 發佈消息,relay_to表示接收消息的隊列,correlation_id表示攜帶請求的惟一ID self.channel.basic_publish(exchange='',routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, app_id=str(n)), body=str('request')) while self.response is None: self.connection.process_data_events() return str(self.response) rpc = RpcClient() print "[x] Requesting..." response = rpc.call('b') print "[.] Got %r" %response
函數call()接收參數name做爲被調用的遠程函數的名字,經過app_id傳給服務端程序,這段代碼裏咱們選擇調用服務端的函數b(),rpc.call(「b」)。
2)第二種方式
root@ansible:~/workspace/RPC_TEST/RPC03# cat RPC_Server.py #!/usr/bin/env python # coding:utf-8 """ 1.首先與rabbitmq創建連接,而後定義個函數fun(), fun的功能是傳入一個數返回該數的2倍,這個函數就是咱們要遠程調用的函數 2.on_request()是一個回調函數,他做爲參數傳遞給了basic_consume(), 當basic_consume()在隊列中消費1條消息時,on_request()就會被調用 3.on_request()從消息內容body中獲取數,並傳給fun()進行計算,並將返回值做爲消息內容發給調用方指定的隊列 隊列名稱保存在props.relay_to中。 疑問: 1. server端怎麼獲得client的callback_queue的? 是經過過 routing_key=props.reply_to獲得的,props是一個神奇的東西 2. 一個隊列中多個請求,怎麼區分的 ? 是經過props.correlation_id 而後client端作判斷,是否和本身的相等。仍是props """ import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def a(): return 2 def b(): return 4 def on_request(channel,method,props,body): print " props.correlation_id: %s" %props.correlation_id print "props.reply_to: %s" %props.reply_to #n = int(body) funname = body # args01 = body.__code__.co_varnames[0] print 'funname: %s' %funname if funname == 'a': response = a() if funname == 'b': response = b() channel.basic_publish(exchange='',routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id), body=str(response)) channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request,queue='rpc_queue') print "[x] Waiting RPC request..." channel.start_consuming()
root@ansible:~/workspace/RPC_TEST/RPC03# cat RPC_Client.py #!/usr/bin/env python # coding:utf-8 """ 1. 連接rabbitmq ,而後開始消費消息隊列callback_queue中的消息,該隊列的名字經過RPC_Server端的Request屬性中的 props.reply_to告訴server端,把返回的消息發送到這裏隊列中 2. basic_consume()的回調函數爲on_response(),這個函數從callback_queue隊列中取出消息的結果 3. 函數call實際的發送請求,把數字n發給服務器端,當response不爲空時,返回response的值 """ import pika import uuid class RpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 訂閱消息,並觸發回調函數 self.channel.basic_consume(self.on_response,no_ack=True, queue=self.callback_queue) def on_response(self,channel,method,props,body): # 判斷client端此次的請求是server端此次的響應 if self.corr_id == props.correlation_id: print "self.corr_id: %s" %self.corr_id print "self.callback_queue: %s" %self.callback_queue self.response = body def call(self,name): self.response = None self.corr_id = str(uuid.uuid4()) # 發佈消息,relay_to表示接收消息的隊列,correlation_id表示攜帶請求的惟一ID self.channel.basic_publish(exchange='',routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(name)) while self.response is None: self.connection.process_data_events() return str(self.response) rpc = RpcClient() print "[x] Requesting..." response = rpc.call('b') print "[.] Got %r" %response
與第一種實現方法的區別就是沒有使用屬性app_id,而是把要調用的函數名放在消息內容body中,執行結果跟第一種方法同樣。
一個簡單的實際應用案例
下面咱們將編寫一個小程序,用於收集多臺KVM宿主機上的虛擬機數量和剩餘可以使用的資源。程序由兩部分組成,運行在每臺宿主機上的腳本agent.py和管理機上收集信息的腳本collect.py。從RPC的角度,agent.py是服務端,collect.py是客戶端。
root@ansible:~/workspace/RPC_TEST/RPC04# cat agent.py #!/usr/bin/env python # coding:utf-8 """ 相似於RPC中的Server端 """ import pika import libvirt import psutil import json import socket import os import sys # 用於解析XML文件 from xml.dom import minidom RabbitmqHost = '172.20.6.184' RabbitmqUser = 'admin' RabbitmqPwd = 'admin' credentials = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd) # 連接libvirt,libvirt是一個虛擬機、容器管理程序 def get_conn(): conn = libvirt.open('qemu:///system') if conn == None: print "Failed to open connection to QEMU/KVM" sys.exit(2) else: return conn # 獲取宿主機虛擬機running的數量 def getVMcount(): conn = get_conn() domainIDs = conn.listDomainsID() return len(domainIDs) # 獲取分配給全部虛擬機的內存之和 def getMemoryused(): conn = get_conn() domainIDs = conn.listDomainsID() used_mem = 0 for id in domainIDs: dom = conn.lookupByID(id) used_mem += dom.maxMemory()/(1024*1024) # used_mem = ''.join((str(used_mem),'G')) return used_mem # 獲取分配給全部虛擬機的內存之和 def getCPUused(): conn = get_conn() domainIDs = conn.listDomainsID() used_cpu = 0 for id in domainIDs: dom = conn.lookupByID(id) used_cpu += dom.maxVcpus() return used_cpu # 獲取全部虛擬機磁盤文件大小之和 def getDiskused(): conn = get_conn() domainIDs = conn.listDomainsID() diskused = 0 for id in domainIDs: # 獲取libvirt對象 dom = conn.lookupByID(id) # 獲取虛擬機xml描述配置文件 xml = dom.XMLDesc(0) doc = minidom.parseString(xml) disks = doc.getElementsByTagName('disk') for disk in disks: if disk.getAttribute('device') == 'disk': diskfile = disk.getElementsByTagName('source')[0].getAttribute('file') diskused += dom.blockInfo(diskfile,0)[0]/(1024**3) return diskused # 使agent.py進入守護進程模式 def daemonize(stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'): try: pid = os.fork() if pid > 0: sys.exit(0) except OSError,e: sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno,e.strerror)) sys.exit(1) os.chdir("/") os.umask(0) os.setsid() try: pid = os.fork() if pid > 0: sys.exit(0) except OSError,e: sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno,e.strerror)) sys.exit(1) for f in sys.stdout,sys.stderr,: f.flush() si = file(stdin,'r') so = file(stdout,'a+',0) se = file(stderr,'a+',0) os.dup2(si.fileno(),sys.stdin.fileno()) os.dup2(so.fileno(),sys.stdout.fileno()) os.dup2(se.fileno(),sys.stderr.fileno()) daemonize('/dev/null','/root/kvm/agent.log','/root/kvm/agent.log') connection = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitmqHost, credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='kvm',type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 把隨機queue綁定到exchange上 channel.queue_bind(exchange='kvm',queue=queue_name) # 定義回調函數 def on_request(channle,method,props,body): sys.stdout.write(body+'\n') sys.stdout.write("callback_queue : %s" %props.reply_to) sys.stdout.flush() mem_total = psutil.virtual_memory()[0]/(1024*1024*1024) cpu_total = psutil.cpu_count() statvfs = os.statvfs('/root') disk_total = (statvfs.f_frsize * statvfs.f_blocks)/(1024**3) print(type(mem_total)) print(type(getMemoryused())) mem_unused = mem_total - getMemoryused() cpu_unused = cpu_total - getCPUused() disk_unused = disk_total - getDiskused() data = { 'hostname': socket.gethostname(), 'vm' : getVMcount(), 'available_memory' : mem_unused, 'available_cpu' : cpu_unused, 'available_disk' : disk_unused, } json_str = json.dumps(data) # 把dict轉換成str類型 # 服務器端回覆client消息到callback_queue中 channel.basic_publish(exchange='',routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id, ), body=json_str, ) channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request,queue=queue_name) sys.stdout.write('[x] Waiting PRC request\n') sys.stdout.flush() channel.start_consuming()
root@ansible:~/workspace/RPC_TEST/RPC04# cat collent.py #!/usr/bin/env python # coding:utf-8 import pika import uuid import json import datetime RabbitmqHost = '172.20.6.184' RabbitmqUser = 'admin' RabbitmqPwd = 'admin' credentials = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd) class RpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitmqHost, credentials=credentials)) self.channel = self.connection.channel() self.channel.exchange_declare(exchange='kvm', type='fanout') result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_responses,no_ack=True,queue=self.callback_queue) self.responses = [] def on_responses(self,channel,method,props,body): if self.corr_id == props.correlation_id: self.responses.append(body) def call(self): timestamp = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%dT%H:%M:%SZ') self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='kvm',routing_key='', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body='%s: receive a request.' %timestamp) # 定義超時回調函數 def outoftime(): self.channel.stop_consuming() self.connection.add_timeout(30,outoftime) self.channel.start_consuming() print "callback_queue : %s" %self.callback_queue return self.responses rpc = RpcClient() responses = rpc.call() for i in responses: response = json.loads(i) print '[.] Got %r' %response
本文在前面演示的RPC都是隻有一個服務端的狀況,客戶端發起請求後是用一個while循環來阻塞程序以等待返回結果的,當self.response不爲None,就退出循環。
若是在多服務端的狀況下照搬過來就會出問題,實際狀況中咱們可能有幾十臺宿主機,每臺上面都運行了一個agent.py,當collect.py向幾十個agent.py發起請求時,收到第一個宿主機的返回結果後就會退出上述while循環,致使後續其餘宿主機的返回結果被丟棄。這裏我選擇定義了一個超時回調函數outoftime()來替代以前的while循環,超時時間設爲30秒。collect.py發起請求後阻塞30秒來等待全部宿主機的迴應。若是宿主機數量特別多,能夠再調大超時時間。真是怕了,先這樣結束吧。還有一個例子下篇寫
地址原文:http://blog.51cto.com/3646344/2097020