消息隊列_RabbitMQ-0002.深刻MQ生產者/信道/交換機/隊列/消費者?

形象說明:python


比喻: RabbitMQ提供的消息投遞服務相似於現實生活中的快遞公司,雙11咱們可能會買不少東西,天然會陸續收到不少寄自淘寶店主由快遞公司發來的快件,可是可能不少時候買回來的東西並不合心意,天然會陸續經過快遞公司退回快件,因此迴歸到架構,這裏的快件就至關於消息,咱們至關於應用程序,淘寶店主至關於服務器,而快遞公司至關於路由器,應用程序能夠發送和接收消息,服務器也能夠發送和接收消息,因此當應用程序鏈接到RabbitMQ時,就必須作一個決定:我是發送仍是接收哪?緩存

現實: 生產者(Producer)建立消息,而後發佈(發送)到消息代理服務器(RabbitMQ),消息包含兩部份內容:有效載荷(想要傳輸的數據,支持任何內容)和標籤(描述有效載荷,最終由RabbitMQ來決定誰將得到消息的拷貝),消費者(Consumer)啓動時鏈接消息代理服務器上,並訂閱指定隊列,每當消息達到此隊列時,RabbitMQ會將其發送給訂閱的消費者,當消費者接收到消息時,它只是獲得了有效載荷,由於消息在路由的過程當中,消息的標籤並無隨着有效載荷一塊兒傳遞,RabbitMQ甚至不會告訴你生產者是誰?固然若是以爲有必要,也能夠將身份信息加入有效載荷一塊兒傳遞~安全

wKioL1g9FvqyZ9s3AAE0kFq2RfM767.png

信道鏈接:服務器


說明: 使用消息代理服務器RabbitMQ的前提是創建AMQP信道,應用程序能夠基於一條TCP鏈接快速建立銷燬無數信道來減小傳統TCP鏈接消耗,每一個信道有惟一ID(由AMQP庫維護),AMQP命令都是經過信道發送架構


消息路由:app

wKioL1g9F1LCA_7jAALTlrIRsus695.png

# 消費消息負載均衡

1. 消費者經過AMQP的basic.consume命令訂閱,這樣作會將信道置爲接收模式,訂閱消息後,消息一到達隊列時就自動接收,直到取消隊列的訂閱爲止異步

2. 消費者經過AMQP的basic.get命令訂閱,這樣作會將信道置爲接收模式,訂閱消息後,得到單條消息後,而後自動取消訂閱,千萬不要妄想放在循環裏代替basic.consume,不然沒法發揮其高吞吐量特性ide

3. 若是消息到達了無人訂閱的隊列,消息會在隊列中等待,一旦有消費者訂閱該隊列,隊列的消息會發送給消費者函數

4. 若是隊列擁有多個消費者時,隊列的消息以輪詢的方式發送給消費者,每條消息只會發送給一個訂閱的消費者,且每一個消費者接收到的每一條消息都必須進行確認,消費者必須經過AMQP的basic.ack命令顯式地向RabbitMQ發送一個確認,或者在訂閱到隊列的時候將auto_ack參數設置爲true,此時一旦消費者接收消息,RabbitMQ會自動認爲其確認了消息,一旦消息被確認,RabbitMQ纔會安全的把消息從隊列中刪除,主要是防止確認以前RabbitMQ斷開鏈接或取消訂閱或程序崩潰,RabbitMQ會認爲這條消息沒有分發,而後從新分發給下一個訂閱的消費者,RabbitMQ會認爲沒有確認的消費者並無準備好接收下一條消息,因此能夠好好利用這一點,若是處理消息內容很是耗時,則你的應用程序能夠延遲確認消息,直到消息處理完成再確認,這樣可防止RabbitMQ持續不斷的消息致使過載

5. 若是收到消息後想要明確拒絕而不是確認收到消息的話,可以使用AMQP的basic.reject,當把其basic.reject參數設置爲true時,RabbitMQ會將消息從新發送給下一個訂閱的消費者,若是設置爲false,則RabbitMQ會把消息從隊列中移除,而不會把它發送給新的消費者,固然也能夠經過對消息確認的方式來簡單地忽略該消息,如當你檢測到一條格式錯誤的消息而任何一個消費者都沒法處理的時候,此時就很是有用了.

