Apache Kafka 源碼剖析
Getting Start
下載
優勢和應用場景
- Kafka消息驅動,符合發佈-訂閱模式,優勢和應用範圍都共通
- 發佈-訂閱模式優勢
- 解耦合 : 兩個應用不須要相互調用
- 可擴展性 : 消費者的個數可實時擴展
- 實時性 : 消費者能實時的獲取生產者發佈的事件
- 高效 :減小因爲多個消費者請求數據形成的數據計算帶來的資源消耗
- 異步通信 :發佈-訂閱模式是天生的異步通信
- Kafka其餘優勢
- 持久化 : 消息丟失的可控性極高
- 高性能 : 磁盤順序讀寫性能比內存隨機讀寫還高,每秒10萬條消息
- 高吞吐量 :每秒上百MB的吞吐量
- 順序性
- 發佈-訂閱模式應用範圍
- 適合數據一被生產,就須要被處理的狀況
- 適合數據具備潛在消費者的狀況
- 適合不管有沒有消費者,數據都在生產的狀況
- 不適合對數據的處理時間有特殊限定的狀況
- 應用場景
- 最爲消息中間件,實現消息隊列和消息的發佈-訂閱,消息驅動的服務
- 數據總線,一對多的模式
- 日誌收集,消息中間件的一種應用
- 數據庫主從同步
核心概念
- Broker
- 一個Kafka server就是一個Broker
- 通常狀況下,一個Broker獨佔一臺服務器,發揮微服務的優點
- 服務器資源有限的狀況下,須要設計出Broker/Topic/Partition/Replica的最優分配策略,從而充分利用服務器資源
- 一個broker能夠有多個topic
- Topic
- 存儲消息的邏輯上的消息集合
- 每一個Topic有多個分區
- 分區 Partition
- 同一個Topic的不一樣分區分配在不一樣Broker上,也就是一個分區一個服務器
- 不一樣Topic的分區能夠共享一個服務器
- 同一個分區的消息是有序的,經過維護offset實現
- 相同key的消息會被髮布到同一個分區
- 同一個分區的消息會被一個消費組裏固定的一個消費者獨佔消費
- 經過增長分區來增長並行處理能力
- 每一個分區能夠有多個副本
- 消費組 Consumer Group
- 實現一個消息只被同組的一個消費者獨佔消費
- 消費組裏的消費者有變化的時候會觸發Rebalance操做從新分配分區與消費者的對應關係
- Rebalance操做實現了分區消費的故障轉移
- 經過增長分區和消費組裏的消費者數量來水平擴展,理想狀況一對一,也能夠一對多,最好不要多對一,形成浪費
- 副本 Replica
- 同一個分區的不一樣副本分配在不一樣Broker上,可是這些Broker能夠是在同一臺服務器上,也能夠不是
- 副本是一個熱備份設計,會選舉一個做爲Leader,提供對外服務
- Fllower副本批量的從Leader副本同步消息
- HW & LEO
- HW是全部ISR副本都有的最新offset,HW以前的消息在全部副本中都存在,HW由Leader副本維護
- 全部消費者只能獲取HW以前的消息,這樣保證了Leader副本不可用的狀況下,全部消費者的狀態是一致的
- LEO是每一個副本各自的最新offset
- ISR集合
- 知足兩個條件的副本會被選入ISR可用副本集合
- 副本與Zookeeper鏈接
- 副本的LEO與Leader副本的LEO差值不超過閾值
- ISR集合保證了Kafka不會被故障副本拖累,也保證了Leader的HW與LEO的差值在閾值內
- 生產者
- 異步提交
- acks=0 : 生產者只管提交,不會等待Leader副本返回,不保證數據不丟失
- 同步提交
- acks=1 : 默認設置,生產者等待Leader副本返回成功,保證數據在Leader中部丟失,可是不保證從新選舉後數據不丟失
- 同步複製
- acks=all : 生產者等待全部副本同步消息後纔算提交成功,保證數據不丟失,性能低
- Log
- 一個副本對應一個Log,用於持久化數據,Kafka採用順序讀寫的方式,性能高
- 一個Log裏有多個Segment,每一個Segment有一個日誌文件和一個索引文件
- 日誌文件的大小有限制,超出後會生成新的Segment
- 日誌消息保留策略有兩種
- 消息的保留時間超過指定時間,能夠被刪除
- Topic的存儲滿,能夠開始刪除最舊的消息
- 保留策略能夠全局配置,也能夠按Topic配置
- 日誌壓縮
- 開啓日誌壓縮後,相同的key會被按期合併,只保留最新的value
Kafka/zookeeper 命令
- 啓動Zookeeper
- ./zookeeper-server-start.sh ../config/zookeeper.properties
- 啓動Kafka
- ./kafka-server-start.sh ../config/server.properties
- 查看Topic
- ./kafka-topics.sh --list --zookeeper localhost:9860
- 刪除Topic
- ./kafka-topics.sh --delete --zookeeper localhost:9860 /kafka --topic test
- 不會立馬刪除topic
- 查看Topic的詳細信息
- ./kafka-topics.sh --zookeeper localhost:9860 --topic test--describe
- 查看zk信息
- ./zookeeper-shell.sh 127.0.0.1:9860
- 生產數據
- ./kafka-console-producer.sh --broker-list cvatap3d.nam.nsroot.net:9801 --topic midcurve-ds
- 消費數據
- ./kafka-console-consumer.sh --zookeeper localhost:9860 --topic midcurve-ds-subscribe --from-beginning
Kafka集羣
zookeeper集羣配置 : zookeeper.properties
- clientPort=2180
- 端口號
- dataDir=/tmp/zookeeper
- 集羣信息記錄目錄,清空目錄能夠重置zookeeper
- 若是須要在同一臺server上啓動多個node,這個路徑必須不一樣
- tickTime=2000
- zookeeper副本與leader之間維護心跳的頻率
- initLimit=5
- zookeeper的leader初始化鏈接follower時等待多少個tickTime時間的心跳,超時副本鏈接失敗
- syncLimit=2
- leader與follower之間發送消息,請求和應答超時是多少個tickTime
- server.0=cvatap3d.nam.nsroot.net:2888:3888
- server.1=cvatap3d.nam.nsroot.net:2889:3889
- server.2=cvatap3d.nam.nsroot.net:2890:3890
- 第一個啓動的爲leader
- zookeeper集羣數量必須是基數3,5,7...
- 0,1,2是服務id,須要在對應的dataDir=/tmp/zookeeper下面建立myid文件,內容就是服務id,好比0
- ip或者host均可以
- 後面兩個端口是zookeeper內部通信使用
- 第一個端口是用於副本與Leader創建TCP鏈接
- 第二個端口是用於Leader選舉的TCP端口
Kafka配置 : server.properties
- broker.id=0
- 同一個zookeeper集羣下的broker的id必須惟一
- log.dirs=/tmp/kafka-logs
- 啓動kafka會從zookeeper下載配置到log目錄
- 若是修改了server.properties可能由於配置與存儲的配置不匹配致使啓動失敗,這時候能夠刪除這個目錄
- 若是須要在同一臺server上啓動多個broker,這個路徑必須不一樣
- zookeeper.connect=localhost:2181
- zookeeper集羣,以逗號隔開
- listeners=PLAINTEXT://cvatap3d.nam.nsroot.net:9093
- broker的host:port
zookeeper與kafka
- Kafka將Broker信息註冊到zookeeper
- zookeeper會維護topic與broker的關係,選舉Leader
- 監控partition leader存活性,發現Leader異常會從新選舉Leader
- 當異常Broker恢復後,會在一段時間後從新分配Leader
- Broker從zookeeper獲取集羣中其它Broker信息
- Consumer端將本身註冊到zookeeper
- 用來獲取broker列表
- 並和partition leader創建socket鏈接
- 在Consumer Group發生變化時進行rebalance
- Zookeeper管理consumer的offset跟蹤當前消費的offset。
- Producer端將本身註冊到zookeeper
- 用來獲取broker列表和分區狀態,從而將消息發佈到正確的Broker
- Zookeeper無論理producer
Kafka使用經驗總結
Consumer
- Consumer默認自動提交Offset,而且是一獲取便提交,默認間隔5秒
- 當發生錯誤重啓,若是你當消費能力強,可能形成重複消費5秒內當Offset
- 若是消費能力比較弱,也可能提交的Offset沒有消費完,形成Offset丟失
- 消費者使用pull的方式去拿消息
- 簡化kafka實現,消費者本身控制消費進度,不會有消息積壓的壓力
- kafka經過長輪詢/長鏈接來提升pull的實時性
- 能夠設置消費端緩存消息大小:queue.buffering.max.meesages ,在自動提交模式下緩存大小須要適當控制
Kafka強一致性保證
- Producer同步複製,性能降低
- Kafka冪等設置
- Kafka使用Produce Id和sequence number實現冪等,判斷一次提交的全部消息的seq num同樣,Produce Id由zookeeper隨機生成,每次不同
- 單分區冪等,不支持分區冪等,也就是當從新分配key與分區關係當時候不支持冪等
- 單會話冪等,不提供重啓Prodicer後單冪等
- Kafka當冪等大部分狀況下有效,單不能徹底信任
- enabled.idempotence=true, 此時就會默認把acks設置爲all,因此不須要再設置acks屬性了。也就是說冪等自動開啓同步複製?
- 消費者手動提交offset
- 分區事物管理
- 安全關閉
- producer.close():優先把消息處理完畢,優雅退出。
- producer.close(timeout): 超時時,強制關閉。
- 可重試/不可重試異常區別對待
Spring Cloud Stream 使用Kafka
Producer 生產數據
- 隨機發送
- spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression不設置
- spring.cloud.stream.bindings.[channelName].producer.partitionCount=10
- 若是partitionCount > Partition數量, 會報錯,但若是autoAddPartitions=true,則不報錯而自動添加Partition,與instanceCount無關
- 若是若是partitionCount < Partition數量,則會被Partition數量覆蓋
- 定向發送
- spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression=payload.currency
- 當Partition變多後,重啓會從新分配,可是不重啓的狀況下仍是保持不變,也就是說消費者自動添加Partiton後,還須要重啓生產者
Consumer Offset管理
- Offset的起駛位置
- spring.cloud.stream.kafka.bindings.channelName.consumer.resetOffsets
- true : 每次從startOffset開始
- false:從Consumer當前位置開始
- spring.cloud.stream.kafka.bindings.channelName.consumer.startOffset
- 新消費組都起駛位置,latest=0, earliest=lastOffset+1
- 不設置組名:groupId=anonymous
- resetOffsets 默認true, startOffset 默認latest, 因此默認每次都從最新都消息消費
- 沒法設置resetOffsets=false,由於沒法知道它當前Offset,可是能夠改變startOffset
- 設置組名:
- spring.cloud.stream.bindings.<channelName>.group
- resetOffsets 默認false, startOffset 默認earliest, 因此新消費組默認從0開始消費,而且擁有記錄Offset能力
- Offset 提交
- spring.cloud.stream.kafka.bindings.channelName.consumer.autoCommitOffset
- 默認true, 自動提交Offset
- false會在Message對象都header字段裏添加一個kafka_acknowledgment對象,能夠用來手動提交offset
- 可是它的AckMode.MANUAL並非馬上提交的,而是全部pull到的Offset都處理後批量自動提交,因此只能控制哪些Offset須要提交,不能控制何時提交
- spring.cloud.stream.kafka.bindings.channelName.consumer.ackEachRecord
- 默認false:當全部一次pull到的Offset都消費完裏以後(@StreamListener都方法執行完),纔會自動提交Offset
- true: 每個消息消費完都提交Offset
- Offset管理的是最後提交的Offset,而不是處理好的Offset的list
Consumer Partition分配
- 自動分配
- spring.cloud.stream.kafka.bindings.channelName.consumer.autoRebalanceEnabled=true
- 自動分配Partition給消費組成員,而且會在當前pull的消息被處理完後才分配,有效避免消息被重複消費,可是也不能徹底信任,好比消息處理緩慢形成心跳失敗或者pull輪詢使得它直接認爲成員丟失而進行rebalance,可是數據其實還在處理
- 若是Comsumer數量 > Partition數量, Consumer會閒置, 但若是autoAddPartitions=true,會根據 max(instanceCount*concurrency,minPartitionCount)自動添加,並在一段時間後自動分配
- 自動分配與instanceCount/instanceIndex。對concurrency的處理和手動分配同樣
- 手動分配
- autoRebalanceEnabled=false
- spring.cloud.stream.instanceCount : 根據Partition數量/(instanceCountconcurrency)數量來決定分配到這個instance的Partition。若是instanceCountconcurrency > Partition數量, 會報錯, 但若是autoAddPartitions=true,則不報錯而自動添加Partition
- spring.cloud.stream.instanceIndex : iinstanceIndex必須在indexCount範圍內。若是index同樣,將消費相同Partition的消息,這樣就違反了一個Partition只能被一個Comsumer Member消費的原則,形成消息的重複消費
- spring.cloud.stream.bindings.channelName.consumer.partitioned????????
- Instance併發消費
- spring.cloud.stream.bindings.channelName.consumer.concurrency=10
- 同一個instance的同一個channel的多個消費者Listener也會消費相同Partition的消息,而且是同步處理,形成低性能的消息的重複消費,與concurrency無關,也就是說spring的一個instance的一個channel就只能有一個consumer的@StreamListener。是否能夠定義多個channel監聽同一個topic來實現instance級別的concurrency?
- concurrency=10表明你想在一個instance中啓動10consumer線程去處理10個partition的message
數據強一致性
- 保證消息至少被髮送一次
- 保證消息只被發送一次
- 保證消息至少被消費一次
- ackEachRecord=false保證消息至少被消費一次,可是能夠有一整批消息會被消費屢次
- ackEachRecord=true保證消息至少被消費一次,並且保證Consumer故障減小後?消息不會被重複消費,可是不保證增長Consumer
- 保證消息只被消費一次
性能優化
在線水平伸縮
歡迎關注本站公眾號,獲取更多信息