RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。node
AMQP,即Advanced message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。python
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。web
RabbitMQ Server: 也叫broker server,是一種傳輸服務,負責維護一條從Producer到consumer的路線,保證數據可以按照指定的方式進行傳輸。正則表達式
Producer,數據的發送方。緩存
Consumer,數據的接收方。安全
Exchanges 接收消息,轉發消息到綁定的隊列。主要使用3種類型:direct, topic, fanout。bash
Queue RabbitMQ內部存儲消息的對象。相同屬性的queue能夠重複定義,但只有第一次定義的有效。服務器
Bindings 綁定Exchanges和Queue之間的路由。cookie
Connection: 就是一個TCP的鏈接。Producer和consumer都是經過TCP鏈接到RabbitMQ Server的。架構
Channel:虛擬鏈接。它創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。也就是說,通常狀況是程序起始創建TCP鏈接,第二步就是創建這個Channel。
Ubuntu 安裝
編輯 /etc/apt/sources.list 文件添加以下行
deb http://www.rabbitmq.com/debian/ testing main
執行更新軟件源命令
#apt-get update.
安裝rabbitmq
#apt-get install rabbitmq-server
啓動rabbitmq
# service rabbitmq-server start
Centos 安裝
從官網下載erlang和rabbitmq 的rpm包
安裝erlang
#rpm –i erlang-17.4-1.el6.x86_64.rpm
安裝rabbitmq
#rpm –i rabbitmq-server-3.5.3-1.noarch.rpm
rabbitmq的rpm安裝包不能指定安裝路徑
RABBITMQ配置文件位置:
啓用web插件
#rabbitmq-plugins enable rabbitmq_management #啓用
關閉web插件
# rabbitmq-plugins disable rabbitmq_management
能夠用默認帳號guest,guest登錄http://主機IP:15672,若是要遠程登陸,須要先建立賬戶,可查看下一節。
若是是集羣的話,只要在一臺主機設置便可,其它會自動同步。
#rabbitmqctl add_user iom 123456 –iom爲新建的用戶,123456爲密碼
#rabbitmqctl set_user_tags iom administrator –將用戶設置爲管理員角色
#rabbitmqctl set_permissions -p / iom 「.*」 「.*」 「.*」
–在 / 虛擬主機裏設置iom用戶配置權限,寫權限,讀權限。.*是正則表達式裏用法。rabbitmq的權限是根據不一樣的虛擬主機(virtual hosts)配置的,同用戶在不一樣的虛擬主機(virtual hosts)裏可能不同。
若是採用標準的 AMQP 協議,則惟一可以保證消息不會丟失的方式是利用事務機制 — 令 channel 處於 transactional 模式、向其 publish 消息、執行 commit 動做。在這種方式下,事務機制會帶來大量的多餘開銷,並會致使吞吐量降低 250% 。爲了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。
confirm 機制是在channel上使用 confirm.select方法,處於 transactional 模式的 channel 不能再被設置成 confirm 模式,反之亦然。
在 channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被 nack 一次。可是沒有對消息被 confirm 的快慢作任何保證,而且同一條消息不會既被 confirm 又被 nack 。
RabbitMQ 將在下面的狀況中對消息進行 confirm :
RabbitMQ發現當前消息沒法被路由到指定的 queues 中;
非持久屬性的消息到達了其所應該到達的全部 queue 中(和鏡像 queue 中);
持久消息到達了其所應該到達的全部 queue 中(和鏡像 queue 中),並被持久化到了磁盤(被 fsync);
持久消息從其所在的全部 queue 中被 consume 了(若是必要則會被 acknowledge)。
爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。
若是沒啓動消息確認機制,RabbitMQ在consumer收到消息後就會把消息刪除。
啓用消息確認後,consumer在處理數據後應經過回調函數顯示發送ack, RabbitMQ收到ack後纔會刪掉數據。若是consumer一段時間內不回饋,RabbitMQ會將該消息從新分配給另一個綁定在該隊列上的consumer。另外一種狀況是consumer斷開鏈接,可是獲取到的消息沒有回饋,則RabbitMQ一樣從新分配。
注意:若是consumer 沒調用basic.qos 方法設置prefetch_count=1,那即便該consumer有未ack的messages,RabbitMQ仍會繼續發messages給它。
消息確認機制確保了consumer退出時消息不會丟失,但若是是RabbitMQ自己因故障退出,消息仍是會丟失。爲了保證在RabbitMQ出現意外狀況時數據仍沒有丟失,須要將queue和message都要持久化。
queue持久化:channel.queue_declare(queue=’hello’, durable=True)
message持久化:channel.basic_publish(exchange=」,
routing_key=」task_queue」,
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,) #消息持久化
)
即便有消息持久化,數據也有可能丟失,由於rabbitmq是先將數據緩存起來,到必定條件才保存到硬盤上,這期間rabbitmq出現意外數據有可能丟失。
網上有測試代表:持久化會對RabbitMQ的性能形成比較大的影響,可能會降低10倍不止。
一個RABBITMQ集 羣中能夠共享user,virtualhosts,queues(開啓Highly Available Queues),exchanges等。但message只會在建立的節點上傳輸。當message進入A節點的queue中後,consumer從B節點拉取時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息實體取出並通過B發送給consumer。因此consumer應儘可能鏈接每個節點,從中取消息。
RABBITMQ的集羣節點包括內存節點、磁盤節點。內存節點的元數據僅放在內存中,性能比磁盤節點會有所提高。不過,若是在投遞message時,打開了message的持久化,那麼內存節點的性能只能體如今資源管理上,好比增長或刪除隊列(queue),虛擬主機(vrtual hosts),交換機(exchange)等,發送和接受message速度同磁盤節點同樣。一個集羣至少要有一個磁盤節點。
環境:有三臺主機,主機名和IP以下,rabbitmq的執行用戶爲rabbitmq,所屬組爲rabbitmq。
主機名 IP
rabbitmq1 192.168.10.2
rabbitmq2 192.168.10.3
rabbitmq3 192.168.10.4
殺掉rabbitmq2和rabbitmq3的rabbitmq進程:
#ps –ef|grep rab|awk ‘{print $2}’|xargs kill -9。–用service rabbitmq-servier stop停會有遺留進程。
登錄rabbitmq1(rabbitmq1上的rabbitmq服務不能關),執行
#cd /var/lib/rabbitmq –進入erlang.cookie所在目錄,只有ls –al能看見此文件
#chmod 777 .erlang* –該文件默認爲400權限,爲方便傳輸,先修改權限,非必須操做
#scp .erlang.cookie rabbitmq@192.168.10.3:/var/lib/rabbitmq –將此文件傳給另外兩條主機
#scp .erlang.cookie rabbitmq@192.168.10.4:/var/lib/rabbitmq
#chmod 400 .er* –恢復文件權限
分別在rabbitmq2和rabbitmq3 上執行
#chown rabbitmq:rabbitmq .er* –修改文件所屬用戶和所屬組
#chmod 400 .er* –修改文件權限
#service rabbitmq-server start
查詢rabbitmq1節點名稱
#rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 …
[{nodes,[{disc,[rabbit@rabbitmq1]}]},{running_nodes,[rabbit@ rabbitmq1]}]
…done.
rabbitmq2 加入rabbitmq1 節點.
# rabbitmqctl stop_app –關掉rabbitmq2服務
# rabbitmqctl join_cluster rabbit@rabbitmq1 — rabbitmq2加入rabbitmq1, rabbitmq2必須能經過rabbitmq1的主機名ping通rabbitmq1。
# rabbitmqctl start_app –啓動rabbitmq2服務
查看集羣信息
# rabbitmqctl cluster_status –此時裏面就應該能看見兩個節點。集羣名字爲rabbit@rabbitmq。
用相同的方法把rabbitmq3也加入rabbitmq1。
#rabbitmqctl stop_app –中止rabbitmq服務
#rabbitmqctl change_cluster_node_type disc/ram –更改節點爲磁盤或內存節點
#rabbitmqctl start_app –開啓rabbitmq服務
#rabbitmqctl cluster_status
[{nodes,[{disc,[rabbit@rabbitmq1,rabbit@rabbitmq2,rabbit@rabbitmq3]}]}, {running_nodes,[rabbit@rabbitmq1,rabbit@rabbitmq2, rabbit@rabbitmq3]}]…done.
–第一行是集羣中的節點成員,disc表示這些都是磁盤節點。
–第二行是正在運行的節點成員
假設要把rabbitmq2退出集羣
在rabbitmq2上執行
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl start_app
在集羣主節點上執行
# rabbitmqctl forget_cluster_node rabbit@rabbitmq2
集羣重啓時,最後一個掛掉的節點應該第一個重啓,若是因特殊緣由(好比同時斷電),而不知道哪一個節點最後一個掛掉。可用如下方法重啓:
先在一個節點上執行
#rabbitmqctl force_boot
#service rabbitmq-server start
在其餘節點上執行
#service rabbitmq-server start
查看cluster狀態是否正常(要在全部節點上查詢)。
#rabbitmqctl cluster_status
若是有節點沒加入集羣,能夠先退出集羣,而後再從新加入集羣。
上述方法不適合內存節點重啓,內存節點重啓的時候是會去磁盤節點同步數據,若是磁盤節點沒起來,內存節點一直失敗。
鏡像隊列能夠同步queue和message,當主queue掛掉,從queue中會有一個變爲主queue來接替工做。
鏡像隊列是基於普通的集羣模式的,因此你仍是得先配置普通集羣,而後才能設置鏡像隊列。
鏡像隊列設置後,會分一個主節點和多個從節點,若是主節點宕機,從節點會有一個選爲主節點,原先的主節點起來後會變爲從節點。
queue和message雖然會存在全部鏡像隊列中,但客戶端讀取時不論物理面鏈接的主節點仍是從節點,都是從主節點讀取數據,而後主節點再將queue和message的狀態同步給從節點,所以多個客戶端鏈接不一樣的鏡像隊列不會產生同一message被屢次接受的狀況。
沿用3.2的環境,如今咱們把名爲「hello」的隊列設置爲同步給全部節點
#rabbitmqctl set_policy ha-all 「hello」 ‘{「ha-mode」:」all」}’
ha-all 是同步模式,指同步給全部節點,還有另外兩種模式ha-exactly表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定,ha-nodes表示在指定的節點上進行鏡像,節點名稱經過ha-params指定;
hello 是同步的隊列名,能夠用正則表達式匹配;
{「ha-mode」:」all」} 表示同步給全部,同步模式的不一樣,此參數也不一樣。
執行上面命令後,能夠在web管理界面查看queue 頁面,裏面hello隊列的node節點後會出現+2標籤,表示有2個從節點,而主節點則是當前顯示的node(xf7021是測試用的名字,按4-2應該爲rabbitmq(1-3))。
紅字爲手工加的備註,原文件裏沒有。
vi /etc/keepalived/keepalived.cnf文件
global_defs {
router_id LVS_MASTER }
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 51
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
virtual_ipaddress {
192.168.10.251/24 #rabbitmq
}
}
virtual_server 192.168.10.251 5672 {
delay_loop 6
lb_algo rr
lb_kind DR
protocol TCP
real_server 192.168.10.3 5672 {
weight 3
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 5672
}
}
real_server 192.168.10.4 5672 {
weight 3
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 5672
}
}
}
lvs_rabbitmq.sh腳本內容:
#!/bin/bash
VIP=192.168.10.251
case 「$1」 in
start)
ifconfig lo:0 $VIP netmask 255.255.255.255 broadcast $VIP
/sbin/route add -host $VIP dev lo:0
echo 「1」 >/proc/sys/net/ipv4/conf/lo/arp_ignore
echo 「2」 >/proc/sys/net/ipv4/conf/lo/arp_announce
echo 「1」 >/proc/sys/net/ipv4/conf/all/arp_ignore
echo 「2」 >/proc/sys/net/ipv4/conf/all/arp_announce
sysctl -p >/dev/null 2>&1
echo 「lvs_vip server start ok!」;;
stop)
ifconfig lo:0 down
/sbin/route del $VIP >/dev/null 2>&1
echo 「0」 >/proc/sys/net/ipv4/conf/lo/arp_ignore
echo 「0」 >/proc/sys/net/ipv4/conf/lo/arp_announce
echo 「0」 >/proc/sys/net/ipv4/conf/all/arp_ignore
echo 「0」 >/proc/sys/net/ipv4/conf/all/arp_announce
echo 「lvs_vip server stoped.」;;
*)
echo 「arg start|stop.」
exit 1
esac
exit 0
RabbitMQ監控項目不少,可經過web管理界面監控。
OVERVIEW頁面下有4個標籤。主要關注totals和nodes兩個。
ready爲待處理消息量,total爲總消息量。
publish爲每秒發送消息量,deliver爲每秒接受消息量。
下面5個灰色長方塊分別表明對應的模塊鏈接數。
name爲節點名稱,後面5個藍色方塊分別表明文件打開數,socket鏈接數,erlang processes(暫時未知),內存佔用兩,磁盤空餘量;info裏顯示節點屬性,將鼠標放在內容上會顯示對應的統計內容。
主機名 IP
VIP: 192.168.10.251
client 192.168.10.2 –本測試發送(producer)和接收(consumer)在同一臺機器
rabbitmq1 192.168.10.3
rabbitmq2 192.168.10.4
負載機啓動keepalived
# service keepalived start
rabbitmq1和rabbitmq2執行5-2的腳本
#./lvs_rabbitmq.sh start
按第3和第4章的方法組建集羣,配置鏡像隊列,節點類型最好都設置爲磁盤節點。
按第1-5建立用戶。
測試用RabbitMQ的python語言客戶端,注意python是靠縮進量來區分語句塊。紅色部分爲註釋,源碼上沒有。
發送源碼:
#vi send.py
#!/usr/bin/env python
import pika
import time
credentials=pika.PlainCredentials(‘iom’,’123456′) –配置鏈接的用戶名和密碼
parameters=pika.ConnectionParameters(‘192.168.10.251′,5672,’/’,credentials)
connection=pika.BlockingConnection(parameters)
channel=connection.channel()
channel.queue_declare(queue=’hello’)
count=0
while count<9999:
message=’Hello World’+str(count)
count=count+1
channel.basic_publish(exchange=」,routing_key=’hello’,body=message)
print 「Sent %s」 %(message)
time.sleep(1)
connection.close()
接收源碼
#vi receive.py
#!/usr/bin/env python
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=’xf7027′))
channel=connection.channel()
channel.queue_declare(queue=’hello’)
print ‘[*] Waiting for message.To exit press CTRL+C’
def callback(ch,method,properties,body):
print 「[x] Received %r」 %(body,)
channel.basic_consume(callback,queue=’hello’,no_ack=True)
channel.start_consuming()
在客戶機上執行
#python send.py
在負載機上執行
#watch ipvsadm –Ln
能夠看到rabbitmq1或rabbitmq2的activeconn列數值爲1。
客戶機從新執行發送程序
#python send.py
在負載機上能夠看到另外一個rabbitmq服務的activeconn 列數值也變爲1。
測試容災性:
在客戶機上分別執行發送和接受程序。
#python send.py
#python receive.py
而後關掉一個rabbitmq節點,若是關掉的正好是客戶機連的那個節點的話,客戶機發送和接收程序會報錯退出(程序自己若是有錯誤重發機制則不受任何影響)。若是關掉的是另外的節點,程序不受任何影響。