Rabbitmq集羣高可用html
RabbitMQ是用erlang開發的,集羣很是方便,由於erlang天生就是一門分佈式語言,但其自己並不支持負載均衡。node
Rabbit模式大概分爲如下三種:單一模式、普通模式、鏡像模式python
單一模式:最簡單的狀況,非集羣模式。git
及單實例服務。github
普通模式:默認的集羣模式。web
queue建立以後,若是沒有其它policy,則queue就會按照普通模式集羣。對於Queue來講,消息實體只存在於其中一個節點,A、B兩個節點僅有相同的元數據,即隊列結構,但隊列的元數據僅保存有一份,即建立該隊列的rabbitmq節點(A節點),當A節點宕機,你能夠去其B節點查看,./rabbitmqctl list_queues 發現該隊列已經丟失,但聲明的exchange還存在。redis
當消息進入A節點的Queue中後,consumer從B節點拉取時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息實體取出並通過B發送給consumer。json
因此consumer應儘可能鏈接每個節點,從中取消息。即對於同一個邏輯隊列,要在多個節點創建物理Queue。不然不管consumer連A或B,出口總在A,會產生瓶頸。vim
該模式存在一個問題就是當A節點故障後,B節點沒法取到A節點中還未消費的消息實體。api
若是作了消息持久化,那麼得等A節點恢復,而後纔可被消費;若是沒有持久化的話,隊列數據就丟失了。
鏡像模式:把須要的隊列作成鏡像隊列,存在於多個節點,屬於RabbitMQ的HA方案。
該模式解決了上述問題,其實質和普通模式不一樣之處在於,消息實體會主動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。
該模式帶來的反作用也很明顯,除了下降系統性能外,若是鏡像隊列數量過多,加之大量的消息進入,集羣內部的網絡帶寬將會被這種同步通信大大消耗掉。
因此在對可靠性要求較高的場合中適用,一個隊列想作成鏡像隊列,須要先設置policy,而後客戶端建立隊列的時候,rabbitmq集羣根據「隊列名稱」自動設置是普通集羣模式或鏡像隊列。具體以下:
隊列經過策略來使能鏡像。策略能在任什麼時候刻改變,rabbitmq隊列也近可能的將隊列隨着策略變化而變化;非鏡像隊列和鏡像隊列之間是有區別的,前者缺少額外的鏡像基礎設施,沒有任何slave,所以會運行得更快。
爲了使隊列稱爲鏡像隊列,你將會建立一個策略來匹配隊列,設置策略有兩個鍵「ha-mode和 ha-params(可選)」。ha-params根據ha-mode設置不一樣的值,下面表格說明這些key的選項
瞭解集羣中的基本概念:
RabbitMQ的集羣節點包括內存節點、磁盤節點。顧名思義內存節點就是將全部數據放在內存,磁盤節點將數據放在磁盤。不過,如前文所述,若是在投遞消息時,打開了消息的持久化,那麼即便是內存節點,數據仍是安全的放在磁盤。
一個rabbitmq集 羣中能夠共享 user,vhost,queue,exchange等,全部的數據和狀態都是必須在全部節點上覆制的,一個例外是,那些當前只屬於建立它的節點的消息隊列,儘管它們可見且可被全部節點讀取。rabbitmq節點能夠動態的加入到集羣中,一個節點它能夠加入到集羣中,也能夠從集羣環集羣會進行一個基本的負載均衡。
集羣中有兩種節點:
1 內存節點:只保存狀態到內存(一個例外的狀況是:持久的queue的持久內容將被保存到disk)
2 磁盤節點:保存狀態到內存和磁盤。
內存節點雖然不寫入磁盤,可是它執行比磁盤節點要好。集羣中,只須要一個磁盤節點來保存狀態 就足夠了
若是集羣中只有內存節點,那麼不能中止它們,不然全部的狀態,消息等都會丟失。
思路:
那麼具體如何實現RabbitMQ高可用,咱們先搭建一個普通集羣模式,在這個模式基礎上再配置鏡像模式實現高可用,Rabbit集羣前增長一個反向代理,生產者、消費者經過反向代理訪問RabbitMQ集羣。
rabbitmq 安裝 1.配置epel 源 on node1-2 rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm #http://mirrors.yun-idc.com/epel/5/x86_64/epel-release-5-4.noarch.rpm wget -O /etc/yum.repos.d/epel-erlang.repo #yum install erlang xmlto #wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.3/rabbitmq-server-3.5.3-1.noarch.rpm #rpm -ivh rabbitmq-server-3.5.3-1.noarch.rpm #/etc/init.d/rabbitmq-server restart # rabbitmqctl delete_user guest #rabitmqctl add_user admin password #rabbitmqctl set_user_tags admin administrator #rabbitmqctl add_vhost web #rabbitmqctl set_permissions -p web admin ".*" ".*" ".*" #rabbitmq restart #sudo rabbitmq-plugins enable rabbitmq_management #node2 # rabbitmqctl delete_user guest #rabitmqctl add_user admin password #rabbitmqctl set_user_tags admin administrator #rabbitmqctl add_vhost web #rabbitmqctl set_permissions -p web admin ".*" ".*" ".*" #rabbitmq restart #sudo vim /var/lib/rabbitmq/.erlang.cookie #保持2臺node 上的文件一直,權限同樣 -r-------- 1 rabbitmq rabbitmq 21 8月 14 10:21 /var/lib/rabbitmq/.erlang.cookie 將node1和node2 組成集羣 node2 上面執行以下命令 #rabbitmqctl stop_app #rabbitmqctl join_cluster --disk rabbit@node1 #rabbitmqctl start_app #rabbitmqctl cluster_status Cluster status of node 'rabbit@node2' ... [{nodes,[{disc,['rabbit@node1']}, {ram,['rabbit@node2']}]}, {running_nodes,['rabbit@node1','rabbit@node2']}, {cluster_name,<<"rabbit@node1">>}, {partitions,[]}] #sudo rabbitmq-plugins enable rabbitmq_management #sudo rabbitmqctl stop_app sudo rabbitmqctl change_cluster_node_type ram sudo rabbitmqctl start_app 二,配置keepalived 集羣 node1 ! Configuration File for keepalived global_defs { router_id LVS_TALARIS_RMQ_SVR } vrrp_instance VI_1 { state BACKUP interface eth0 virtual_router_id 47 priority 160 advert_int 1 nopreempt # lvs_sync_daemon_interface eth0 dont_track_primary garp_master_delay 5 authentication { auth_type PASS auth_pass jianzhong.xu@aa.com } virtual_ipaddress { 1.1.1.1 } } node2 ! Configuration File for keepalived global_defs { router_id LVS_TALARIS_RMQ_SVR } vrrp_instance VI_1 { state MASTER interface eth0 virtual_router_id 47 priority 150 advert_int 1 nopreempt # lvs_sync_daemon_interface eth0 dont_track_primary garp_master_delay 5 authentication { auth_type PASS auth_pass qc_rmq2015@ele.me } virtual_ipaddress { 1.1.1.1 } } node1 haproxy: global maxconn 100000 log /dev/log local0 notice defaults mode tcp log global option redispatch retries 3 timeout connect 5000ms timeout client 50000ms timeout server 50000ms listen stats :4966 mode http stats enable stats uri / frontend rmq_lb bind :27010 default_backend rmq_lb backend rmq_lb balance roundrobin server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000 server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000 node2 haproxy: global maxconn 100000 log /dev/log local0 notice defaults mode tcp log global option redispatch retries 3 timeout connect 5000ms timeout client 50000ms timeout server 50000ms listen stats :4966 mode http stats enable stats uri / frontend rmq_lb bind :27010 default_backend rmq_lb backend rmq_lb balance roundrobin server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000 server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000 #node1 supervisor [program:rmq_lb] command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg autostart=true autorestart=unexpected startsecs=3 startretries=3 stopsignal=TERM stopwaitsecs=5 user=nobody stopasgroup=true killasgroup=true stdout_logfile=syslog stderr_logfile=syslog #node2 supervisor [program:rmq_lb] command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg autostart=true autorestart=unexpected startsecs=3 startretries=3 stopsignal=TERM stopwaitsecs=5 user=nobody stopasgroup=true killasgroup=true stdout_logfile=syslog stderr_logfile=syslog #supervisor monitor [program:rmqmonitor] command=ruby /opt/sbin/rmq.watcher.rb -h statsd directory=/opt/ autostart=true user=root redirect_stderr=true stdout_logfile=/data/log/rmqmonitor/rmqmonitor.log #cat rmq.watcher.rb #!/usr/bin/env ruby # # Author: Jeff Vier <jeff@jeffvier.com> require 'rubygems' require 'digest' require 'find' require 'fileutils' require 'json' require 'socket' require 'resolv' require 'optparse' options = { :prefix => Socket.gethostname, :interval => 10, :host => '127.0.0.1', :port => 8125, :queues => false } OptionParser.new do |opts| opts.banner = "Usage: #{$0} [options]" opts.on('-P', '--prefix [STATSD_PREFIX]', "metric prefix (default: #{options[:prefix]})") { |prefix| options[:prefix] = "#{prefix}" } opts.on('-i', '--interval [SEC]',"reporting interval (default: #{options[:interval]})") { |interval| options[:interval] = interval } opts.on('-h', '--host [HOST]', "statsd host (default: #{options[:host]})") { |host| options[:host] = host } opts.on('-p', '--port [PORT]', "statsd port (default: #{options[:port]})") { |port| options[:port] = port } opts.on('-q', '--[no-]queues', "report queue metrics (default: #{options[:queues]})") { |queues| options[:queues] = queues } end.parse! ############################################################### # Typical StatsD class, pasted to avoid an external dependency: # Stolen from https://github.com/bvandenbos/statsd-client class Statsd Version = '0.0.8' class << self attr_accessor :host, :port def host_ip_addr @host_ip_addr ||= Resolv.getaddress(host) end def host=(h) @host_ip_addr = nil @host = h end # +stat+ to log timing for # +time+ is the time to log in ms def timing(stat, time = nil, sample_rate = 1) if block_given? start_time = Time.now.to_f yield time = ((Time.now.to_f - start_time) * 1000).floor end send_stats("#{stat}:#{time}|ms", sample_rate) end def gauge(stat, value, sample_rate = 1) send_stats("#{stat}:#{value}|g", sample_rate) end # +stats+ can be a string or an array of strings def increment(stats, sample_rate = 1) update_counter stats, 1, sample_rate end # +stats+ can be a string or an array of strings def decrement(stats, sample_rate = 1) update_counter stats, -1, sample_rate end # +stats+ can be a string or array of strings def update_counter(stats, delta = 1, sample_rate = 1) stats = Array(stats) send_stats(stats.map { |s| "#{s}:#{delta}|c" }, sample_rate) end private def send_stats(data, sample_rate = 1) data = Array(data) sampled_data = [] # Apply sample rate if less than one if sample_rate < 1 data.each do |d| if rand <= sample_rate sampled_data << "#{d}|@#{sample_rate}" end end data = sampled_data end return if data.empty? raise "host and port must be set" unless host && port begin sock = UDPSocket.new data.each do |d| sock.send(d, 0, host_ip_addr, port) end rescue => e puts "UDPSocket error: #{e}" ensure sock.close end true end end end ################################################################################ include FileUtils # allows use of FileUtils methods without the FileUtils:: prefix ie: mv_f(file, file2) or rm_rf(dir) STDOUT.sync = true # don't buffer STDOUT Statsd.host = options[:host] Statsd.port = options[:port] unless system 'which rabbitmqadmin' raise "unable to locate the rabbitmqadmin command" end loop do overview = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 show overview -f raw_json`) prefix = "rabbitmq.#{options[:prefix]}.overview.object_totals" Statsd.gauge("#{prefix}.channels", overview[0]['object_totals']['channels']) Statsd.gauge("#{prefix}.connections", overview[0]['object_totals']['connections']) Statsd.gauge("#{prefix}.consumers", overview[0]['object_totals']['consumers']) Statsd.gauge("#{prefix}.exchanges", overview[0]['object_totals']['exchanges']) Statsd.gauge("#{prefix}.messages", overview[0]['queue_totals']['messages']) Statsd.gauge("#{prefix}.queues", overview[0]['object_totals']['queues']) if options[:queues] queues = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 list queues -f raw_json`) queues.each do |queue| if queue.key?('name') prefix = "#{options[:prefix]}.queues.#{queue['name']}" Statsd.gauge("#{prefix}.active_consumers", queue['active_consumers']) Statsd.gauge("#{prefix}.consumers", queue['consumers']) Statsd.gauge("#{prefix}.memory", queue['memory']) Statsd.gauge("#{prefix}.messages", queue['messages']) Statsd.gauge("#{prefix}.messages_ready", queue['messages_ready']) Statsd.gauge("#{prefix}.messages_unacknowledged", queue['messages_unacknowledged']) Statsd.gauge("#{prefix}.avg_egress_rate", queue['backing_queue_status']['avg_egress_rate']) if queue['backing_queue_status'] Statsd.gauge("#{prefix}.avg_ingress_rate", queue['backing_queue_status']['avg_ingress_rate']) if queue['backing_queue_status'] if queue.key?('message_stats') Statsd.gauge("#{prefix}.ack_rate", queue['message_stats']['ack_details']['rate']) if queue['message_stats']['ack_details'] Statsd.gauge("#{prefix}.deliver_rate", queue['message_stats']['deliver_details']['rate']) if queue['message_stats']['deliver_details'] Statsd.gauge("#{prefix}.deliver_get_rate", queue['message_stats']['deliver_get_details']['rate']) if queue['message_stats']['deliver_get_details'] Statsd.gauge("#{prefix}.publish_rate", queue['message_stats']['publish_details']['rate']) if queue['message_stats']['publish_details'] end end end end sleep options[:interval] end # 客戶端, 將keepalived VIP 配置成域名 而後客戶端鏈接 域名:haproxy 端口 zabbix 監控rmq UserParameter=rmq.messageTotal[*],/etc/zabbix/scripts/rmqmessage.py -u "$1" -p "$2" -H "$3" cat rmqmessage.py #!/usr/bin/env python #coding:utf-8 import requests,re from requests.auth import HTTPBasicAuth from optparse import OptionParser def RmqMessage(): usage="usage: %prog [options] arg" parser=OptionParser(usage) parser.add_option("-u", "--user",action="store",type="string",help="USERNAME") parser.add_option("-p", "--passwd",action="store",type="string",help="PASSWORD") parser.add_option("-H", "--hostname",action="store",type="string",help="HOSTNAME") (options,args) = parser.parse_args() user=options.user passwd=options.passwd hostname=options.hostname url="http://%s.elenet.me:15672/api/overview" %(hostname) if re.match(r"^\d+.",hostname): url="http://%s:15672/api/overview" %(hostname) try: r = requests.get(url, auth=HTTPBasicAuth(user, passwd)) result=r.json() QmessageTotal=result['queue_totals']['messages'] print QmessageTotal return QmessageTotal except Exception, e: return -1 RmqMessage()