最近開發一cdc應用,爲了測試極端狀況,須要kafka傳遞100萬條數據過去,1個G左右,因爲其餘環節限制,不便進行拆包(注:測下來,大包走kafka不必定性能更好,甚至可能更低)。html
測試百萬以上的變動數據時,報消息超過kafka broker容許的最大值,所以須要修改以下參數,保證包可以正常發送:算法
socket.request.max.bytes=2147483647 # 設置了socket server接收的最大請求大小
log.segment.bytes=2147483647 # kafka數據文件的大小,確保這個數值大於一個消息的長度。通常說來使用默認值便可(通常一個消息很難大於1G,由於這是一個消息系統,而不是文件系統)。
message.max.bytes=2147483647 # 設置了kafka server接收的最大消息大小,應小於等於socket.request.max.bytes
replica.fetch.max.bytes=2147483647 #每一個分區試圖獲取的消息字節數。要大於等於message.max.bytes,不然broker會接收此消息,但沒法將此消息複製出去,從而形成數據丟失。
fetch.message.max.bytes=2147483647 #每一個提取請求中爲每一個主題分區提取的消息字節數。要大於等於message.max.bytes,不然broker就會由於消費端沒法使用這個消息而掛起。apache
生產者能夠以下設定:服務器
kafkaProps.put("max.request.size", 2147483647); # 要小於 message.max.bytes,也能夠設置在producer.properties配置文件中
kafkaProps.put("buffer.memory", 2147483647);
kafkaProps.put("timeout.ms", 3000000);
kafkaProps.put("request.timeout.ms", 30000000);
消費者設定以下:session
props.put("request.timeout.ms", 30000000);
props.put("session.timeout.ms", "3000000");
props.put("fetch.max.wait.ms", "3000000");
各參數的含義能夠參考kafka官方文檔https://kafka.apache.org/documentation/#configuration。app
kafka基礎知識體系,請參考LZ學習筆記kafka學習指南(總結版)。socket
注,各參數對內存的影響以下:Brokers會爲每一個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則須要差很少1G的內存,確保 分區數*最大的消息不會超過服務器的內存,不然會報OOM錯誤。一樣地,消費端的fetch.message.max.bytes指定了最大消息須要的內存空間,一樣,分區數*最大須要內存空間 不能超過服務器的內存。因此,若是你有大的消息要傳送,則在內存必定的狀況下,只能使用較少的分區數或者使用更大內存的服務器。性能
雖然上面的方法能夠奏效,可是並不推薦。Kafka設計的初衷是迅速處理短小的消息,通常10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,咱們須要處理更大的消息,好比XML文檔或JSON內容,一個消息差很少有10-100M,這種狀況下,Kakfa應該如何處理?學習
針對這個問題,有如下幾個建議:測試
上面這些值太大還會形成一個問題,就是消息沒有在指定時間內(max.poll.interval.ms(默認300秒))消費完,致使被rebalance,而kafka自己有個bug(服務器端的rebalance.timeout.ms(默認60秒)不生效),這會致使消費者組的rebalance時間比較長,因此這是須要注意的,參見https://blog.csdn.net/u013200380/article/details/87868696。