(轉載)RabbitMQ在分佈式系統的應用

轉載自:http://geek.csdn.net/news/detail/74184html

因爲以前作的項目中須要在多個節點之間可靠地通訊,因此廢棄了以前使用的Redis pub/sub(由於集羣有單點問題,且有諸多限制),改用了RabbitMQ。使用期間獲得很多收穫,也踩了很多坑,因此在此分享下心得。java

怎麼保證可靠性的?

RabbitMQ提供了幾種特性,犧牲了一點性能代價,提供了可靠性的保證。node

  • 持久化

當RabbitMQ退出時,默認會將消息和隊列都清除,因此須要在第一次聲明隊列和發送消息時指定其持久化屬性爲true,這樣RabbitMQ會將隊列、消息和狀態存到RabbitMQ本地的數據庫,重啓後會恢復。git

durable=true channel.queueDeclare("task_queue", durable, false, false, null);//隊列 channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//消息
注:當聲明的隊列已經存在時,嘗試從新定義它的durable是不生效的。
  • 接收應答

客戶端接收消息的模式默認是自動應答,可是經過設置autoAck爲false可讓客戶端主動應答消息。當客戶端拒絕此消息或者未應答便斷開鏈接時,就會使得此消息從新入隊(在版本2.7.0之前是到從新加入到隊尾,2.7.0及之後是保留消息在隊列中的原來位置)。github

autoAck = false; requeue = true; channel.basicConsume(queue, autoAck, callback); channel.basicAck();//應答 channel.basicReject(deliveryTag, requeue);//拒絕 channel.basicRecover(requeue);//恢復
  • 發送確認

默認狀況下,發送端不關注發出去的消息是否被消費掉了。可設置channel爲confirm模式,全部發送的消息都會被確認一次,用戶能夠自行根據server發回的確認消息查看狀態。詳細介紹見:confirmsdocker

channel.confirmSelect(); // 進入confirm模式 do publish messages... // 每一個消息都會被編號,從1開始 channel.getNextPublishSeqNo() // 查看下一個要發送的消息的序號 channel.waitForConfirms(); // 等待全部消息發送並確認
  • 事務:和confirm模式不能同時使用,並且會帶來大量的多餘開銷,致使吞吐量降低不少,故而不推薦。
channel.txSelect(); try { do something... channel.txCommit(); } catch (e){ channel.txRollback(); }
  • 消息隊列的高可用(主備模式)

相比於路由和綁定,能夠視爲是共享於全部的節點的,消息隊列默認只存在於第一次聲明它的節點上,這樣一旦這個節點掛了,這個隊列中未處理的消息就沒有了。 幸虧,RabbitMQ提供了將它備份到其餘節點的機制,任什麼時候候都有一個master負責處理請求,其餘slaves負責備份,當master掛掉,會將最先建立的那個slave提高爲master。數據庫

命令:rabbitmqctl set_policy ha-all 「^ha\.」 ‘{「ha-mode」:」all」}’:設置全部以’ha’開頭的queue在全部節點上擁有備份。詳細語法點這裏;也能夠在界面上配置。緩存

圖片描述

注:因爲exclusive類型的隊列會在client和server鏈接斷開時被刪掉,因此對它設置持久化屬性和備份都是沒有意義的。
  • 順序保證

直接上圖好了:ruby

圖片描述

一些須要注意的地方

  • 集羣配置:

一個集羣中多個節點共享一份.erlang.cookie文件;如果沒有啓用RABBITMQ_USE_LONGNAME,須要在每一個節點的hosts文件中指定其餘節點的地址,否則會找不到其餘集羣中的節點。服務器

  • 腦裂:

RabbitMQ集羣對於網絡分區的處理和忍受能力不太好,推薦使用federation或者shovel插件去解決。federation詳見高級->Federation。可是,狀況已經發生了,怎麼去解決呢?放心,仍是有辦法恢復的。當網絡斷斷續續時,會使得節點之間的通訊斷掉,進而形成集羣被分隔開的狀況。這樣,每一個小集羣以後便只處理各自本地的鏈接和消息,從而致使數據不一樣步。當從新恢復網絡鏈接時,它們彼此都認爲是對方掛了-_-||,即可以判斷出有網絡分區出現了。可是RabbitMQ默認是忽略掉不處理的,形成兩個節點繼續各自爲政(路由,綁定關係,隊列等能夠獨立地建立刪除,甚至主備隊列也會每一方擁有本身的master)。能夠更改配置使得鏈接恢復時,會根據配置自動恢復。

  • ignore:默認,不作任何處理

  • pause-minority:斷開鏈接時,判斷當前節點是否屬於少數派(節點數少於或者等於一半),若是是,則暫停直到恢復鏈接。

  • {pause_if_all_down, [nodes], ignore | autoheal}:斷開鏈接時,判斷當前集羣中節點是否有節點在nodes中,若是有,則繼續運行,不然暫停直到恢復鏈接。這種策略下,當恢復鏈接時,可能會有多個分區存活,因此,最後一個參數決定它們怎麼合併。

  • autoheal:當恢復鏈接時,選擇客戶端鏈接數最多的節點狀態爲主,重啓其餘節點。

