[kafka] 001_kafka起步

 

 

1、簡介html

  Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

 Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。

  這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。

  對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。

  Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣機來提供實時的消費。

 

2、特色node

Kafka maintains feeds of messages in categories called topics.

We'll call processes that publish messages to a Kafka topic producers.

We'll call processes that subscribe to topics and process the feed of published messages consumers.

Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:apache

 

Communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. We provide a Java client for Kafka, but clients are available in many languages.(客戶端和服務器之間的溝通採用了TCP協議,kafka提供了基於Java的客戶端,可是理論上客戶端能夠用任何語言編碼實現)緩存

 

3、相關術語服務器

Broker
Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker.

Topic
每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic. 物理上不一樣Topic的消息分開存儲,邏輯上一個Topic 的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處.

Partition
Partition是物理上的概念,每一個Topic包含一個或多個Partition.

Producer
負責發佈消息到Kafka broker

Consumer
消息消費者,向Kafka broker讀取(pull)消息的客戶端.

Consumer Group
每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group).

 

4、設計原理網絡

kafka的設計初衷是但願做爲一個統一的信息收集平臺,可以實時的收集反饋信息,並須要可以支撐較大的數據量,且具有良好的容錯能力.

一、持久性併發

kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的自己特性.且不管任何OS下,對文件系統自己的優化幾乎沒有可能.

文件緩存/直接內存映射等是經常使用的手段.

由於kafka是對日誌文件進行append操做,所以磁盤檢索的開支是較小的;同時爲了減小磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到必定閥值時,再flush到磁盤,這樣減小了磁盤IO調用的次數.

二、性能app

須要考慮的影響性能點不少,除磁盤IO以外,咱們還須要考慮網絡IO,這直接關係到kafka的吞吐量問題.kafka並無提供太多高超的技巧;

對於producer端,能夠將消息buffer起來,當消息的條數達到必定閥值時,批量發送給broker;

對於consumer端也是同樣,批量fetch多條消息.不過消息量的大小能夠經過配置文件來指定.

對於kafka broker端,彷佛有個sendfile系統調用能夠潛在的提高網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域便可,而無需進程再次copy和交換.

其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,所以啓用消息壓縮機制是一個良好的策略;壓縮須要消耗少許的CPU資源,不過對於kafka而言,網絡IO更應該須要考慮.能夠將任何在網絡上傳輸的消息都通過壓縮.kafka支持gzip/snappy等多種壓縮方式.

三、生產者負載均衡

負載均衡: 
    producer將會和Topic下全部partition leader保持socket鏈接;消息由producer直接經過socket發送到broker,中間不會通過任何"路由層".事實上,消息被路由到哪一個partition上,有producer客戶端決定.好比能夠採用"random""key-hash""輪詢"等,若是一個topic中有多個partitions,那麼在producer端實現"消息均衡分發"是必要的.
 
    其中partition leader的位置(host:port)註冊在zookeeper中,producer做爲zookeeper client,已經註冊了watch用來監聽partition leader的變動事件.

    異步發送:將多條消息暫且在客戶端buffer起來,並將他們批量的發送到broker,小數據IO太多,會拖慢總體的網絡延遲,批量延遲發送事實上提高了網絡效率。不過這也有必定的隱患,好比說當producer失效時,那些還沒有發送的消息將會丟失。

 

  四、消費者dom

    consumer端向broker發送"fetch"請求,並告知其獲取消息的offset;此後consumer將會得到必定條數的消息;consumer端也能夠重置offset來從新消費消息.
 
    在JMS實現中,Topic模型基於push方式,即broker將消息推送給consumer端.不過在kafka中,採用了pull方式,即consumer在和broker創建鏈接以後,主動去pull(或者說fetch)消息;這中模式有些優勢,首先consumer端能夠根據本身的消費能力適時的去fetch消息並處理,且能夠控制消息消費的進度(offset);此外,消費者能夠良好的控制消息消費的數量,batch fetch.
 
    其餘JMS實現,消息消費的位置是有prodiver保留,以便避免重複發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker須要太多額外的工做.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有複雜的消息確認機制,可見kafka broker端是至關輕量級的.當消息被consumer接收以後,consumer能夠在本地保存最後消息的offset,並間歇性的向zookeeper註冊offset.因而可知,consumer客戶端也很輕量級.

 

 

  五、消息傳送機制

對於JMS實現,消息傳輸擔保很是直接:有且只有一次(exactly once).在kafka中稍有不一樣,有三種方式:

