【3.工程開發】-mq-ActiveMQ/rabbitmq

由流處理的轉發講到消息隊列,總體見:https://segmentfault.com/a/11...。本篇總結下之前的兩個消息隊列協議:JMS,AMQP以及兩個典型的實現Activemq和Rabbitmq.兩者對分佈式的支持都在主從,對分片的支持很差,擴展性差。主要關注框架組件,功能,簡單說下分佈式。更經常使用的消息隊列見:https://segmentfault.com/a/11...html

JMS模型

是一種Java消息服務協議,支持兩種模型:點對點或隊列模型,發佈/訂閱模型java

點對點

在點對點或隊列模型下,一個生產者向一個特定的隊列發佈消息,一個消費者從該隊列中讀取消息。這裏,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被歸納爲:
1)只有一個消費者將得到消息
2)生產者不須要在接收者消費該消息期間處於運行狀態,接收者也一樣不須要在消息發送時處於運行狀態。
3)每個成功處理的消息都由接收者簽收數據庫

發佈者/訂閱者

發佈者/訂閱者模型支持向一個特定的消息主題發佈消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發佈者和訂閱者彼此不知道對方。這種模式比如是匿名公告板。這種模式被歸納爲:
1)多個消費者能夠得到消息
2)在發佈者和訂閱者之間存在時間依賴性。發佈者須要建立一個訂閱(subscription),以便客戶可以購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種狀況下,在訂閱者未鏈接時發佈的消息將在訂閱者從新鏈接時從新發布。apache

ActiveMQ

官方:http://activemq.apache.orgsegmentfault

框架組件

  • 邏輯框架:
    clipboard.png
  • 基本組件:
    Broker,消息代理,表示消息隊列服務器實體,接受客戶端鏈接,提供消息通訊的核心服務。
    Producer,消息生產者,業務的發起方,負責生產消息並傳輸給 Broker 。
    Consumer,消息消費者,業務的處理方,負責從 Broker 獲取消息並進行業務邏輯處理。
    Topic,主題,發佈訂閱模式下的消息統一聚集地,不一樣生產者向 Topic 發送消息,由 Broker 分發到不一樣的訂閱者,實現消息的廣播。
    Queue,隊列,點對點模式下特定生產者向特定隊列發送消息,消費者訂閱特定隊列接收消息並進行業務邏輯處理。
    Message,消息體,根據不一樣通訊協議定義的固定格式進行編碼的數據包,來封裝業務 數據,實現消息的傳輸。
    connector(broker與client,broker與broker)的協議:vm,tcp,xxxx
    持久消息的存儲:KahaDB(基於文件),journal+JDBC,內存,levelDb(leveldb+zk做爲M-S複製方案)

消費/持久化/刪除

  • 消息存入
    內存+磁盤光標,若內存不夠,維護一份磁盤索引位置(對於消費速度慢的,後續還會訪問的消息),性能會受影響,主要是由於分片很差,單機內存不夠。

    clipboard.png

  • 刪除
    對於點對點的消息一旦消費者完成消費這條消息將從broker上刪除;
    對於發佈訂閱類型的消息,即便全部的訂閱者都完成了消費,Broker也不必定會立刻刪除無用消息,而是保留推送歷史,以後會異步清除無用消息。而每一個訂閱者消費到了哪條消息的offset會記錄在Broker,以避免下次重複消費。由於消息是順序消費,先進先出,因此只須要記錄上次消息消費到哪裏就能夠了。
  • 持久化
    持久化三個表:
    activemq_acks:用於存儲訂閱關係。若是是持久化Topic,訂閱者和服務器的訂閱關係在這個表保存,主要數據庫字段以下:服務器

    container:消息的destination
    sub_dest:若是是使用static集羣,這個字段會有集羣其餘系統的信息
    client_id:每一個訂閱者都必須有一個惟一的客戶端id用以區分
    sub_name:訂閱者名稱
    selector:選擇器,能夠選擇只消費知足條件的消息。條件能夠用自定義屬性實現,可支持多屬性and和or操做
    last_acked_id:記錄消費過的消息的id

    activemq_lock:在集羣環境,只有一個Master Broker能夠得到消息,用於記錄哪一個Broker是當前的Master Broker。網絡

    activemq_msgs:用於存儲消息,Queue和Topic都存儲在這個表中。主要的數據庫字段以下:session

    id:自增的數據庫主鍵
    container:消息的destination
    msgid_prod:消息發送者客戶端的主鍵
    msg_seq:是發送消息的順序,msgid_prod+msg_seq能夠組成jms的messageid
    expiration:消息的過時時間,存儲的是從1970-01-01到如今的毫秒數
    msg:消息本體的java序列化對象的二進制數據
    priority:優先級,從0-9,數值越大優先級越高
    activemq_acks用於存儲訂閱關係。若是是持久化topic,訂閱者和服務器的訂閱關係在這個表保存。

