【大數據實踐】Kafka生產者編程(4)——ProducerConfig詳解(上)

前言

前面的文章對producer流程及其可自定義的配置類作了大致介紹,本文將繼續對Kafka生成者編程相關知識點進行講解。ProducerConfig類存放着producer客戶端可配置的項以及其對應的解釋文檔,在本文中,主要根據其說明文檔,分析kafka內部的一些機制和原理。node

備註:apache

  • 本文章中針對的是kafka-clients 1.1.0版本。
  • ProducerConfig 類在包org.apache.kafka.clients.producer中。

ProducerConfig各配置項

bootstrap.servers

重要性:高
類型:List
默認值:Collections.emptyList()編程

引導producer查找Kafka集羣全部broker的引導服務地址列表bootstrap

顧名思義,該配置項是引導服務列表,即用於查找Kafka集羣中全部broker的host:port列表,producer經過這些host:port與kafka集羣創建鏈接。producer用該列表中的地址只是用於發現kafka集羣中全部的服務broker,而在kafka集羣中,broker多是動態改變的。另外,Kafka機制中,能夠經過某一個broker而查詢到全部其餘broker,因此在bootstrap.servers中,並不須要配置全部broker的host:port,理想狀況下,只須要配置其中的某一個就能夠了。但爲了提升可用性,避免因該broker掛掉而致使沒法查找,那麼能夠選擇配置多個。segmentfault

配置格式爲:安全

host1:port1,host2:port2,...

metadata.max.age.ms

重要性:高
類型:Long
默認值:300000毫秒,即5分鐘服務器

元數據最大生存時間,每隔metadata.max.age.ms時間,producer客戶端會強制刷新一遍元數據metadata,即便沒有任何partition leadership主動發現新的broker或者新的partition。網絡

元數據

元數據類org.apache.kafka.clients#Metadata中,除了記錄一些和自身更新策略有關的信息(metadata的更新策略值得另開一篇文章分析)。還保存了kafka集羣的一些信息,參見org.apache.kafka.common#Cluster類:函數

  • 集羣中全部結點broker node列表,Node結點中記錄告終點的ip、port以及機架信息(rack)。oop

    機架信息(rack):broker的機架信息,相似於Hadoop那樣,能夠更好地利用局部性原 
    理減小集羣中網絡開銷。若是指定了機架信息(brooker.rack), Kafka在爲分區作副 
    本分配時就會考慮這部分信息,儘量地爲副本挑選不一樣機架的broker。
  • 集羣中每個TopicPartition,對應的分區信息PartitionInfo。org.apache.kafka.common#PartitionInfo中主要記錄了以下信息:

    • 分區所屬的topic。
    • 分區partition編號。
    • 分區的leader所在結點。
    • 分區副本結點列表。
    • 分區副本同步結點隊列(ISR)。
    • 離線副本結點隊列。
  • 集羣中的控制結點信息。

    控制結點broker:負責管理整個集羣中分區和副本的狀態,好比partition的leader 副本故障,由controller 負責爲該partition從新選舉新的leader 副本;當檢測到ISR列表發生變化,有controller通知集羣中全部broker更新其MetadataCache信息;或者增長某個topic分區的時候也會由controller管理分區的從新分配工做
  • 集羣中每一個topic對應的全部分區列表,至關於以topic做爲索引。
  • 集羣中每一個topic對應的可用分區列表。
  • 集羣中每一個結點broker node對應的全部分區列表,至關於以broker.id做爲索引。
  • 集羣中每一個結點ID(broker.id)對應的結點信息。

batch.size

重要性:高
類型:Long
默認值:16384字節,即16K

消息記錄batch(批)大小限制。kafka producer在將消息記錄record發送到集羣時,會嘗試將一批要發送到相同partition的消息記錄壓縮在一塊兒,稱之爲batch(批)。每次request,其實不是發送一個record,而是發送若干個batch,而每一個batch裏面可能包含多個record。這樣成批成批的發送,減小了網絡請求,有助於提高producer客戶端和kafka集羣服務的性能。

batch.size就是用來設置一個batch的最大字節數byte。當設置爲0時,表示徹底禁用batch的功能。若是batch.size設置過大,那麼可能形成內存浪費,由於每一個發送到不一樣partition的record都須要預先分配一塊batch.size大小的內存。

acks

重要性:高
類型:String
默認值:"1"

應答數設置。producer只有接收到來自server的acks指定數量的應答,纔會認爲發送給server的消息記錄已送達。該配置項用於控制已發送消息記錄的持久性,有如下幾種設置值:

  • acks = 0:表示producer無需等待server端的應答消息,producer將record扔到發送緩衝區,就認爲該record已經發送,而後轉身走人。這種狀況沒法保證server端真的成功接收到該消息記錄,且此時即便retries配置項也沒法生效,由於producer沒法知道是否失敗。另外,每一個record返回的offset都被設爲-1。
  • acks = 1:表示接收該消息記錄的分區leader將消息記錄寫入到本地log文件,就返回Acknowledgement,告知producer本次發送已完成,而無需等待其餘follower分區的確認。這種狀況下,可能出現消息記錄沒有備份的狀況(follower宕機等)。
  • acks = all:表示消息記錄只有獲得分區leader以及其餘分區副本同步結點隊列(ISR)中的分區follower的確認以後,才能回覆acknowlegement,告知producer本次發送已完成。這種狀況下,只要分區副本同步結點隊列(ISR)中某一個follower存活,那麼消息記錄就不會被丟失。這種方式最安全,但效率也最低。
  • acks = -1:等同於acks = all

