- 1. RabbitMQ的安裝
- 1.1 配置好 epel
- 1.2 安裝 RPM包
- 1.3 建立用戶設置權限
- 2. RabbitMQ組件
- 3. RabbitMQ-Server 六種消息類型
- 3.1 "Hello World"
- 3.1.1 代碼整合
- 3.2 Work queues (工做隊列模式)
- 3.2.1 代碼整合
- 3.2.2 循環調度
- 3.2.3 消息確認
- 3.2.4 忘記確認
- 3.2.5 消息持久化
- 3.2.6 公平調度
- 3.2.7 代碼整合
- 3.3 Publish/Subscribe(發佈/訂閱)
- 3.3.1 交換機(Exchange)
- 3.3.2 交換器列表
- 3.3.3 匿名的交換器
- 3.3.4 臨時隊列
- 3.3.5 綁定(Bindings)
- 3.3.6 代碼整合
- 3.4 Routing (路由)
- 3.4.1 直連交換機(Direct exchange)
- 3.4.2 多個綁定(Multiple bindings)
- 3.4.3 代碼整合
- 3.5 Topics(主題交換機)
- 3.5.1 主題交換機
- 3.5.2 代碼整合
- 3.6 RPC (遠程過程調試模式)
- 3.6.1 回調隊列
- 3.6.2 消息屬性
- 3.6.3 關聯標識
- 3.6.4 整合代碼python
# For EL5: rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6: rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7: rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
[root@192.168.118.15 /opt]# ls erlang-19.0.4-1.el6.x86_64.rpm rabbitmq-server-3.6.13-1.el7.noarch.rpm [root@192.168.118.15 /opt]# yum install *.rpm -y [root@192.168.118.15 /opt]#systemctl start rabbitmq-server [root@192.168.118.15 /opt]#netstat -ntplu | egrep beam tcp 0 0 0.0.0.0:5672 0.0.0.0:* LISTEN 2130/beam.smp tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 2130/beam.smp 服務啓動成功。
[root@192.168.118.15 /opt]#rabbitmqctl add_user admin admin Creating user "admin" 設置 admin 爲管理員角色 [root@192.168.118.15 /opt]#rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] 設置權限 [root@192.168.118.15 /opt]#rabbitmqctl add_vhost /admin Creating vhost "/admin" [root@192.168.118.15 /opt]#rabbitmqctl set_permissions -p '/admin' admin '.*' '.*' '.*' Setting permissions for user "admin" in vhost "/admin" 開啓web管理後臺 [root@192.168.118.15 /opt]#rabbitmq-plugins enable rabbitmq_management The following plugins have been enabled: amqp_client cowlib cowboy rabbitmq_web_dispatch rabbitmq_management_agent rabbitmq_management Applying plugin configuration to rabbit@localhost... started 6 plugins. 後臺監聽在 15672 端口,開啓成功 [root@192.168.118.15 /opt]#netstat -ntplu | egrep beam tcp 0 0 0.0.0.0:5672 0.0.0.0:* LISTEN 2130/beam.smp tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 2130/beam.smp tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN 2130/beam.smp
經過web頁面登陸查看用戶配置是否正確。程序員
ok,用戶配置完成。至此,RabbitMQ 安裝完畢。web
AMQP協議是一個高級抽象層消息通訊協議,RabbitMQ是AMQP協議的實現。它主要包括如下組件:算法
(1) Server(broker):接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程
(2) Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host
(3) Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。
(4) Message Queue:消息隊列,用於存儲還未被消費者消費的消息。
(5) Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。
(6) Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。
(7) Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。
(8) Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。
(9) Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。
json
以上 6 種類型經過 python 鏈接並測試。緩存
功能:一個生產者 P 發送消息到隊列Q,一個消費者 C 接收服務器
3.1.1 代碼整合網絡
import pika # 首先要安裝 pika 模塊
credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='hello') # 聲明一個隊列 hello
channel.basic_publish(exchange='', routing_key='hello', # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
body='Hello World!') # 消息內容
print('[x]Sent "Hello World!"') connection.close() # 鏈接關閉
多運行幾回後,在RabbitMQ服務器上查看app
[root@192.168.118.15 ~]#rabbitmqctl list_queues -p '/admin' Listing queues hello 5
後臺管理查看:異步
接下來,建立一個 consumer 來消費隊列中的這些消息。
import pika import time credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='hello') # 聲明一個隊列 hello
def callback(ch, method, properties, body): # 四個參數爲標準格式
print('>>>>>>> %s' % body) time.sleep(5) channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue='hello', # 你要從那個隊列裏收消息
no_ack=True # 收到消息當即返回ack確認信息
) print('[x] Waiting for messages. To exit press CTRL+C.') channel.start_consuming() # 開始消費消息
運行結果:
功能:一個生產者,多個消費者,每一個消費者獲取到的消息惟一,多個消費者只有一個隊列。
3.2.1 代碼整合
import pika # 首先要安裝 pika 模塊
credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='hello') # 聲明一個隊列 hello
channel.basic_publish(exchange='', routing_key='hello', # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
body='Hello World!') # 消息內容
print('[x]Sent "Hello World!"') connection.close() # 隊列關閉
import pika import time credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='hello') # 聲明一個隊列 hello
def callback(ch, method, properties, body): # 四個參數爲標準格式
# print(ch)
# print(method)
# print(properties)
print('>>>>>>> %s' % body) time.sleep(5) # ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue='hello' # 你要從那個隊列裏收消息
# no_ack = True
) print('[x] Waiting for messages. To exit press CTRL+C.') channel.start_consuming() # 開始消費消息
運行8次producer.py 後,生成8條消息進入隊列,再次開啓2個consumer.py 進行消費,如圖:
默認來講,RabbitMQ會按順序得把消息發送給每一個消費者,平均每一個消費者都會收到同等數量得消息。這種發送消息得方式叫作——輪詢(round-robin)。
3.2.2 循環調度
經過上面的測試,咱們發現每一個消費者默認是平均分配全部消息,所以這種方式擴容起來很簡單,若是隊列中堆積了不少任務,咱們只須要添加更多的消費者就能夠了。
3.2.3 消息確認
當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給消費者(consumers)以後,立刻就會在內存中移除。這種狀況,你只要把一個工做者(worker)中止,正在處理的消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。
咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。
爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ就會釋放並刪除這條消息。
若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,即便工做者(workers)偶爾的掛掉,也不會丟失消息。
消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。
消息響應默認是開啓的。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。
import pika import time credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='hello') # 聲明一個隊列 hello
def callback(ch, method, properties, body): # 四個參數爲標準格式
# print(ch)
# print(method)
# print(properties)
print('>>>>>>> %s' % body) time.sleep(1) ch.basic_ack(delivery_tag=method.delivery_tag) # 當消息處理完畢後,告訴生成者,消息處理完成
channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue='hello' # 你要從那個隊列裏收消息
) print('[x] Waiting for messages. To exit press CTRL+C.') channel.start_consuming() # 開始消費消息
3.2.4 忘記確認
運行上面的代碼,咱們發現即便消費者中斷,消息也不會消失。當工做者(worker)掛掉後,全部沒有響應的消息都會被從新發送,若是它不可以釋放沒響應的消息,RabbitMQ就會佔用愈來愈多的內存。
用了排除這種錯誤,可使用 rabbitmqctl 命令,輸出 messages_unacknowledged字段:
[root@192.168.118.15 ~]#rabbitmqctl list_queues -p '/admin' messages_ready messages_unacknowledged Listing queues 0 20
綜上,在編寫消費者程序的時候建議使用第三種方式。
3.2.5 消息持久化
若是你沒有特地告訴RabbitMQ,那麼在它退出或者崩潰的時候,將會丟失全部隊列和消息。爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。
首先,爲了避免讓隊列消失,須要把隊列聲明爲持久化(durable):
channel.queue_declare(queue='hello', durable=True)
儘管這行代碼自己是正確的,可是仍然不會正確運行。由於咱們已經定義過一個叫hello的非持久化隊列。RabbitMq不容許你使用不一樣的參數從新定義一個隊列,它會返回一個錯誤。但咱們如今使用一個快捷的解決方法——用不一樣的名字,例如task_queue。
channel.queue_declare(queue='task_queue', durable=True)
這個queue_declare必須在生產者(producer)和消費者(consumer)對應的代碼中修改。
import pika # 首先要安裝 pika 模塊
credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='task_queue', durable=True) # 聲明一個隊列 hello
for i in range(1, 21): channel.basic_publish(exchange='', routing_key='hello', # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
body='%s' % i, properties=pika.BasicProperties( delivery_mode=2, # 讓消息持久化
)) # 消息內容
print('[x]Sent "Hello World!"') connection.close() # 隊列關閉
修改完成,運行 producer.py
import pika import time credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='task_queue', durable=True) # 聲明一個隊列 hello
def callback(ch, method, properties, body): # 四個參數爲標準格式
# print(ch)
# print(method)
# print(properties)
print('>>>>>>> %s' % body) time.sleep(1) ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue='task_queue' # 你要從那個隊列裏收消息
# no_ack=True
) print('[x] Waiting for messages. To exit press CTRL+C.') channel.start_consuming() # 開始消費消息
使用 consumer.py 就能夠消費消息。
持久化注意:
將消息設爲持久化並不能徹底保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間仍是有一個很小的間隔時間。由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付咱們的簡單工做隊列。若是你必定要保證持久化,你須要改寫你的代碼來支持事務(transaction)。
3.2.6 公平調度
經過以前的驗證,咱們知道每一個消費者都是平均消費消息。好比有兩個工做者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。
這是由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者。
咱們可使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。
channel.basic_qos(prefetch_count=1)
3.2.7 代碼整合
import pika # 首先要安裝 pika 模塊
credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='task_queue', durable=True) # 聲明一個隊列 hello
queue_list = ['1...............', '2....', '3.', '4.', '5.'] for i in queue_list: channel.basic_publish(exchange='', routing_key='task_queue', # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
body='%s' % i, properties=pika.BasicProperties( delivery_mode=2, # 讓消息持久化
)) # 消息內容
print('[x]Sent "Hello World!"') connection.close() # 隊列關閉
import pika import time credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='task_queue', durable=True) # 聲明一個隊列 hello
def callback(ch, method, properties, body): # 四個參數爲標準格式
# print(ch)
# print(method)
# print(properties)
print('>>>>>>> %s' % body) # print(body.count(b'.'))
time.sleep(body.count(b'.')) # 經過 '.' 來表明消費消息時長
ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_qos(prefetch_count=1) channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue='task_queue' # 你要從那個隊列裏收消息
# no_ack=True
) print('[x] Waiting for messages. To exit press CTRL+C.') channel.start_consuming() # 開始消費消息
經過運行以上代碼,開啓兩個consumer.py 的時候,一個消費一個消息,另外一個消費了4個消息,這樣就達到了咱們預期效果。
說明:分發一個消息給多個消費者(consumer),這種模式被稱爲「發佈/訂閱」。
3.3.1 交換機(Exchange)
在前面的模式中,咱們發送消息到隊列並從隊列中取出消息。如今是時候介紹RabbitMQ中完整的消息模型了。
簡單的歸納一下以前的模式:
RabbitMQ消息模型的核心概念是:發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。
發佈者(producer)只須要把消息發送給一個交換機(exchange)。交換機很是簡單,它一邊從發佈者方接收消息,一邊把消息推送到隊列。交換機必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是是多個隊列,或者是直接忽略消息。這些規則是經過交換機類型(exchange type)來定義的。
有幾個可供選擇的交換機類型:直連交換機(direct), 主題交換機(topic), (頭交換機)headers和 扇型交換機(fanout)。咱們在這裏主要說明最後一個 —— 扇型交換機(fanout)。先建立一個fanout類型的交換機,命名爲logs:
channel.exchange_declare(exchange='logs', type='fanout')
扇型交換機(fanout)很簡單,你可能從名字上就能猜想出來,它把消息發送給它所知道的全部隊列。這正是咱們的日誌系統所須要的。
3.3.2 交換器列表
Rabbitmqctl可以列出服務器上全部的交換器:
[root@192.168.118.15 ~]#rabbitmqctl list_exchanges Listing exchanges amq.direct direct amq.fanout fanout amq.headers headers amq.rabbitmq.log topic amq.match headers amq.topic topic direct amq.rabbitmq.trace topic
這個列表中有一些叫作amq.*的交換器。這些都是默認建立的,不過這時候你還不須要使用他們。
3.3.3 匿名的交換器
前面的教程中咱們對交換機一無所知,但仍然可以發送消息到隊列中。由於咱們使用了命名爲空字符串(「」)默認的交換機。
回想咱們以前是如何發佈一則消息:
channel.basic_publish(exchange='', routing_key='hello', body=message)
exchange參數就是交換機的名稱。空字符串表明默認或者匿名交換機:消息將會根據指定的routing_key分發到指定的隊列。
如今,咱們就能夠發送消息到一個具名交換機了:
channel.basic_publish(exchange='logs', routing_key='', body=message)
3.3.4 臨時隊列
你還記得以前咱們使用的隊列名嗎( hello和task_queue)?給一個隊列命名是很重要的——咱們須要把工做者(workers)指向正確的隊列。若是你打算在發佈者(producers)和消費者(consumers)之間共享同隊列的話,給隊列命名是十分重要的。
可是這並不適用於咱們的日誌系統。咱們關心的是最新的消息而不是舊的。爲了解決這個問題,咱們須要作兩件事情。
首先,當咱們鏈接上RabbitMQ的時候,咱們須要一個全新的、空的隊列。咱們能夠手動建立一個隨機的隊列名,或者讓服務器爲咱們選擇一個隨機的隊列名(推薦)。咱們只須要在調用queue_declare方法的時候,不提供queue參數就能夠了:
result = channel.queue_declare()
這時候咱們能夠經過result.method.queue得到已經生成的隨機隊列名。
第二步,當與消費者(consumer)斷開鏈接的時候,這個隊列應當被當即刪除。exclusive標識符便可達到此目的。
result = channel.queue_declare()
3.3.5 綁定(Bindings)
咱們已經建立了一個扇型交換機(fanout)和一個隊列。如今咱們須要告訴交換機如何發送消息給咱們的隊列。交換器和隊列之間的聯繫咱們稱之爲綁定(binding)。
channel.queue_bind(exchange='logs', queue=result.method.queue)
如今,logs交換機將會把消息添加到咱們的隊列中。
綁定(binding)列表
你可使用 rabbitmqctl list_bindings 列出全部現存的綁定。
3.3.6 代碼整合
import pika # 首先要安裝 pika 模塊
import time credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 聲明一個交換器 logs exchange類型:fanout
for i in range(1000): channel.basic_publish(exchange='logs', routing_key='', # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
body='%s' % i, ) # 消息內容
print('[x]Sent "%s"' % i) time.sleep(1) connection.close() # 隊列關閉
import pika credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.exchange_declare(exchange='task_queue', exchange_type='fanout') # 聲明一個隊列 hello
result = channel.queue_declare(exclusive=True) # 隨機建立一個隊列,當消費者斷開時,刪除隊列
queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) def callback(ch, method, properties, body): # 四個參數爲標準格式
print('>>>>>>> %s' % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue=queue_name # 你要從那個隊列裏收消息
# no_ack=True
) print('[x] Waiting for messages. To exit press CTRL+C.') channel.start_consuming() # 開始消費消息
執行效果:
生產者不斷髮送消息到Exchange,只有當消費者鏈接的時候纔會創建隊列並接收消息,不然消息所有丟棄,只須要接收最新的消息。
上面的模式中,咱們實現了一個簡單的日誌系統。能夠把日誌消息廣播給多個接收者。
在Routing模式中,新增一個功能——使得RabbitMQ只訂閱消息的一個子集。例如:咱們只須要把嚴重的錯誤日誌信息寫入日誌文件(存儲到磁盤),但同時仍然把全部的日誌信息輸出到控制檯中。
綁定(Bindings)
前面的例子,咱們已經建立過綁定(bindings),代碼以下:
channel.queue_bind(exchange=exchange_name, queue=queue_name)
綁定(binding)是指交換機(exchange)和隊列(queue)的關係。能夠簡單理解爲:這個隊列(queue)對這個交換機(exchange)的消息感興趣。
綁定的時候能夠帶上一個額外的routing_key參數。爲了不與basic_publish的參數混淆,咱們把它叫作綁定鍵(binding key)。如下是如何建立一個帶綁定鍵的綁定。
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
綁定鍵的意義取決於交換機(exchange)的類型。咱們以前使用過的扇型交換機(fanout exchanges)會忽略這個值。
3.4.1 直連交換機(Direct exchange)
咱們的日誌系統廣播全部的消息給全部的消費者(consumers)。咱們打算擴展它,使其基於日誌的嚴重程度進行消息過濾。例如咱們也許只是但願將比較嚴重的錯誤(error)日誌寫入磁盤,以避免在警告(warning)或者信息(info)日誌上浪費磁盤空間。
咱們使用的扇型交換機(fanout exchange)沒有足夠的靈活性 —— 它能作的僅僅是廣播。
咱們將會使用直連交換機(direct exchange)來代替。路由的算法很簡單 —— 交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配(值相等),從而肯定消息該分發到哪一個隊列。
下圖可以很好的描述這個場景:
在這個場景中,咱們能夠看到直連交換機 X和兩個隊列進行了綁定。第一個隊列使用orange做爲綁定鍵,第二個隊列有兩個綁定,一個使用black做爲綁定鍵,另一個使用green。
這樣以來,當路由鍵爲orange的消息發佈到交換機,就會被路由到隊列Q1。路由鍵爲black或者green的消息就會路由到Q2。其餘的全部消息都將會被丟棄。
3.4.2 多個綁定(Multiple bindings)
多個隊列使用相同的綁定鍵是合法的。這個例子中,咱們能夠添加一個X和Q1之間的綁定,使用black綁定鍵。這樣一來,直連交換機就和扇型交換機的行爲同樣,會將消息廣播到全部匹配的隊列。帶有black路由鍵的消息會同時發送到Q1和Q2。
發送日誌
咱們將會發送消息到一個直連交換機,把日誌級別做爲路由鍵。這樣接收日誌的腳本就能夠根據嚴重級別來選擇它想要處理的日誌。咱們先看看發送日誌。
咱們須要建立一個交換機(exchange):
channel.exchange_declare(exchange='direct_logs', type='direct')
而後咱們發送一則消息:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
咱們先假設「severity」的值是info、warning、error中的一個。
訂閱
處理接收消息的方式和以前差很少,只有一個例外,咱們將會爲咱們感興趣的每一個嚴重級別分別建立一個新的綁定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
3.4.3 代碼整合
import pika # 首先要安裝 pika 模塊
import sys credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # 聲明一個交換器 logs exchange類型:fanout
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
print(severity) message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
body=message, ) # 消息內容
print(" [x] Sent %r:%r" % (severity, message)) connection.close() # 隊列關閉
import pika import sys credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # 聲明一個隊列 hello
result = channel.queue_declare(exclusive=True) # 隨機建立一個隊列,當消費者斷開時,刪除隊列
queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) # 定義了三種接收消息方式info,warning,error
sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity ) print('[x] Waiting for messages. To exit press CTRL+C.') def callback(ch, method, properties, body): # 四個參數爲標準格式
print(" [x] %r:%r" % (method.routing_key, body)) # ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue=queue_name, # 你要從那個隊列裏收消息
no_ack=True ) channel.start_consuming() # 開始消費消息
共開啓三個窗口:
Producer.py 生產消息
Consumer.py 打印包括 info / warning / error
Consumer.py 只打印error信息
運行結果:
經過RabbitMQ管理查看,綁定了兩個隊列。
爲何須要主題交換機?
在上面的模式中,咱們改進了日誌系統。使用直連交換機(direct)代替了扇型交換機(fanout),從只能盲目的廣播消息改進爲有可能選擇性的接收日誌。
儘管直連交換機可以改善咱們的系統,可是它也有它的限制 —— 沒辦法基於多個標準執行路由操做。
在咱們的日誌系統中,咱們不僅但願訂閱基於嚴重程度的日誌,同時還但願訂閱基於發送來源的日誌。Unix工具syslog就是同時基於嚴重程度-severity (info/warn/crit…) 和 設備-facility (auth/cron/kern…)來路由日誌的。
若是這樣的話,將會給予咱們很是大的靈活性,咱們既能夠監聽來源於「cron」的嚴重程度爲「critical errors」的日誌,也能夠監聽來源於「kern」的全部日誌。
爲了實現這個目的,接下來咱們學習如何使用另外一種更復雜的交換機 —— 主題交換機。
3.5.1 主題交換機
發送到主題交換機(topic exchange)的消息,它的路由鍵(routing_key)必須是一個由 . 分隔開的詞語列表。這些單詞隨即是什麼均可以,可是最好是跟攜帶它們的消息有關係的詞彙。如下是幾個推薦的例子:」stock.usd.nyse」, 「nyse.vmw」, 「quick.orange.rabbit」。詞語的個數能夠隨意,可是不要超過255字節。
綁定鍵也必須擁有一樣的格式。主題交換機背後的邏輯跟直連交換機很類似 —— 一個攜帶着特定路由鍵的消息會被主題交換機投遞給綁定鍵與之想匹配的隊列。可是它的綁定鍵和路由鍵有兩個特殊應用方式:
○ '*'(星號)用來表示一個單詞;
○ '#'(井號)用來表示任意數量(零個或多個)單詞。
下邊用圖說明:
這個例子裏,咱們發送的全部消息都是用來描述小動物的。發送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個.分割開。路由鍵裏的第一個單詞描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類。因此它看起來是這樣的: <celerity>.<colour>.<species>。
咱們建立了三個綁定:Q1的綁定鍵爲 *.orange.*,Q2的綁定鍵爲 *.*.rabbit 和 lazy.# 。
這三個綁定鍵被能夠總結爲:
○ Q1 對全部的桔黃色動物都感興趣。
○ Q2 則是對全部的兔子和全部懶惰的動物感興趣。
攜帶有 quick.orange.rabbit 的消息將會被分別投遞給這兩個隊列。
攜帶着 lazy.orange.elephant 的消息一樣也會給兩個隊列都投遞過去。
攜帶有 quick.orange.fox 的消息會投遞給第一個隊列,攜帶有 lazy.brown.fox 的消息會投遞給第二個隊列。
攜帶有 lazy.pink.rabbit 的消息只會被投遞給第二個隊列一次,即便它同時匹配第二個隊列的兩個綁定。
攜帶着 quick.brown.fox 的消息不會投遞給任何一個隊列。
若是咱們違反約定,發送了一個攜帶有一個單詞或者四個單詞("orange" or "quick.orange.male.rabbit")的消息時,發送的消息不會投遞給任何一個隊列,並且會丟失掉。
可是另外一方面,即便 "lazy.orange.male.rabbit" 有四個單詞,他仍是會匹配最後一個綁定,而且被投遞到第二個隊列中。
主題交換機
主題交換機是很強大的,它能夠表現出跟其餘交換機相似的行爲。
當一個隊列的綁定鍵爲 「#」(井號) 的時候,這個隊列將會無視消息的路由鍵,接收全部的消息(扇型交換機模式)。
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行爲(直連交換機模式)。
3.5.2 代碼整合
接下來咱們會將主題交換機應用到咱們的日誌系統中。在開始工做前,咱們假設日誌的路由鍵由兩個單詞組成,路由鍵看起來是這樣的:<facility>.<severity>
import pika # 首先要安裝 pika 模塊
import sys credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.exchange_declare(exchange='topic_logs', exchange_type='topic') # 聲明一個交換器 logs exchange類型:fanout
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
body=message, ) # 消息內容
print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() # 隊列關閉
import pika import sys credentials = pika.PlainCredentials('admin', 'admin') # 建立認證證書
connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.exchange_declare(exchange='topic_logs', exchange_type='topic') # 聲明一個隊列 hello
result = channel.queue_declare(exclusive=True) # 隨機建立一個隊列,當消費者斷開時,刪除隊列
queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) # 定義了三種接收消息方式info,warning,error
sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key ) print('[x] Waiting for messages. To exit press CTRL+C.') def callback(ch, method, properties, body): # 四個參數爲標準格式
print(" [x] %r:%r" % (method.routing_key, body)) # ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume( # 消費消息
callback, # 若是收到消息,就調用callback函數來處理消息
queue=queue_name, # 你要從那個隊列裏收消息
no_ack=True ) channel.start_consuming() # 開始消費消息
執行下邊命令 接收全部日誌:
python consumer.py "#"
執行下邊命令 接收來自」kern「設備的日誌:
python consumer.py "kern.*"
執行下邊命令 只接收嚴重程度爲」critical「的日誌:
python consumer.py "*.critical"
執行下邊命令 創建多個綁定:
python consumer.py "kern.*" "*.critical"
執行下邊命令 發送路由鍵爲 「kern.critical」 的日誌:
python producer.py "kern.critical" "A critical kernel error"
執行上邊命令試試看效果吧。另外,上邊代碼不會對路由鍵和綁定鍵作任何假設,因此你能夠在命令中使用超過兩個路由鍵參數。
• 綁定鍵爲 * 的隊列會取到一個路由鍵爲空的消息嗎?
答:不會接收爲空的消息,匹配全部消息。
• 綁定鍵爲 #.* 的隊列會獲取到一個名爲..的路由鍵的消息嗎?它會取到一個路由鍵爲單個單詞的消息嗎?
答:綁定鍵爲 #.* 的隊列會獲取到一個名爲..的路由鍵,會取到一個路由鍵爲單個單詞的消息
• a.*.# 和 a.#的區別在哪兒?
答:這裏的 '#' 能夠理解爲正則中的 '?' 問號,意思是0到無窮大。
a.*.# 表示首先必須知足 a.* 也就是 a.單詞,知足這個條件之後都會匹配到
a.# 表示只要以a開頭的routing_key 都會被匹配到
若是咱們須要將一個函數運行在遠程計算機上而且等待從那兒獲取結果時,該怎麼辦呢?這就是另外的故事了。這種模式一般被稱爲遠程過程調用(Remote Procedure Call)或者RPC。
咱們使用RabbitMQ來構建一個RPC系統:包含一個客戶端和一個RPC服務器。如今的狀況是,咱們沒有一個值得被分發的足夠耗時的任務,因此接下來,咱們會建立一個模擬RPC服務來返回斐波那契數列。
客戶端接口
爲了展現RPC服務如何使用,咱們建立了一個簡單的客戶端類。它會暴露出一個名爲「call」的方法用來發送一個RPC請求,而且在收到迴應前保持阻塞。
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % (result,))
關於RPC的注意事項:
儘管RPC在計算領域是一個經常使用模式,但它也常常被詬病。當一個問題被拋出的時候,程序員每每意識不到這究竟是由本地調用仍是由較慢的RPC調用引發的。一樣的困惑還來自於系統的不可預測性和給調試工做帶來的沒必要要的複雜性。跟軟件精簡不一樣的是,濫用RPC會致使不可維護的麪條代碼.
考慮到這一點,牢記如下建議:
確保可以明確的搞清楚哪一個函數是本地調用的,哪一個函數是遠程調用的。給你的系統編寫文檔。保持各個組件間的依賴明確。處理錯誤案例。明瞭客戶端改如何處理RPC服務器的宕機和長時間無響應狀況。
當對避免使用RPC有疑問的時候。若是能夠的話,你應該儘可能使用異步管道來代替RPC類的阻塞。結果被異步地推送到下一個計算場景。
3.6.1 回調隊列
通常來講經過RabbitMQ來實現RPC是很容易的。一個客戶端發送請求信息,服務器端將其應用到一個回覆信息中。爲了接收到回覆信息,客戶端須要在發送請求的時候同時發送一個回調隊列(callback queue)的地址。咱們試試看:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
3.6.2 消息屬性
AMQP協議給消息預約義了一系列的14個屬性。大多數屬性不多會用到,除了如下幾個:
• delivery_mode(投遞模式):將消息標記爲持久的(值爲2)或暫存的(除了2以外的其餘任何值)。第二篇教程裏接觸過這個屬性,記得吧?
• content_type(內容類型):用來描述編碼的mime-type。例如在實際使用中經常使用application/json來描述JOSN編碼類型。
• reply_to(回覆目標):一般用來命名回調隊列。
• correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。
3.6.3 關聯標識
上邊介紹的方法中,咱們建議給每個RPC請求新建一個回調隊列。這不是一個高效的作法,幸虧這兒有一個更好的辦法 —— 咱們能夠爲每一個客戶端只創建一個獨立的回調隊列。
這就帶來一個新問題,當此隊列接收到一個響應的時候它沒法辨別出這個響應是屬於哪一個請求的。correlation_id 就是爲了解決這個問題而來的。咱們給每一個請求設置一個獨一無二的值。稍後,當咱們從回調隊列中接收到一個消息的時候,咱們就能夠查看這條屬性從而將響應和請求匹配起來。若是咱們接手到的消息的correlation_id是未知的,那就直接銷燬掉它,由於它不屬於咱們的任何一條請求。
你也許會問,爲何咱們接收到未知消息的時候不拋出一個錯誤,而是要將它忽略掉?這是爲了解決服務器端有可能發生的競爭狀況。儘管可能性不大,但RPC服務器仍是有可能在已將應答發送給咱們但還未將確認消息發送給請求的狀況下死掉。若是這種狀況發生,RPC在重啓後會從新處理請求。這就是爲何咱們必須在客戶端優雅的處理重複響應,同時RPC也須要儘量保持冪等性。
總結:
咱們的RPC如此工做:
• 當客戶端啓動的時候,它建立一個匿名獨享的回調隊列。
• 在RPC請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另外一個是設置惟一值的 correlation_id 屬性。
• 將請求發送到一個 rpc_queue 隊列中。
• RPC工做者(又名:服務器)等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工做而且將帶有執行結果的消息發送給reply_to字段指定的隊列。
• 客戶端等待回調隊列裏的數據。當有消息出現的時候,它會檢查correlation_id屬性。若是此屬性的值與請求匹配,將它返回給應用。
3.6.4 整合代碼
import pika # 首先要安裝 pika 模塊
import sys credentials = pika.PlainCredentials('admin', 'admin') # 遠程rabbitmq的用戶名密碼
connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.118.15', 5672, '/admin', credentials)) # 鏈接 RabbitMQ信息
channel = connection.channel() # 定義 channel 對象
channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print('[.] fib(%s)' % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response) ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
服務器端代碼至關簡單:
• (4)像往常同樣,咱們創建鏈接,聲明隊列
• (11)咱們聲明咱們的fib函數,它假設只有合法的正整數看成輸入。(別期望這個函數能處理很大的數值,函數遞歸大家都懂得…)
• (19)咱們爲 basic_consume 聲明瞭一個回調函數,這是RPC服務器端的核心。它執行實際的操做而且做出響應。
• (32)或許咱們但願能在服務器上多開幾個線程。爲了能將負載平均地分攤到多個服務器,咱們須要將 prefetch_count 設置好。
rpc_client.py
#!/usr/bin/python3 # -*- coding: utf-8 -*- # Author: hkey
import pika import uuid class FibbonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection( # 鏈接 RabbitMQ信息
pika.ConnectionParameters('192.168.118.15', 5672, '/admin', pika.PlainCredentials('admin', 'admin'))) self.channel = self.connection.channel() # 定義 channel 對象
# 創建臨時隊列
result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibbonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(8) print(" [.] Got %r" % (response,))
客戶端代碼稍微有點難懂:
• (7)創建鏈接、通道而且爲回覆(replies)聲明獨享的回調隊列。
• (16)咱們訂閱這個回調隊列,以便接收RPC的響應。
• (18)「on_response」回調函數對每個響應執行一個很是簡單的操做,檢查每個響應消息的correlation_id屬性是否與咱們期待的一致,若是一致,將響應結果賦給self.response,而後跳出consuming循環。
• (23)接下來,咱們定義咱們的主要方法 call 方法。它執行真正的RPC請求。
• (24)在這個方法中,首先咱們生成一個惟一的 correlation_id 值而且保存起來,’on_response’回調函數會用它來獲取符合要求的響應。
• (25)接下來,咱們將帶有 reply_to 和 correlation_id 屬性的消息發佈出去。
• (32)如今咱們能夠坐下來,等待正確的響應到來。
• (33)最後,咱們將響應返回給用戶。
咱們的RPC服務已經準備就緒了,如今啓動服務器端:
此處呈現的設計並非實現RPC服務的惟一方式,可是他有一些重要的優點:
• 若是RPC服務器運行的過慢的時候,你能夠經過運行另一個服務器端輕鬆擴展它。試試在控制檯中運行第二個 rpc_server.py 。
• 在客戶端,RPC請求只發送或接收一條消息。不須要像 queue_declare 這樣的異步調用。因此RPC客戶端的單個請求只須要一個網絡往返。
咱們的代碼依舊很是簡單,並且沒有試圖去解決一些複雜(可是重要)的問題,如:
• 當沒有服務器運行時,客戶端如何做出反映。
• 客戶端是否須要實現相似RPC超時的東西。
• 若是服務器發生故障,而且拋出異常,應該被轉發到客戶端嗎?
• 在處理前,防止混入無效的信息(例如檢查邊界)