初識kafka中的生產者與消費者

發送生產消息的大體流程:服務器

1. 建立生產者對象,生產者發送包裝消息的ProducerRecord
2. 生產者經過send方法發送消息
3. 消息被序列化
4. 消息計算出分區
5. 根據分區消息被分配到指定主題和分區的批次中
6. 批量發送到broker
7. broker判斷是否消息失敗,成功則直接返回元數據【可選】,失敗判斷是否重試,對應作相應處理

如何建立生產者對象?

必須指定3個屬性:broker地址,key的序列化方式,value的序列化方式。其它可選參數,包括重試次數,內存緩衝大小,每次發送消息的批次大小,是否壓縮等等

Avro序列化簡介

它是一種與語言無關的序列化格式。數據經過schema來定義,若是出現讀的schema與寫的shema不一致的時候,不會拋出遺產,而選擇返回默認值。使用的時候,在註冊表中註冊一個schema,消息字段schema的標識,而後存放到broker中,消費者使用標識符從註冊表中拉取schema進行解析獲得結果
網絡


如何發送消息?

1. 同步方式:構建消息的封裝ProducerRecord,經過生產者的send方法發送便可,能夠用Future的方式接收返回的RecordMetadata
2. 異步方式:同步發消息若是服務器之間通訊的時間是10ms,那麼1s只能發100個消息,所以不等待的方式(異步)能夠節省時間,增長吞吐
3. 發送並忘記:只管發送,不處理任何返回值

發送消息的過程當中出了異常怎麼辦?

kafka異常基本有兩類,一是可以重試的方式,好比網絡鏈接段了,一是不會重連,好比消息太大,會直接拋異常,對於異步來說,能夠經過使用回調函數來處理期間出現的異常


代碼上如何建立消費者並訂閱主題?

使用Propertites[包含 server,key.deserializer和value.deserializer]初始化 KafkaConsumer,經過consumer.subscribe便可訂閱主題,主題能夠是一個列表或者是一表達式

異步

代碼上消費者是如何獲取數據的?

輪詢。消費者訂閱了主題後,輪詢中處理全部細節,包括羣組協調、分區再平衡、發送心跳和獲取數據函數

如何優雅退出輪詢?添加shutdownhook,在鉤子裏頭調用消費者的wakeup方法,這樣若是讀取poll,會拋出wakeup異常,而後調用close方法,保證最後的提交都已經完成,而且告知羣組協調器,本身要離開羣組,而後就觸發了再均衡


消費者和線程之間的關係是什麼?

一個羣組裏面有多個消費者,一個消費者只有一個線程
spa


爲何kafka可以從上次斷開的地方再開始讀取消息?

kafka對每一個分區都有一個偏移量,來跟蹤當前消息消費到哪兒去了,若是配置自動提交(更新分區當前位置),默認每5s就上報一次從poll中獲取的收到的最大偏移量。可是這種自動方式若是在小於默認的時間以內發生了再均衡,會照成消息重複消費

線程

想本身提交偏移量,避免自動提交存在的問題怎麼辦? 1. 同步提交 [commitSync()],提交最後一次的偏移量。只要不是不可恢復的問題,就會一直重試,可是在broker對提交作出反應前,會一直阻塞,有可能成爲吞吐量的瓶頸  ; 2. 異步提交[commitAsync()],提交最後一次的偏移量。不重試,若是異步提交出現問題,能夠經過回調來觀察
某些操做我必定要成功,可是又不想每次阻塞,怎麼辦? 混用同步提交和異步提交。在消息處理的時候異步提交,若是出了問題就catch住,而後同步提交
同步提交和異步提交都只能對最後一次進行提交,我想更頻繁的,更自助的控制好提交的頻率,怎麼作? 用map存儲每一個分區的偏移量,而後根據本身的需求,在讀取消息後,異步提交整個map
相關文章
相關標籤/搜索