配置:**【詳見下文:集羣配置】

  • 屢次ack:客戶端屢次應答同一條消息,會使得該客戶端收不到後續消息。

結合Docker使用

集羣版本的實現:詳見我本身寫的一個例子rabbitmq-server-cluster

消息隊列中間件的比較

  • RabbitMQ: 
    • 優勢:支持不少協議如:AMQP,XMPP,STMP,STOMP;靈活的路由;成熟穩定的集羣方案;負載均衡;數據持久化等。
    • 缺點:速度較慢;比較重量級,安裝須要依賴Erlang環境。
  • Redis: 
    • 優勢:比較輕量級,易上手
    • 缺點:單點問題,功能單一
  • Kafka: 
    • 優勢:高吞吐;分佈式;快速持久化;負載均衡;輕量級
    • 缺點:極端狀況下會丟消息

最後附一張網上截取的測試結果:

圖片描述

更多性能參數見: RabbitMQ Performance Measurements

若是有興趣簡單瞭解下RabbitMQ,能夠繼續往下看~

幾個重要的概念

  • Virtual Host:包含若干個Exchange和Queue,表示一個節點;
  • Exchange:接受客戶端發送的消息,並根據Binding將消息路由給服務器中的隊列,Exchange分爲direct、fanout、topic三種;
  • Binding:鏈接Exchange和Queue,包含路由規則;
  • Queue:消息隊列,存儲還未被消費的消息;
  • Message:Header+Body;
  • Channel:通道,執行AMQP的命令;一個鏈接可建立多個通道以節省資源。

Client

RabbitMQ官方實現了不少熱門語言的客戶端,就不一一列舉啦,以java爲例,直接開始正題:

  • 創建鏈接:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
  • 能夠加上斷開重試機制:
factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(10000);
  • 建立鏈接和通道:
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
  • 一對一:一個生產者,一個消費者

圖片描述

生產者:

channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消費者:

Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  • 一對多:一個生產者,多個消費者

圖片描述

代碼同上,只不過會有多個消費者,消息會輪序發給各個消費者。

若是設置了autoAck=false,那麼能夠實現公平分發(即對於某個特定的消費者,每次最多隻發送指定條數的消息,直到其中一條消息應答後,再發送下一條)。須要在消費者中加上:

int prefetchCount = 1; channel.basicQos(prefetchCount);

其餘同上。

  • 廣播

圖片描述

生產者:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

消費者同上。

  • Routing:指定路由規則

圖片描述

生產者:

String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, routingKey); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

消費者同上。

  • Topics:支持通配符的Routing

圖片描述

*能夠表示一個單詞 
#能夠表示一個或多個單詞

生產者:

channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

消費者同上。

  • RPC

圖片描述

其實就是一對一模式的一種用法:

首先,客戶端發送一條消息到服務端聲明的隊列,消息屬性中包含reply_tocorrelation_id

- reply_to 是客戶端建立的消息的隊列,用來接收遠程調用結果 - correlation_id 是消息的標識,服務端迴應的消息屬性中會帶上以便知道是哪條消息的結果。

而後,服務端接收到消息,處理,並返回一條結果到reply_to隊列中,最終,客戶端接收到返回消息,繼續向下處理。

Server

支持各大主流操做系統,這裏以Unix爲例介紹下經常使用配置和命令:

安裝

因爲RabbitMQ是依賴於Erlang的,因此得首先安裝最近版本的Erlang。 
單點的安裝比較簡單,下載解壓便可。

下載地址:http://www.rabbitmq.com/download.html

  • 配置:(通常的,用默認的便可。)
  • $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf: 環境變量默認配置(也可在啓動腳本中設置,且以啓動命令中的配置爲準)。經常使用的有: 
    • RABBITMQ_NODENAME:節點名稱,默認是rabbit@$HOSTNAME。
    • RABBITMQ_NODE_PORT:協議端口號,默認5672。
    • RABBITMQ_SERVER_START_ARGS:覆蓋rabbitmq.config中的一些配置。
  • $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config: 核心組件,插件,erlang服務等配置,經常使用的有: 
    • disk_free_limit:隊列持久化等信息都是存到RabbitMQ本地的數據庫中的,默認限制50000000(也就是最多隻讓它使用50M空間啦,不夠能夠上調,也支持空閒空間百分比的配置)。要是超標了,它就罷工了……
    • vm_memory_high_watermark:內存使用,默認0.4(最多讓它使用40%的內存,超標罷工)

(注:若啓動失敗了,能夠在啓動日誌中查看到具體的錯誤信息。)

  • 命令:
$RABBITMQ_HOME/sbin/rabbitmq-server:啓動腳本,會打印出配置文件,插件,集羣等信息;加上-detached爲後臺啓動; /sbin/rabbitmqctl status:查看啓動狀態 /sbin/rabbitmqctl add_user admin admin:添加新用戶admin,密碼admin;默認只有一個guest用戶,但只限本機訪問。 /sbin/rabbitmqctl set_user_tags admin administrator:將admin設置爲管理員權限 /sbin/rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 賦予admin全部權限 /sbin/rabbitmqctl stop:關閉