# 隊列建立

1. 消費者和生產者都能使用AMQP的queue.declare命令來建立隊列,可是若是消費者在同一條信道上訂閱了另外一個隊列的話,就沒法再聲明隊列,必須首先取消訂閱,將信道設置爲"傳輸"模式,

2. 建立隊列時,最好指定一個隊列名稱,消費者訂閱隊列時須要隊列名稱,並在建立綁定時也須要隊列名稱,若是不指定,RabbitMQ會隨機分配一個名稱做爲queue.declare的返回值(經常使用於構建在AMQP上的RPC應用,此時零時匿名隊列頗有用),建立隊列時exclusive爲true時,隊列會變爲私有,此時只有你的應用程序才能消費隊列消息,當你想要限制一個隊列只有一個消費者時頗有有,auto-delete爲true時,當最後一個消費者取消訂閱的時候,隊列就會自動移除,當你須要零時隊列只爲一個消費者服務的話,可結合auto-delete和exclusive,當消費者斷開鏈接時,隊列就被移除了.

3. 若是嘗試聲明一個已經存在的隊列時,RabbitMQ就什麼都不作,併成功返回,若是你只是爲了檢測隊列是否存在,可設置queue.declare的passive爲true,若是存在會成功返回,不然會直接返回一個錯誤

4. 因爲生產者和消費者均可以經過queue.declare建立隊列,可是因爲若是消息路由到了不存在的隊列RabbitMQ會直接忽略它們,因此最好是生產者和消費者都建隊列

#交換綁定

1. 若是你想要將消息投遞到隊列時,首先得把消息發送給交換機,而後根據肯定的規則,RabbitMQ會將決定消息該投遞到哪一個隊列,這些規則被稱爲路由鍵(Routing Key),隊列經過路由鍵綁定到交換機,當你把消息發送到消息代理服務器時,消息將擁有一個路由鍵,即使爲空,RabbitMQ也會將其和綁定使用的路由鍵進行匹配,若是匹配成功,消息會被投遞到該隊列,若是不匹配將進入"黑洞"

wKiom1g9F3HCPie-AACJHeYI8wM145.png

2. Direct直接交換機(channel->basic_publish(message, exchange, routingkey)),很是簡單,若是路由鍵匹配的話,消息就被投遞到對應的隊列,當聲明隊列時,會自動綁定到默認交換機,並以隊列名稱做爲路由鍵,因此發送消息時exchange爲空則會發送到默認交換機,routingkey直接填寫對應的隊列名便可,若是默認交換機沒法知足應用程序需求時,可經過exchange.declare建立其它交換機

wKioL1g9GDzjYG3IAAB6vj0PX48379.png

3. Fanout扇形交換機,很是簡單,當你發送一條消息到fanout交換機時,它會把消息投遞給全部附加在此交換機上的隊列,這容許你對單條消息作不一樣方式的反應,如一個WEB應用程序可能須要在用戶上傳新的圖片時,用戶相冊必須清除緩存,同時用戶應該獲得些積分獎勵,你能夠將兩個隊列綁定到圖片上傳交換機上,一個用於清除緩存,另外一個用於增長用戶積分,後期若是有其它需求只須要爲新的消費者寫段代碼,而後聲明新的隊列並將其綁定到fanout交換機上,這樣就能夠實現生產者和消費者徹底解耦,容許你垂手可得的添加應用程序的功能.

wKioL1g9GF6CR2mTAAB5dxEM-KA426.png

4. Topic主題交換機,很是簡單,當你發送一條消息到topic交換機時,它會把消息投遞給以點號分割的路由鍵,匹配模式中*匹配特定位置的任意文本,"#"匹配全部的規則,是沒有相似"*"以點號特定塊兒匹配的概念的,它匹配包括點號在內的全部規則.


總結: 從上面幾種模式能夠看出其實RabbitMQ在開發中的角色能夠很是靈活,既能夠做爲隊列服務器使用,也能夠做爲RPC服務器使用,徹底取決於你如何組織這些功能.


