原理:node
經過鏈接多個機器組成單個邏輯中間服務器,機器之間通訊藉助於erlang的消息傳輸,要求集羣中全部節點必須有相同的erlang cookie;節點間網絡必須可靠,且運行相同版本的rabbitmq和erlang。python
虛擬主機、交換機、用戶信息和權限會自動鏡像到集羣中全部節點。隊列可能位於單個節點或鏡像到多個節點。鏈接到任意節點的客戶端能看到集羣中全部隊列。git
安裝erlanggithub
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.0.2/erlang-21.0.2-1.el6.x86_64.rpmweb
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.0.2/erlang-21.0.2-1.el7.centos.x86_64.rpmcentos
rpm -ivh erlang-20.3.6-1.el6.x86_64.rpm服務器
安裝mqcookie
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.6/rabbitmq-server-3.7.6-1.el6.noarch.rpm網絡
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpmapp
yum install rabbitmq-server-3.7.7-1.el7.noarch.rpm
啓動mq
systemctl start rabbitmq-server
新建用戶
rabbitmqctl add_user admin Hilaw@890
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
rabbitmqctl add_user monitor law@201809
rabbitmqctl set_user_tags monitor administrator
rabbitmqctl set_permissions -p / monitor '.*' '.*' '.*'
開啓web管理頁面:
rabbitmq-plugins enable rabbitmq_management
http://IP:15672 便可用admin登陸查看
關閉web管理頁面:
rabbitmq-plugins disable rabbitmq_management
經常使用命令
rabbitmqctl list_users 查看用戶列表
rabbitmqctl add_user username password 新建一個用戶
rabbitmqctl delete_user username 刪除一個用戶
rabbitmqctl change_password username newpassword 修改用戶密碼
rabbitmqctl set_permissions -p vhost username ".*" ".*" ".*"
配置用戶對應虛擬主機配置、寫和讀的全部權限
rabbitmqctl set_permissions -p username 查看用戶權限
rabbitmqctl clear_permissions -p username vhost
移除用戶在虛擬主機上的全部權限
rabbitmqctl list_user_permissions username 查看用戶在全部虛擬主機上的權限
rabbitmqctl status 查看狀態
rabbitmqctl list_queues 查看全部隊列及其消息數
rabbitmqctl list_queues -p vhost 查看具體隊列
rabbitmqctl -p vhostpath purge_queue blue 清除隊列blue裏的消息
鏡像集羣搭建
1,MQ1上同步erlang.cookie
cd /var/lib/rabbitmq
ls -al
chmod 777 .erlang.cookie
scp .erlang.cookie root@MQ2:/var/lib/rabbitmq
chmod 400 .erlang.cookie
2,在MQ2上
MQ2上暫停服務
systemctl stop rabbitmq-server
cd /var/lib/rabbitmq
ls -al
cp .erlang.cookie .erlang.cookie.default
chown rabbitmq.rabbitmq .erlang.cookie
chmod 400 .erlang.cookie
systemctl start rabbitmq-server
加入集羣
MQ1查看集羣狀態
rabbitmqctl cluster_status
將MQ2加入MQ1
關掉MQ2的隊列
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@MQ1 MQ2上必須能ping同MQ1,添加全部節點IP HOSTNAME到/etc/hosts文件
rabbitmqctl start_app
查看集羣狀態
rabbitmqctl cluster_status
退出集羣
MQ1上
rabbitmqctl forget_cluster_note rabbit@MQ2
MQ2上
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
注意事項:
cookie在全部節點上必須徹底同樣,同步時必定要注意。
erlang是經過主機名來鏈接服務,必須保證各個主機名之間能夠ping通。能夠經過編輯/etc/hosts來手工添加主機名和IP對應關係。若是主機名ping不通,rabbitmq服務啓動會失敗。
若是queue是非持久化queue,則若是建立queue的那個節點失敗,發送方和接收方能夠建立一樣的queue繼續運做。但若是是持久化queue,則只能等建立queue的那個節點恢復後才能繼續服務。
在集羣元數據有變更的時候須要有disk node在線,可是在節點加入或退出的時候全部的disk node必須所有在線。若是沒有正確退出disk node,集羣會認爲這個節點當掉了,在這個節點恢復以前不要加入其它節點。
更改節點屬性
rabbitmqctl stop_app –中止rabbitmq服務
rabbitmqctl change_cluster_node_type disc/ram –更改節點爲磁盤或內存節點
rabbitmqctl start_app –開啓rabbitmq服務
內存節點是將交換器、用戶和vhost等元數據都保存在內存中,而磁盤節點將元數據保存在磁盤中。單節點系統只容許磁盤節點,集羣中可部分配置爲內存節點。
集羣重啓時,最後一個掛掉的節點應該第一個重啓,若是因特殊緣由(好比同時斷電),而不知道哪一個節點最後一個掛掉。可用如下方法重啓:
先在一個節點上執行
#rabbitmqctl force_boot
#service rabbitmq-server start
在其餘節點上執行
#service rabbitmq-server start
查看cluster狀態是否正常(要在全部節點上查詢)。
#rabbitmqctl cluster_status.
測試:
1,生成消息
#!/usr/bin/python3
import pika
credentials = pika.PlainCredentials('monitor','law@201809')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1','5672','/',credentials))
channel = connection.channel() #新建一個頻道
channel.queue_declare(queue='pika') #聲明一個隊列pika
channel.basic_publish(exchange='',routing_key='pika',body='Test message') #向pika隊列發送一條消息
connection.close() #發送完成後,關閉鏈接
2,消費
#!/usr/bin/python3
import pika
credentials = pika.PlainCredentials('monitor','law@201809')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1','5672','/',credentials))
channel = connection.channel()
channel.queue_declare(queue='pika')
for method_frame,properties,body in channel.consume('pika'):
print(method_frame,properties,body) #打印出消息
channel.basic_ack(method_frame.delivery_tag)
#消費完消息就跳出循環
if method_frame.delivery_tag == 1:
break
#取消消費者並返回任何待處理消息
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
connection.close() #關閉鏈接