RabbitMQ上手記錄–part 6-Shovel

上一part《RabbitMQ上手記錄–part 5-節點集羣高可用(多服務器)》講到了經過多個服務器來搭建RabbitMQ的節點集羣,示例當中提到的服務器都是在同一個局域網中的(其實是一個機器上的多個不一樣虛擬機而已),這種使用方式適用於在同一個數據中心的狀況。互聯網裏經常提到異地多活、多數據中心來實現更高級別的高可用。個人理解是當數據或者訪問量超過當個數據中心規模時,經過更多的數據中心來提供更多的訪問量支持,同時當某地數據中心出問題時,也不會讓數據由於都放在同一個數據中心而致使整個系統宕機html

 

RabbitMQ經過Shovel插件實現節點集羣跨多數據中心的需求。下面來簡單瞭解一下Shovel的一些基本概念。python

 

Shovel基本概念git

Shovel是RabbitMQ的一個插件,這個插件的功能就是將源節點的消息發佈到目標節點,這個過程當中Shovel就是一個客戶端,它負責鏈接源節點,讀取某個隊列的消息,而後將消息寫入到目標節點的exchange中。根據這麼一個概念,其實也能夠本身開發一個簡單的程序,負責從一個節點讀取數據而後發送到目標節點。github

 

使用Shovel的好處web

1.Shovel能在不一樣數據中心之間傳遞消息,源節點和目標節點可使用不一樣的用戶和vhosts,不一樣的RabbitMQ版本,而且不須要使用相同的cookie token(在上一part實現多服務器節點集羣是,咱們特地將每一個主機的cookie token都設置成同樣)ubuntu

2.客戶端的鏈接容許鏈接斷開的同時不丟失消息安全

3.支持多個版本的AMQP協議服務器

 

具體工做方式cookie

Shovel插件經過定義一個或多個shovel來實現消息的傳遞。測試

shovel實現瞭如下功能

1.鏈接源節點和目標節點

2.讀取(或者說是consume)隊列裏的消息

3.發佈消息到目標節點(經過將消息發佈到目標節點的exchange,並經過routing_key的方式發佈)

 

image(Shovel的工做過程簡要描述)

 

使用Shovel

梳理完理論以後,接下來將使用兩個不一樣地域雲主機來實踐一下(有點小成本,須要自行租用雲主機)。

 

1.準備雲主機

1.須要有兩臺雲主機,我這裏兩個雲主機分別來自vulrtr和阿里雲(來源不重要,只要是雲主機而且分佈在不一樣的地區),練習用最低配的就夠了。

雲主機信息

名稱 角色 主機提供方 操做系統 IP 端口 備註
主機1 來源節點 vultr ubuntu1604 45.32.250.47 5672  
主機2 目標節點 aliyun ubuntu1604 47.106.179.208 5672 阿里雲鬚要在安全策略組中單獨開放5672端口

 

而後在各個主機安裝好RabbitMQ,而且確認5672端口號可被外部訪問到。

注意這裏咱們並無同步兩個機器的cookie token,是爲了證實在使用shovel時不須要依賴於cookie token。

 

2.安裝Shovel

Shovel是RabbitMQ的一個插件,在已經安裝好RabbitMQ的基礎上,把相關的插件啓用便可。

咱們只須要在主機1,也就是來源節點啓用shovel插件便可。

 

執行以下命令啓用Shovel插件

rabbitmq-plugins enable rabbitmq_shovel

看到以下輸出即代表啓用成功

The following plugins have been enabled:
   amqp_client
   rabbitmq_shovel

Applying plugin configuration to rabbit@vultr... started 2 plugins.

 

3.配置和運行Shovel

shovel分紅兩種

靜態shovel:在配置文件中定了源節點和目標節點信息,修改配置後須要重啓

動態shovel:經過運行時參數指定,可在運行時建立或刪除

這裏我使用靜態shovel,在配置裏定義shovel配置。

 

首先要找到RabbitMQ使用的配置,默認狀況下是沒有建立的,咱們能夠經過啓動日誌查看目前是否有指定的配置文件。

 

