RabbitMQ 是一個由 erlang 開發的 AMQP(Advanced Message Queuing Protocol)的開源實現。編程
AMQP:高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。 AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。 RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。安全
RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:服務器
# 可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。 # 靈活的路由(Flexible Routing) 在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。 # 消息集羣(Clustering) 多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。 # 高可用(Highly Available Queues) 隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。 # 多種協議(Multi-protocol) RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。 # 多語言客戶端(Many Clients) RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。 # 管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。 # 跟蹤機制(Tracing) 若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。 # 插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。
3、RabbitMQ 基本概念網絡
# Message 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。 # Publisher 消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。 # Exchange 交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。 # Routing Key 路由關鍵字,exchange根據這個關鍵字進行消息投遞。 # Binding 綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。 # Queue 消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。 # Connection 網絡鏈接,好比一個TCP鏈接。 # Channel 信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。 # Consumer 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。 # Virtual Host 虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。 # Broker
Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:併發
消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。異步
每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。socket
topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「」。#匹配0個或多個單詞,匹配很少很多一個單詞。分佈式
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。高併發
RabbbitMQ的分發機制很是適合擴展,並且它是專門爲併發程序設計的,若是如今load加劇,那麼只須要建立更多的Consumer來進行任務處理工具
在實際應用中,可能會發生消費者收到 Queue 中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給 RabbitMQ,RabbitMQ 收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除;若是 RabbitMQ 沒有收到回執並檢測到消費者的 RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。這裏不存在timeout概念,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。 這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多;消費者重啓後會重複消費這些消息並重復執行業務邏輯…
另外pub message是沒有ack的。
若是咱們但願即便在RabbitMQ服務重啓的狀況下,也不會丟失消息,咱們能夠將Queue與Message都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是咱們須要對這種小几率事件也要管理起來,那麼咱們要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此這裏將不講解RabbitMQ相關的事務。
要持久化隊列queue的持久化須要在聲明時指定durable=True;
這裏要注意,隊列的名字必定要是Broker中不存在的,否則不能改變此隊列的任何屬性.
隊列和交換機有一個建立時候指定的標誌durable,durable的惟一含義就是具備這個標誌的隊列和交換機會在重啓以後從新創建,它不表示說在隊列中的消息會在重啓後恢復
消息持久化包括3部分
# 1.exchange持久化,在聲明時指定durable => true hannel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);//聲明消息隊列,且爲可持久化的 # 2.queue持久化,在聲明時指定durable => true channel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);//聲明消息隊列,且爲可持久化的 # 3.消息持久化,在投遞時指定delivery_mode => 2(1是非持久化). channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
若是 exchange 和 queue 都是持久化的,那麼它們之間的binding 也是持久化的,若是 exchange 和 queue 二者之間有一個持久化,一個非持久化,則不容許創建綁定.
注意:一旦建立了隊列和交換機,就不能修改其標誌了,例如,建立了一個non-durable的隊列,而後想把它改變成durable的,惟一的辦法就是刪除這個隊列而後重現建立。
關於持久化的進一步討論:
爲了數據不丟失,咱們採用了:
在數據處理結束後發送ack,這樣RabbitMQ Server會認爲Message Deliver 成功。
持久化queue,能夠防止RabbitMQ Server 重啓或者crash引發的數據丟失。
持久化Message,理由同上。
可是這樣能保證數據100%不丟失嗎?答案是否認的。問題就在與RabbitMQ 須要時間去把這些信息存到磁盤上,這個time window 雖然短,可是它的確仍是有。在這個時間窗口內若是數據沒有保存,數據還會丟失。還有另外一個緣由就是 RabbitMQ 並非爲每一個 Message 都作 fsync:它可能僅僅是把它保存到Cache 裏,還沒來得及保存到物理磁盤上。所以這個持久化仍是有問題。可是對於大多數應用來講,這已經足夠了。固然爲了保持一致性,你能夠把每次的publish放到一個transaction中。這個transaction的實現須要user defined codes。那麼商業系統會作什麼呢?一種可能的方案是在系統異常重啓時或者斷電時,應該給各個應用留出時間去flash cache,保證每一個應用都能 exit gracefully。
你可能也注意到了,分發機制不是那麼優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。n是取餘後的,它無論Consumer是否還有unacked Message,只是按照這個默認的機制進行分發.
那麼若是有個Consumer工做比較重,那麼就會致使有的Consumer基本沒事可作,有的Consumer卻毫無休息的機會,那麼,Rabbit是如何處理這種問題呢?
前面咱們講到若是有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時若是每一個消息的處理時間不一樣,就有可能會致使某些消費者一直在忙,而另一些消費者很快就處理完手頭工做並一直空閒的狀況。咱們能夠經過設置prefetchCount來限制Queue每次發送給每一個消費者的消息數,好比咱們設置prefetchCount=1,則Queue每次給每一個消費者發送一條消息;消費者處理完這條消息後Queue會再給該消費者發送一條消息。
經過basic.qos方法設置prefetch_count=1,這樣RabbitMQ就會使得每一個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它
channel.basic_qos(prefetch_count=1)
注意,這種方法可能會致使queue滿。固然,這種狀況下你可能須要添加更多的Consumer,或者建立更多的virtual Host來細化你的設計。
RabbitMQ使用ProtoBuf序列化消息,它可做爲RabbitMQ的Message的數據格式進行傳輸,因爲是結構化的數據,這樣就極大的方便了Consumer的數據高效處理,固然也可使用XML,與XML相比, ProtoBuf有如下優點:
1.簡單
2.size小了3-10倍
3.速度快了20-100倍
4.易於編程
6.減小了語義的歧義.
,ProtoBuf具備速度和空間的優點,使得它如今應用很是普遍
MQ 自己是基於異步的消息處理,前面的示例中全部的生產者(P)將消息發送到 RabbitMQ 後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,咱們極可能須要一些同步處理,須要同步等待服務端將個人消息處理完成後再進行下一步處理。這至關於RPC(Remote Procedure Call,遠程過程調用)。
RabbitMQ 中也支持 RPC,RabbitMQ 中實現 RPC 的機制是:
客戶端發送請求(消息)時,在消息的屬性(MessageProperties ,在 AMQP 協議中定義了14種 properties ,這些屬性會隨着消息一塊兒發送)中設置兩個值 replyTo (一個 Queue 名稱,用於告訴服務器處理完成後將通知個人消息發送到這個 Queue 中)和 correlationId (這次請求的標識號,服務器處理完成後須要將此屬性返還,客戶端將根據這個id瞭解哪條請求被成功執行了或執行失敗)
服務器端收到消息並處理,處理完消息後,將生成一條應答消息到replyTo 指定的 Queue ,同時帶上 correlationId 屬性
客戶端以前已訂閱 replyTo 指定的 Queue ,從中收到服務器的應答消息後,根據其中的correlationId 屬性分析哪條請求被執行了,根據執行結果進行後續業務處理
按照目前網絡上的資料,RabbitMQ 、activeM 、ZeroMQ 三者中,綜合來看,RabbitMQ 是首選。
ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指咱們機器在不可抗力因素等狀況下掛掉了,消息不會丟失的機制。
可靠性、靈活的路由、集羣、事務、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。固然ZeroMq 也能夠作到,不過本身必須手動寫代碼實現,代碼量不小。尤爲是可靠性中的:持久性、投遞確認、發佈者證明和高可用性。
毋庸置疑,RabbitMQ 最高,緣由是它的實現語言是天生具有高併發高可用的erlang 語言。
RabbitMq 比 Kafka 成熟,在可用性上,穩定性上,可靠性上,RabbitMq 勝於 Kafka(理論上)。另外,Kafka 的定位主要在日誌等方面, 由於Kafka 設計的初衷就是處理日誌的,能夠看作是一個日誌(消息)系統一個重要組件,針對性很強,因此 若是業務方面仍是建議選擇 RabbitMq 。還有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出來不少。選型最後總結:若是咱們系統中已經有選擇 Kafka ,或者 RabbitMq ,而且徹底能夠知足如今的業務,建議就不用重複去增長和造輪子。能夠在 Kafka 和 RabbitMq 中選擇一個適合本身團隊和業務的,這個纔是最重要的。可是毋庸置疑現階段,綜合考慮沒有第三選擇。