基本概念
整個Kafka體系結構中引入瞭如下3個術語。apache
(1)Producer:生產者,也就是發送消息的一方。生產者負責建立消息,而後將其投遞到Kafka中。數組
(2)Consumer:消費者,也就是接收消息的一方。消費者鏈接到Kafka上並接收消息,進而進行相應的業務邏輯處理。緩存
<!--more-->服務器
(3)Broker:服務代理節點。對於Kafka而言,Broker能夠簡單地看做一個獨立的Kafka服務節點或Kafka服務實例。大多數狀況下也能夠將Broker看做一臺Kafka服務器,前提是這臺服務器上只部署了一個Kafka實例。一個或多個Broker組成了一個Kafka集羣。通常而言,咱們更習慣使用首字母小寫的broker來表示服務代理節點。網絡
在Kafka中還有兩個特別重要的概念—主題(Topic)與分區(Partition)。Kafka中的消息以主題爲單位進行歸類,生產者負責將消息發送到特定的主題(發送到Kafka集羣中的每一條消息都要指定一個主題),而消費者負責訂閱主題並進行消費。架構
主題是一個邏輯上的概念,它還能夠細分爲多個分區,一個分區只屬於單個主題,不少時候也會把分區稱爲主題分區(Topic-Partition)。同一主題下的不一樣分區包含的消息是不一樣的,分區在存儲層面能夠看做一個可追加的日誌(Log)文件,消息在被追加到分區日誌文件的時候都會分配一個特定的偏移量(offset)。offset是消息在分區中的惟一標識,Kafka經過它來保證消息在分區內的順序性,不過offset並不跨越分區,也就是說,Kafka保證的是分區有序而不是主題有序。如圖所示,主題中有 4 個分區,消息被順序追加到每一個分區日誌文件的尾部。Kafka中的分區能夠分佈在不一樣的服務器(broker)上,也就是說,一個主題能夠橫跨多個broker,以此來提供比單個broker更強大的性能。性能
生產者
消息在真正發往Kafka以前,有可能須要經歷攔截器(Interceptor)、序列化器(Serializer)和分區器(Partitioner)等一系列的做用,那麼在此以後又會發生什麼呢?下面咱們來看一下生產者客戶端的總體架構.線程
producer 流程:
整個生產者客戶端由兩個線程協調運行,這兩個線程分別爲主線程和Sender線程(發送線程)。代理
在主線程中由KafkaProducer建立消息,而後經過可能的攔截器、序列化器和分區器的做用以後緩存到消息累加器(RecordAccumulator,也稱爲消息收集器)中。日誌
Sender 線程負責從RecordAccumulator中獲取消息並將其發送到Kafka中。
主線程
<font color=#C7063 size=3>生產者攔截器</font> 能夠用來在消息發送前作一些準備工做,好比按照某個規則過濾不符合要求的消息、修改消息的內容等,也能夠用來在發送回調邏輯前作一些定製化的需求,好比統計類工做。生產者攔截器的使用也很方便,主要是自定義實現 org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3個方法
KafkaProducer() 在將消息序列化和計算分區以前會調用生產者攔截器的onSend()方法來對消息進行相應的定製化操做。
KafkaProducer() 在消息被應答(Acknowledgement)以前或消息發送失敗時調用生產者攔截器的onAcknowledgement()方法,優先於用戶設定的 Callback 以前執行。
close() 主要用於在關閉攔截器時執行一些資源的清理工做。在這 3 個方法中拋出的異常都會被捕獲並記錄到日誌中,但並不會再向上傳遞。
<font color=#C7063 size=3>序列化器</font> 生產者須要用序列化器(Serializer) 把對象轉換成字節數組才能經過網絡發送給Kafka,消費者須要用反序列化器(Deserializer)把從 Kafka 中收到的字節數組轉換成相應的對象,Serializer接口中包含3個方法
configure() 方法用來配置當前類
serialize() 方法用來執行序列化操做
close() 方法用來關閉當前的序列化器,通常狀況下 close()是一個空方法,若是實現了此方法,則必須確保此方法的冪等性,由於這個方法極可能會被KafkaProducer調用屢次。
<font color=#C7063 size=3>分區器</font> 分區器的做用就是爲消息分配分區。序列化器是必需的。消息通過序列化以後就須要肯定它發往的分區,若是消息ProducerRecord中指定了partition字段,那麼就不須要分區器的做用,由於partition表明的就是所要發往的分區號。
消息累加器 RecordAccumulator
RecordAccumulator 主要用來緩存消息以便 Sender 線程能夠批量發送,進而減小網絡傳輸的資源消耗以提高性能。RecordAccumulator 緩存的大小能夠經過生產者客戶端參數buffer.memory 配置,默認值爲 32MB。 若是生產者發送消息的速度超過發送到服務器的速度,則會致使生產者空間不足,這個時候KafkaProducer的send()方法調用要麼被阻塞,要麼拋出異常,這個取決於參數max.block.ms的配置,此參數的默認值爲60000,即60秒。
主線程中發送過來的消息都會被追加到RecordAccumulator的某個雙端隊列(Deque)中,在RecordAccumulator 的內部爲每一個分區都維護了一個雙端隊列,隊列中的內容就是ProducerBatch,即 Deque<ProducerBatch>。消息寫入緩存時,追加到雙端隊列的尾部;Sender讀取消息時,從雙端隊列的頭部讀取。注意ProducerBatch不是ProducerRecord,ProducerBatch中能夠包含一至多個 ProducerRecord。
通俗地說,ProducerRecord 是生產者中建立的消息,而ProducerBatch是指一個消息批次,ProducerRecord會被包含在ProducerBatch中,這樣可使字節的使用更加緊湊。與此同時,將較小的ProducerRecord拼湊成一個較大的ProducerBatch,也能夠減小網絡請求的次數以提高總體的吞吐量
sender 線程
Sender 從 RecordAccumulator 中獲取緩存的消息以後,會進一步將本來<分區,Deque<ProducerBatch>>的保存形式轉變成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集羣的broker節點,Sender 還會進一步封裝成<Node,Request>的形式,這樣就能夠將Request請求發往各個Node了,這裏的Request是指Kafka的各類協議請求,請求在從Sender線程發往Kafka以前還會保存到InFlightRequests中,InFlightRequests保存對象的具體形式爲Map<NodeId,Deque<Request>> 它的主要做用是緩存了已經發出去但尚未收到響應的請求(NodeId 是一個 String 類型,表示節點的 id 編號)。
原創不易,若是以爲有點用的話,請絕不留情點個贊,轉發一下,這將是我持續輸出優質文章的最強動力。