虛機隔離:

說明: RabbitMQ還支持Vhost"虛擬主機",每一個Vhost本質上是一個迷你版擁有本身的隊列/交換機/綁定以及權限機制的RabbitMQ服務器,這樣就能夠經過一個RabbitMQ服務衆多應用程序,Vhost之間相互隔離,有效的避免了隊列/交換機的命名衝突,不然你不得不運行多個RabbitMQ,默認Vhost爲vhost: "/"可經過guest/guest訪問,可是爲了安全起見,應該及時更改


添加虛機: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl add_vhost xmzoomeye

查看虛機: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl list_vhosts

刪除虛機: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl delete_vhost xmzoomeye


說明: 一旦Vhost建立成功以後,就能夠鏈接上去開始添加隊列和交換機,若是想鏈接遠程RabbitMQ節點可經過rabbitmqctl -n rabbit@hostname list_vhosts,須要注意的是rabbit@hostname中rabbit@是固定的,而hostname必須正確的是遠程主機名


持久存儲:


1. 默認重啓RabbitMQ後,以前定義的交換機/隊列都會消失,可是若是設置隊列和交換機的durable屬性爲true,則在崩潰重啓以後會重建隊列和交換機,可是消息並不會重建,若是要實現持久化消息,則須要首先將"投遞模式"設置爲2將消息標記成持久化,而後發佈到持久化的交換機併到達持久化的隊列,這樣才能夠保證消息的持久化.

2. RabbitMQ確保持久化消息能從服務器重啓中恢復實際上是將它們寫入磁盤上的一個持久化日誌文件,當發佈一條持久化消息到持久化交換機時,RabbitMQ會在消息提交到日誌文件後才發送響應,若是消息後來被路由到非持久化隊列,它會自動從持久化日誌中刪除,而且沒法從服務器重啓中恢復,若是消息後來被路由到持久化隊列且被消費者消費並確認,則RabbitMQ會在持久化日誌中把這條消息標記爲等待垃圾收集,可是並非全部的消息都須要啓用持久化,否則會嚴重影響RabbitMQ每秒處理的消息總數

3. 從業務分析性能需求,若是要單臺RabbitMQ服務器每秒處理10萬條消息則[能夠考慮更快的存儲系統]或[經過在生產者單獨信道上監聽應答隊列,發送消息時有效載荷帶上此隊列名,消費者就能夠回答應答確認接收返回給生產者]或[分開創建持久化熱備非集羣負載均衡和非持久化集羣],這樣持久化消息通訊負載不會減慢非持久化消息的處理.

4. AMQP中,一旦把信道設置成事務模式後,經過信道發送須要確認的消息,若是第一個消息失敗則後續命令會忽略,雖然能夠藉助它確認消息是否持久化到磁盤,可是事務不但會下降消息吞吐量,並且會使生產者應用程序產生同步,而你使用消息通訊就是想要避免同步,其實還有另外一種發送確認模式和事務相仿,只須要將信道設置爲confirm模式,全部信道上發佈的消息都會被指派一個惟一的ID,一旦消息被投遞給匹配的隊列後,信道會發送一個發送方確認模式給生產者應用程序(包含惟一ID),使得生產者知道消息已經安全到達目的隊列,若是消息和隊列是可持久化的,那麼確認消息只會在隊列將消息寫入磁盤後纔會發出,相比於事務來講,最大的好處在於都是異步的,一旦發佈了一條消息,生產者應用程序就能夠在等待確認的同時繼續發送下一條,當確認消息最終收到的時候,生產者應用的回調方法就會觸發來處理該確認消息,若是RabbitMQ發生內部錯誤而致使消息丟失,會發送一條nack未確認消息,只是此次說明消息確實丟失了,此方式更加輕量級對於RabbitMQ消息代理服務器的性能影響幾乎不記.


貫穿實例:

wKioL1g9GIqS9VB6AAFC8FC6gQ8839.png

說明: 如上講述了RabbitMQ的全部組件以及架構,但要結合起來理解一條真實消息的生命週期的最好方法是實踐出真知,下面會使用PY的pika模塊來演示Hello Word消息傳遞過程.

發佈: 鏈接RabbitMQ->獲取信道->聲明交換機->建立消息->發佈消息->關閉信道->關閉鏈接

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 說明: 導入公共模塊
import sys
import pika
# 說明: 導入其它模塊


if __name__ == '__main__':
    # 建立憑證對象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 建立參數對象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服務地址
        host='127.0.0.1',
        # RabbitMQ服務端口
        port=5672,
        # RabbitMQ登陸憑證
        credentials=credentials,
        # RabbitMQ虛擬主機
        virtual_host='/'
    )
    # 建立鏈接對象
    conn_broker = pika.BlockingConnection(conn_params)
    # 獲取信道對象
    channel = conn_broker.channel()
    # 建立交換機
    channel.exchange_declare(
        # 交換機名稱
        exchange="salt-exchange",
        # 交換機類型
        type="direct",
        # 若是同名交換機已存在依然返回成功,不然建立
        passive=False,
        # 聲明爲非持久化交換機
        durable=False,
        # 交換機閒置也不會自動刪除
        auto_delete=False
    )
    msg = sys.argv[1]
    # 建立配置對象
    msg_props = pika.BasicProperties()
    # 設置內容類型
    msg_props.content_type = 'text/plain'
    # 嘗試發佈消息
    channel.basic_publish(
        # 發佈消息內容
        body=msg,
        # 發佈到交換機
        exchange='salt-exchange',
        # 發佈信息屬性
        properties=msg_props,
        # 發佈信息時攜帶的路由鍵
        routing_key='salt'
    )

說明: 首先用使用默認賬號密碼guest,默認端口5672,默認虛擬主機/鏈接RabbitMQ Vhost,而後創建信道,利用信道和rabbitMQ進行通訊,而後聲明交換機,須要指定交換機名稱,交換機類型,是否passive模式,若是非passive模式則表示想要聲明交換機而非獲取交換機信息,還能夠指定是否持久化以及是否刪除,最後經過命令行建立一條攜帶salt路由鍵類型爲text/plain的消息經過basic_publish發送到salt-exchange交換機,可是此時因爲並無任何隊列綁定在此交換機,因此消息必然會進入"黑洞"丟失掉.~

接收: 鏈接RabbitMQ->得到信道->聲明交換機->聲明隊列->綁定隊列到交換機->消費信息->關閉信道->關閉鏈接

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 說明: 導入公共模塊
import pika
# 說明: 導入其它模塊


if __name__ == '__main__':
    # 建立憑證對象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 建立參數對象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服務地址
        host='127.0.0.1',
        # RabbitMQ服務端口
        port=5672,
        # RabbitMQ服務憑證
        credentials=credentials,
        # RabbitMQ虛擬主機
        virtual_host='/'
    )
    # 建立鏈接對象
    conn_broker = pika.BlockingConnection(conn_params)
    # 獲取信道對象
    channel = conn_broker.channel()
    # 建立交換機
    channel.exchange_declare(
        # 交換機名稱
        exchange="salt-exchange",
        # 交換機類型
        type="direct",
        # 若是同名交換機已存在依然返回成功
        passive=False,
        # 聲明爲持久化交換機
        durable=False,
        # 交換機閒置也不會自動刪除
        auto_delete=False
    )
    # 建立隊列
    channel.queue_declare(queue="salt")
    # 綁定隊列
    channel.queue_bind(
        # 隊列名稱
        queue="salt",
        # 交換機名稱
        exchange="salt-exchange",
        # 路由鍵名稱
        routing_key="salt"
    )

    # 消息回調處理函數
    def msg_consumer(channel, method, header, body):
        # 發送消息確認
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # 退出監聽循環
        if body == 'exit':
            channel.basic_cancel(consumer_tag="salt-consumer")
            channel.stop_consuming()
        else:
            print 'found notice: recive queue message {0}'.format(body)
        return

    # 做爲指定隊列消費者
    channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer")
    # 循環調用回調函數接收處理消息
    channel.start_consuming()

說明: 首先用使用默認賬號密碼guest,默認端口5672,默認虛擬主機/鏈接RabbitMQ Vhost,而後創建信道,利用信道和rabbitMQ進行通訊,而後再次聲明交換機,防止因爲生產者沒有聲明交換機致使後面綁定隊列失敗,而後就是建立隊列,建立隊列時須要指定隊列名稱,而後就是綁定交換機,綁定的時候須要指定隊列名稱,交換機名稱,綁定路由鍵,最後就是訂閱指定隊列,訂閱時須要傳遞一個回調函數來處理消息,一個隊列名稱來指明要訂閱的隊列,一個標識進程的消費者標記,一旦開始讀取消息則會開始一個阻塞的循環等待從信道進來的數據,若是要中止,則須要先使用basic_cancel結束消費(關閉信道和鏈接),注意須要提供進程標識,而後再stop_consuming中止消費者

確認: 鏈接RabbitMQ->獲取信道->設置確認模式->聲明交換機->建立消息->發佈消息->關閉信道->關閉鏈接

wKioL1g9GSbjoUozAACEXLxs2hg797.png

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 說明: 導入公共模塊
import sys
import pika
# 說明: 導入其它模塊


if __name__ == '__main__':
    # 建立憑證對象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 建立參數對象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服務地址
        host='127.0.0.1',
        # RabbitMQ服務端口
        port=5672,
        # RabbitMQ登陸憑證
        credentials=credentials,
        # RabbitMQ虛擬主機
        virtual_host='/'
    )
    # 建立鏈接對象
    conn_broker = pika.BlockingConnection(conn_params)
    # 獲取信道對象
    channel = conn_broker.channel()
    msg_ids = []
    msg_ids.append(len(msg_ids)+1)
    # 確認模式回調函數
    def confirm_handler(frame):
        # 第一次信道被設置爲確認模式時會觸發一次確認回調
        if type(frame.method) == pika.spec.Confirm.SelectOk:
            print 'found notice: channel in confirm mode'
        # 若是發送的消息達到隊列後沒有迴應則說明消息丟失,須要重發
        elif type(frame.method) == pika.spec.Basic.Nack:
            # 若是丟的消息確實是msg_ids裏面的,則說明剛剛發的消息確實是丟失了~
            if frame.method.delivery_tag in msg_ids:
                print 'found errors: message may be lost'
        # 若是發送的消息到達隊列後發回響應
        elif type(frame.method) == pika.spec.Basic.Ack:
            # 若是確認消息id確實是msg_ids裏面的,則從msg_ids裏面刪除
            if frame.method.delivery_tag in msg_ids:
                print 'found notice: message confirm received'
                # 刪除已經確認的消息
                msg_ids.remove(frame.method.delivery_tag)
    # 設置信道爲確認模式
    channel.confirm_delivery(callback=confirm_handler)
    # 建立交換機
    channel.exchange_declare(
        # 交換機名稱
        exchange="salt-exchange",
        # 交換機類型
        type="direct",
        # 若是同名交換機已存在依然返回成功,不然建立
        passive=False,
        # 聲明爲非持久化交換機
        durable=False,
        # 交換機閒置也不會自動刪除
        auto_delete=False
    )
    msg = sys.argv[1]
    # 建立配置對象
    msg_props = pika.BasicProperties()
    # 設置內容類型
    msg_props.content_type = 'text/plain'
    # 嘗試發佈消息
    channel.basic_publish(
        # 發佈消息內容
        body=msg,
        # 發佈到交換機
        exchange='salt-exchange',
        # 發佈信息屬性
        properties=msg_props,
        # 發佈信息時攜帶的路由鍵
        routing_key='salt'
    )
    channel.close()

說明: RabbitMQ任何一個信道上發佈的第一條消息都將得到ID1,而且信道上接下來的每一條消息的ID都會步進1,對於信道來講,消息ID是惟一的,因此一旦信道關閉,你將沒法追蹤發佈在該信道上任何未完成的發送方確認消息狀態,因此RabbitMQ並不會在發佈消息時返回消息對應的ID,而須要咱們本身爲每一個信道單獨維護一個消息計數器,在幾乎不影響RabbitMQ性能的前提下在生產者端用回調來處理消息確認.

相關文章
相關標籤/搜索