消息中間件介紹
消息中間件的概念
消息中間件是在消息傳輸過程當中保存消息的容器。消息中間件在將消息從它的源中繼到它的目標時充當中間人的做用。隊列的主要做用是提供路由並保證消息的傳遞;若是發生消息接收者不可用,消息隊列會保留消息,直到能夠成功傳遞它爲止,固然,消息隊列保存消息也是有期限的。
類比理解記憶
使用生產者、消費者的關係進行類比
消息生產者 → 消息中間件 → 消息消費者
※ 消息是有時效性的,存在過時→消息租約的概念
消息中間件特色
1,採用異步處理模式
消息發送者能夠發送一個消息而無須等待響應。
消息發送者將消息發送到一條虛擬的通道(主題或隊列)上,消息接收者則訂閱或是監聽該通道。
一條消息可能最終轉發給一個或多個消息接收者,這些接收者都無須對消息發送者做出同步迴應。整個過程是異步的。
※ 好比用戶信息註冊,註冊完畢後過段時間發送郵件或短信。
2,應用程序和應用程序調用關係爲鬆耦合關係
發送者和接受者沒必要了解對方、只須要確認消息
發送者和接受者沒必要同時在線
※ 好比在線交易系統爲了保證數據的最終一致,在支付系統處理完成後會把支付結果放到消息中間件裏通知訂單系統修改訂單支付狀態。兩個系統經過消息中間件解耦。
消息傳遞服務模型
消息中間件的傳遞模型
1,點對點模型(PTP)
點對點模型用於消息生產者和消費者之間點到點的通訊。消息生產者將消息發送到由某個名字標識的特定消費者。這個名字實際上對應於消息服務中的一個隊列(Queue),在消息傳遞給消費者以前他被存儲在這個隊列中。隊列消息能夠放在內存中也能夠是持久的,以保證在消息服務出現故障時仍然可以傳遞消息。
特性:
1,每一個消息只有一個消費者
2,發送者和接收者沒有時間依賴
3,接受者確認消息接收和處理成功
模型簡圖:
2,發佈-訂閱模型(Pub/Sub)
發佈者/訂閱者模型支持向一個特定的消息主題生產消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發佈者和訂閱者彼此不知道對方。
這種模式比如是匿名公告板。這種模式被歸納爲:多個消費者能夠得到消息.在發佈者和訂閱者之間存在時間依賴性。發佈者須要創建一個訂閱(subscription),以便可以消費者訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者創建了持久的訂閱。在這種狀況下(持久訂閱),在訂閱者未鏈接時發佈的消息將在訂閱者從新鏈接時從新發布。
特性:
1,每一個消息能夠有多個訂閱者
2,發佈者和訂閱者有時間依賴,客戶端只有在訂閱以後才能收到消息
3,有持久訂閱和非持久訂閱之分
① 持久訂閱
訂閱關係創建後,消息就不會消失,無論訂閱者是否在線
② 非持久訂閱
訂閱者爲了接受消息,必須一直在線
當只有一個訂閱者的時候約等於點對點模型
模型簡圖:
互聯網消息中間件的應用場景案例
場景1:用戶註冊異步處理案例
網站用戶註冊,註冊成功後會過一下子發送郵件確認或者短信。
以下圖:
場景2:日誌分析使用案例
把日誌進行集中收集,用於計算pv、用戶行爲分析。
以下圖:
場景3:數據複製案例
1,將數據從源頭複製到多個目的地,通常是要求順序或者保證因果序列。
2,用戶跨機房數據傳輸、搜索、離線數據計算等。
以下圖:
場景4:延遲消息發送和暫存
1,把消息中間件當作可靠的消息暫存地
2,定時進行消息投遞,好比模擬用戶秒殺訪問,進行系統性能壓測。
eg: tcpcopy,accesslog等
以下圖:
場景5:消息廣播
1,緩存數據同步更新
2,往應用推送數據
eg:好比更新本地緩存
以下圖:
消息中間件分類
一、(push)推消息類型:
消息生產者將消息發送給消息傳遞服務,消息傳遞服務又將消息推給消費者。
二、(pull)拉消息類型:
消費者請求消息服務接收消息,消息生產者從消息中間件拉取該消息。
兩種類型的區別
推跟拉兩種類型的本質區別
誰保存消息的偏移量
push:生產者保存推送軌跡
pull:消費者保存pull狀態、拉取位置的偏移量
消息中間件實戰之MetaQ
介紹
METAQ是一款徹底的隊列模型消息中間件,服務器使用Java語言編寫,可在多種軟硬件平臺上部署。
客戶端支持Java、C++編程語言。
單臺服務器可支持1萬以上個消息隊列,經過擴容服務器,隊列數幾乎可任意橫向擴展。
每一個隊列都是持久化、長度無限(取決於磁盤空間大小)、而且可從隊列任意位置開始消費。
metaQ架構圖:
MetaQ特色
1,生產者、服務器和消費者均可以分佈式
2,消息存儲順序寫
3,性能極高、吞吐量大
4,支持消息順序
5,
客戶端pull,隨機讀,批量拉取數據
6,數據遷移、擴容對用戶透明
7,消費狀態保存在客戶端
metaq重要術語
一、Topic:
消息的主題,由用戶定義並在服務端配置。producer發送消息到某個topic下,consumer從某個topic下消費消息
二、Offset:
消息在broker上的每一個分區都是組織成一個文件列表,消費者拉取數據須要知道數據在文件中的偏移量,這個偏移量就是所謂offset。Offset是
絕對偏移量,服務器會將offset轉化爲具體文件的相對偏移量。
三、Broker:
就是meta的服務端或者說服務器,在消息中間件中也一般稱爲broker。
四、分區(partition):
同一個topic下面還分爲多個分區,如meta-test這個topic咱們能夠分爲10個分區,分別有兩臺服務器提供,那麼可能每臺服務器提供5個分區,假設服務器id分別爲0和1,則全部分區爲0-0、0-一、0-二、0-三、0-四、1-0、1-一、1-二、1-三、1-4。
metaq經常使用配置項
zk.zkEnable=true(是否註冊到zk,默認爲true)
zk.zkConnect=localhost:2181 (zk的服務器列表)
zk.zkSessionTimeoutMs=30000 (zk心跳超時,單位毫秒,默認30秒 )
zk.zkConnectionTimeoutMs=30000 (zk鏈接超時時間,單位毫秒,默認30秒)
brokerId:(服務器ID(必須是集羣內惟一,必須爲整型0-1024之間)
serverPort:(服務器端口)
hostName:(默認將取本機IP (多機網卡,須要指明))
dataLogPath:(日誌數據文件路徑,默認跟dataPath同樣)
dataPath: (於指定默認的數據存儲路徑 )
deletePolicy=delete,168 (數據刪除策略,默認超過7天即刪除,這裏的168是小時,10s表示10秒,10m表示10分鐘,10h表示10小時,默認爲小時)
deleteWhen: (什麼時候執行刪除策略的cron表達式,默認是0 0 6,18 * * ?,也就是天天的遲早6點執行處理策略。deleteWhen: 刪除策略的執行時間,cron表達式)
flushTxLogAtCommit=1 (事務日誌的同步設置,0表示讓操做系統決定,1表示每次commit都同步,2表示每隔1秒同步一次,此參數嚴重影響事務性能,可根據你須要的性能和可靠性之間權衡作出一個合理的選擇。一般建議設置爲2,表示每隔1秒刷盤一次,也就是最多丟失一秒內的運行時事務。這樣的可靠級別對大多數服務是足夠的。最安全的固然是設置爲1,可是將嚴重影響事務性能。而0的安全級別最低。安全級別上 1>=2>0,而性能則是0 >= 2 > 1。)
unflushThreshold: (每隔多少條消息作一次磁盤sync,強制將更改的數據刷入磁盤。默認爲1000。也就是說在掉電狀況下,最多容許丟失1000條消息。可設置爲0,強制每次寫入都sync。在設置爲0的狀況下,服務器會自動啓用
group commit技術,將多個消息合併成一次sync來提高IO性能。通過測試,group commit狀況下消息發送者的TPS沒有受到太大影響,可是服務端的負載會上升不少。)
unflushInterval: (間隔多少毫秒按期作一次磁盤sync,默認是10秒。也就是說在服務器掉電狀況下,最多丟失10秒內發送過來的消息。不可設置爲小於或者等於0)
MetaQ安裝說明
1,首先須要安裝配置zookeeper
2,安裝java運行環境
3,安裝MetaQ
metaq的集羣實現方法
step1,全部的broker註冊到zookeeper
step2,客戶端鏈接到zookeeper,爲客戶分配一個broker
消息中間件實戰之RabbitMQ
RabbitMQ介紹
rabbitMQ是一個在AMQP協議標準基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。採用 Erlang 實現的工業級的消息隊列(MQ)服務器。
AMQP介紹
AMQP(高級消息隊列協議) 是一個異步消息傳遞所使用的應用層協議規範,做爲線路層協議,而不是API(例如JMS),AMQP 客戶端可以無視消息的來源任意發送和接受信息。AMQP的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。所以,面向消息的中間件 (MOM)系統,例如發佈/訂閱隊列,沒有做爲基本元素實現。反而經過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一 部分,造成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如以前提到的發佈/訂閱,隊列,事務以及流數據,而且添加了額外的特性,例如更易於擴展,基於內容的路由。
rabbitMQ總體架構
![](http://static.javashuo.com/static/loading.gif)
rabbitMQ運行原理
![](http://static.javashuo.com/static/loading.gif)
rabbitMQ重要術語
1. Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。
2. Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host
3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有
direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。
4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。
5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。
6.BindingKey:所謂綁定就是將一個特定的Exchange 和一個特定的Queue綁定起來,綁定關鍵字成爲BindingKey。
Exchange分類--直接式交換器類型
Direct Exchange – 直接交互式處理路由鍵。
須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵(routing key) 「dog」,則只有被標記爲「dog」的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。
圖解:
Python代碼實現(待添加)
Exchange分類--廣播式交換器類型
Fanout Exchange – 廣播式路由鍵。
你只須要簡單的將隊列綁定到交換機上。
一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。
很像子網廣播,每臺子網內的主機都得到了一份複製的消息。Fanout交換機轉發消息是最快的
圖解:
Python代碼實現(待添加)
Exchange分類--主題式交換器類型
Topic Exchange – 主題式交換器,經過消息的路由關鍵字和綁定關鍵字的模式匹配,將消息路由到被綁定的隊列中。
這種路由器類型能夠被用來支持經典的發佈/訂閱消息傳輸模型——使用主題名字空間做爲消息尋址模式,將消息傳遞給那些部分或者所有匹配主題模式的多個消費者。
主題交換器類型的工做方式以下: 綁定關鍵字用零個或多個標記構成,每個標記之間用「.」字符分隔。
綁定關鍵字必須用這種形式明確說明,並支持通配符:
「*」匹配一個詞組,「#」零個或多個詞組。
所以綁定關鍵字「*.stock.#」匹配路由關鍵字「usd.stock」和「eur.stock.db」,可是不匹配「stock.nasdaq」
圖解:
Python代碼實現(待添加)
RabbitMQ安裝
step1,配置epel源java
step2,yum -y install rabbitmq-server
RabbitMQ經常使用配置
通常狀況下,RabbitMQ的默認配置就足夠了。若是但願特殊設置的話,有兩個途徑:
一個是環境變量的配置文件 rabbitmq-env.conf ;
一個是配置信息的配置文件 rabbitmq.config;
注意,這兩個文件默認是沒有的,若是須要必須本身建立。
一、rabbitmq-env.conf 這個文件的位置是肯定和不能改變的,位於:/etc/rabbitmq目錄下(這個目錄須要本身建立)。
RABBITMQ_NODE_IP_ADDRESS:指定ip地址
RABBITMQ_NODE_PORT:指定端口號,默認5672
RABBITMQ_CONFIG_FILE:配置文件的路徑,注意配置文件後綴必須是.config
ABBITMQ_LOG_BASE:日誌文件路徑
二、rabbitmq.config 這是一個標準的erlang配置文件。它必須符合erlang配置文件的標準。Erlang tuple,結構爲{Key,Value}, Key爲atom類型, Value爲一個term,其中幾個關鍵參數爲:
tcp_listerners設置rabbimq的監聽端口,默認爲[5672]。
disk_free_limit 磁盤低水位線,若磁盤容量低於指定值則中止接收數據。
vm_memory_high_watermark,設置內存低水位線,若低於該水位線,則開啓流控機制,默認值是0.4,即內存總量的40%。
rabbitmq.config配置例子
[
{rabbit, [
{tcp_listeners,[{"127.0.0.1",5672}]},
{ssl_listeners, [{"127.0.1.1",5671}]},
{ssl_options, [{cacertfile,"/usr/local/etc/rabbitmq/ssl/testca/cacert.pem"},
{certfile,"/usr/local/etc/rabbitmq/ssl/server/cert.pem"},
{keyfile,"/usr/local/etc/rabbitmq/ssl/server/key.pem"},
{verify,verify_none},
{fail_if_no_peer_cert,false}]}
]}
].
RabbitMQ命令介紹
一、/etc/init.d/rabbitmq-server start | stop | restart | reload
二、rabbitmqctl add_vhost vhostname 建立Vhost:
三、rabbitmqctl delete_vhost vhostname 刪除Vhost
四、rabbitmqctl list_vhosts 遍歷全部虛擬主機信息
五、rabbitmqctl add_user username password 添加用戶及密碼
六、rabbitmqctl change_password username newpassword 修改用戶密碼
七、rabbitmqctl set_permissions -p v_host user ".*" ".*" ".*"//綁定權限,而且具有讀寫的權限。
八、rabbitmqctl list_queues //顯示全部隊列