前面的文章對producer流程及其可自定義的配置類作了大致介紹,本文將繼續對Kafka生成者編程相關知識點進行講解。ProducerConfig
類存放着producer客戶端可配置的項以及其對應的解釋文檔,在本文中,主要根據其說明文檔,分析kafka內部的一些機制和原理。node
備註:apache
kafka-clients 1.1.0
版本。org.apache.kafka.clients.producer
中。重要性:高
類型:List
默認值:Collections.emptyList()編程
引導producer查找Kafka集羣全部broker的引導服務地址列表。bootstrap
顧名思義,該配置項是引導服務列表,即用於查找Kafka集羣中全部broker的host:port
列表,producer經過這些host:port
與kafka集羣創建鏈接。producer用該列表中的地址只是用於發現kafka集羣中全部的服務broker,而在kafka集羣中,broker多是動態改變的。另外,Kafka機制中,能夠經過某一個broker而查詢到全部其餘broker,因此在bootstrap.servers
中,並不須要配置全部broker的host:port
,理想狀況下,只須要配置其中的某一個就能夠了。但爲了提升可用性,避免因該broker掛掉而致使沒法查找,那麼能夠選擇配置多個。segmentfault
配置格式爲:安全
host1:port1,host2:port2,...
重要性:高
類型:Long
默認值:300000毫秒,即5分鐘服務器
元數據最大生存時間,每隔metadata.max.age.ms
時間,producer客戶端會強制刷新一遍元數據metadata,即便沒有任何partition leadership主動發現新的broker或者新的partition。網絡
元數據類org.apache.kafka.clients#Metadata
中,除了記錄一些和自身更新策略有關的信息(metadata的更新策略值得另開一篇文章分析)。還保存了kafka集羣的一些信息,參見org.apache.kafka.common#Cluster
類:函數
集羣中全部結點broker node列表,Node結點中記錄告終點的ip、port以及機架信息(rack)。oop
機架信息(rack):broker的機架信息,相似於Hadoop那樣,能夠更好地利用局部性原 理減小集羣中網絡開銷。若是指定了機架信息(brooker.rack), Kafka在爲分區作副 本分配時就會考慮這部分信息,儘量地爲副本挑選不一樣機架的broker。
集羣中每個TopicPartition,對應的分區信息PartitionInfo。org.apache.kafka.common#PartitionInfo
中主要記錄了以下信息:
集羣中的控制結點信息。
控制結點broker:負責管理整個集羣中分區和副本的狀態,好比partition的leader 副本故障,由controller 負責爲該partition從新選舉新的leader 副本;當檢測到ISR列表發生變化,有controller通知集羣中全部broker更新其MetadataCache信息;或者增長某個topic分區的時候也會由controller管理分區的從新分配工做
重要性:高
類型:Long
默認值:16384字節,即16K
消息記錄batch(批)大小限制。kafka producer在將消息記錄record發送到集羣時,會嘗試將一批要發送到相同partition的消息記錄壓縮在一塊兒,稱之爲batch(批)。每次request,其實不是發送一個record,而是發送若干個batch,而每一個batch裏面可能包含多個record。這樣成批成批的發送,減小了網絡請求,有助於提高producer客戶端和kafka集羣服務的性能。
batch.size
就是用來設置一個batch的最大字節數byte。當設置爲0時,表示徹底禁用batch的功能。若是batch.size
設置過大,那麼可能形成內存浪費,由於每一個發送到不一樣partition的record都須要預先分配一塊batch.size大小的內存。
重要性:高
類型:String
默認值:"1"
應答數設置。producer只有接收到來自server的acks指定數量的應答,纔會認爲發送給server的消息記錄已送達。該配置項用於控制已發送消息記錄的持久性,有如下幾種設置值:
acks = 0
:表示producer無需等待server端的應答消息,producer將record扔到發送緩衝區,就認爲該record已經發送,而後轉身走人。這種狀況沒法保證server端真的成功接收到該消息記錄,且此時即便retries
配置項也沒法生效,由於producer沒法知道是否失敗。另外,每一個record返回的offset都被設爲-1。acks = 1
:表示接收該消息記錄的分區leader將消息記錄寫入到本地log文件,就返回Acknowledgement,告知producer本次發送已完成,而無需等待其餘follower分區的確認。這種狀況下,可能出現消息記錄沒有備份的狀況(follower宕機等)。acks = all
:表示消息記錄只有獲得分區leader以及其餘分區副本同步結點隊列(ISR)中的分區follower的確認以後,才能回覆acknowlegement,告知producer本次發送已完成。這種狀況下,只要分區副本同步結點隊列(ISR)中某一個follower存活,那麼消息記錄就不會被丟失。這種方式最安全,但效率也最低。acks = -1
:等同於acks = all
。重要性:中
類型:Long
默認值:0毫秒,表示無延時,當即發送。
延遲發送消息記錄的時間,上面及前面文章中也已經提到過,producer在發送消息記錄record的時候,會將發送到同一個partition的records壓縮在batch中。但一般這隻發生在records到達速度快於records發送速度的狀況下,很容易理解:若是發送速度大於record到達速度,則每來一個record都會被當即發送出去,根本不存在將多個records壓縮爲一個的可能。
但不少時候,即使是發送速度大於到達速度,咱們也不但願每一個record就發送一次,仍是但願分批次發送,以減小發送次數,提高producer客戶端和服務器端的性能。爲此,咱們須要人爲地加一個發送延遲限制,即每次發送之間,存在必定的時間間隔linger.ms
,在這段時間內,可能有多個records到達,此時就能夠對他們分組壓縮,成批次發送。這相似於TCP的擁塞控制方法。
注意:
linger.ms
設置了發送延遲的最高時間上限,另外一個配置項batch.size
也同時控制着發送的時機。若是爲某個partition壓縮的batch字節數已經達到了batch.size
設置的字節數,那麼該batch將被當即發送到指定的partition,即便此時延遲時間還沒達到linger.ms
的設置。linger.ms
的設置,那麼即便壓縮累積的batch沒有達到batch.size
設置的字節數,也會被髮送到指定的partition。linger.ms
是針對每個發送到partition的request。即不一樣partition的request並非同時發送的。linger.ms
值。重要程度:中
類型:String
默認值:""
producer 客戶端ID,在建立request時,會傳送到kafka服務。其目的是爲了跟蹤記錄請求的來源,雖然服務端能夠經過ip/port來追蹤請求的來源,但ip/port沒法表達業務語義,因此,能夠經過client.id
來設置一個富有業務邏輯語義的名字(如PDK遊戲),有助於後續的分析和記錄。
重要程度:中
類型:int
默認值:131072字節,即128K。
TCP發送緩衝區(SO_SNDBUF)的大小,若send.buffer.bytes
設爲-1,則使用操做系統的默認值。
重要程度:中
類型:int
默認值:32768字節,即32K。
TCP接收緩衝區(SO_RCVBUF)大小,當receive.buffer.bytes
設置爲-1,則使用操做系統默認的大小。
重要程度:中
類型:String
默認值:1048576字節,即1M。
一個請求request中最大字節數,用於限制producer發送的單個請求request中,record batches的最大數量,以免單個請求數據過於巨大。
重要性:低
類型:Long
默認值:50毫秒。
重連間隔時間,避免producer客戶端過於緊密循環地重連kafka服務broker。該值針對的是全部client到broker的鏈接。
重要性:低
類型:Long
默認值:1000毫秒
producer客戶端鏈接一個kafka服務(broker)失敗重連的總時間,每次鏈接失敗,重連時間都會指數級增長,每次增長的時間會存在20%的隨機抖動,以免鏈接風暴
。
應用啓動的時候,常常可能發生各應用服務器的鏈接數異常飆升的狀況。假設鏈接數的設置爲:min值3,max值10,正常的業務使用鏈接數在5個左右,當重啓應用時,各應用鏈接數可能會飆升到10個,瞬間甚至還有可能部分應用會報取不到鏈接。啓動完成後接下來的時間內,鏈接開始慢慢返回到業務的正常值。這就是所謂的鏈接風暴。
重要性:低
類型:Long
默認值:1000毫秒
該配置值控制着KafkaProducer.send()
函數以及KafkaProducer.partitionsFor()
函數將阻塞的最大時間。另外當發送緩衝區滿或者metadata不可用時,這兩個方法也會被阻塞。若是阻塞發生在用戶提供的自定義序列化類serializers或者是自定義的分區類partitioner,那麼這些阻塞的時間不會被計算在該配置值之類。
上面總結了ProducerConfig
類中部分配置項,限於篇幅已經較長,剩餘部分的配置項將在後面另起一篇再作介紹。另外,在這篇文章中,本身有一個疑惑:
producer發送消息記錄到broker的時機,究竟是個什麼機制?從上述配置項的介紹中,batch.size
,max.request.size
,linger.ms
這幾個配置項都會影響其發送時機。
先在此記錄,後續搞明白了再更新吧。若是有大牛可以幫我回答這個問題,能夠在評論中幫我解答。