RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。html
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。python
RabbitMQ安裝算法
安裝配置epel源 # 64位源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm # 32位源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安裝erlang(依賴包) $ yum -y install erlang 安裝RabbitMQ $ yum -y install rabbitmq-server
默認端口爲:5672
啓動rabbitmq: service rabbitmq-server start/stopjson
安裝API緩存
pip3 install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pika
使用API操做RabbitMQ服務器
MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取或者訂閱隊列中的消息。下面以消費-生產者模型爲例:網絡
RabbitMQ 的結構圖:app
生產者-消費者模型的簡單實例異步
對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。ide
-----------------------------生產者----------------------------------- # /usr/bin/env python # -*- coding:utf8 -*- # auth rain import pika # 建立鏈接,鏈接到消息隊列服務器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) # 建立通道 channel = connection.channel() # 建立任務隊列 channel.queue_declare(queue='task_queue') # 發佈消息 # exchange -- 它使咱們可以確切地指定消息應該到哪一個隊列去。 # 向隊列插入數值 routing_key是隊列名 body 是要插入的內容 channel.basic_publish( exchange='', routing_key='task_queue', body='test rabbitMQ' ) print("[X] sent 'test rabbitMq'") # 緩衝區已經flush並且消息已經確認發送到了RabbitMQ中,關閉連接 connection.close() # [X] sent 'test rabbitMq'
----------------------------消費者----------------------------------- # /usr/bin/env python # -*- coding:utf8 -*- # auth rain import pika # 建立鏈接,鏈接到消息隊列服務器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 建立通道 channel = connection.channel() # 若是生產者沒有運行建立隊列,那麼消費者也許就找不到隊列了。爲了不這個問題因此消費者也建立這個隊列 channel.queue_declare(queue='task_queue') # 接收消息須要使用callback這個函數來接收,他會被pika庫來調用 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 從隊列取數據 callback是回調函數 若是拿到數據 那麼將執行callback函數 channel.basic_consume(callback, queue='task_queue', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') # 永遠循環等待數據處理和callback處理的數據 channel.start_consuming() ''' [*] Waiting for messages. To exit press CTRL+C [x] Received b'test rabbitMQ' [x] Received b'test rabbitMQ' '''
import pika import sys import time # 建立鏈接,鏈接到消息隊列服務器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.10.36.101')) # 建立通道 channel = connection.channel() # 建立任務隊列 channel.queue_declare(queue='task_queue') # 發佈消息 # exchange -- 它使咱們可以確切地指定消息應該到哪一個隊列去。 # 向隊列插入數值 routing_key是隊列名 body 是要插入的內容 message = ' '.join(sys.argv[1:]) or "Hello World!" # 循環發送數據 for i in range(20): channel.basic_publish( exchange='', routing_key='task_queue', body=message ) time.sleep(0.5) print("[X] sent %s " % message) # 緩衝區已經flush並且消息已經確認發送到了RabbitMQ中,關閉連接 connection.close()
import pika import time # 建立鏈接,鏈接到消息隊列服務器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) # 建立通道 channel = connection.channel() # 若是生產者沒有運行建立隊列,那麼消費者也許就找不到隊列了。爲了不這個問題因此消費者也建立這個隊列 channel.queue_declare(queue='task_queue') # 接收消息須要使用callback這個函數來接收,他會被pika庫來調用 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # time.sleep(body.count('.')) # print('[x] Done') # 從隊列取數據 callback是回調函數 若是拿到數據 那麼將執行callback函數 channel.basic_consume(callback, queue='task_queue', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') # 永遠循環等待數據處理和callback處理的數據 channel.start_consuming()
一、acknowledgment 消息不丟失
消息確認
當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給消費者(consumers)以後,立刻就會在內存中移除。這種狀況,你只要把一個工做者(worker)中止,正在處理的消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。
咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。
爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ就會釋放並刪除這條消息。
若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,及時工做者(workers)偶爾的掛掉,也不會丟失消息。
消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。
消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。
no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。
消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。
ch.basic_ack(delivery_tag=method.delivery_tag)
一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。消息在你的程序退出以後就會從新發送,若是它不可以釋放沒響應的消息,RabbitMQ就會佔用愈來愈多的內存。
爲了排除這種錯誤,你可使用rabbitmqctl命令,輸出messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
import pika import sys import time # 建立鏈接,鏈接到消息隊列服務器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.10.36.101')) # 建立通道 channel = connection.channel() # 建立任務隊列 channel.queue_declare(queue='task_queue') # 發佈消息 # exchange -- 它使咱們可以確切地指定消息應該到哪一個隊列去。 # 向隊列插入數值 routing_key是隊列名 body 是要插入的內容 message = ' '.join(sys.argv[1:]) or "Hello World!" # 循環發送數據 for i in range(20): channel.basic_publish( exchange='', routing_key='task_queue', body=message ) time.sleep(0.5) print("[X] sent %s " % message) # 緩衝區已經flush並且消息已經確認發送到了RabbitMQ中,關閉連接 connection.close()
import pika import time # 建立鏈接,鏈接到消息隊列服務器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) # 建立通道 channel = connection.channel() # 若是生產者沒有運行建立隊列,那麼消費者也許就找不到隊列了。爲了不這個問題因此消費者也建立這個隊列 channel.queue_declare(queue='task_queue') # 接收消息須要使用callback這個函數來接收,他會被pika庫來調用 def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.decode().count('...')) print(" [x] Done") # 消息不丟失 ch.basic_ack(delivery_tag=method.delivery_tag) # 從隊列取數據 callback是回調函數 若是拿到數據 那麼將執行callback函數 channel.basic_consume(callback, queue='task_queue', ) print(' [*] Waiting for messages. To exit press CTRL+C') # 永遠循環等待數據處理和callback處理的數據 channel.start_consuming()
二、消息持久化
若是你沒有特地告訴RabbitMQ,那麼在它退出或者崩潰的時候,將會丟失全部隊列和消息。爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。
首先,爲了避免讓隊列消失,須要把隊列聲明爲持久化(durable):
channel.queue_declare(queue='hello', durable=True)
這個queue_declare必須在生產者(producer)和消費者(consumer)對應的代碼中修改。
這時候,咱們就能夠確保在RabbitMq重啓以後queue_declare隊列不會丟失。另外,咱們須要把咱們的消息也要設爲持久化——將delivery_mode的屬性設爲2。
# 生產者端 channel.basic_publish(exchange='', routing_key='task_queue', properties=pika.BasicProperties( delivery_mode=2, # 即便服務端掛了, 消息也能持久化 )) # 消費者端 def callback(ch, method, properties, body): print(body) time.sleep(body.count()) ch.basic_ack(delivery_tag=method.delivery_tag)
注意:消息持久化
將消息設爲持久化並不能徹底保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間仍是有一個很小的間隔時間。由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付咱們的簡單工做隊列。若是你必定要保證持久化,你須要改寫你的代碼來支持事務(transaction)。
三、公平調度
你應該已經發現,它仍舊沒有按照咱們指望的那樣進行分發。好比有兩個工做者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。
這時由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者
咱們可使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。
channel.basic_qos(prefetch_count=1)
注意:關於隊列大小
若是全部的工做者都處理繁忙狀態,你的隊列就會被填滿。你須要留意這個問題,要麼添加更多的工做者(workers),要麼使用其餘策略。
----------------------------------生產者-------------------------------------------- #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 隊列持久化 durable=True channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # 消息持久化 )) print " [x] Sent %r" % (message,) connection.close()
----------------------------------消費者--------------------------------------------
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 隊列持久化 durable=True channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
四、發佈/訂閱
分發一個消息給多個消費者(consumers)。這種模式被稱爲「發佈/訂閱」。
爲了描述這種模式,咱們將會構建一個簡單的日誌系統。它包括兩個程序——第一個程序負責發送日誌消息,第二個程序負責獲取消息並輸出內容。
在咱們的這個日誌系統中,全部正在運行的接收方程序都會接受消息。咱們用其中一個接收者(receiver)把日誌寫入硬盤中,另一個接受者(receiver)把日誌輸出到屏幕上。
最終,日誌消息被廣播給全部的接受者(receivers)。
交換機(Exchanges)
RabbitMQ消息模型的核心理念是:發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。
發佈者(producer)只須要把消息發送給一個交換機(exchange)。交換機很是簡單,它一邊從發佈者方接收消息,一邊把消息推送到隊列。交換機必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是是多個隊列,或者是直接忽略消息。這些規則是經過交換機類型(exchange type)來定義的。
交換機類型:
直連交換機(direct),
主題交換機(topic),
頭交換機 (headers),
扇型交換機(fanout)。
channel.exchange_declare(exchange='logs', type='fanout')
扇型交換機(fanout)很簡單,你可能從名字上就能猜想出來,它把消息發送給它所知道的全部隊列。這正是咱們的日誌系統所須要的。
匿名的交換器
前面的教程中咱們對交換機一無所知,但仍然可以發送消息到隊列中。由於咱們使用了命名爲空字符串("")默認的交換機。
回想咱們以前是如何發佈一則消息:
channel.basic_publish(exchange='', routing_key='hello', body=message)exchange參數就是交換機的名稱。空字符串表明默認或者匿名交換機:消息將會根據指定的routing_key分發到指定的隊列。
如今,咱們就能夠發送消息到一個具名交換機了:
channel.basic_publish(exchange='logs', routing_key='', body=message)
臨時隊列
第一步, 當咱們鏈接上RabbitMQ的時候,咱們須要一個全新的、空的隊列。咱們能夠手動建立一個隨機的隊列名,或者讓服務器爲咱們選擇一個隨機的隊列名(推薦)。咱們只須要在調用queue_declare方法的時候,不提供queue參數就能夠了:
result = channel.queue_declare()
這時候咱們能夠經過result.method.queue得到已經生成的隨機隊列名。它多是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,當與消費者(consumer)斷開鏈接的時候,這個隊列應當被當即刪除。exclusive標識符便可達到此目的。
result = channel.queue_declare(exclusive=True)
綁定(Bindings)
咱們已經建立了一個扇型交換機(fanout)和一個隊列。如今咱們須要告訴交換機如何發送消息給咱們的隊列。交換器和隊列之間的聯繫咱們稱之爲綁定(binding)。
channel.queue_bind(exchange='logs', queue=result.method.queue)
如今,logs交換機將會把消息添加到咱們的隊列中。
綁定(binding)列表
你可使用
rabbitmqctl list_bindings
列出全部現存的綁定。
一個發送日誌的實例
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() # 指定exchange,其類型爲fanout channel.exchange_declare(exchange='logs', type='fanout') message = ''.join(sys.argv[1:] or 'hello world') # 發送給指定的exchange channel.publish( exchange='logs', routing_key='', body='message' ) print(" [x] Sent %r" % (message,)) connection.close()
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() # 指定exchange,其類型爲fanout channel.exchange_declare(exchange='logs', type='fanout') # 建立臨時(queue)隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 將queue綁定到指定的exchange上 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % (body,)) channel.basic_consume(callback, queue=queue_name, no_ack=True ) channel.start_consuming()
綁定(binding)是指交換機(exchange)和隊列(queue)的關係。能夠簡單理解爲:這個隊列(queue)對這個交換機(exchange)的消息感興趣。
綁定的時候能夠帶上一個額外的routing_key參數。爲了不與basic_publish的參數混淆,咱們把它叫作綁定鍵(binding key)。如下是如何建立一個帶綁定鍵的綁定。
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
綁定鍵的意義取決於交換機(exchange)的類型。咱們以前使用過的扇型交換機(fanout exchanges)會忽略這個值。
應用場景:
咱們的日誌系統廣播全部的消息給全部的消費者(consumers)。咱們打算擴展它,使其基於日誌的嚴重程度進行消息過濾。例如咱們也許只是但願將比較嚴重的錯誤(error)日誌寫入磁盤,以避免在警告(warning)或者信息(info)日誌上浪費磁盤空間。
咱們使用的扇型交換機(fanout exchange)沒有足夠的靈活性 —— 它能作的僅僅是廣播。
咱們將會使用直連交換機(direct exchange)來代替。路由的算法很簡單 —— 交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而肯定消息該分發到哪一個隊列。
下圖可以很好的描述這個場景:
在這個場景中,咱們能夠看到直連交換機 X和兩個隊列進行了綁定。第一個隊列使用orange做爲綁定鍵,第二個隊列有兩個綁定,一個使用black做爲綁定鍵,另一個使用green。
這樣以來,當路由鍵爲orange的消息發佈到交換機,就會被路由到隊列Q1。路由鍵爲black或者green的消息就會路由到Q2。其餘的全部消息都將會被丟棄。
多個綁定(Multiple bindings)
多個隊列使用相同的綁定鍵是合法的。這個例子中,咱們能夠添加一個X和Q1之間的綁定,使用black綁定鍵。這樣一來,直連交換機就和扇型交換機的行爲同樣,會將消息廣播到全部匹配的隊列。帶有black路由鍵的消息會同時發送到Q1和Q2。
發送日誌
咱們將會發送消息到一個直連交換機,把日誌級別做爲路由鍵。這樣接收日誌的腳本就能夠根據嚴重級別來選擇它想要處理的日誌。咱們先看看發送日誌。
severity = ['info','warning','error']
咱們須要建立一個交換機(exchange): channel.exchange_declare(exchange='direct_logs', type='direct') 而後咱們發送一則消息: channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) 咱們先假設「severity」的值是info、warning、error中的一個。
訂閱 處理接收消息的方式和以前差很少,只有一個例外,咱們將會爲咱們感興趣的每一個嚴重級別分別建立一個新的綁定。 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
1.生產者:
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
2.消費者:
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
發送到主題交換機(topic exchange)的消息不能夠攜帶隨意什麼樣子的路由鍵(routing_key),它的路由鍵必須是一個由.
分隔開的詞語列表。這些單詞隨即是什麼均可以,可是最好是跟攜帶它們的消息有關係的詞彙。如下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數能夠隨意,可是不要超過255字節。
綁定鍵也必須擁有一樣的格式。主題交換機背後的邏輯跟直連交換機很類似 —— 一個攜帶着特定路由鍵的消息會被主題交換機投遞給綁定鍵與之想匹配的隊列。可是它的綁定鍵和路由鍵有兩個特殊應用方式:
*
(星號) 用來表示一個單詞.#
(井號) 用來表示任意數量(零個或多個)單詞。
這個例子裏,咱們發送的全部消息都是用來描述小動物的。發送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個 . 分割開。路由鍵裏的第一個單詞描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類。因此它看起來是這樣的: <celerity>.<colour>.<species> 。 咱們建立了三個綁定:Q1的綁定鍵爲 *.orange.* ,Q2的綁定鍵爲 *.*.rabbit 和 lazy.# 。 這三個綁定鍵被能夠總結爲: •Q1 對全部的桔黃色動物都感興趣。 •Q2 則是對全部的兔子和全部懶惰的動物感興趣。 一個攜帶有 quick.orange.rabbit 的消息將會被分別投遞給這兩個隊列。攜帶着 lazy.orange.elephant 的消息一樣也會給兩個隊列都投遞過去。另外一方面攜帶有 quick.orange.fox 的消息會投遞給第一個隊列,攜帶有 lazy.brown.fox 的消息會投遞給第二個隊列。攜帶有 lazy.pink.rabbit 的消息只會被投遞給第二個隊列一次,即便它同時匹配第二個隊列的兩個綁定。攜帶着 quick.brown.fox 的消息不會投遞給任何一個隊列。 若是咱們違反約定,發送了一個攜帶有一個單詞或者四個單詞( "orange" or "quick.orange.male.rabbit" )的消息時,發送的消息不會投遞給任何一個隊列,並且會丟失掉。 可是另外一方面,即便 "lazy.orange.male.rabbit" 有四個單詞,他仍是會匹配最後一個綁定,而且被投遞到第二個隊列中。
主題交換機 主題交換機是很強大的,它能夠表現出跟其餘交換機相似的行爲 當一個隊列的綁定鍵爲 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收全部的消息。 當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行爲。
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
執行下邊命令 接收全部日誌: python receive_logs_topic.py "#" 執行下邊命令 接收來自」kern「設備的日誌: python receive_logs_topic.py "kern.*" 執行下邊命令 只接收嚴重程度爲」critical「的日誌: python receive_logs_topic.py "*.critical" 執行下邊命令 創建多個綁定: python receive_logs_topic.py "kern.*" "*.critical" 執行下邊命令 發送路由鍵爲 "kern.critical" 的日誌: python emit_log_topic.py "kern.critical" "A critical kernel error" 執行上邊命令試試看效果吧。另外,上邊代碼不會對路由鍵和綁定鍵作任何假設,因此你能夠在命令中使用超過兩個路由鍵參數。
遠程過程調用(RPC Remote Procedure Call)
使用RabbitMQ來構建一個RPC系統:包含一個客戶端和一個RPC服務器。
AMQP協議給消息預約義了一系列的14個屬性。大多數屬性不多會用到,除了如下幾個:
關聯標識
咱們建議給每個RPC請求新建一個回調隊列。這不是一個高效的作法,幸虧這兒有一個更好的辦法 —— 咱們能夠爲每一個客戶端只創建一個獨立的回調隊列。
這就帶來一個新問題,當此隊列接收到一個響應的時候它沒法辨別出這個響應是屬於哪一個請求的。correlation_id 就是爲了解決這個問題而來的。咱們給每一個請求設置一個獨一無二的值。稍後,當咱們從回調隊列中接收到一個消息的時候,咱們就能夠查看這條屬性從而將響應和請求匹配起來。若是咱們接手到的消息的correlation_id是未知的,那就直接銷燬掉它,由於它不屬於咱們的任何一條請求。
你也許會問,爲何咱們接收到未知消息的時候不拋出一個錯誤,而是要將它忽略掉?這是爲了解決服務器端有可能發生的競爭狀況。儘管可能性不大,但RPC服務器仍是有可能在已將應答發送給咱們但還未將確認消息發送給請求的狀況下死掉。若是這種狀況發生,RPC在重啓後會從新處理請求。這就是爲何咱們必須在客戶端優雅的處理重複響應,同時RPC也須要儘量保持冪等性。
RPC工做流程:
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print("[.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id,), body=str(response) ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
#!/bin/bin/env python # -*-coding:utf-8 -*- # Author : rain import pika import uuid class FibonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id,), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % (response,))
此處呈現的設計並非實現RPC服務的惟一方式,可是他有一些重要的優點:
咱們的代碼依舊很是簡單,並且沒有試圖去解決一些複雜(可是重要)的問題,如:
rabbitMQ中文文檔: http://rabbitmq.mr-ping.com/tutorials_with_python/[1]Hello_World.html