一般log文件在/var/log/rabbitmq下的rabbit@{hostname}.log文件中,打開文件發現

config file(s) : /etc/rabbitmq/rabbitmq.config (not found)

說明配置文件沒有建立。

 

從新建立一個rabbitmq.config文件太麻煩了,咱們從基於官方提供的example配置文件來修改會簡單一些。

 

打開目錄/usr/share/doc/rabbitmq-server/

cd /usr/share/doc/rabbitmq-server/

而後找到rabbitmq.config.example.gz,解壓後複製到/etc/rabbitmq目錄下

gzip -dk rabbitmq.config.example.gz

mv rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

 

這時候的rabbitmq.config文件裏全部配置都是註釋的,這裏咱們如今只關注shovel部分的配置。

 

a.準備配置信息

在主機1和主機2上分別建立一個RabbitMQ用戶,用戶名是shovel_user,密碼是123456,並設置受權

sudo rabbitmqctl add_user shovel_user 123456

sudo rabbitmqctl set_user_tags shovel_user administrator

sudo rabbitmqctl set_permissions -p / shovel_user ".*" ".*" ".*"

 

b.配置shovel

在主機1上打開rabbitmq.config文件,修改shovel部分的配置爲以下內容

{rabbitmq_shovel,
   [{shovels,
     [%% A named shovel worker.
      {my_test_shovel,
       [

    %  List the source broker(s) from which to consume.
     
        {sources,
         [%% URI(s) and pre-declarations for all source broker(s).
         {brokers, ["amqp://shovel_user:123456@45.32.250.47:5672"]},
          {declarations, [
              {'exchange.declare',
                 [ {exchange, <<"shovel_exchange">>},
                 {type, <<"direct">>},
                 durable
                 ]},
             {'queue.declare',
                 [{queue,    <<"shovel_outcome_queue">>},durable]},
             {'queue.bind',
                 [ {exchange, <<"shovel_exchange">>},
                 {queue,    <<"shovel_outcome_queue">>},
                 {routing_key, <<"shovel_key">>}
                 ]}
          ]}

         ]},

    %  List the destination broker(s) to publish to.
        {destinations,
         [%% A singular version of the 'brokers' element.
         {broker, "amqp://shovel_user:123456@47.106.179.208:5672"},
          {declarations, [{'exchange.declare',
                             [ {exchange, <<"shovel_exchange">>},
                             {type, <<"direct">>},
                             durable
                             ]},
                         {'queue.declare',
                             [{queue,    <<"shovel_income_queue">>},durable]},
                         {'queue.bind',
                             [ {exchange, <<"shovel_exchange">>},
                             {queue,    <<"shovel_income_queue">>},
                             {routing_key, <<"shovel_key">>}
                             ]}]}

         ]},

    %  Name of the queue to shovel messages from.
     
      {queue, <<"shovel_outcome_queue">>},

    %  Optional prefetch count.
     
      {prefetch_count, 10},

    %  when to acknowledge messages:
     %  - no_ack: never (auto)
     %  - on_publish: after each message is republished
     %  - on_confirm: when the destination broker confirms receipt
     
      {ack_mode, no_ack},

    %  Overwrite fields of the outbound basic.publish.
     
      {publish_fields, [{exchange,    <<"shovel_exchange">>},
                        {routing_key, <<"shovel_key">>}]},

    %  Static list of basic.properties to set on re-publication.
     
      {publish_properties, [{delivery_mode, 2}]},

    %  The number of seconds to wait before attempting to
     %  reconnect in the event of a connection failure.
     
      {reconnect_delay, 2.5}

     ]} %% End of my_first_shovel
     ]}
    %% Rather than specifying some values per-shovel, you can specify
    %% them for all shovels here.
    %%
    %% {defaults, [{prefetch_count,     0},
    %%             {ack_mode,           on_confirm},
    %%             {publish_fields,     []},
    %%             {publish_properties, [{delivery_mode, 2}]},
    %%             {reconnect_delay,    2.5}]}
   ]}

 