linger.ms

重要性:中
類型:Long
默認值:0毫秒,表示無延時,當即發送。

延遲發送消息記錄的時間,上面及前面文章中也已經提到過,producer在發送消息記錄record的時候,會將發送到同一個partition的records壓縮在batch中。但一般這隻發生在records到達速度快於records發送速度的狀況下,很容易理解:若是發送速度大於record到達速度,則每來一個record都會被當即發送出去,根本不存在將多個records壓縮爲一個的可能。

但不少時候,即使是發送速度大於到達速度,咱們也不但願每一個record就發送一次,仍是但願分批次發送,以減小發送次數,提高producer客戶端和服務器端的性能。爲此,咱們須要人爲地加一個發送延遲限制,即每次發送之間,存在必定的時間間隔linger.ms,在這段時間內,可能有多個records到達,此時就能夠對他們分組壓縮,成批次發送。這相似於TCP的擁塞控制方法。

注意:

  • linger.ms設置了發送延遲的最高時間上限,另外一個配置項batch.size也同時控制着發送的時機。若是爲某個partition壓縮的batch字節數已經達到了batch.size設置的字節數,那麼該batch將被當即發送到指定的partition,即便此時延遲時間還沒達到linger.ms的設置。
  • 一樣的,若是延遲的時間已經達到了linger.ms的設置,那麼即便壓縮累積的batch沒有達到batch.size設置的字節數,也會被髮送到指定的partition。
  • linger.ms是針對每個發送到partition的request。即不一樣partition的request並非同時發送的。
  • 延遲覺得這性能下降,須要在延遲和性能之間進行平衡,找到一個合適的linger.ms值。

client.id

重要程度:中
類型:String
默認值:""

producer 客戶端ID,在建立request時,會傳送到kafka服務。其目的是爲了跟蹤記錄請求的來源,雖然服務端能夠經過ip/port來追蹤請求的來源,但ip/port沒法表達業務語義,因此,能夠經過client.id來設置一個富有業務邏輯語義的名字(如PDK遊戲),有助於後續的分析和記錄。

send.buffer.bytes

重要程度:中
類型:int
默認值:131072字節,即128K。

TCP發送緩衝區(SO_SNDBUF)的大小,若send.buffer.bytes設爲-1,則使用操做系統的默認值。

receive.buffer.bytes

重要程度:中
類型:int
默認值:32768字節,即32K。

TCP接收緩衝區(SO_RCVBUF)大小,當receive.buffer.bytes設置爲-1,則使用操做系統默認的大小。

max.request.size

重要程度:中
類型:String
默認值:1048576字節,即1M。

一個請求request中最大字節數,用於限制producer發送的單個請求request中,record batches的最大數量,以免單個請求數據過於巨大。

max.request.size & batch.size

  • 一個請求request中,可能包含多個record batch。
  • max.request.size可能影響record batch的大小上限,即當batch.size 大於 max.request.size時,batch的上限就變成了max.request.size設置的大小。

reconnect.backoff.ms

重要性:低
類型:Long
默認值:50毫秒。

重連間隔時間,避免producer客戶端過於緊密循環地重連kafka服務broker。該值針對的是全部client到broker的鏈接。

reconnect.backoff.max.ms

重要性:低
類型:Long
默認值:1000毫秒

producer客戶端鏈接一個kafka服務(broker)失敗重連的總時間,每次鏈接失敗,重連時間都會指數級增長,每次增長的時間會存在20%的隨機抖動,以免鏈接風暴

鏈接風暴

應用啓動的時候,常常可能發生各應用服務器的鏈接數異常飆升的狀況。假設鏈接數的設置爲:min值3,max值10,正常的業務使用鏈接數在5個左右,當重啓應用時,各應用鏈接數可能會飆升到10個,瞬間甚至還有可能部分應用會報取不到鏈接。啓動完成後接下來的時間內,鏈接開始慢慢返回到業務的正常值。這就是所謂的鏈接風暴。

max.block.ms

重要性:低
類型:Long
默認值:1000毫秒

該配置值控制着KafkaProducer.send()函數以及KafkaProducer.partitionsFor()函數將阻塞的最大時間。另外當發送緩衝區滿或者metadata不可用時,這兩個方法也會被阻塞。若是阻塞發生在用戶提供的自定義序列化類serializers或者是自定義的分區類partitioner,那麼這些阻塞的時間不會被計算在該配置值之類。

小結

上面總結了ProducerConfig類中部分配置項,限於篇幅已經較長,剩餘部分的配置項將在後面另起一篇再作介紹。另外,在這篇文章中,本身有一個疑惑:

producer發送消息記錄到broker的時機,究竟是個什麼機制?從上述配置項的介紹中, batch.sizemax.request.sizelinger.ms這幾個配置項都會影響其發送時機。

先在此記錄,後續搞明白了再更新吧。若是有大牛可以幫我回答這個問題,能夠在評論中幫我解答。

相關文章
相關標籤/搜索