基於oslo_messaging的RPC通訊

oslo_messaging源於Openstack的一個經典的模塊,用以實現服務間的RPC通訊。Client端將數據放入rabbitmq中,server端從消息隊列中獲取傳送數據。python

oslo.messaging庫就是把rabbitmq的python庫作了封裝,考慮到了編程友好、性能、可靠性、異常的捕獲等諸多因素。讓各個項目的開發者聚焦於業務代碼的編寫,而不用考慮消息如何發送和接收。編程

一張比較經典的圖見下:後端

 

Target:做爲消息發送者,須要在target中指定消息要發送到的topic,exchange, binding-key, consumer等信息。多線程

Transport(傳輸層)主要實現RPC底層的通訊(好比socket)以及事件循環,多線程等其餘功能.能夠經過URL來得到不一樣transport的句柄.URL的格式爲:socket

 transport://user:password@host:port[,hostN:portN]/virtual_host函數

 目前支持的Transport有rabbit,qpid與zmq,分別對應不一樣的後端消息總線.用戶能夠使用oslo.messaging.get_transport函數來得到transport對象實例的句柄.性能

Notifier:消息的發送端,能夠在不一樣的優先級別上發送通知,這些優先級包括sample,critical,error,warn,info,debug,audit等spa

Notification Listener和Server相似,一個Notification Listener對象能夠暴露多個endpoint,每一個endpoint包含一組方法.可是與Server對象中的endpoint不一樣的是,這裏的endpoint中的方法對應通知消息的不一樣優先級。在發送消息時,指定方法info,warn等,在notifer listener監聽消息隊列,使用dispatcher對象根據消息的publish_id, event_type將消息路由到不一樣的endpoint方法上。線程

舉個例子,在notifier listener端程序見下:debug

 1 from  oslo_config import cfg  2 import oslo_messaging  3 
 4 class NotificationEndpoint(object):  5 # filter_rule = oslo_messaging.NotificationFilter(
 6 # publish_id='^compute.*')
 7     def warn(self, ctxt, publish_id, event_type, payload, metadata):  8         print "caesar==> %s"  % payload  9 
10 class ErrorEndpoint(object): 11 # filter_rule = oslo_messaging.NotificationFilter(
12 # event_type='^instance\..*\.start',
13 # context={'ctxt_key':'regexp'})
14 
15     def error(self, ctxt, publish_id, event_type, payload, metadata): 16         print "caesar==> %s"  % payload 17 
18 transport = oslo_messaging.get_notification_transport(cfg.CONF) 19 endpoints = [ 20  NotificationEndpoint(), 21  ErrorEndpoint() 22 ] 23 targets = [ 24     oslo_messaging.Target(topic='notification'), 25     oslo_messaging.Target(topic='notification_bis') 26 ] 27 
28 server = oslo_messaging.get_notification_listener(transport, targets, 29  endpoints) 30 server.start() 31 server.wait()

 

程序中,兩個endpoint中分別有error和warn方法,當開啓服務時,會建立四個topic消息 隊列,見下:

在客戶端,經過notifier中topic和方法,好比topic=notification 方法爲error,便可以向notification.error隊列中傳入數據。

 1 from oslo_config import cfg  2 import oslo_messaging as messaging  3 
 4 transport = messaging.get_transport(cfg.CONF)  5 notifier = messaging.Notifier(transport, driver='messaging', topics=['notification'])  6 project_id = 'b23a5e41d1af4c20974bf58b4dff8e5a'
 7 user_id = 'ceb61464a3d341ebabdf97d1d4b97099'
 8 notifier.error(ctxt={},  9                 event_type='my_type', 10                 payload={ 11             'tenant_id': project_id, 12             'user_id': user_id, 13             'instance_id': '123', 14             'instance_type_id': 1, 15             'instance_type': 'm1.flavor', 16             'state': 'active'
17 
18 })

 執行notifier程序,查詢消息隊列爲空,即已經被notification listnener消費,消息無阻塞。:

在notification listnener 路由到ErrorEndpoint的error方法,打印結果見下:

 

相關文章
相關標籤/搜索