同步發送數據庫
異步發送session
- ProducerWindowSize 每次發送統計數量
- 開啓事務必然異步發送
JMS消息的可靠性機制異步
- JMS Session接口提供了 commit 和 rollback 方法
- 事務提交意味着生產的全部消息被髮送
- 事務回滾意味着生產的全部消息被銷燬,
- 消費的所 有消息被恢復並從新提交,除非它們已通過期。
- 關閉事務性會話將回滾其中的事務
- 在非事務型會話中
- Session.AUTO_ACKNOWLEDGE
- Session.CLIENT_ACKNOWLEDGE
- Session.DUPS_ACKNOWLEDGE
點對點消息傳遞性能
- 每一個消息只能有一個消費者
- 消息的生產者和消費者之間沒有時間上的相關性。
- 不管消費者在生產者發送消息的時候是否處於運行狀態,
- 均可以提取消息
發佈訂閱消息傳遞域fetch
- 每一個消息能夠有多個消費者
- 生產者和消費者之間有時間上的相關性。
- 訂閱一個主題的消費者只能消費自它訂閱以後發佈的消息。
- JMS 規範容許客戶建立持久訂閱,這在必定程度上下降了時間上的相關性要求。
- 持久訂閱容許消費者消費它在未處於激活狀態時發送的消息
JMS 消息由幾部分組成:優化
- 消息頭
- 包含消息頭包含消息的識別信息和路由信息
- 消息頭包含一些標準的屬性如:
- JMS Destination 消息發送的目的地, queue 或者 topic
- JMSDeliveryMode傳送模式。持久模式和非持久模式
- JMS Priority 消息優先級
- 優先級分爲 10 個級別,從 0 最低 到 9 最高
- 若是不設定優先級,默認級別是 4 。
- JMSMessage ID 惟一識別每一個消息的標識
- 屬性
- 按類型能夠分爲應用設置的屬性,標準屬性和消息中間件定義的屬性
- 消息體
持久訂閱設計
- 持久訂閱有兩個特色:
- 持久訂閱者和非持久訂閱者針對的 Domain 是 Pub/Sub 而不是 P2P
- 當 Broker 發送消息給訂閱者時,
- 若是訂閱者處於未激活狀態:
- 持久訂閱者能夠收到消息
- 而非持久訂閱者則收不到消息
- 若是持久訂閱者訂閱的消息太多則會溢出
消息的持久化策略3d
- KahaDB存儲
- 默認的存儲方式
- 可用於任何場景,提升了性能和恢復能力
- 消息存儲使用一個事務日誌和僅僅用一個索引文件來存儲它全部的地址。
- KahaDB是一個專門針對消息持久化的解決方案
- KahaDB的存儲原理
- 在data/kahadb這個目錄下,會生成四個文件
- Ø db.data 它是消息的索引文件,
- 本質上是B-Tree(B樹)
- 使用B-Tree做爲索引指向db-*.log裏面存儲的消息
- Ø db.redo 用來進行消息恢復
- Ø db-*.log 存儲消息內容。
- 新的數據以APPEND的方式追加到日誌文件末尾。
- 屬於順序寫入,所以消息存儲是比較快的。
- 默認是32M,達到閥值會自動遞增
- Ø lock文件鎖,
- JDBC存儲
- ACTIVEMQ_MSGS 消息表,queue和topic都存在這個表中
- ACTIVEMQ_ACKS 存儲持久訂閱的信息和最後一個持久訂閱接收的消息ID
- ACTIVEMQ_LOCKS 鎖表,用來確保某一時刻,只能有一個ActiveMQ broker實例來訪問數據庫
- LevelDB存儲
- LevelDB持久化性能高於KahaDB
- 官方建議使用以及再也不支持
- Memory 消息存儲
- persistent=」false」,表示不設置持久化存儲,直接存儲到內存中
- JDBC Message store with ActiveMQ Journal
- 這種方式克服了JDBC Store的不足,JDBC每次消息過來,都須要去寫庫和讀庫。
- 當消費者的消費速度可以及時跟上生產者消息的生產速度時,journal文件可以大大減小須要寫入到DB中的消息。
- 若是消費者的消費速度很慢,這個時候journal文件可使消息以批量方式寫到DB。
消費端消費消息的原理日誌
prefetch limit 規定了一次能夠向消費者Push(推送)多少條消息。中間件
- 有兩種方法能夠接收消息,
- 一種是使用同步阻塞的MessageConsumer#receive方法。
- 另外一種是使用消息監聽器MessageListener。
- 同一個session下,這二者不能同時工做,
- 也就是說不能針對不一樣消息採用不一樣的接收方式。不然會拋出異常。
- 爲何這麼作,最大的緣由仍是在事務性會話中,
- sendPullCommand
- 發送pull命令從broker上獲取消息,前提是prefetchSize=0而且unconsumedMessages爲空。
- unconsumedMessage表示未消費的消息,這裏面預讀取的消息大小爲prefetchSize的值
- clearDeliveredList
- 在上面的sendPullCommand方法中,會先調用clearDeliveredList方法,
- 主要用來清理已經分發的消息鏈表deliveredMessages
- dequeue
- 爲何要存在這樣一個消息分發通道
- 你們能夠想象一下,若是消費者每次去消費完一個消息之後再去broker拿一個消息,效率是比較低的。
- 因此經過這樣的設計能夠容許session可以一次性將多條消息分發給一個消費者。
- 默認狀況下對於queue來講,prefetchSize的值是1000
- beforeMessageIsConsumed
- afterMessageIsConsumed
- 主要做用是執行應答操做,這裏面作如下幾個操做 Ø 若是消息過時,則返回消息過時的ack Ø 若是是事務類型的會話,則不作任何處理 Ø 若是是AUTOACK或者(DUPS_OK_ACK且是隊列),而且是優化ack操做,則走批量確認ack Ø 若是是DUPS_OK_ACK,則走ackLater邏輯 Ø 若是是CLIENT_ACK,則執行ackLater