集羣

集羣節點共享全部的狀態和數據,如:用戶、路由、綁定等信息(隊列有點特殊,雖然從全部節點均可達,可是隻存在於第一次聲明它的那個節點上,解決方案——詳見上文:消息隊列的高可用);每一個節點均可以接收鏈接,處理數據。

集羣節點有兩種,disc:默認,信息存在本地數據庫;ram:加入集羣時,添加–ram參數,信息存在內存,可提升性能。

  • 配置:(通常的,用默認的便可。)
  • $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf: 
    • RABBITMQ_USE_LONGNAME:默認false,(默Rene的,RABBITMQ_NODENAME中@後面的\$HOSTNAME是主機名,因此須要集羣中每一個節點的hosts文件包含其餘節點主機名到地址的映射。可是若是設置爲true,就能夠定義RABBITMQ_NODENAME中的$HOSTNAME爲域名了)
    • RABBITMQ_DIST_PORT:集羣端口號,默認RABBITMQ_NODE_PORT + 20000
  • $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config: 
    • cluster_nodes:設置後,在啓動時會嘗試自動鏈接加入的節點並組成集羣。
    • cluster_partition_handling:【詳見上文:網絡分區的處理】

更多詳細的配置見:配置

  • 命令:
rabbitmqctl stop_app rabbitmqctl join_cluster [--ram] nodename@hostname:將當前節點加入到集羣中;默認是以disc節點加入集羣,加上--ram爲ram節點。 rabbitmqctl start_app rabbitmqctl cluster_status:查看集羣狀態

(注:若是加入集羣失敗,可先查看)

  • 每一個節點的$HOME/.erlang.cookie內容一致;
  • 若是hostname是主機名,那麼此hostname和地址的映射須要加入hosts文件中;
  • 若是使用的是域名,那麼須要設置RABBITMQ_USE_LONGNAME爲true。 
    (注:docker版集羣的見:rabbitmq-server-cluster)

高級

AMQP協議簡介

RabbitMQ原生支持AMQP 0-9-1並擴展實現了了一些經常使用的功能:AMQP 0-9-1

包含三層:

  • 模型層: 最高層,提供了客戶端調用的命令,如:queue.declare,basic.ack,consume等。
  • 會話層:將命令從客戶端傳遞給服務器,再將服務器的應答傳遞給客戶端,會話層爲這個傳遞過程提供可靠性、同步機制和錯誤處理。
  • 傳輸層:主要傳輸二進制數據流,提供幀的處理、信道複用、錯誤檢測和數據表示。

圖片描述

注:其餘協議的支持見:RabbitMQ支持的協議

經常使用插件

管理界面(神器)

啓動後,執行rabbitmq-plugins enable rabbitmq_management→訪問http://localhost:15672→查看節點狀態,隊列信息等等,甚至能夠動態配置消息隊列的主備策略,以下圖:

圖片描述

Federation

啓用Federation插件,使得不一樣集羣的節點之間能夠傳遞消息,從而模擬出相似集羣的效果。這樣能夠有幾點好處:

  • 鬆耦合:聯合在一塊兒的不一樣集羣能夠有各自的用戶,權限等信息,無需一致;此外,這些集羣的RabbitMQ和Erlang的版本能夠不一致。
  • 遠程網絡鏈接友好:因爲通訊是遵循AMQP協議的,故而對斷斷續續的網絡鏈接容忍度高。
  • 自定義:能夠自主選擇哪些組件啓用federation。

幾個概念:

  • Upstreams:定義上游節點信息,以下:
rabbitmqctl set_parameter federation-upstream my-upstream '{"uri":"amqp://server-name","expires":3600000}' 定義一個my-upstream
  • uri是其上游節點的地址,多個upstream的節點無需在同一集羣中。
  • expires表示斷開鏈接3600000ms後其上游節點會緩存消息。
  • Upstream sets:多個Upstream的集合;默認有個all,會將全部的Upstream加進去。
  • Policies:定義哪些exchanges,queues關聯到哪一個Upstream或者Upstream set,如:
rabbitmqctl set_policy --apply-to exchanges federate-me "^amq\." '{"federation-upstream-set":"all"}'
  • 將此節點全部以amq.開頭的exchange聯合到上游節點的同名exchange。 
    注: 
    • 因爲下游節點的exchange能夠繼續做爲其餘節點的上游,故可設置成循環,廣播等形式。
    • 經過max_hops參數控制傳遞層數。
    • 模擬集羣,能夠將多個節點兩兩互連,並設置max_hops=1。

圖片描述

rabbitmq-plugins enable rabbitmq_federation

若是啓用了管理界面,能夠添加:

rabbitmq-plugins enable rabbitmq_federation_management

這樣就能夠在界面配置Upstream和Policy了。

注:若是在一個集羣中使用federation,須要該集羣每一個節點都啓用Federation插件; 
注:更多插件請見:插件

相關文章
相關標籤/搜索