發送生產消息的大體流程:服務器
1. 建立生產者對象,生產者發送包裝消息的ProducerRecord它是一種與語言無關的序列化格式。數據經過schema來定義,若是出現讀的schema與寫的shema不一致的時候,不會拋出遺產,而選擇返回默認值。使用的時候,在註冊表中註冊一個schema,消息字段schema的標識,而後存放到broker中,消費者使用標識符從註冊表中拉取schema進行解析獲得結果
網絡
使用Propertites[包含 server,key.deserializer和value.deserializer]初始化 KafkaConsumer,經過consumer.subscribe便可訂閱主題,主題能夠是一個列表或者是一表達式
異步
輪詢。消費者訂閱了主題後,輪詢中處理全部細節,包括羣組協調、分區再平衡、發送心跳和獲取數據函數
如何優雅退出輪詢?添加shutdownhook,在鉤子裏頭調用消費者的wakeup方法,這樣若是讀取poll,會拋出wakeup異常,而後調用close方法,保證最後的提交都已經完成,而且告知羣組協調器,本身要離開羣組,而後就觸發了再均衡
一個羣組裏面有多個消費者,一個消費者只有一個線程
spa
kafka對每一個分區都有一個偏移量,來跟蹤當前消息消費到哪兒去了,若是配置自動提交(更新分區當前位置),默認每5s就上報一次從poll中獲取的收到的最大偏移量。可是這種自動方式若是在小於默認的時間以內發生了再均衡,會照成消息重複消費
線程
想本身提交偏移量,避免自動提交存在的問題怎麼辦?
1. 同步提交 [commitSync()],提交最後一次的偏移量。只要不是不可恢復的問題,就會一直重試,可是在broker對提交作出反應前,會一直阻塞,有可能成爲吞吐量的瓶頸 ;
2. 異步提交[commitAsync()],提交最後一次的偏移量。不重試,若是異步提交出現問題,能夠經過回調來觀察
某些操做我必定要成功,可是又不想每次阻塞,怎麼辦?
混用同步提交和異步提交。在消息處理的時候異步提交,若是出了問題就catch住,而後同步提交
同步提交和異步提交都只能對最後一次進行提交,我想更頻繁的,更自助的控制好提交的頻率,怎麼作?
用map存儲每一個分區的偏移量,而後根據本身的需求,在讀取消息後,異步提交整個map