注:此文並不是官方文檔的翻譯java
kafka的producer默認是異步的方式,在大數據量的狀況下可能會出現丟失數據的狀況.可是同步的方式又比較低效,所以合理設置異步producer下的kafka參數既能夠提升效率又能夠不丟失數據.只是要對各參數有一個比較深刻的瞭解.下面是我總結的對於處理安全外幾乎全部producer參數的理解:python
以python客戶端爲例子,java的參數名可能稍有不一樣可是含義是同樣的算法
例子: producer = KafkaProducer(bootstrap_servers='xxx1:9092,xxx2:9092', acks=1,retries =3, batch_size=524288, reconnect_backoff_max_ms=3000, buffer_memory=536870912 ) // producer默認是異步的 f = producer.send("topic_name","hello") //f.get(timeout=3) 若是加了get就變成了同步,也就是說要等待get到服務端返回的結果後再往下執行
格式爲host[:port]例如localhost:9092,是kafka鏈接的broker地址列表,能夠是多臺,用逗號分隔bootstrap
客戶端名稱,用來追查日誌的,默認是kafka-python-producer-# (#是個惟一編號)安全
key序列化函數. 默認值: None.服務器
值序列化函數默認值: None.網絡
表明kafka收到消息的答覆數,0就是不要答覆,愛收到沒收到.1就是有一個leader broker答覆就行,all是全部broker都要收到才行app
0: Producer不等待kafka服務器的答覆,消息馬上發往socket buffer,這種方式不能保證kafka收到消息,設置成這個值的時候retries參數就失效了,由於producer不知道kafka收沒收到消息,因此所謂的重試就沒有意義了,發送返回值的offset全默認是-1.異步
1: 等待leader記錄數據到broker本地log便可.不等待leader同步到其餘followers,那麼假如此時恰好leader收到消息並答覆後,leader忽然掛了,其餘fowller還沒來得及複製消息呢,那麼這條消息就會丟失了.socket
all:等待全部broker記錄消息.保證消息不會丟失(只要從節點沒全掛),這種方式是最高可用的 acks默認值是1.
發消息時候的壓縮類型能夠是gzip,snappy,lz4,None,壓縮是針對batches的,因此batches的大小會影響壓縮效率,大一點的壓縮比例可能好些,要是過小的話壓縮就沒有意義了,好比你就發個幾個字節的數據那壓完沒準更大了.至於何時啓用壓縮,要看應用場景,啓用後producer會變慢,但網絡傳輸帶寬佔用會減小,帶寬緊缺建議開啓壓縮,帶寬充足就不用開了 默認值:None
重試發送次數,有時候網絡出現短暫的問題的時候,會自動重發消息,前面提到了這個值是須要在acks=1或all時候纔有效.若是設置了該參數,可是setting max_in_flight_requests_per_connection沒有設置爲1的話,可能形成消息順序的改變,由於若是2個batches發到同一個partition,可是第一個失敗重發了,那麼就會形成第二個batches跑到前面去了. Default: 0.
批處理消息字節數,發往broker的消息會包含多個batches,每一個分區對應一個batch,batch小了會減少響吞吐量,batch爲0的話就禁用了batch發送,默認值:16384(16kb)
逗留時間,這個逗留指的是消息不當即發送,而是逗留這個時間後一塊發送,這個設置是比較有用的,有時候消息產生的要比可以發送的要快,這個參數完美的實現了一我的工的延遲,使得大批量能夠聚合到一個batch裏一塊發送.當batch慢了的話,會忽略這個參數當即發送,這個參數有點相似TCP協議中的Nagle算法. Default: 0.
分區函數,用來人工干預消息發到到哪一個分區,這個函數會在key serialization後調用,函數原型:partitioner(key_bytes, all_partitions, available_partitions), 默認是對key作murmur2算法的hash(跟java客戶端算法相同),hash值相同的到一個分區,沒有key的話就是隨機分區.
當消息發送速度大於kafka服務器接收的速度,producer會阻塞max_block_ms,超時會報異常,buffer_memory用來保存等待發送的消息,默認33554432(32MB)
當buffer滿了或者metadata獲取不到(好比leader掛了),或者序列化沒完成分區函數沒計算完等等狀況下的最大阻塞時間,默認60000ms (60秒)
消息的最大大小限制,也就是說send的消息大小不能超過這個限制,這個限制只改一個地方是不行的producer, broker, consumer都要改才行. Default: 1048576.(1MB)
metadata的刷新時間,每通過這個時間就刷新下metadata來發現新的分區或broker,無論有沒有都會刷新下看看 Default: 300000(5分鐘)
– 重試發送若是仍是錯誤要等待下次重試的時間,單位毫秒 Default: 100.
發送請求的超時時間 Default: 30000.(30秒)
TCP receiver buffer(SO_RCVBUF)大小,就是接收數據的緩衝區大小 默認:None(根據操做系統設置).Java Client默認是32768
TCP send buffer (SO_SNDBUF) 發送數據的緩衝區大小 默認:None(根據操做系統設置).Java Client默認是131072.
socket選項 默認: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
嘗試從新鏈接broker的時間間隔 Default: 50.
reconnect_backoff_ms每次再次鏈接失敗會以指數增加,增加到的最大限度就是這個參數,爲了不鏈接風暴,鏈接重試的時間間隔會在一個範圍內隨機調整,上浮或下調20%,也就是說每次重連的時間間隔不必定就是這個值自己,而是上下浮動20%. 另外這裏解釋下鏈接風暴,當咱們的kafka集羣出現問題後,全部的producer和consumer都會嘗試重連,重連的間隔就會達到這個參數所設置的最大值,好比你們都是每秒嘗試重連,這時候若是集羣回覆了,那麼在同一秒可能就會有大量的鏈接打到kafka集羣上,這就形成了鏈接風暴,可是若是隨機上下浮動就可能把重連時間給錯開,不會形成同事的大量鏈接 Default: 1000.
發送多少條消息後,接收服務端確認,好比設置爲1,就是每發一條就要確認一條,設置爲5就是,發送5條消息等待一次確認 ,若是大於1,好比5,這種狀況下是會有一個順序問題的,就是這5條消息其中的一條發送失敗了,若是進行重試,那麼重發的消息實際上是在下個批次的,這就會形成消息順序的錯亂,因此若是須要保證消息的順序,建議設置此參數爲1 Default: 5.