kafka producer參數詳解

注:此文並不是官方文檔的翻譯java

kafka的producer默認是異步的方式,在大數據量的狀況下可能會出現丟失數據的狀況.可是同步的方式又比較低效,所以合理設置異步producer下的kafka參數既能夠提升效率又能夠不丟失數據.只是要對各參數有一個比較深刻的瞭解.下面是我總結的對於處理安全外幾乎全部producer參數的理解:python

以python客戶端爲例子,java的參數名可能稍有不一樣可是含義是同樣的算法

例子:
producer = KafkaProducer(bootstrap_servers='xxx1:9092,xxx2:9092',
        acks=1,retries =3,
        batch_size=524288,
        reconnect_backoff_max_ms=3000,
        buffer_memory=536870912
        )
  
// producer默認是異步的
f = producer.send("topic_name","hello")

//f.get(timeout=3) 若是加了get就變成了同步,也就是說要等待get到服務端返回的結果後再往下執行

bootstrap_servers

格式爲host[:port]例如localhost:9092,是kafka鏈接的broker地址列表,能夠是多臺,用逗號分隔bootstrap

client_id (str)

客戶端名稱,用來追查日誌的,默認是kafka-python-producer-# (#是個惟一編號)安全

key_serializer (callable)

key序列化函數. 默認值: None.服務器

value_serializer (callable)

值序列化函數默認值: None.網絡

acks (0, 1, 'all')

表明kafka收到消息的答覆數,0就是不要答覆,愛收到沒收到.1就是有一個leader broker答覆就行,all是全部broker都要收到才行app

  • 0: Producer不等待kafka服務器的答覆,消息馬上發往socket buffer,這種方式不能保證kafka收到消息,設置成這個值的時候retries參數就失效了,由於producer不知道kafka收沒收到消息,因此所謂的重試就沒有意義了,發送返回值的offset全默認是-1.異步

  • 1: 等待leader記錄數據到broker本地log便可.不等待leader同步到其餘followers,那麼假如此時恰好leader收到消息並答覆後,leader忽然掛了,其餘fowller還沒來得及複製消息呢,那麼這條消息就會丟失了.socket

  • all:等待全部broker記錄消息.保證消息不會丟失(只要從節點沒全掛),這種方式是最高可用的 acks默認值是1.

compression_type (str)

發消息時候的壓縮類型能夠是gzip,snappy,lz4,None,壓縮是針對batches的,因此batches的大小會影響壓縮效率,大一點的壓縮比例可能好些,要是過小的話壓縮就沒有意義了,好比你就發個幾個字節的數據那壓完沒準更大了.至於何時啓用壓縮,要看應用場景,啓用後producer會變慢,但網絡傳輸帶寬佔用會減小,帶寬緊缺建議開啓壓縮,帶寬充足就不用開了 默認值:None

retries (int)

重試發送次數,有時候網絡出現短暫的問題的時候,會自動重發消息,前面提到了這個值是須要在acks=1或all時候纔有效.若是設置了該參數,可是setting max_in_flight_requests_per_connection沒有設置爲1的話,可能形成消息順序的改變,由於若是2個batches發到同一個partition,可是第一個失敗重發了,那麼就會形成第二個batches跑到前面去了. Default: 0.

batch_size (int)

批處理消息字節數,發往broker的消息會包含多個batches,每一個分區對應一個batch,batch小了會減少響吞吐量,batch爲0的話就禁用了batch發送,默認值:16384(16kb)

linger_ms (int)

逗留時間,這個逗留指的是消息不當即發送,而是逗留這個時間後一塊發送,這個設置是比較有用的,有時候消息產生的要比可以發送的要快,這個參數完美的實現了一我的工的延遲,使得大批量能夠聚合到一個batch裏一塊發送.當batch慢了的話,會忽略這個參數當即發送,這個參數有點相似TCP協議中的Nagle算法. Default: 0.

partitioner (callable)

分區函數,用來人工干預消息發到到哪一個分區,這個函數會在key serialization後調用,函數原型:partitioner(key_bytes, all_partitions, available_partitions), 默認是對key作murmur2算法的hash(跟java客戶端算法相同),hash值相同的到一個分區,沒有key的話就是隨機分區.

buffer_memory (int)

當消息發送速度大於kafka服務器接收的速度,producer會阻塞max_block_ms,超時會報異常,buffer_memory用來保存等待發送的消息,默認33554432(32MB)

max_block_ms (int)

當buffer滿了或者metadata獲取不到(好比leader掛了),或者序列化沒完成分區函數沒計算完等等狀況下的最大阻塞時間,默認60000ms (60秒)

max_request_size (int)

消息的最大大小限制,也就是說send的消息大小不能超過這個限制,這個限制只改一個地方是不行的producer, broker, consumer都要改才行. Default: 1048576.(1MB)

metadata_max_age_ms (int)

metadata的刷新時間,每通過這個時間就刷新下metadata來發現新的分區或broker,無論有沒有都會刷新下看看 Default: 300000(5分鐘)

retry_backoff_ms (int)

– 重試發送若是仍是錯誤要等待下次重試的時間,單位毫秒 Default: 100.

request_timeout_ms (int)

發送請求的超時時間 Default: 30000.(30秒)

receive_buffer_bytes (int)

TCP receiver buffer(SO_RCVBUF)大小,就是接收數據的緩衝區大小 默認:None(根據操做系統設置).Java Client默認是32768

send_buffer_bytes (int)

TCP send buffer (SO_SNDBUF) 發送數據的緩衝區大小 默認:None(根據操做系統設置).Java Client默認是131072.

socket_options (list)

socket選項 默認: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]

reconnect_backoff_ms (int)

嘗試從新鏈接broker的時間間隔 Default: 50.

reconnect_backoff_max_ms (int)

reconnect_backoff_ms每次再次鏈接失敗會以指數增加,增加到的最大限度就是這個參數,爲了不鏈接風暴,鏈接重試的時間間隔會在一個範圍內隨機調整,上浮或下調20%,也就是說每次重連的時間間隔不必定就是這個值自己,而是上下浮動20%. 另外這裏解釋下鏈接風暴,當咱們的kafka集羣出現問題後,全部的producer和consumer都會嘗試重連,重連的間隔就會達到這個參數所設置的最大值,好比你們都是每秒嘗試重連,這時候若是集羣回覆了,那麼在同一秒可能就會有大量的鏈接打到kafka集羣上,這就形成了鏈接風暴,可是若是隨機上下浮動就可能把重連時間給錯開,不會形成同事的大量鏈接 Default: 1000.

max_in_flight_requests_per_connection (int)

發送多少條消息後,接收服務端確認,好比設置爲1,就是每發一條就要確認一條,設置爲5就是,發送5條消息等待一次確認 ,若是大於1,好比5,這種狀況下是會有一個順序問題的,就是這5條消息其中的一條發送失敗了,若是進行重試,那麼重發的消息實際上是在下個批次的,這就會形成消息順序的錯亂,因此若是須要保證消息的順序,建議設置此參數爲1 Default: 5.

相關文章
相關標籤/搜索