I. 消息中間件特色:
1,異步處理模式
消息發送者能夠發送一個消息而無需等待響應,消息發送者將消息發送到一條虛擬的通道或隊列上,消息接收者則訂閱或監聽該通道,一條消息可能最終轉發給一個或多個消息接受者,這些接受者都無需對消息發送者作出同步迴應,整個過程是異步的,如:用戶註冊
2,應用程序和應用程序調用關係爲鬆耦合關係,發送者和接受者沒必要了解對方,只須要確認消息,發送者和接受者沒必要同時在線
好比在線交易系統爲了保證數據一致性,在支付系統處理完成後會把支付結果放到消息中間件裏通知訂單系統修改訂單支付狀態,兩個系統經過消息中間件解耦
消息傳遞服務模型,以下圖:
html
II. 消息中間件傳遞模型
1,點對點模型PTP
點對點模型用於消息生產者和消息消費者之間點對點的通訊,消息生產者將消息發送到由某個名字標示的特定消費者,此名字實際上對應消息服務中的一個隊列queue,在消息傳遞給消費者以前他被存儲在這個隊列中,隊列消息能夠放在內存中也能夠是持久的,以保障消息服務出現故障時仍然可以傳遞消息java
1.1 模型特性:
每一個消息只有一個消費者,發送者和消費者沒有時間依賴,接受者確認消息接受和處理成功,以下圖:
linux
2,發佈訂閱模型PUB/SUB
發佈者/訂閱者模型支持向一個特定的消息主題生產消息,0或多個訂閱者可能對接收來自特定消息主題的消息敢興趣,在這種模型下,發佈者和訂閱者彼此不知道對方,這種模式比如匿名公告,這種模式被歸納爲:多個消費者能夠得到消息,在發佈者和訂閱者之間存在時間依賴性。發佈者須要創建一個訂閱subscription,以便可以消費者訂閱。訂閱者必須保持持續的活動狀態用來接收消息,除非訂閱者創建了持久訂閱,在這種狀況下,在訂閱者未鏈接時發佈的消息在訂閱者從新鏈接時從新發布c++
2.1 發佈訂閱模型特性
1,每一個消息有多個訂閱者
2,客戶端只有訂閱後才能接受到消息
以下圖,當有消息更新後會同時更新訂閱者,而且一個消息會有多個訂閱者,以下圖:
3,持久訂閱和非持久訂閱
以下圖,若是此時持久訂閱出現故障,在故障後重啓則會從新發送一次消息,若是非持久訂閱則不會,消息則錯過,以下圖:
消費者訂閱後,中間件會投遞給消費者,消費者確認後在返回給中間件,以下圖:
1,發佈者和訂閱者是存在時間以來的,當接受者和發佈者只有創建訂閱關係才能收到消息。當訂閱關係創建後,消息就不會丟失,無論訂閱者是否都在線。
當訂閱者接受了消息,必須一直在線,當只有一個訂閱者時約等於點對點模式,如一個消息中間件分爲多個塊,以下圖:
git
III. 場景
異步處理web
1,用戶註冊
用戶註冊,成功會發生郵件或短信
當正常狀況下回發生成功,若是在期間發生網絡抖動或者服務意外中止,則可能須要消息超時機制,在或者重試機制,以下圖:
後端
2,日誌分析
使用案例,計算pv和用戶行爲分析,訂閱模式,以下圖:
緩存
3,數據複製
數據從源複製到多個目的地,通常要求保持數據一致性,順序或者保證因果序列。用於跨機房數據傳輸,搜索,離線數據計算等,以下圖:
安全
4,延遲消息發送和暫存
把消息中間件當成可靠的消息暫存地,定時進行消息投遞,好比模擬用戶秒殺等作習題壓測,以下圖:
bash
5,消息廣播
緩存數據同步更新,應用推送數據,好比更新本地緩存,以下圖:
IV. 消息中間件分類
消息中間件主要分爲push和 pull
push推消息模型,消息生產者將消息發送給消息傳遞服務,消息傳遞服務又將消息推送給消息消費者
pull拉取消息模型,消費者請求消息服務接收消息,消息生產者從消息中間件拉取消息
在通常場景中,消費者拉取更容易控制,若是是生產者推送給消費者的話,須要提供更多的處理,如負載均衡,後端訂閱者信息收集的集羣等,顯而易見,拉取則更容易控制範圍,可是,拉取一般可能一次拉取成千上百萬的數據,這時,數據存儲設備和帶寬也是必備考量的環節,#原文連接www#linu#xea#com/1506.html#,相似註冊用戶發送短信這種實時的信息則不適用於拉取,更偏向於推送,對好比下圖:
######################################################################################
metaq使用java語言編寫,能夠在多種平臺部署,客戶端支持c++和java,單臺服務可支持1w以上各消息隊列,事實上測試可能在4-6千之間,可橫向擴展,隊列持久化,隊列長度無限(取決於磁盤大小),可從隊列任何位置開始消費
一般在磁盤中的二進制文件id爲8位定長,一般狀況下還須要有int,4個字節,數據字節,此時若是有個數據爲11個字節,以下
在id,int date就看作一個消息體,以下圖:
生產者和服務器和消費者均可分佈式,消息存儲一般安裝順序寫,他的性能也很是高,吞吐量達,而且支持消息順序,客戶pull,隨機讀,批量拉數據,數據遷移,以及擴展而且對用戶同名,最後,消費狀態保持在客戶端
整體架構,以下圖:
topic:消息的主題,由用戶定義並在服務器端配置,producer發送消息到某個topic,consumer從某個topic下的消費消息
offset:消息在broker上的每一個分區都是組織成一個文件列表,消費者拉取數據須要指定數據在文件中的偏移量,這個偏移就是所謂的offset,offset是絕對偏移量,服務器會將offset轉化爲具體文件的相對偏移量
broker:meta的服務器或者書服務器,在消息中間件中一般稱爲broker
partition(分區):同一個topic下面還分爲多個分區,如meta-test這個topic咱們能夠分爲10個分區,分別有兩臺服務器提供,那麼可能每臺服務器提供5個分區,假設服務器id分別是0和1,則全部分區爲0-0,0-1,0-2,0-3,0-4,1-0,1-1,1-2,1-3,1-4
zk.zkEnable=true 是否註冊到zk,默認爲true
zk.zkConnect=localhost:2181 zk的服務器列表
zk.zkSessionTimeoutMs=30000 zk心跳超市,單位毫秒,默認30秒
zk.zkConnection TImeoutMs=30000 zk連接超時時間,單位毫秒,默認30秒
brokerld 服務器id,必須惟一,0-1024之間
serverprot 服務端口
hostName 默認將取本機IP,若是多機網卡須要指明
dataLogPath 日誌存放路徑,默認和datapath同樣
datapath 指定默認數據存儲路徑
daletepolicy=delete,168 數據刪除策略,默認超過7天則刪除,這裏是168是小時,10s表示秒,10m表示分鐘,10h表示小時,默認爲小時
deleteWhen 什麼時候執行刪除策略的cron表達式,默認是0 0 6,18 * * ?,也就是天天的遲早執行處理策略
deletewhen 刪除策略的執行時間,cron表達式
flushTxLogAtCommit=1 事務日誌的同步設置,0表示讓操做系統決定,1表示每次commit都同步,2表示每隔1秒同步一次,不過會影響事務性能,可根據須要的性能和可靠性直接權衡作出一個合理的選擇。一般設置爲2,表示每隔1秒刷一次,也就最多丟失一秒內的運行時事務,最安全的設置爲1,可是嚴重影響性能,而0的安全級別最低,安全級別上1>=2>0 而性能則是0>=2>1
unflushthreshold 每隔多少條消息作一次磁盤的sync,強制將更改的數據刷入磁盤,默認1000,也就是說在掉電的狀況下,最多容許丟失1000條消息,可設置爲0,強制每次寫如都sync,在設置0的狀況下,服務器會自動啓用group commit技術,將多個消息合併成一次sync來提升io性能,而group commit狀況下消息發送者的tps沒有收到太大影響,可是服務端的負責會上升不少
unflushinterval 間隔多少毫秒按期作一個磁盤的sync,默認是10秒,也就是說服務器掉電狀況下,最多丟失10秒內發送來的消息,不可設置爲小於或者等於0
metaq集羣:全部broker註冊到zookeeper,客戶端連接zookeeper並返回可用的broker列表,選擇一個broker併發送消息
RabbitMQ
在AMQP協議標準基礎上完整的,可複用的企業消息系統,遵循Mozilla Public License開源協議,採用Erlang實現的工業級的消息隊列MQ服務器
AMQP高級消息隊列協議,是一個一步消息傳遞所使用的應用層協議規範,做爲線路層協議,而不是API,例如JMS,AMQP客戶端可以無視消息的來源任意發送和接受信息。AMQP的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。所以,面向消息的中間件MOM系統,例如發佈/訂閱隊列,沒有做爲基本元素實現,反而經過發送簡化的AMQ實體,用戶被賦予 了構建這些實體的能力,這些實體也是規範的一部分,造成了在線路層協議頂端的一個層級,AMQP模型,這個模型贊成了消息模式,例如以前提到的發佈訂閱,隊列,事務以及流數據,而且添加了額外的特性,例如更易於擴展,基於內容的路由
RabbitMQ總體架構,以下圖:
I. 運行原理圖
RabbitMQ核心組件分別是exchange和queue,以下圖:
RabbitMQ術語
Server(broker) 接收客戶端連接,實現AMQP消息隊列和路由功能的進程
Virtual Host 一個虛擬概念,相似權限控制組,一個virtual host裏面可有多個exchange和queue,可是權限控制 的最小粒度是virtual host
exchange 接收生產者發送的消息,並根據binding規則將消息路由給服務器中的隊列,exchangeType決定了exchnage路由消息的行爲,如,在RabbitMQ中,exchangeType有Direct,fanout和topic三種,不一樣類型的exchange路由的行爲是不同的
message queue 消息隊列,用於存儲還未被消費者消費的消息
message 由header和body組成,header是由生產者添加的各類屬性的集合,包括message是否被持久化,由那個message queue接受,優先級是多少等,而body是真正須要傳輸的app數據
bindingkey 所謂綁定就是將一個特定的exchange和一個特定的queue綁定起來,綁定關鍵字成爲bindingkey
Exchange分類
直接式交換器類型
Direct Exchange 直接交互式處理路由鍵,須要將一個隊列綁定到交換機上,須要該消息與一個特定的路由鍵徹底匹配,這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵「dog",則只有被標記爲「dog"的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog,以下圖:
廣播式交換器類型
Fanout Exchange 廣播式路由鍵,只須要簡單的將隊列綁定到交換機上,一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上,很像子網廣播,每臺子網內的主機都得到了一份複製的消息,fanout交換機轉發消息是最快,以下圖:
主題式交換器類型
Topic exchange,主題式交換器,經過消息的路由關鍵字和綁定關鍵字的模式匹配,將消息路由到被綁定的隊列中,這種路由器類型能夠被用來支持經典的發佈,訂閱消息傳輸模型,使用主題名字空間做爲消息尋址模式,將消息傳遞給那些部分或者所有匹配主題模式的多個消費者,主題交換機類型的工資方式以下:綁定關鍵字用零個或多個標記構成,每個標記之間用"."字符分隔,綁定關鍵字必須用這種形式明確說明,並支持通配符「」匹配一個詞組,「#」零個或多個詞組,所以綁定關鍵字「 *。stock.#" 匹配路由關鍵字」usd.stock「和」eur.stock.db",可是不匹配「stock.nasdaq」,以下圖:
配置
配置文件存放:/etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODE_IP_ADDRESS:IP地址
RABBITMQ_NODE_PORT:端口號
RABBITMQ_CONFIG_FILE:默認路徑
ABBITMQ_LOG_BASE:日誌路徑
rabbitmq.config是標準的erlang配置文件,必須符合erlang配置標準,erlangtuple,結構爲{key,value},key爲atom類型,value爲一個term,其中關鍵參數爲
tcp_listerners設置rabimq監聽端口,默認5672
disk_free_limit磁盤低水位線,若磁盤容量低於指定值則中止接收數據
vm_memory_high_watermark,設置內存低水位線,則開啓流控機制,默認值是0.4,即內存總量的40%
安裝
[root@LinuxEA ~]# yum install epel-release
[root@LinuxEA ~]# yum install rabbitmq-server -y
[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins list
[ ] amqp_client 3.3.5
[ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap 3.3.5-gite309de4
[ ] mochiweb 2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0 3.3.5
[ ] rabbitmq_auth_backend_ldap 3.3.5
[ ] rabbitmq_auth_mechanism_ssl 3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation 3.3.5
[ ] rabbitmq_federation_management 3.3.5
[ ] rabbitmq_management 3.3.5
[ ] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_mqtt 3.3.5
[ ] rabbitmq_shovel 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
[ ] rabbitmq_stomp 3.3.5
[ ] rabbitmq_test 3.3.5
[ ] rabbitmq_tracing 3.3.5
[ ] rabbitmq_web_dispatch 3.3.5
[ ] rabbitmq_web_stomp 3.3.5
[ ] rabbitmq_web_stomp_examples 3.3.5
[ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
[ ] webmachine 1.10.3-rmq3.3.5-gite9359c7
[root@LinuxEA ~]#
[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
[root@LinuxEA ~]#
[root@LinuxEA ~]# systemctl restart rabbitmq-server
建立vhost
[root@LinuxEA ~]# rabbitmqctl add_vhost linuxea
Creating vhost "linuxea" ...
...done.
[root@LinuxEA ~]#
[root@LinuxEA ~]# cat LinuxEA
#!/bin/bash
###www.#linu#xea#.com
#The source address of this article:http://www.lin#uxea.com/1514.html
查看vhosts
[root@LinuxEA ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
linuxea
...done.
[root@LinuxEA ~]#
刪除vhosts:rabbitmqctl delete_vhosts vhostname
添加用戶名和密碼:
[root@LinuxEA ~]# rabbitmqctl add_user linuxea linuxea
Creating user "linuxea" ...
...done.
[root@LinuxEA ~]#
修改用戶名密碼:rabbitmqctl change_password username newpassword
受權讀寫權限:
[root@LinuxEA ~]# rabbitmqctl set_permissions -p linuxea linuxea ".*" ".*" ".*"
Setting permissions for user "linuxea" in vhost "linuxea" ...
...done.
[root@LinuxEA ~]#
查看:-p 便可
[root@LinuxEA ~]# rabbitmqctl list_queues -p linuxea
Listing queues ...
...done.
[root@LinuxEA ~]#