以前的幾篇文章介紹了一下RabbitMQ的概念以及環境的搭建和配置,有了RabbitMQ環境就能夠基於其實現一些特殊的任務場景了。RabbitMQ官方有個很好的Tutorials基本覆蓋了RabbitMQ的各中常見應用場景,現以代碼加註釋的方式以其Python客戶端pika爲例簡單介紹以下。更詳盡的信息可參閱:http://www.rabbitmq.com/getstarted.html 。css
以前的幾篇文章:
RabbitMQ概念及環境搭建(一)單節點安裝與配置
RabbitMQ概念及環境搭建(二)RabbitMQ Broker管理
RabbitMQ概念及環境搭建(三)RabbitMQ cluster
RabbitMQ概念及環境搭建(四)RabbitMQ High Availability
RabbitMQ概念及環境搭建(五)與web的整合
html
RabbitMQ是一個消息代理,從「生產者」接收消息並傳遞消息至「消費者」,期間可根據規則路由、緩存、持久化消息。「生產者」也即message發送者如下簡稱P,相對應的「消費者」乃message接收者如下簡稱C,message經過queue由P到C,queue存在於RabbitMQ,可存儲儘量多的message,多個P可向同一queue發送message,多個C可從同一個queue接收message。
python
應用場景1-「Hello Word」web
一個P向queue發送一個message,一個C從該queue接收message並打印。緩存
send.py
producer,鏈接至RabbitMQ Server,聲明隊列,發送message,關閉鏈接,退出。併發
#!/usr/bin/python27 #encoding:utf8 import pika #與RabbitMQ Server創建鏈接 #鏈接到的broker在本機-localhost上 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #聲明隊列以向其發送消息消息 #向不存在的位置發送消息時RabbitMQ將消息丟棄 #queue='hello'指定隊列名字 channel.queue_declare(queue='hello', durable=True) #message不能直接發送給queue,需經exchange到達queue,此處使用以空字符串標識的默認的exchange #使用默認exchange時容許經過routing_key明確指定message將被髮送給哪一個queue #body參數指定了要發送的message內容 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" #關閉與RabbitMq Server間的鏈接 connection.close()
receive.py
consumer,鏈接至RabbitMQ Server,聲明隊列,接收消息並進行處理這裏爲打印出消息,退出。函數
#!/usr/bin/env python #encoding:utf8 import pika #創建到達RabbitMQ Server的connection #此處RabbitMQ Server位於本機-localhost connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #聲明queue,確認要從中接收message的queue #queue_declare函數是冪等的,可運行屢次,但只會建立一次 #若能夠確信queue是已存在的,則此處可省略該聲明,如producer已經生成了該queue #但在producer和consumer中重複聲明queue是一個好的習慣 channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' #定義回調函數 #一旦從queue中接收到一個message回調函數將被調用 #ch:channel #method: #properties: #body:message def callback(ch, method, properties, body): print " [x] Received %r" % (body,) #從queue接收message的參數設置 #包括從哪一個queue接收message,用於處理message的callback,是否要確認message #默認狀況下是要對消息進行確認的,以防止消息丟失。 #此處將no_ack明確指明爲True,不對消息進行確認。 channel.basic_consume(callback, queue='hello', no_ack=True) #開始循環從queue中接收message並使用callback進行處理 channel.start_consuming()
測試測試
python send.py
python receive.py
應用場景2-work queuesfetch
將耗時的消息處理經過隊列分配給多個consumer來處理,咱們稱此處的consumer爲worker,咱們將此處的queue稱爲Task Queue,其目的是爲了不資源密集型的task的同步處理,也即當即處理task並等待完成。相反,調度task使其稍後被處理。也即把task封裝進message併發送到task queue,worker進程在後臺運行,從task queue取出task並執行job,若運行了多個worker,則task可在多個worker間分配。ui
new_task.py
創建鏈接,聲明隊列,發送能夠模擬耗時任務的message,斷開鏈接、退出。
#!/usr/bin/env python #encoding:utf8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #僅僅對message進行確認不能保證message不丟失,好比RabbitMQ崩潰了queue就會丟失 #所以還需使用durable=True聲明queue是持久化的,這樣即使Rabb崩潰了重啓後queue仍然存在 channel.queue_declare(queue='task_queue', durable=True) #從命令行構造將要發送的message message = ' '.join(sys.argv[1:]) or "Hello World!" #除了要聲明queue是持久化的外,還需聲明message是持久化的 #basic_publish的properties參數指定message的屬性 #此處pika.BasicProperties中的delivery_mode=2指明message爲持久的 #這樣一來RabbitMQ崩潰重啓後queue仍然存在其中的message也仍然存在 #需注意的是將message標記爲持久的並不能徹底保證message不丟失,由於 #從RabbitMQ接收到message到將其存儲到disk仍需一段時間,若此時RabbitMQ崩潰則message會丟失 #何況RabbitMQ不會對每條message作fsync動做 #可經過publisher confirms實現更強壯的持久性保證 channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
worker.py
創建鏈接,聲明隊列,不斷的接收message,處理任務,進行確認。
#!/usr/bin/env python #encoding:utf8 import pika import time #默認狀況RabbirMQ將message以round-robin方式發送給下一個consumer #每一個consumer接收到的平均message量是同樣的 #能夠同時運行兩個或三個該程序進行測試 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #僅僅對message進行確認不能保證message不丟失,好比RabbitMQ崩潰了 #還需使用durable=True聲明queue是持久化的,這樣即使Rabb崩潰了重啓後queue仍然存在其中的message不會丟失 #RabbitMQ中不容許使用不一樣的參數定義同名queue channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' #回調函數,函數體模擬耗時的任務處理:以message中'.'的數量表示sleep的秒數 def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" #對message進行確認 ch.basic_ack(delivery_tag = method.delivery_tag) #若存在多個consumer每一個consumer的負載可能不一樣,有些處理的快有些處理的慢 #RabbitMQ並無論這些,只是簡單的以round-robin的方式分配message #這可能形成某些consumer積壓不少任務處理不完而一些consumer長期處於飢餓狀態 #可使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer處理並確認了上一個message後才分配新的message給他 #不然分給另外一個空閒的consumer channel.basic_qos(prefetch_count=1) #這裏移除了no_ack=True這個參數,也即須要對message進行確認(默認行爲) #不然consumer在偶然down後其正在處理和分配到該consumer還未處理的message可能發生丟失 #由於此時RabbitMQ在發送完message後當即從內存刪除該message #假如沒有設置no_ack=True則consumer在偶然down掉後其正在處理和分配至該consumer但還將來得及處理的message會從新分配到其餘consumer #沒有設置no_ack=True則consumer在收到message後會向RabbitMQ反饋已收到並處理了message告訴RabbitMQ能夠刪除該message #RabbitMQ中沒有超時的概念,只有在consumer down掉後從新分發message channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
測試
python new_task.py "A very hard task which takes two seconds.."
python worker.py
應用場景3-Publish/Subscribe
在應用場景2中一個message(task)僅被傳遞給了一個comsumer(worker)。如今咱們設法將一個message傳遞給多個consumer。這種模式被稱爲publish/subscribe。此處以一個簡單的日誌系統爲例進行說明。該系統包含一個log發送程序和一個log接收並打印的程序。由log發送者發送到queue的消息能夠被全部運行的log接收者接收。所以,咱們能夠運行一個log接收者直接在屏幕上顯示log,同時運行另外一個log接收者將log寫入磁盤文件。
receive_logs.py
日誌消息接收者:創建鏈接,聲明exchange,將exchange與queue進行綁定,開始不停的接收log並打印。
#!/usr/bin/env python #encoding:utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #做爲好的習慣,在producer和consumer中分別聲明一次以保證所要使用的exchange存在 channel.exchange_declare(exchange='logs', type='fanout') #在不一樣的producer和consumer間共享queue時指明queue的name是重要的 #但某些時候,好比日誌系統,須要接收全部的log message而非一個子集 #並且僅對當前的message 流感興趣,對於過期的message不感興趣,那麼 #能夠申請一個臨時隊列這樣,每次鏈接到RabbitMQ時會以一個隨機的名字生成 #一個新的空的queue,將exclusive置爲True,這樣在consumer從RabbitMQ斷開後會刪除該queue result = channel.queue_declare(exclusive=True) #用於獲取臨時queue的name queue_name = result.method.queue #exchange與queue之間的關係成爲binding #binding告訴exchange將message發送該哪些queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) #從指定地queue中consume message且不確認 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
emit_log.py
日誌消息發送者:創建鏈接,聲明fanout類型的exchange,經過exchage向queue發送日誌消息,消息被廣播給全部接收者,關閉鏈接,退出。
#!/usr/bin/env python #encoding:utf8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #producer只能經過exchange將message發給queue #exchange的類型決定將message路由至哪些queue #可用的exchange類型:direct\topic\headers\fanout #此處定義一個名稱爲'logs'的'fanout'類型的exchange,'fanout'類型的exchange簡單的將message廣播到它所知道的全部queue channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" #將message publish到名爲log的exchange中 #由於是fanout類型的exchange,這裏無需指定routing_key channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
測試
python receive_logs.py
python emit_log.py "info: This is the log message"
應用場景4-Routing
應用場景3中構建了簡單的log系統,能夠將log message廣播至多個receiver。如今咱們將考慮只把指定的message類型發送給其subscriber,好比,只把error message寫到log file而將全部log message顯示在控制檯。
receive_logs_direct.py
log message接收者:創建鏈接,聲明direct類型的exchange,聲明queue,使用提供的參數做爲routing_key將queue綁定到exchange,開始循環接收log message並打印。
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #聲明一個名爲direct_logs類型爲direct的exchange #同時在producer和consumer中聲明exchage或queue是個好習慣,以保證其存在 channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #從命令行獲取參數:routing_key severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],) sys.exit(1) for severity in severities: #exchange和queue之間的binding可接受routing_key參數 #該參數的意義依賴於exchange的類型 #fanout類型的exchange直接忽略該參數 #direct類型的exchange精確匹配該關鍵字進行message路由 #對多個queue使用相同的binding_key是合法的 channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
emit_log_direct.py
log message發送者:創建鏈接,聲明direct類型的exchange,生成併發送log message到exchange,關閉鏈接,退出。
#!/usr/bin/env python #encoding:utf8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #聲明一個名爲direct_logs的direct類型的exchange #direct類型的exchange channel.exchange_declare(exchange='direct_logs', type='direct') #從命令行獲取basic_publish的配置參數 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' #向名爲direct_logs的exchage按照設置的routing_key發送message channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close()
測試:
python receive_logs_direct.py info
python emit_log_direct.py info "The message"
應用場景5-topic
應用場景4中改進的log系統中用direct類型的exchange替換應用場景3中的fanout類型exchange實現將不一樣的log message發送給不一樣的subscriber(也即分別經過不一樣的routing_key將queue綁定到exchange,這樣exchange即可將不一樣的message根據message內容路由至不一樣的queue)。但仍然存在限制,不能根據多個規則路由消息,好比接收者要麼只能收error類型的log message要麼只能收info類型的message。若是咱們不只想根據log的重要級別如info、warning、error等來進行log message路由還想同時根據log message的來源如auth、cron、kern來進行路由。爲了達到此目的,須要topic類型的exchange。topic類型的exchange中routing_key中能夠包含兩個特殊字符:「*」用於替代一個詞,「#」用於0個或多個詞。
receive_logs_topic.py
log message接收者:創建鏈接,聲明topic類型的exchange,聲明queue,根據程序參數構造routing_key,根據routing_key將queue綁定到exchange,循環接收並處理message。
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #聲明一個名爲direct_logs類型爲direct的exchange #同時在producer和consumer中聲明exchage或queue是個好習慣,以保證其存在 channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #從命令行獲取參數:routing_key severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],) sys.exit(1) for severity in severities: #exchange和queue之間的binding可接受routing_key參數 #該參數的意義依賴於exchange的類型 #fanout類型的exchange直接忽略該參數 #direct類型的exchange精確匹配該關鍵字進行message路由 #對多個queue使用相同的binding_key是合法的 channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
emit_log_topic.py
log message發送者:創建鏈接、聲明topic類型的exchange、根據程序參數構建routing_key和要發送的message,以構建的routing_key將message發送給topic類型的exchange,關閉鏈接,退出。
#!/usr/bin/env python #encoding:utf8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #聲明一個名爲topic_logs的topic類型的exchange #topic類型的exchange可經過通配符對message進行匹配從而路由至不一樣queue channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print " [x] Sent %r:%r" % (routing_key, message) connection.close()
測試:
python receive_logs_topic.py "*.rabbit"
python emit_log_topic.py red.rabbit Hello
應用場景6-PRC
在應用場景2中描述瞭如何使用work queue將耗時的task分配到不一樣的worker中。可是,若是咱們task是想在遠程的計算機上運行一個函數並等待返回結果呢。這根場景2中的描述是一個徹底不一樣的故事。這一模式被稱爲遠程過程調用。如今,咱們將構建一個RPC系統,包含一個client和可擴展的RPC server,經過返回斐波那契數來模擬RPC service。
rpc_server.py
RPC server:創建鏈接,聲明queue,定義了一個返回指定數字的斐波那契數的函數,定義了一個回調函數在接收到包含參數的調用請求後調用本身的返回斐波那契數的函數並將結果發送到與接收到message的queue相關聯的queue,並進行確認。開始接收調用請求並用回調函數進行請求處理。
#!/usr/bin/env python #encoding:utf8 import pika #創建到達RabbitMQ Server的connection connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #聲明一個名爲rpc_queue的queue channel.queue_declare(queue='rpc_queue') #計算指定數字的斐波那契數 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) #回調函數,從queue接收到message後調用該函數進行處理 def on_request(ch, method, props, body): #由message獲取要計算斐波那契數的數字 n = int(body) print " [.] fib(%s)" % (n,) #調用fib函數得到計算結果 response = fib(n) #exchage爲空字符串則將message發送個到routing_key指定的queue #這裏queue爲回調函數參數props中reply_ro指定的queue #要發送的message爲計算所得的斐波那契數 #properties中correlation_id指定爲回調函數參數props中co的rrelation_id #最後對消息進行確認 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) #只有consumer已經處理並確認了上一條message時queue才分派新的message給它 channel.basic_qos(prefetch_count=1) #設置consumeer參數,即從哪一個queue獲取消息使用哪一個函數進行處理,是否對消息進行確認 channel.basic_consume(on_request, queue='rpc_queue') print " [x] Awaiting RPC requests" #開始接收並處理消息 channel.start_consuming()
rpc_client.py
RPC client:遠程過程調用發起者:定義了一個類,類中初始化到RabbitMQ Server的鏈接、聲明回調queue、開始在回調queue上等待接收響應、定義了在回調queue上接收到響應後的處理函數on_response根據響應關聯的correlation_id屬性做出響應、定義了調用函數並在其中向調用queue發送包含correlation_id等屬性的調用請求、初始化一個client實例,以30爲參數發起遠程過程調用。
#!/usr/bin/env python #encoding:utf8 import pika import uuid #在一個類中封裝了connection創建、queue聲明、consumer配置、回調函數等 class FibonacciRpcClient(object): def __init__(self): #創建到RabbitMQ Server的connection self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #聲明一個臨時的回調隊列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #此處client既是producer又是consumer,所以要配置consume參數 #這裏的指明從client本身建立的臨時隊列中接收消息 #並使用on_response函數處理消息 #不對消息進行確認 self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定義回調函數 #比較類的corr_id屬性與props中corr_id屬性的值 #若相同則response屬性爲接收到的message def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): #初始化response和corr_id屬性 self.response = None self.corr_id = str(uuid.uuid4()) #使用默認exchange向server中定義的rpc_queue發送消息 #在properties中指定replay_to屬性和correlation_id屬性用於告知遠程server #correlation_id屬性用於匹配request和response self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), #message需爲字符串 body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) #生成類的實例 fibonacci_rpc = FibonacciRpcClient() print " [x] Requesting fib(30)" #調用實例的call方法 response = fibonacci_rpc.call(30) print " [.] Got %r" % (response,)
測試: