【3.工程開發】-mq-kafka/rocketmq

本篇是消息隊列中的一節,爲何講到消息隊列見:https://segmentfault.com/a/11...。其中流處理的數據傳播用到消息隊列。另外消息隊列還能夠做用於異步處理,流量削峯,多系統同步等。另外一篇介紹了傳統的JMS(activemq),AMQP(rabbitmq),本篇介紹kafka,robbitmq,ddmq,另外簡單說下bridgemq以及常見mq的綜合對比。同其餘系統同樣,終點關注架構組件,功能(生產消費等),分佈式的高可用,擴展性,一致性等linux

kafka

官方:發佈訂閱,流處理管道和存儲
https://kafka.apache.org/docu...redis

組件

clipboard.png

  • broker:負責消息存儲轉發,包含topic(一個queue)=》partition(物理分佈,一個topic包含一個或多個partition,能夠分佈在不一樣的broker上)
  • producer(與broker leader直連,負載均衡指定partition,可批次發,可設置要ack的副本數)
  • consumer/consumer group.同一group中只有一個c能夠消費一個p,負載均衡,此時若這組cg的線程多於p會有空等線程。可是多個cg能夠同時消費一個p不受影響。所以在考慮爲了消費更快下,對於partition的分區和c的線程數能夠一致。
  • 其中partition的集羣和單機物理結構以下:
    clipboard.png

    clipboard.png

  • index所有映射到內存,每一個partition下自增id
  • 元數據放在zk上.分partition,每一個partition副本分散在broker上,單partition+單消費才能保證順序(rocketmq同樣)。每一個partition一個索引,順序寫一個文件。流處理+批量處理(累計部分數據才發送),實時上有取捨。

https://kafka.apache.org/docu...apache

高可用和可擴展

1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.
2) Broker端使用zookeeper用來註冊broker信息,以及監測partition leader存活性.全部的Kafka Broker節點一塊兒去Zookeeper上註冊一個臨時節點,成功的爲Broker controller,失效後zk後發現從新註冊節點,controller負責各broker內partition的選主(ISR中,記錄replica進度,隨便選)ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每一個節點讀取並追加到日誌中了,纔回通知外部這個消息已經被提交了。所以這個集合中的任何一個節點隨時均可以被選爲leader.若是ISR的大小超過某個最小值,則分區將僅接受寫入,以防止丟失僅寫入單個副本的消息(只關注ISR,而不是共識多個都寫入,多數(兩個故障須要5個副本,一個要三個)對於主數據的寫代價大)【與ES相似都使用的Microsoft的PacificA】
3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息。
broker,partition,customer組內線程可擴展。json

消費

只保證一個partition被一個customer消費有序
producter推,customer拉(拉須要存日誌)
partition中的每一個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,若多個同時要配多個Consumer group。
kafka中的消息是批量(一般以消息的條數或者chunk的尺寸爲單位)發送給consumer,當消息被consumer接收以後,負責維護消息的消費記錄(JMS等都是broker維護),consumer能夠在本地保存最後消息的offset,並間歇性的向zookeeper註冊offset.也沒有ACK
消息消費的可靠性,消費者控制,最多一次,先保存offset再處理;至少一次,先處理再保存offset;只一次:最少1次+消費者的輸出中額外增長已處理消息最大編號segmentfault

日誌壓縮

確保有每一個分區數據日誌中每一個key有最後已知值,offset不能變。對同一partition的多個文件一塊兒壓縮合並。
position是文件的bytes偏移吧?壓縮過程當中要重建索引和位置?【我的理解是要重建的】
active不動(不影響寫入),對cleaner point後面的作壓縮,選擇日誌tail和header比例小的,合併壓縮每組log不超過1G,index不超過10M。
clipboard.png
對於tail的壓縮過程:【position不變???我的理解這是錯誤的,position是變得】
每一個日誌清理線程會使用一個名爲「SkimpyOffsetMap」的對象來構建key與offset的映射關係的哈希表。日誌清理須要遍歷兩第二天志文件,第一次遍歷把每一個key的哈希值和最後出現的offset都保存在SkimpyOffsetMap中,映射模型以下圖所示。第二次遍歷檢查每一個消息是否符合保留條件,若是符合就保留下來,不然就會被清理掉服務器

clipboard.png

rocketmq

activemq 不能分片。kafka性能(上面知道基本上partition和consumer須要配置同樣的,一個consumer group的線程數和partition數量一致,受partition限制,rocketmq多partition的擴展在於都用一個commitlog,而不是一個partition單獨一份順序log,對於磁盤多個文件是隨機寫入的,隨機高性能很差不能linux組提交,cq只存儲位置,在commitlog中找數據,一份徹底順序的寫入提升性能。對於消費順序和kafka都是同樣的保證,cq都是負載均衡,只保證一個cq順序。在消費時,須要先讀取cq上個的offset再讀commitlog。http://rocketmq.apache.org/ro...架構

組件

clipboard.png

  • broker :主從,topic,queue,tag
  • nameserver:幾乎無狀態,可集羣內部署,節點對等,不一樣步。數據是broker同步過來的
  • producer:鏈接ns,主從brokers(心跳),無狀態
  • consumer/group :鏈接ns,主從brokers(心跳)

高可用和可擴展

clipboard.png

  • 負載均衡
    Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上,而Producer的發送機制保證消息儘可能平均分佈到全部隊列中,最終效果就是全部消息都平均落在每一個Broker上。
  • 主從
    機器級別,不依賴zk,元數據:在 Broker 啓動的時候,其會將本身在本地存儲的配置文件 (默認位於$HOME/store/config/topics.json 目錄) 中的全部話題加載到內存中去,而後會將這些全部的話題所有同步到全部的 Name 服務器中。與此同時,Broker 也會啓動一個定時任務,默認每隔 30 秒來執行一次話題全同步.主從寫commitlog保證持久性和同步和其餘同樣,就再也不說了。
  • Broker與Namesrv的心跳機制:
    單個Broker跟全部Namesrv保持心跳請求,心跳間隔爲30秒,心跳請求中包括當前Broker全部的Topic信息。Namesrv會反查Broer的心跳信息,若是某個Broker在2分鐘以內都沒有心跳,則認爲該Broker下線,調整Topic跟Broker的對應關係。但此時Namesrv不會主動通知Producer、Consumer有Broker宕機。
  • 消息存儲持久化
    全部broker上的全部topic都順序寫入內存文件mapedfile(1G),mapedfilelist記錄每一個mapedfile在磁盤的偏移量,新消息寫入最後一個文件。
  • 動態伸縮能力
    (非順序消息,消息分散;有序消息只能放在一個queue中,切不支持遷移,只保證一個queue內順序,但能夠多消費線程保證順序):Broker的伸縮性體如今兩個維度:Topic, Broker。
    1)Topic維度:假如一個Topic的消息量特別大,但集羣水位壓力仍是很低,就能夠擴大該Topic的隊列數,Topic的隊列數跟發送、消費速度成正比。
    2)Broker維度:若是集羣水位很高了,須要擴容,直接加機器部署Broker就能夠。Broker起來後想Namesrv註冊,Producer、Consumer經過Namesrv發現新Broker,當即跟該Broker直連,收發消息。

消費

  1. 消費者註冊,消費者上有多有topic的broker地址和隊列,消費者負載均衡選擇;
    1)廣播模式:每一個costumer全量消費,消費偏移量保存在costumer中
    2)集羣模式:constumer均勻消費部分,每一個消息只有一個costumer消費,保存在broker上
  2. 新消息發送到q:brocker上commit log和消費組信息

    clipboard.png
    每一個commmit log消息發給topic的隨機queue中(生產者的負載均衡,每一個msg只發送到一個q中),每一個queue有不少consumequeue,發給全部。廣播模式,cq會在全部q上,集羣模式cq會負載均衡到某個q上,消息根據這些配置數據落到q的全部cq上。
    clipboard.png併發

  3. 消費
    3.1)普通的併發消費:queue的全部cq都直接發,全部cq發送後刪除(q以TreeMap結構存儲)。內部RocketMQ 的消息樹是用 TreeMap 實現的,其內部基於消息偏移量維護了消息的有序性。每次消費請求都會從消息數中拿取偏移量最小的幾條消息 (默認爲 1 條)給用戶,以此來達到有序消費的目的。
    3.2)有序消費:在3.1的基礎上加兩個鎖,costumer client給消費的每一個queue會加鎖,保證同一時刻只有一個costumer client在消費queue(不然發給一個client刪除了消息,此消息在另外一個client和後面的client的消息沒法保證順序),默認20s加一次,queue檢測60s沒有就釋放,每次成功後才取下一條,反正只有一個客戶端消費。第二把鎖是在client中,將堆積的消息按照順序加鎖的寫入線程池task隊列中。

    clipboard.png

其餘

bridgequeue

內存。redis實現。適合小型系統
clipboard.png負載均衡

ddmq 對大型延時系統的支持,引入chronos

clipboard.png

這裏的kafka去掉了。普通的直接用哪一個rocketmq.延時消息和事務消息異步

  • 延時消息
    放入rocketmq一個內部的消費topic中,消費入chronos中(存RocksDB,seektimestamp, while從leveldb中取符合時間的再放入rocketmq中)
  • 事務消息
    A執行後要發送消息給B,由於ddmq一旦接收是保證被消費的,因此增長髮送方事務回查。
    clipboard.png

對比

clipboard.png分析:少topic時kafka性能好,rockemq須要讀mq後去讀一個大的cl。多topic是rockemq好,處理線程多。

相關文章
相關標籤/搜索