1、消息隊列(MQ)概述javascript
消息隊列(Message Queue),是分佈式系統中重要的組件,其通用的使用場景能夠簡單地描述爲:php
當不須要當即得到結果,可是併發量又須要進行控制的時候,差很少就是須要使用消息隊列的時候。html
消息隊列主要解決了應用耦合、異步處理、流量削鋒等問題。前端
當前使用較多的消息隊列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分數據庫如Redis、Mysql以及phxsql也可實現消息隊列的功能。java
2、消息隊列使用場景node
消息隊列在實際應用中包括以下四個場景:python
應用耦合:多應用間經過消息隊列對同一消息進行處理,避免調用接口失敗致使整個過程失敗;c++
異步處理:多應用對消息隊列中同一消息進行處理,應用間併發處理消息,相比串行處理,減小處理時間;git
限流削峯:普遍應用於秒殺或搶購活動中,避免流量過大致使應用系統掛掉的狀況;github
消息驅動的系統:系統分爲消息隊列、消息生產者、消息消費者,生產者負責產生消息,消費者(可能有多個)負責對消息進行處理;
下面詳細介紹上述四個場景以及消息隊列如何在上述四個場景中使用:
2.1 異步處理
具體場景:用戶爲了使用某個應用,進行註冊,系統須要發送註冊郵件並驗證短信。對這兩個操做的處理方式有兩種:串行及並行。
(1)串行方式:新註冊信息生成後,先發送註冊郵件,再發送驗證短信;
在這種方式下,須要最終發送驗證短信後再返回給客戶端。
(2)並行處理:新註冊信息寫入後,由發短信和發郵件並行處理;
在這種方式下,發短信和發郵件 需處理完成後再返回給客戶端。
假設以上三個子系統處理的時間均爲50ms,且不考慮網絡延遲,則總的處理時間:
串行:50+50+50=150ms
並行:50+50 = 100ms
若使用消息隊列:
並在寫入消息隊列後當即返回成功給客戶端,則總的響應時間依賴於寫入消息隊列的時間,而寫入消息隊列的時間自己是能夠很快的,基本能夠忽略不計,所以總的處理時間相比串行提升了2倍,相比並行提升了一倍;
2.2 應用耦合
具體場景:用戶使用QQ相冊上傳一張圖片,人臉識別系統會對該圖片進行人臉識別,通常的作法是,服務器接收到圖片後,圖片上傳系統當即調用人臉識別系統,調用完成後再返回成功,以下圖所示:
該方法有以下缺點:
人臉識別系統被調失敗,致使圖片上傳失敗;
延遲高,須要人臉識別系統處理完成後,再返回給客戶端,即便用戶並不須要當即知道結果;
圖片上傳系統與人臉識別系統之間互相調用,須要作耦合;
若使用消息隊列:
客戶端上傳圖片後,圖片上傳系統將圖片信息如uin、批次寫入消息隊列,直接返回成功;而人臉識別系統則定時從消息隊列中取數據,完成對新增圖片的識別。
此時圖片上傳系統並不須要關心人臉識別系統是否對這些圖片信息的處理、以及什麼時候對這些圖片信息進行處理。事實上,因爲用戶並不須要當即知道人臉識別結果,人臉識別系統能夠選擇不一樣的調度策略,按照閒時、忙時、正常時間,對隊列中的圖片信息進行處理。
2.3 限流削峯
具體場景:購物網站開展秒殺活動,通常因爲瞬時訪問量過大,服務器接收過大,會致使流量暴增,相關係統沒法處理請求甚至崩潰。而加入消息隊列後,系統能夠從消息隊列中取數據,至關於消息隊列作了一次緩衝。
該方法有以下優勢:
請求先入消息隊列,而不是由業務處理系統直接處理,作了一次緩衝,極大地減小了業務處理系統的壓力;
隊列長度能夠作限制,事實上,秒殺時,後入隊列的用戶沒法秒殺到商品,這些請求能夠直接被拋棄,返回活動已結束或商品已售完信息;
2.4 消息驅動的系統
具體場景:用戶新上傳了一批照片, 人臉識別系統須要對這個用戶的全部照片進行聚類,聚類完成後由對帳系統從新生成用戶的人臉索引(加快查詢)。這三個子系統間由消息隊列鏈接起來,前一個階段的處理結果放入隊列中,後一個階段從隊列中獲取消息繼續處理。
該方法有以下優勢:
避免了直接調用下一個系統致使當前系統失敗;
每一個子系統對於消息的處理方式能夠更爲靈活,能夠選擇收到消息時就處理,能夠選擇定時處理,也能夠劃分時間段按不一樣處理速度處理;
3、消息隊列的兩種模式
消息隊列包括兩種模式,點對點模式(point to point, queue)和發佈/訂閱模式(publish/subscribe,topic)。
3.1 點對點模式
點對點模式下包括三個角色:
消息隊列
發送者 (生產者)
接收者(消費者)
消息發送者生產消息發送到queue中,而後消息接收者從queue中取出而且消費消息。消息被消費之後,queue中再也不有存儲,因此消息接收者不可能消費到已經被消費的消息。
點對點模式特色:
每一個消息只有一個接收者(Consumer)(即一旦被消費,消息就再也不在消息隊列中);
發送者和接收者間沒有依賴性,發送者發送消息以後,無論有沒有接收者在運行,都不會影響到發送者下次發送消息;
接收者在成功接收消息以後需向隊列應答成功,以便消息隊列刪除當前接收的消息;
3.2 發佈/訂閱模式
發佈/訂閱模式下包括三個角色:
角色主題(Topic)
發佈者(Publisher)
訂閱者(Subscriber)
發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
發佈/訂閱模式特色:
每一個消息能夠有多個訂閱者;
發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者以後,才能消費發佈者的消息。
爲了消費消息,訂閱者須要提早訂閱該角色主題,並保持在線運行;
4、經常使用消息隊列介紹
本部分主要介紹四種經常使用的消息隊列(RabbitMQ/ActiveMQ/RocketMQ/Kafka)的主要特性、優勢、缺點。
4.1 RabbitMQ
RabbitMQ 2007年發佈,是一個在AMQP(高級消息隊列協議)基礎上完成的,可複用的企業消息系統,是當前最主流的消息中間件之一。
主要特性:
可靠性: 提供了多種技術可讓你在性能和可靠性之間進行權衡。這些技術包括持久性機制、投遞確認、發佈者證明和高可用性機制;
靈活的路由: 消息在到達隊列前是經過交換機進行路由的。RabbitMQ爲典型的路由邏輯提供了多種內置交換機類型。若是你有更復雜的路由需求,能夠將這些交換機組合起來使用,你甚至能夠實現本身的交換機類型,而且當作RabbitMQ的插件來使用;
消息集羣:在相同局域網中的多個RabbitMQ服務器能夠聚合在一塊兒,做爲一個獨立的邏輯代理來使用;
隊列高可用:隊列能夠在集羣中的機器上進行鏡像,以確保在硬件問題下還保證消息安全;
多種協議的支持:支持多種消息隊列協議;
服務器端用Erlang語言編寫,支持只要是你能想到的全部編程語言;
管理界面: RabbitMQ有一個易用的用戶界面,使得用戶能夠監控和管理消息Broker的許多方面;
跟蹤機制:若是消息異常,RabbitMQ提供消息跟蹤機制,使用者能夠找出發生了什麼;
插件機制:提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件;
使用RabbitMQ須要:
ErLang語言包
RabbitMQ安裝包
RabbitMQ能夠運行在Erlang語言所支持的平臺之上:
Solaris
BSD
Linux
MacOSX
TRU64
Windows NT/2000/XP/Vista/Windows 7/Windows 8
Windows Server 2003/2008/2012
Windows 95, 98
VxWorks
優勢:
因爲erlang語言的特性,mq 性能較好,高併發;
健壯、穩定、易用、跨平臺、支持多種語言、文檔齊全;
有消息確認機制和持久化機制,可靠性高;
高度可定製的路由;
管理界面較豐富,在互聯網公司也有較大規模的應用;
社區活躍度高;
缺點:
儘管結合erlang語言自己的併發優點,性能較好,可是不利於作二次開發和維護;
實現了代理架構,意味着消息在發送到客戶端以前能夠在中央節點上排隊。此特性使得RabbitMQ易於使用和部署,可是使得其運行速度較慢,由於中央節點增長了延遲,消息封裝後也比較大;
須要學習比較複雜的接口和協議,學習和維護成本較高;
4.2 ActiveMQ
ActiveMQ是由Apache出品,ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。它很是快速,支持多種語言的客戶端和協議,並且能夠很是容易的嵌入到企業的應用環境中,並有許多高級功能。
主要特性:
服從 JMS 規範:JMS 規範提供了良好的標準和保證,包括:同步或異步的消息分發,一次和僅一次的消息分發,消息接收和訂閱等等。聽從 JMS 規範的好處在於,不論使用什麼 JMS 實現提供者,這些基礎特性都是可用的;
鏈接性:ActiveMQ 提供了普遍的鏈接選項,支持的協議有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。對衆多協議的支持讓 ActiveMQ 擁有了很好的靈活性。
支持的協議種類多:OpenWire、STOMP、REST、XMPP、AMQP ;
持久化插件和安全插件:ActiveMQ 提供了多種持久化選擇。並且,ActiveMQ 的安全性也能夠徹底依據用戶需求進行自定義鑑權和受權;
支持的客戶端語言種類多:除了 Java 以外,還有:C/C++,.NET,Perl,PHP,Python,Ruby;
代理集羣:多個 ActiveMQ 代理能夠組成一個集羣來提供服務;
異常簡單的管理:ActiveMQ 是以開發者思惟被設計的。因此,它並不須要專門的管理員,由於它提供了簡單又使用的管理特性。有不少中方法能夠監控 ActiveMQ 不一樣層面的數據,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,經過處理 JMX 的告警消息,經過使用命令行腳本,甚至能夠經過監控各類類型的日誌。
使用ActiveMQ須要:
Java JDK
ActiveMQ安裝包
ActiveMQ能夠運行在Java語言所支持的平臺之上。
優勢:
跨平臺(JAVA編寫與平臺無關有,ActiveMQ幾乎能夠運行在任何的JVM上)
能夠用JDBC:能夠將數據持久化到數據庫。雖然使用JDBC會下降ActiveMQ的性能,可是數據庫一直都是開發人員最熟悉的存儲介質。將消息存到數據庫,看得見摸得着。並且公司有專門的DBA去對數據庫進行調優,主從分離;
支持JMS :支持JMS的統一接口;
支持自動重連;
有安全機制:支持基於shiro,jaas等多種安全配置機制,能夠對Queue/Topic進行認證和受權。
監控完善:擁有完善的監控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;
界面友善:提供的Web Console能夠知足大部分狀況,還有不少第三方的組件可使用,如hawtio;
缺點:
社區活躍度不及RabbitMQ高;
根據其餘用戶反饋,會出莫名其妙的問題,會丟失消息;
目前重心放到activemq6.0產品-apollo,對5.x的維護較少;
不適合用於上千個隊列的應用場景;
4.3 RocketMQ
RocketMQ出自 阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並作出了本身的一些改進,消息可靠性上比 Kafka 更好。RocketMQ在阿里集團被普遍應用在訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等場景。
主要特性:
是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式特色;
Producer、Consumer、隊列均可以分佈式;
Producer向一些隊列輪流發送消息,隊列集合稱爲Topic,Consumer若是作廣播消費,則一個consumer實例消費這個Topic對應的全部隊列,若是作集羣消費,則多個Consumer實例平均消費這個topic對應的隊列集合;
可以保證嚴格的消息順序;
提供豐富的消息拉取模式;
高效的訂閱者水平擴展能力;
實時的消息訂閱機制;
億級消息堆積能力;
較少的依賴;
使用RocketMQ須要:
Java JDK
安裝git、Maven
RocketMQ安裝包
RocketMQ能夠運行在Java語言所支持的平臺之上。
優勢:
單機支持 1 萬以上持久化隊列
RocketMQ 的全部消息都是持久化的,先寫入系統 PAGECACHE,而後刷盤,能夠保證內存與磁盤都有一份數據,
訪問時,直接從內存讀取。
模型簡單,接口易用(JMS 的接口不少場合並不太實用);
性能很是好,能夠大量堆積消息在broker中;
支持多種消費,包括集羣消費、廣播消費等。
各個環節分佈式擴展設計,主從HA;
開發度較活躍,版本更新很快。
缺點:
支持的客戶端語言很少,目前是java及c++,其中c++不成熟;
RocketMQ社區關注度及成熟度也不及前二者;
沒有web管理界面,提供了一個CLI(命令行界面)管理工具帶來查詢、管理和診斷各類問題;
沒有在 mq 核心中去實現JMS等接口;
4.4 Kafka
Apache Kafka是一個分佈式消息發佈訂閱系統。它最初由LinkedIn公司基於獨特的設計實現爲一個分佈式的提交日誌系統( a distributed commit log),,以後成爲Apache項目的一部分。Kafka系統快速、可擴展而且可持久化。它的分區特性,可複製和可容錯都是其不錯的特性。
主要特性:
快速持久化,能夠在O(1)的系統開銷下進行消息持久化;
高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;
.徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;
支持同步和異步複製兩種HA;
支持數據批量發送和拉取;
zero-copy:減小IO操做步驟;
數據遷移、擴容對用戶透明;
無需停機便可擴展機器;
其餘特性:嚴格的消息順序、豐富的消息拉取模型、高效訂閱者水平擴展、實時的消息訂閱、億級的消息堆積能力、按期刪除機制;
使用Kafka須要:
Java JDK
Kafka安裝包
優勢:
客戶端語言豐富,支持java、.net、php、ruby、python、go等多種語言;
性能卓越,單機寫入TPS約在百萬條/秒,消息大小10個字節;
提供徹底分佈式架構, 並有replica機制, 擁有較高的可用性和可靠性, 理論上支持消息無限堆積;
支持批量操做;
消費者採用Pull方式獲取消息, 消息有序, 經過控制可以保證全部消息被消費且僅被消費一次;
有優秀的第三方Kafka Web管理界面Kafka-Manager;
在日誌領域比較成熟,被多家公司和多個開源項目使用;
缺點:
Kafka單機超過64個隊列/分區,Load會發生明顯的飆高現象,隊列越多,load越高,發送消息響應時間變長
使用短輪詢方式,實時性取決於輪詢間隔時間;
消費失敗不支持重試;
支持消息順序,可是一臺代理宕機後,就會產生消息亂序;
社區更新較慢;
4.5 RabbitMQ/ActiveMQ/RocketMQ/Kafka對比
這裏列舉了上述四種消息隊列的差別對比:
結論:
Kafka在於分佈式架構,RabbitMQ基於AMQP協議來實現,RocketMQ/思路來源於kafka,改爲了主從結構,在事務性可靠性方面作了優化。普遍來講,電商、金融等對事務性要求很高的,能夠考慮RabbitMQ和RocketMQ,對性能要求高的可考慮Kafka。
5、參考資料: 5.1 消息隊列:
大型網站架構之分佈式消息隊列 http://blog.csdn.net/shaobingj126/article/details/50585035
消息隊列的使用場景 https://www.zhihu.com/question/34243607/answer/127666030
淺談異步消息隊列模型 http://www.cnblogs.com/sunkeydev/p/5248855.html
消息隊列的兩種模式 http://blog.csdn.net/heyutao007/article/details/50131089
5.2 RabbitMQ
RabbitMQ主頁 https://www.rabbitmq.com/
RabbitMQ學習教程 https://www.rabbitmq.com/getstarted.html
專欄:RabbitMQ從入門到精通 http://blog.csdn.net/column/details/rabbitmq.html
RabbitMQ能爲你作些什麼 http://rabbitmq.mr-ping.com/deion.html
RabbitMQ指南(1)-特性及功能 https://blog.zenfery.cc/archives/79.html
5.3 ActiveMQ
ActiveMQ主頁 http://activemq.apache.org/
Apache ActiveMQ介紹 http://jfires.iteye.com/blog/1187688
ActiveMQ的簡介與安裝 http://blog.csdn.net/sl1992/article/details/72824562
ActiveMQ 和消息簡介 http://www.cnblogs.com/craftsman-gao/p/7002605.html
5.4 RocketMQ
主頁 https://github.com/alibaba/RocketMQ
RocketMQ 原理簡介 http://alibaba.github.io/RocketMQ-docs/document/design/RocketMQ_design.pdf
RocketMQ與kafka對比(18項差別) http://jm.taobao.org/2016/03/24/rmq-vs-kafka/
5.5 Kafka
1.Kafka主頁: http://kafka.apache.org/
Kafka特性 http://www.cnblogs.com/lsx1993/p/4847719.html
Kafka客戶端支持語言 https://cwiki.apache.org/confluence/display/KAFKA/Clients
5.6 RabbitMQ/ActiveMQ/RocketMQ/Kafka對比
RocketMQ,隊列選型 http://www.zmannotes.com/index.php/2016/01/17/rocketmq/
RabbitMQ和Kafka http://www.dongcoder.com/detail-416804.html
即時通訊RabbitMQ二-性能測試 http://www.jianshu.com/p/d31ae9e3bfb6
RabbitMq、ActiveMq、ZeroMq、kafka之間的比較,資料彙總 http://blog.csdn.net/linsongbin1/article/details/47781187
消息隊列軟件產品大比拼 http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html
總結:
消息隊列利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。目前業界有不少的MQ產品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用數據庫redis充當消息隊列的案例。而這些消息隊列產品,各有側重,在實際選型時,須要結合自身需求及MQ產品特徵,綜合考慮。
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現。
場景說明:用戶註冊後,須要發註冊郵件和註冊短信,傳統的作法有兩種1.串行的方式;2.並行的方式
(1)串行方式:將註冊信息寫入數據庫後,發送註冊郵件,再發送註冊短信,以上三個任務所有完成後才返回給客戶端。 這有一個問題是,郵件,短信並非必須的,它只是一個通知,而這種作法讓客戶端等待沒有必要等待的東西.
(2)並行方式:將註冊信息寫入數據庫後,發送郵件的同時,發送短信,以上三個任務完成後,返回給客戶端,並行的方式能提升處理的時間。
假設三個業務節點分別使用50ms,串行方式使用時間150ms,並行使用時間100ms。雖然並性已經提升的處理時間,可是,前面說過,郵件和短信對我正常的使用網站沒有任何影響,客戶端沒有必要等着其發送完成才顯示註冊成功,英愛是寫入數據庫後就返回.
(3)消息隊列
引入消息隊列後,把發送郵件,短信不是必須的業務邏輯異步處理
由此能夠看出,引入消息隊列後,用戶的響應時間就等於寫入數據庫的時間+寫入消息隊列的時間(能夠忽略不計),引入消息隊列後處理後,響應時間是串行的3倍,是並行的2倍。
場景:雙11是購物狂節,用戶下單後,訂單系統須要通知庫存系統,傳統的作法就是訂單系統調用庫存系統的接口.
這種作法有一個缺點:
訂單系統和庫存系統高耦合.
引入消息隊列
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
流量削峯通常在秒殺活動中應用普遍
場景:秒殺活動,通常會由於流量過大,致使應用掛掉,爲了解決這個問題,通常在應用前端加入消息隊列。
做用:
1.能夠控制活動人數,超過此必定閥值的訂單直接丟棄(我爲何秒殺一次都沒有成功過呢^^)
2.能夠緩解短期的高流量壓垮應用(應用程序按本身的最大處理能力獲取訂單)
1.用戶的請求,服務器收到以後,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接拋棄用戶請求或跳轉到錯誤頁面.
2.秒殺業務根據消息隊列中的請求信息,再作後續處理.
幾個概念說明:
Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
Queue:消息的載體,每一個消息都會被投到一個或多個隊列。
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來.
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠有多個vhost,用做不一樣用戶的權限分離。
Producer:消息生產者,就是投遞消息的程序.
Consumer:消息消費者,就是接受消息的程序.
Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel.
RabbbitMQ的分發機制很是適合擴展,並且它是專門爲併發程序設計的,若是如今load加劇,那麼只須要建立更多的Consumer來進行任務處理。
爲了保證數據不被丟失,RabbitMQ支持消息確認機制,爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack,而應該是在處理完數據以後發送ack.
在處理完數據以後發送ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ能夠安全的刪除它了.
若是Consumer退出了可是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer,這樣就保證在Consumer異常退出狀況下數據也不會丟失.
RabbitMQ它沒有用到超時機制.RabbitMQ僅僅經過Consumer的鏈接中斷來確認該Message並無正確處理,也就是說RabbitMQ給了Consumer足夠長的時間作數據處理。
若是忘記ack,那麼當Consumer退出時,Mesage會從新分發,而後RabbitMQ會佔用愈來愈多的內存.
要持久化隊列queue的持久化須要在聲明時指定durable=True;
這裏要注意,隊列的名字必定要是Broker中不存在的,否則不能改變此隊列的任何屬性.
隊列和交換機有一個建立時候指定的標誌durable,durable的惟一含義就是具備這個標誌的隊列和交換機會在重啓以後從新創建,它不表示說在隊列中的消息會在重啓後恢復
消息持久化包括3部分
1. exchange持久化,在聲明時指定durable => true
hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且爲可持久化的
2.queue持久化,在聲明時指定durable => true
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且爲可持久化的
3.消息持久化,在投遞時指定delivery_mode => 2(1是非持久化).
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
若是exchange和queue都是持久化的,那麼它們之間的binding也是持久化的,若是exchange和queue二者之間有一個持久化,一個非持久化,則不容許創建綁定.
注意:一旦建立了隊列和交換機,就不能修改其標誌了,例如,建立了一個non-durable的隊列,而後想把它改變成durable的,惟一的辦法就是刪除這個隊列而後重現建立。
你可能也注意到了,分發機制不是那麼優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。n是取餘後的,它無論Consumer是否還有unacked Message,只是按照這個默認的機制進行分發.
那麼若是有個Consumer工做比較重,那麼就會致使有的Consumer基本沒事可作,有的Consumer卻毫無休息的機會,那麼,Rabbit是如何處理這種問題呢?
經過basic.qos方法設置prefetch_count=1,這樣RabbitMQ就會使得每一個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它
channel.basic_qos(prefetch_count=1)
注意,這種方法可能會致使queue滿。固然,這種狀況下你可能須要添加更多的Consumer,或者建立更多的virtualHost來細化你的設計。
先來溫習如下交換機路由的幾種類型:
Direct Exchange:直接匹配,經過Exchange名稱+RountingKey來發送與接收消息.
Fanout Exchange:廣播訂閱,向全部的消費者發佈消息,可是隻有消費者將隊列綁定到該路由器才能收到消息,忽略Routing Key.
Topic Exchange:主題匹配訂閱,這裏的主題指的是RoutingKey,RoutingKey能夠採用通配符,如:*或#,RoutingKey命名採用.來分隔多個詞,只有消息這將隊列綁定到該路由器且指定RoutingKey符合匹配規則時才能收到消息;
Headers Exchange:消息頭訂閱,消息發佈前,爲消息定義一個或多個鍵值對的消息頭,而後消費者接收消息同時須要定義相似的鍵值對請求頭:(如:x-mactch=all或者x_match=any),只有請求頭與消息頭匹配,才能接收消息,忽略RoutingKey.
默認的exchange:若是用空字符串去聲明一個exchange,那麼系統就會使用」amq.direct」這個exchange,咱們建立一個queue時,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去
channel.BasicPublish("", "TaskQueue", properties, bytes);
由於在第一個參數選擇了默認的exchange,而咱們申明的隊列叫TaskQueue,因此默認的,它在新建一個也叫TaskQueue的routingKey,並綁定在默認的exchange上,致使了咱們能夠在第二個參數routingKey中寫TaskQueue,這樣它就會找到定義的同名的queue,並把消息放進去。
若是有兩個接收程序都是用了同一個的queue和相同的routingKey去綁定direct exchange的話,分發的行爲是負載均衡的,也就是說第一個是程序1收到,第二個是程序2收到,以此類推。
若是有兩個接收程序用了各自的queue,但使用相同的routingKey去綁定direct exchange的話,分發的行爲是複製的,也就是說每一個程序都會收到這個消息的副本。行爲至關於fanout類型的exchange。
下面詳細來講:
綁定其實就是關聯了exchange和queue,或者這麼說:queue對exchange的內容感興趣,exchange要把它的Message deliver到queue。
Driect exchange的路由算法很是簡單:經過bindingkey的徹底匹配,能夠用下圖來講明.
Exchange和兩個隊列綁定在一塊兒,Q1的bindingkey是orange,Q2的binding key是black和green.
當Producer publish key是orange時,exchange會把它放到Q1上,若是是black或green就會到Q2上,其他的Message被丟棄.
多個queue綁定同一個key也是能夠的,對於下圖的例子,Q1和Q2都綁定了black,對於routing key是black的Message,會被deliver到Q1和Q2,其他的Message都會被丟棄.
對於Message的routing_key是有限制的,不能使任意的。格式是以點號「.」分割的字符表。好比:」stock.usd.nyse」, 「nyse.vmw」, 「quick.orange.rabbit」。你能夠聽任意的key在routing_key中,固然最長不能超過255 bytes。
對於routing_key,有兩個特殊字符
#(hash)0個或多個單詞
Producer發送消息時須要設置routing_key,routing_key包含三個單詞和連個點號o,第一個key描述了celerity(靈巧),第二個是color(色彩),第三個是物種:
在這裏咱們建立了兩個綁定: Q1 的binding key 是」.orange.「; Q2 是 「..rabbit」 和 「lazy.#」:
RabbitMQ使用ProtoBuf序列化消息,它可做爲RabbitMQ的Message的數據格式進行傳輸,因爲是結構化的數據,這樣就極大的方便了Consumer的數據高效處理,固然也可使用XML,與XML相比,ProtoBuf有如下優點:
1.簡單
2.size小了3-10倍
3.速度快了20-100倍
4.易於編程
6.減小了語義的歧義.
,ProtoBuf具備速度和空間的優點,使得它如今應用很是普遍
什麼是MQ?
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取隊列中的消息。
RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。
一、隊列、生產者、消費者
隊列是RabbitMQ的內部對象,用於存儲消息。生產者(下圖中的P)生產消息並投遞到隊列中,消費者(下圖中的C)能夠從隊列中獲取消息並消費。
多個消費者能夠訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。
二、Exchange、Binding
剛纔咱們看到生產者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的狀況是,生產者將消息發送到Exchange(交換器,下圖中的X),再經過Binding將Exchange與Queue關聯起來。
三、Exchange Type、Bingding key、routing key
在綁定(Binding)Exchange與Queue的同時,通常會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding容許使用相同的binding key。
生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,生產者就能夠在發送消息給Exchange時,經過指定routing key來決定消息流向哪裏。
RabbitMQ經常使用的Exchange Type有三種:fanout、direct、topic。
fanout:把全部發送到該Exchange的消息投遞到全部與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key徹底匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。
附上一張RabbitMQ的結構圖:
最後來具體解析一下幾個問題:
一、能夠自動建立隊列,也能夠手動建立隊列,若是自動建立隊列,那麼是誰負責建立隊列呢?是生產者?仍是消費者?
若是隊列不存在,固然消費者不會收到任何的消息。可是若是隊列不存在,那麼生產者發送的消息就會丟失。因此,爲了數據不丟失,消費者和生產者均可以建立隊列。那麼若是建立一個已經存在的隊列呢?那麼不會有任何的影響。須要注意的是沒有任何的影響,也就是說第二次建立若是參數和第一次不同,那麼該操做雖然成功,可是隊列屬性並不會改變。
隊列對於負載均衡的處理是完美的。對於多個消費者來講,RabbitMQ使用輪詢的方式均衡的發送給不一樣的消費者。
二、RabbitMQ的消息確認機制
默認狀況下,若是消息已經被某個消費者正確的接收到了,那麼該消息就會被從隊列中移除。固然也可讓同一個消息發送到不少的消費者。
若是一個隊列沒有消費者,那麼,若是這個隊列有數據到達,那麼這個數據會被緩存,不會被丟棄。當有消費者時,這個數據會被當即發送到這個消費者,這個數據被消費者正確收到時,這個數據就被從隊列中刪除。
那麼什麼是正確收到呢?經過ack。每一個消息都要被acknowledged(確認,ack)。咱們能夠顯示的在程序中去ack,也能夠自動的ack。若是有數據沒有被ack,那麼:
RabbitMQ Server會把這個信息發送到下一個消費者。
若是這個app有bug,忘記了ack,那麼RabbitMQServer不會再發送數據給它,由於Server認爲這個消費者處理能力有限。
並且ack的機制能夠起到限流的做用(Benefitto throttling):在消費者處理完成數據後發送ack,甚至在額外的延時後發送ack,將有效的均衡消費者的負載。
關於消息隊列,從前年開始斷斷續續看了些資料,想寫好久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術選型,是時候把這塊的知識整理記錄一下了。
市面上的消息隊列產品有不少,好比老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,還有 ZeroMQ ,去年末阿里巴巴捐贈給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種,就我本身的使用經驗和興趣只打算談談 RabbitMQ、Kafka 和 ActiveMQ ,本文先講 RabbitMQ ,在此以前先看下消息隊列的相關概念。
消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。
消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。
從上面的描述中能夠看出消息隊列是一種應用間的異步協做機制,那何時須要使用 MQ 呢?
以常見的訂單系統爲例,用戶點擊【下單】按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發放紅包、發短信通知等。這種場景下就能夠用 MQ ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。
以上是用於業務解耦的狀況,其它常見場景包括最終一致性、廣播、錯峯流控等等。
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。
RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:
可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。
靈活的路由(Flexible Routing)
在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。
消息集羣(Clustering)
多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。
高可用(Highly Available Queues)
隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。
多種協議(Multi-protocol)
RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。
多語言客戶端(Many Clients)
RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。
跟蹤機制(Tracing)
若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。
插件機制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。
全部 MQ 產品從模型抽象上來講都是同樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念須要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,因此其內部實際上也是 AMQP 中的基本概念:
AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差異,AMQP 中增長了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。
Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:
direct
消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。
fanout
每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。
通常來講安裝 RabbitMQ 以前要安裝 Erlang ,能夠去Erlang官網下載。接着去RabbitMQ官網下載安裝包,以後解壓縮便可。根據操做系統不一樣官網提供了相應的安裝說明:Windows、Debian / Ubuntu、RPM-based Linux、Mac
若是是Mac 用戶,我的推薦使用 HomeBrew 來安裝,安裝前要先更新 brew:
brew update
接着安裝 rabbitmq 服務器:
brew install rabbitmq
這樣 RabbitMQ 就安裝好了,安裝過程當中會自動其所依賴的 Erlang 。
./sbin/rabbitmq-server
啓動正常的話會看到一些啓動過程信息和最後的 completed with 7 plugins,這也說明啓動的時候默認加載了7個插件。
./sbin/rabbitmq-server -detached
./sbin/rabbitmqctl status
該命令將輸出服務器的不少信息,好比 RabbitMQ 和 Erlang 的版本、OS 名稱、內存等等
./sbin/rabbitmqctl stop
它會和本地節點通訊並指示其乾淨的關閉,也能夠指定關閉不一樣的節點,包括遠程節點,只須要傳入參數 -n :
./sbin/rabbitmqctl -n rabbit@server.example.com stop
-n node 默認 node 名稱是 rabbit@server ,若是你的主機名是 server.example.com ,那麼 node 名稱就是 rabbit@server.example.com 。
./sbin/rabbitmqctl stop_app
這個命令在後面要講的集羣模式中將會頗有用。
./sbin/rabbitmqctl start_app
./sbin/rabbitmqctl reset
該命令將清除全部的隊列。
./sbin/rabbitmqctl list_queues
./sbin/rabbitmqctl list_exchanges
該命令還能夠附加參數,好比列出交換器的名稱、類型、是否持久化、是否自動刪除:
./sbin/rabbitmqctl list_exchanges name type durable auto_delete
./sbin/rabbitmqctl list_bindings
RabbitMQ 支持多種語言訪問,以 Java 爲例看下通常使用 RabbitMQ 的步驟。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //發佈消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } }
package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //綁定隊列,經過鍵 hola 將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消費消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } }
./sbin/rabbitmq-server
運行 Producer
接着運行 Producer ,發佈一條消息,在 Consumer 的控制檯能看到接收的消息:
RabbitMQ 最優秀的功能之一就是內建集羣,這個功能設計的目的是容許消費者和生產者在節點崩潰的狀況下繼續運行,以及經過添加更多的節點來線性擴展消息通訊吞吐量。RabbitMQ 內部利用 Erlang 提供的分佈式通訊框架 OTP 來知足上述需求,使客戶端在失去一個 RabbitMQ 節點鏈接的狀況下,仍是可以從新鏈接到集羣中的任何其餘節點繼續生產、消費消息。
RabbitMQ 會始終記錄如下四種類型的內部元數據:
在單一節點中,RabbitMQ 會將全部這些信息存儲在內存中,同時將標記爲可持久化的隊列、交換器、綁定存儲到硬盤上。存到硬盤上能夠確保隊列和交換器在節點重啓後可以重建。而在集羣模式下一樣也提供兩種選擇:存到硬盤上(獨立節點的默認設置),存在內存中。
若是在集羣中建立隊列,集羣只會在單個節點而不是全部節點上建立完整的隊列信息(元數據、狀態、內容)。結果是隻有隊列的全部者節點知道有關隊列的全部信息,所以當集羣節點崩潰時,該節點的隊列和綁定就消失了,而且任何匹配該隊列的綁定的新消息也丟失了。還好RabbitMQ 2.6.0以後提供了鏡像隊列以免集羣節點故障致使的隊列內容不可用。
RabbitMQ 集羣中能夠共享 user、vhost、exchange等,全部的數據和狀態都是必須在全部節點上覆制的,例外就是上面所說的消息隊列。RabbitMQ 節點能夠動態的加入到集羣中。
當在集羣中聲明隊列、交換器、綁定的時候,這些操做會直到全部集羣節點都成功提交元數據變動後才返回。集羣中有內存節點和磁盤節點兩種類型,內存節點雖然不寫入磁盤,可是它的執行比磁盤節點要好。內存節點能夠提供出色的性能,磁盤節點能保障配置信息在節點重啓後仍然可用,那集羣中如何平衡這二者呢?
RabbitMQ 只要求集羣中至少有一個磁盤節點,全部其餘節點能夠是內存節點,當節點加入火離開集羣時,它們必需要將該變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,恰好又是該節點崩潰了,那麼集羣能夠繼續路由消息,但不能建立隊列、建立交換器、建立綁定、添加用戶、更改權限、添加或刪除集羣節點。換句話說集羣中的惟一磁盤節點崩潰的話,集羣仍然能夠運行,但知道該節點恢復,不然沒法更改任何東西。
若是是在一臺機器上同時啓動多個 RabbitMQ 節點來組建集羣的話,只用上面介紹的方式啓動第2、第三個節點將會由於節點名稱和端口衝突致使啓動失敗。因此在每次調用 rabbitmq-server 命令前,設置環境變量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明確指定惟一的節點名稱和端口。下面的例子端口號從5672開始,每一個新啓動的節點都加1,節點也分別命名爲test_rabbit_一、test_rabbit_二、test_rabbit_3。
啓動第1個節點:
RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
啓動第2個節點:
RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
啓動第2個節點前建議將 RabbitMQ 默認激活的插件關掉,不然會存在使用了某個插件的端口號衝突,致使節點啓動不成功。
如今第2個節點和第1個節點都是獨立節點,它們並不知道其餘節點的存在。集羣中除第一個節點外後加入的節點須要獲取集羣中的元數據,因此要先中止 Erlang 節點上運行的 RabbitMQ 應用程序,並重置該節點元數據,再加入而且獲取集羣的元數據,最後從新啓動 RabbitMQ 應用程序。
中止第2個節點的應用程序:
./sbin/rabbitmqctl -n test_rabbit_2 stop_app
重置第2個節點元數據:
./sbin/rabbitmqctl -n test_rabbit_2 reset
第2節點加入第1個節點組成的集羣:
./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
啓動第2個節點的應用程序
./sbin/rabbitmqctl -n test_rabbit_2 start_app
第3個節點的配置過程和第2個節點相似:
RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached ./sbin/rabbitmqctl -n test_rabbit_3 stop_app ./sbin/rabbitmqctl -n test_rabbit_3 reset ./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost ./sbin/rabbitmqctl -n test_rabbit_3 start_app
中止某個指定的節點,好比中止第2個節點:
RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
查看節點3的集羣狀態:
./sbin/rabbitmqctl -n test_rabbit_3 cluster_status