1) at most once: 最多一次,這個和JMS中"非持久化"消息相似.發送一次,不管成敗,將不會重發. 2) at least once: 消息至少發送一次,若是消息未能接受成功,可能會重發,直到接收成功. 3) exactly once: 消息只會發送一次.
at most once: 消費者fetch消息,而後保存offset,而後處理消息;當client保存offset以後,可是在消息處理過程當中出現了異常,致使部分消息未能繼續處理.那麼此後
"未處理"的消息將不能被fetch到,這就是"at most once".
at least once: 消費者fetch消息,而後處理消息,而後保存offset.若是消息處理成功以後,可是在保存offset階段zookeeper異常致使保存操做未能執行成功,這就致使接下來再次fetch時可能得到上次已經處理過的消息,這就是
"at least once",緣由offset沒有及時的提交給zookeeper,zookeeper恢復正常仍是以前offset狀態.
exactly once: kafka中並無嚴格的去實現(基於2階段提交,事務),咱們認爲這種策略在kafka中是沒有必要的.
一般狀況下
"at-least-once"是咱們首選.(相比at most once而言,重複接收數據總比丟失數據要好).

 

  六、複製備份

    kafka將每一個partition數據複製到多個server上,任何一個partition有一個leader和多個follower(能夠沒有);

    備份的個數能夠經過broker配置文件來設定.

    leader處理全部的read-write請求,follower須要和leader保持同步.Follower和consumer同樣,消費消息並保存在本地日誌中;leader負責跟蹤全部的follower狀態,若是follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當全部的follower都將一條消息保存成功,此消息才被認爲是"committed",那麼此時consumer才能消費它.

    即便只有一個replicas實例存活,仍然能夠保證消息的正常發送和接收,只要zookeeper集羣存活便可.(不一樣於其餘分佈式存儲,好比hbase須要"多數派"存活才行)

    當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,所以須要選擇一個"up-to-date"的follower.選擇follower時須要兼顧一個問題,就是新leader-server上所已經承載的partition leader的個數,若是一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,須要考慮到"負載均衡".

 

  7.日誌

    若是一個topic的名稱爲"my_topic",它有2個partitions,那麼日誌將會保存在my_topic_0和my_topic_1兩個目錄中;

    日誌文件中保存了一序列"log entries"(日誌條目),每一個log entry格式爲"4個字節的數字N表示消息的長度" + "N個字節的消息內容";

    每一個日誌都有一個offset來惟一的標記一條消息,offset的值爲8個字節的數字,表示此消息在此partition中所處的起始位置.每一個partition在物理存儲層面,由多個log file組成(稱爲segment).

    segmentfile的命名爲"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

     其中每一個partiton中所持有的segments列表信息會存儲在zookeeper中.
    
    當segment文件尺寸達到必定閥值時(能夠經過配置文件設定,默認1G),將會建立一個新的文件;當buffer中消息的條數達到閥值時將會觸發日誌信息flush到日誌文件中,同時若是"距離最近一次flush的時間差"達到閥值時,也會觸發flush到日誌文件.若是broker失效,極有可能會丟失那些還沒有flush到文件的消息.由於server意外實現,仍然會致使log文件格式的破壞(文件尾部),那麼就要求當server啓東是須要檢測最後一個segment的文件結構是否合法並進行必要的修復.

    獲取消息時,須要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,能夠找到此消息所在segment文件,而後根據segment的最小offset取差值,獲得它在file中的相對位置,直接讀取輸出便可.
  
    日誌文件的刪除策略很是簡單:啓動一個後臺線程按期掃描log file列表,把保存時間超過閥值的文件直接刪除(根據文件的建立時間).爲了不刪除文件時仍然有read操做(consumer消費),採起copy-on-write方式.

 

八、分配

  kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變動並做出相應的動做(好比consumer失效,觸發負載均衡等)
1) Broker node registry: 當一個kafka-broker啓動後,首先會向zookeeper註冊本身的節點信息(臨時znode),同時當broker和zookeeper斷開鏈接時,此znode也會被刪除.
格式:
/broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每一個broker的配置文件中都須要指定一個數字類型的id(全局不可重複),znode的值爲此broker的host:port信息.
2) Broker Topic Registry: 當一個broker啓動時,會向zookeeper註冊本身持有的topic和partitions信息,仍然是一個臨時znode.
  格式:
/broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引號.
  
3) Consumer and Consumer group: 每一個consumer客戶端被建立時,會向 zookeeper 註冊本身的信息;此做用主要是爲了"負載均衡".
一個group中的多個consumer能夠交錯的消費一個topic的全部partitions;簡而言之,保證此topic的全部partitions都能被此group所消費,且消費時爲了性能考慮,讓partition相對均衡的分散到每一個consumer上.
4) Consumer id Registry: 每一個consumer都有一個惟一的ID(host:uuid,能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.
格式:
/consumers/[group_id]/ids/[consumer_id]
仍然是一個臨時的znode,此節點的值爲{
"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表.
5) Consumer offset Tracking: 用來跟蹤每一個consumer目前所消費的partition中最大的offset.
格式:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value 此znode爲持久節點,能夠看出offset跟group_id有關,以代表當group中一個消費者失效,其餘consumer能夠繼續消費.
6) Partition Owner registry: 用來標記partition被哪一個consumer消費.臨時znode
格式:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啓動時,所觸發的操做:
A) 首先進行
"Consumer id Registry";
B) 而後在
"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave""join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管partitions).
C) 在
"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.

 

 

  1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.
2) Broker端使用zookeeper用來註冊broker信息,已經監測partition-leader存活性.
3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

5、參考資料

http://kafka.apache.org/documentation.html#gettingStarted

http://www.cnblogs.com/likehua/p/3999538.html

相關文章
相關標籤/搜索