一發一存一消費,沒有最好的消息隊列中間件(簡稱消息中間件),只有最合適的消息中間件。
消息隊列經常使用的使用場景:php
非實時性
:當不須要當即得到結果,可是併發量又須要進行控制的時候,差很少就是須要使用消息隊列的時候。主要解決了應用耦合、異步處理、流量削鋒等問題。應用耦合
:多應用間經過消息隊列對同一消息進行處理,避免調用接口失敗致使整個過程失敗;(如:訂單->庫存)異步處理
:多應用對消息隊列中同一消息進行處理,應用間併發處理消息,相比串行處理,減小處理時間;(點對多場景,廣播場景(註冊發短信,發郵件)等等)限流削峯
:應用於秒殺或搶購活動中,避免流量過大致使應用系統掛掉的狀況;(根據服務承受度設置隊列大小,超過了就返回活動結束了,我們常常各大商城秒殺,內心尚未點B數嗎)減小壓力,避免服務掛掉。消息驅動的系統
:系統分爲消息隊列、消息生產者、消息消費者,生產者負責產生消息,消費者(可能有多個)負責對消息進行處理;(分工處理(各自對應相應的隊列),靈活應用(收到就處理/定時處理))消息隊列是異步RPC的主要手段之一java
兩種模式:python
點對點
:每一個消息只有一個消費者(Consumer),不可重複消費(一旦被消費,消息就再也不在消息隊列中)發佈/訂閱
:微信公衆號(Topic),大夥(訂閱者)訂閱關注以後,微信公衆號運營平臺(發佈者)發佈信息後,大夥微信就都收到信息了,這裏其實還分pull/push的。一個是主動推送,一個是被動拉取基於發佈/訂閱模式作擴展就是橫向擴展,多個隊列及消費分組訂閱(提升消費能力)
pull
:主動權在於消費方,優勢是按需消費(吃自助餐,能吃多少拿多少),並且服務端隊列堆積的消息處理也相對簡單(不用記錄狀態啊,狀態都消費端);缺點就是消息延遲(不知道啥時候去拉取更新),這時候有小夥伴會問,那爲啥不叫服務端通知一下呢(有句話叫不在其位不謀其政,服務端通知必然要記錄通知狀態和增長之間的通訊帶寬;固然也能夠根據實際狀況來選擇和push組合起來用(男女搭配幹活不累嘛)來提升消息的實時性)push
:主動權就在服務方了,優勢是實時性高,服務端能夠統一管理來進行負載,不過也容易致使慢消費(就得考慮消費方受不受得了,畢竟你說你瞭解,但也只有對方纔清楚你有多瞭解);缺點就是發送消息的狀態是集中式管理,壓力大啊(要分發消息還要記錄狀態還要作備份,又當爹來又當媽,你說累不累)對於順序消息,這種場景有限且成本過高的方式就得慎重考慮了,對那種全局有序但容許出現小偏差的場景(日誌推送),pull模式就很是適合了(因此說kafka爲啥經常使用於日誌處理、大數據等方面),要問爲何?本身去領悟
實際開發中消息中間件選型基於幾個方面:c++
功能
:這個就多了,優先級隊列、延遲隊列(劃分不一樣的延遲隊列來避免從新排序消耗性能,缺點嘛本身悟)、死信隊列(放沒有推送成功的)、消費模式(pull/push)、廣播消費、消息回溯(可追溯嘛,否則被賣了都不知道是誰)、消息堆積+持久化、消息追蹤(鏈路條,方便定位)、消息過濾(根據規則過濾啊,不一樣類別消息發送到不一樣topic)、多協議支持(通用性)、跨語言支持(流行程度)、流量控制(嘿嘿嘿,上面有)、消息順序性(還要再說一遍?)、安全機制(身份認證,權限認證(讀寫))、消息冪等性(承諾知道不,答應人家的事就必定要作到)、事務性消息(不想說)等性能
:通常是指其吞吐量(統一大小的消息體和不一樣大小的消息體生產和消耗能力),性能和功能不少時候是相悖的,魚和熊掌不可兼得。高可靠、高可用
:先說可靠,主要在於消息的持久化這一塊(消息只要寫入就必定會被消費,不會由於故障致使數據丟失(這個就很好測試出來了吧))。若是是從系統的角度來看就得從總體的維度去衡量了(不能單單隻靠消息中間件自己,要從生產端、服務端、消費端三個維度去保障)。運維
:一般有審覈評估啊、監控啊、報警提醒啊、容災啊、擴容啊、升級部署等等,一方面看中間件支撐的維度,一方面就看結合自動化運維的難易度社區力度及生態發展
:這個好理解吧,使用開源框架最開始基本上愉快的奔跑,但時不時的總會掉坑裏,能不能爬出來一方面看自身的實力,一方面就看社區的力度了成本
: 儘可能貼合團隊自身的技術棧體系,讓一個C棧的團隊去深挖zeroMQ總比scala編寫kafka要容易的多先貼一個圖(網上Q來的),一些功能支不支持主要取決於它使用的模式,看完上面詳細說明應該就比較清楚
git
先從比較有表明性的兩個MQ(rabbitMQ,kafka),功能對比(圖仍是Q來的)
github
應用方面:web
架構模型方面:ajax
AMQP協議
,RabbitMQ的broker由Exchange,Binding,queue組成,其中exchange和binding組成了消息的路由鍵;客戶端Producer經過鏈接channel和server進行通訊,Consumer從queue獲取消息進行消費(長鏈接,queue有消息會推送到consumer端,consumer循環從輸入流讀取數據)。rabbitMQ以broker爲中心;有消息的確認機制。吞吐量:redis
可用性方面:shell
集羣負載均衡方面:
基於erlang開發
是採用Erlang語言實現的AMQP協議的消息中間件,最初起源於金融系統,用於在分佈式系統中存儲轉發消息。RabbitMQ發展到今天,被愈來愈多的人承認,這和它在可靠性、可用性、擴展性、功能豐富等方面的卓越表現是分不開的。
優勢:
缺點:
基於java開發
是Apache出品的、採用Java語言編寫的徹底基於JMS1.1規範的面向消息的中間件,爲應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通訊。不過因爲歷史緣由包袱過重,目前市場份額沒有後面三種消息中間件多,其最新架構被命名爲Apollo,(京東的消息中間件就是基於activeMQ開發的)
優勢:
缺點:
基於C開發
號稱史上最快的消息隊列,基於C語言開發。ZeroMQ是一個消息處理隊列庫,可在多線程、多內核和主機之間彈性伸縮,雖然大多數時候咱們習慣將其納入消息隊列家族之中,可是其和前面的幾款有着本質的區別,ZeroMQ自己就不是一個消息隊列服務器,更像是一組底層網絡通信庫,對原有的Socket API上加上一層封裝而已。
優勢:
缺點:
基於java開發(阿里消息中間件)
是阿里開源的消息中間件,目前已經捐獻個Apache基金會,它是由Java語言開發的,具有高吞吐量、高可用性、適合大規模分佈式系統應用等特色,經歷過雙11的洗禮,實力不容小覷。
優勢:
缺點:
基於Scala和Java開發
起初是由LinkedIn公司採用Scala語言開發的一個分佈式、多分區、多副本且基於zookeeper協調的分佈式消息系統,現已捐獻給Apache基金會。它是一種高吞吐量的分佈式發佈訂閱消息系統,以可水平擴展和高吞吐率而被普遍使用。目前愈來愈多的開源分佈式處理系統如Cloudera、Apache Storm、Spark、Flink等都支持與Kafka集成。
優勢:
缺點:
Redis的PUB/SUB機制,即發佈-訂閱模式。利用的Redis的列表(lists)數據結構。比較好的使用模式是,生產者lpush消息,消費者brpop消息,並設定超時時間,能夠減小redis的壓力。只有在Redis宕機且數據沒有持久化的狀況下丟失數據,能夠根據業務經過AOF和縮短持久化間隔來保證很高的可靠性,並且也能夠經過多個client來提升消費速度。但相對於專業的消息隊列來講,該方案消息的狀態過於簡單(沒有狀態),且沒有ack機制,消息取出後消費失敗依賴於client記錄日誌或者從新push到隊列裏面。
redis不支持分組(這點很重要,在作負載均衡的時候劣勢就體現出來),不過能夠徹底當作一個輕量級的隊列使用,但redis他爹作了disque,能夠去試一試。
幾個重要概念:
使用過程:
單機
RabbitMQ 安裝須要依賴 Erlang 環境
因此先安裝erlang,再安裝rabbitMQ,中間確定會遇到不少問題,不一樣的環境問題不同,大多數是依賴包的問題,自行補齊就好了。
rabbitmq默認建立的用戶guest,密碼也是guest,這個用戶默認只能是本機訪問,從外部訪問須要添加上面的配置。建議刪除guest用戶
配置外部訪問
# rabbitmq.config文件默認是沒有的,這裏是直接新建
vi rabbitmq.config
# 添加下面內容
[{rabbit, [{loopback_users, []}]}].
複製代碼
刪除guest用戶
rabbitmqctl delete_user guest
複製代碼
添加用戶
rabbitmqctl add_user <username> <newpassword>
複製代碼
給新增用戶賦予超級管理員權限
rabbitmqctl set_user_tags <username> administrator
複製代碼
啓動服務(建議後臺模式運行)
service rabbitmq-server start &
複製代碼
開啓管理UI(建議後臺模式運行)
rabbitmq-plugins enable rabbitmq_management &
複製代碼
訪問:ip:15672,用新添加的用戶登錄
幾個重要概念:
單機
Kafka使用Zookeeper來維護集羣信息,因此須要先安裝zookeeper,而zookeeper是由java編寫的,因此須要先安裝jdk
下載地址:kafka.apache.org/downloads
# 下載解壓
mkdir kafka && cd kafka
wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.1
# 啓動zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 啓動kafka
bin/kafka-server-start.sh config/server.properties
# 建立Topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 參數描述:
# create: 建立Topic
# zookeeper:zookeeper集羣信息,多個用,分開
# replication-factor:複製因子
# partitions:分區信息
# topic:主題名
# 向topic發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> 隨便輸入
# 向topic獲取消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 就能看到前面輸入的消息了
複製代碼
Kafka監控工具
kafka沒有自帶的web ui,這裏使用KafkaOffsetMonitor, 程序一個jar包的形式運行,部署較爲方便。只有監控功能,使用起來也較爲安全。
KafkaOffsetMonitor託管在Github上,能夠經過Github下載。 下載地址:github.com/Morningstar…
# 下載好以後cd到KafkaOffsetMonitor所在目錄,能夠直接啓動,也能夠編寫shell腳原本啓動
java -cp KafkaOffsetMonitor-assembly-0.4.1-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb
--port 8089
--zk 127.0.0.1:2181
--refresh 5.minutes
--retain 1.day
複製代碼
編寫腳本啓動
mkdir kafka-monitor.sh
chmod +x kafka-monitor.sh
vi kafka-monitor.sh
# 把以前啓動的命令複製進來,如:
#! /bin/bash
java -Xms512M -Xmx512M -Xss1024K \
-cp KafkaOffsetMonitor-assembly-0.4.1-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk localhost:2181 \
--port 8089 \
--refresh 10.seconds \
--retain 5.days
# 保存,而後就能夠啓動腳本了
複製代碼
zk :zookeeper主機地址,若是有多個,用逗號隔開
port :應用程序端口(沒設置的話,日誌裏面會輸出隨機的端口號)
refresh :應用程序在數據庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數據庫文件名,默認爲offsetapp
github上詳細參數說明:
Topic:訂閱的主題
Partition:分區編號
Offest:表示該parition已經消費了多少條message
logSize:表示該partition已經寫了多少條message
Lag:表示有多少條message沒有被消費。
Owner:表示消費者
Created:該partition建立時間
Last Seen:消費狀態刷新最新時間。
複製代碼
注意
本機能訪問,但其它機器不能訪問應該就是防火牆的問題,開放對應端口,或者關閉防火牆
若是能訪問,可是沒有信息顯示出來,只顯示了黑色的屏,是由於ui頁面所依賴的ajax.googleapis.com等公共庫被牆了
解決辦法