集羣

  • M-S
    數據單獨HA方案共享+鎖獲取master/slave切換。leveldb+zk/KahaDB+SAN
  • 相似分片:networkConnector。
    對於訂閱,一直在brocker中轉發,直到消費。對於queue順序保證比較困難。https://shift-alt-ctrl.iteye....

AMQP

不限於java的通用消息系統協議,官方:
http://docs.oasis-open.org/am...
定義瞭如下幾層內容:
TYPE - type system and encoding
Transport - AMQP transport layer(全雙工可靠遞交對等,connection=>多channel的session=>links)
Messaging - AMQP Messaging Layer (對消息確認,拒絕,持久化等規定)
Transactions - AMQP Transactions Layer(事務提交等)
Security - AMQP Security Layers架構

rabbitmq

架構組件

clipboard.png

  • Message
    消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
  • Publisher
    消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
  • Exchange
    交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。Exchange 類型
    Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers(基本不用)
    1.直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應綁定鍵的隊列。直連交換機用來處理消息的單播路由(unicast routing)(儘管它也能夠處理多播路由)。下邊介紹它是如何工做的:
    1)將一個隊列(一個消費者一個隊列)綁定到某個交換機上時,賦予該綁定一個綁定鍵(Binding Key),假設爲R;
    2)當一個攜帶着路由鍵(Routing Key)爲R的消息被髮送給直連交換機時,交換機會把它路由給綁定鍵爲R的隊列(徹底匹配,有幾個R就發幾個)
    直連交換機的隊列一般是循環分發任務給多個消費者(咱們稱之爲輪詢)
    2.生產者(P)生產消息推送到 Exchange,遵循 fanout 的規則將消息推送到全部與它綁定 Queue
    3.topic
    同直連型,只不過是模糊匹配
  • Binding
    綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。
  • Queue
    消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。(多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理,因此topic的每一個消費者的topic的queue應該同樣或前匹配https://juejin.im/entry/599e5...
  • Connection
    網絡鏈接,好比一個TCP鏈接。
  • Channel
    信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
  • Consumer
    消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  • Virtual Host
    虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。
  • Broker
    表示消息隊列服務器實體。

持久化、刪除、確認機制、事務

  • 持久化
    要求消息持久/內存滿了(自行開發,由於要釋放RAM,隊列有很是活躍和靜止之分不適合都快照增量的方式)
    buffer->文件->刷盤,消息+索引,當刪除Segment到閾值文件合併。須要持久化消息,exchange,隊列。
    Segment的刪除rabbit_msg_index模塊爲每個Segment維護一個unacked計數,每publish一個消息加1,每ack一個消息減1,當unacked=0時,文件刪除。
  • 消息刪除
    能夠指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號後才從內存(和磁盤,若是是持久化消息的話)中移去消息。不然,RabbitMQ會在隊列中消息被消費後當即刪除它)每一個隊列單獨處理
  • ACK與事務
    默認狀況下發布操做是不會返回任何信息給生產者的,
    1.使用事務:
    client發送Tx.Select
    broker發送Tx.Select-Ok(以後publish)
    client發送Tx.Commit
    broker發送Tx.Commit-Ok
    client未收到確認會回滾並重發
    2.channel的confirm模式
    生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理。

集羣

rabbitmq不支持動態擴展,erlang性能高
能夠共享 user、vhost、exchange等,全部的數據和狀態都是必須在全部節點上覆制的,隊列例外,只在單個節點而不是全部節點上建立完整的隊列信息(元數據、狀態、內容,RabbitMQ 2.6.0以後提供了鏡像隊列以免集羣節點故障致使的隊列內容不可用)(猜想:消息的持久應該包含交換機上的和隊列中的,發給全部隊列後消息應該可刪除了,消費後隊列中的可刪除了)
磁盤節點+內存節點,要求集羣中至少有一個磁盤節點,全部其餘節點能夠是內存節點,當節點加入火離開集羣時,它們必需要將該變動通知到至少一個磁盤節點,若是隻有一個磁盤節點,恰好又是該節點崩潰了,那麼集羣能夠繼續路由消息,但不能建立隊列、建立交換器、建立綁定、添加用戶、更改權限、添加或刪除集羣節點。框架

相關文章
相關標籤/搜索