RabbitMQ的官網的shovel配置示例不可用,這裏使用配置的是RabbitMQ在github提供的配置示例基礎上修改的(https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.config.example),而後結合官網文檔的說明本身摸索配置出來。

 

配置說明

簡單介紹一下上述配置中的關鍵部分

shovels以後接着可定義多個shovel,這裏只定義了一個shovel,名稱是my_test_shovel。

sources:定義了消息的來源

            brokers

             須要給出來源服務的地址,一般格式爲amqp://用戶名:密碼@主機名(IP):端口號/vhost名稱。

             以前定義的shovel_user這時候就能夠用上了,配置中咱們使用的是默認的vhost,因此沒有設置vhost名稱

            amqp://shovel_user:123456@45.32.250.47:5672

 

            declarations:

            declarations裏面的內容就是執行一些amqp的命令,這些命令跟使用API調用的過程相似,

            好比聲明隊列,Exchange和綁定信息等。

 

destinations:定義了消息的去向,裏面的內容跟sources相似,實際上就是定義接收的exchange和隊列

 

queue:這裏單獨配置了一個queue,是表示從哪一個隊列讀取消息,這裏跟sources裏聲明的隊列一致。

其餘一些可選的配置就不詳細介紹了,具體能夠查看官網文檔http://www.rabbitmq.com/configure.html

 

咱們這裏的配置表示從主機1的shovel_outcome_queue隊列獲取消息,而後轉發到主機2的shovel_income_queue隊列,兩邊使用的exchange名稱都是shovel_exchange,而且routing_key的值都是shovel_key。

完整的配置請參考

https://github.com/shenba2014/RabbitMQ/blob/master/shovel/rabbitmq.config

 

c.驗證配置

配置完成以後,重啓主機1的RabbitMQ服務

service rabbitmq-server restart

 

而後在主機1查看shovel的狀態

rabbitmqctl eval 'rabbit_shovel_status:status().'
能夠看到相似以下輸出信息,看到running字樣代表shovel服務已正常運行

[{my_test_shovel,static,
                  {running,[{src_uri,<<"amqp://45.32.250.47:5672">>},
                            {dest_uri,<<"amqp://47.106.179.208:5672">>}]},
                  {{2018,5,13},{12,4,48}}}]


輸出信息中列出了源節點和目標節點信息,最後一行是時間戳。

 

4.使用Shovel

接下來咱們使用代碼來測試Shovel是否可用,咱們的代碼跟鏈接普通的RabbitMQ服務相似,只是具體鏈接的服務器地址不一樣。

測試的代碼包含consumer和producer兩部分,consumer將鏈接主機2,producer將鏈接到主機1。

分別運行consumer和producer的代碼,producer在運行以後會發送消息到隊列shovel_outcome_queue,而後consumer會接收到消息。

 

具體consumer和producer的代碼不貼出來,完整代碼請參考

https://github.com/shenba2014/RabbitMQ/tree/master/shovel

 

咱們來直接看看運行代碼的效果

 

運行消費者的代碼,參數依次是:主機IP,端口,RabbitMQ用戶名和密碼,這裏的咱們鏈接的是主機2(目標節點)。

運行

python shovel_consumer.py 47.106.179.208 5672 shovel_user 123456

輸出

Ready for orders!

 

而後新開一個控制檯運行生產者的代碼,參數同樣,可是生產者鏈接的是主機1(源節點)。

運行

python shovel_producer.py 45.32.250.47 5672 shovel_user 123456

輸出

Sent order message.

 

而後在消費者的那個控制檯應該會看到相似以下輸出

Received order 92 for test type.

數字是隨機生成的,因此最終會結果可能不一樣。

 

好了,到目前爲止,整個演練就完成了,前面的準備工做較多,測試的代碼很簡單,主要是演示shovel的跨數據中心的消息傳遞功能。

結合實際的IP地址,exchange和queue,最後用一個圖來講明使用shovel的消息流向。

 

image

相關文章
相關標籤/搜索