1、回顧kafka的知識點和問題陳述java
前面幾篇kafka文章,介紹從搭建到優化。 redis
Kafka消息隊列學習進階(四)--優化(配置/代碼/集羣);服務器
以上都是在test和uat環境進行測試和遷移數據的,最近遷移項目正式上線,可是上線當晚就出現嚴重的bug,現象是:session
1.一執行遷移程序,數據庫鏈接就超時。app
2.kafka鏈接不上,同時查看error.log日誌,kafka一直報數據大小超過kafka最大的發送size(kafka max.request.size)。也就是那天根本沒有上線成功是失敗的。那麼咱們是怎麼處理的呢?異步
2、解決問題的思路--優化配置性能
首先解決的是數據庫鏈接不上的問題,一是檢查數據庫是否啓動成功,檢查完畢,啓動成功。是啓動程序致使的,並非一開始就鏈接不上。繼續排查,看info.log日誌,看看程序執行到哪裏,判斷哪裏在調用數據庫,最後根據sql反查生產庫,發現是這條語句查詢的數據有35W,可是數據庫根本承受不住那麼大返回數據。最後更改查詢區間大小,勝利解決這個問題。
如今纔開始來解決kafka的問題,因爲本人,對kafka配置信息的不熟悉,或者說是生產庫和uat庫/test庫的數據根本不是一個檔次的狀況不清楚,沒有預估到致使的。建議對數據遷移的項目,最好想預估其有多大的數據,而後採用不一樣的方式。
因爲第一步已解決數據庫的鏈接問題,已經減少了數據庫大小,同時返回的數據也減小了。kafka已經在發送數據,日誌一直在打印"數據發送成功!",但是沒想到的是,kafka根本沒有消費,由於消費日誌沒有打印啊。info.log日誌打印的是發送成功,可是數據又沒有寫入到數據庫,很奇怪啊,當時心情真的是.....而後又去查看error.log日誌,發現一直報:kafka max.request.size的問題,就是發送的數據大小已經超過kafka的配置發送數據大小,致使一直髮送不過去。因爲沒有配置過,只能百度。下面以yml配置爲例:
消費數據:
properties:
max.partition.fetch.bytes: 15000000
生產數據:
properties:
max.request.size: 15000000
以上,算是從新配置了,OK,從新啓動服務器,再來一次遷移程序的執行。但是新的問題又出現了,生產數據(發送)和消費數據沒有問題,可是很慢,沒有測試那麼快。同時,越日後執行,越慢,最終線程池報錯,超過線程大小(解釋一下:發送數據是異步的)。因此說優化配置參數是失敗的。那隻能另外想辦法了。
代碼優化
根據問題,我想到的是,以前在插入數據庫的時候,若是數據過於大,也會致使數據庫插入失敗或者說很慢,是分批次提交插入數據到數據庫的。因此最終咱們的想法也是:查詢出來的數據,分批次發送到kafka,順利解決消費慢和發送數據過於大的問題。具體代碼以下:
executorService.submit(() -> { //查詢會員的數據 List<Class> dataSize = mapper.get(); if (StringUtils.isObjNotEmpty(dataSize) && dataSize.size() > 0) { //限制條數 int pointsDataLimit = sendSize; Integer size = dataSize.size(); //判斷是否有必要分批 if (pointsDataLimit < size) { //分批數 int part = size / pointsDataLimit; for (int i = 0; i < part; i++) { //10000條 List<Class> listPage = dataSize.subList(0, pointsDataLimit); //發送數據 sendMessage(integer,sizePage,time,endTime,accountType,listPage); //剔除 dataSize.subList(0, pointsDataLimit).clear(); } if (!dataSize.isEmpty()) { sendMessage(integer,sizePage,time,endTime,accountType,dataSize); } } else { sendMessage(integer,sizePage,time,endTime,accountType,dataSize); } }else{ log.info("數據爲空的數據段:{}",integer+":"+sizePage); } }); private void sendMessage(Integer integer, Integer sizePage, String time, String endTime, String accountType, List<Class> listPage) { //查詢 Message message = new Message(); message.setId(String.valueOf(integer+":"+sizePage+":"+time+":"+endTime+":"+accountType)); message.setMsg(JSON.toJSONString(listPage)); message.setSendTime(new Date()); log.info("數據發送成功的數據段:{}",integer+":"+sizePage); try { kafkaTemplate.send("userAndAccount", JSONUtil.toJsonStr(message)); }catch (Exception e){ redisTemplate.opsForList().leftPush(RedisKeyConstsnts.RECORD_ERROR_KEY,"數據遷移遷移數據失敗,發送消息失敗,該批次信息:"+integer+":"+endTime); log.info("數據遷移遷移數據失敗,發送消息失敗,該批次信息,data:{}",integer+":"+endTime); e.printStackTrace(); } }
3、總結
整理問題:
1.熟悉線上環境數據有多大。
2.認真熟悉kafka的配置文件。同時配置越大並不必定是萬能的,須要配置加上代碼相互。
補充知識點:
Kafka設計的初衷是迅速處理短小的消息,通常10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,咱們須要處理更大的消息,好比XML文檔或JSON內容,一個消息差很少有10-100M,這種狀況下,Kakfa應該如何處理?
針對這個問題,有如下幾個建議:
最好的方法是不直接傳送這些大的數據。若是有共享存儲,如NAS, HDFS, S3等,能夠把這些大的文件存放到共享存儲,而後使用Kafka來傳送文件的位置信息。
第二個方法是,將大的消息數據切片或切塊,在生產端將數據切片爲10K大小,使用分區主鍵確保一個大消息的全部部分會被髮送到同一個kafka分區(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分從新還原爲原始的消息。
第三,Kafka的生產端能夠壓縮消息,若是原始消息是XML,當經過壓縮以後,消息可能會變得不那麼大。在生產端的配置參數中使用compression.codec和commpressed.topics能夠開啓壓縮功能,壓縮算法可使用GZip或Snappy。
不過若是上述方法都不是你須要的,而你最終仍是但願傳送大的消息,那麼,則能夠在kafka中設置下面一些參數:
broker 配置:
message.max.bytes (默認:1000000) – broker能接收消息的最大字節數,這個值應該比消費端的fetch.message.max.bytes更小纔對,不然broker就會由於消費端沒法使用這個消息而掛起。
log.segment.bytes (默認: 1GB) – kafka數據文件的大小,確保這個數值大於一個消息的長度。通常說來使用默認值便可(通常一個消息很難大於1G,由於這是一個消息系統,而不是文件系統)。
replica.fetch.max.bytes (默認: 1MB) – broker可複製的消息的最大字節數。這個值應該比message.max.bytes大,不然broker會接收此消息,但沒法將此消息複製出去,從而形成數據丟失。
consumer 配置:
fetch.message.max.bytes (默認 1MB) – 消費者能讀取的最大消息。這個值應該大於或等於message.max.bytes。
因此,若是你必定要選擇kafka來傳送大的消息,還有些事項須要考慮。要傳送大的消息,不是當出現問題以後再來考慮如何解決,而是在一開始設計的時候,就要考慮到大消息對集羣和主題的影響。
性能: 根據前面提到的性能測試,kafka在消息爲10K時吞吐量達到最大,更大的消息會下降吞吐量,在設計集羣的容量時,尤爲要考慮這點。
可用的內存和分區數:Brokers會爲每一個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則須要差很少1G的內存,確保 分區數*最大的消息不會超過服務器的內存,不然會報OOM錯誤。一樣地,消費端的fetch.message.max.bytes指定了最大消息須要的內存空間,一樣,分區數*最大須要內存空間 不能超過服務器的內存。因此,若是你有大的消息要傳送,則在內存必定的狀況下,只能使用較少的分區數或者使用更大內存的服務器。
垃圾回收:到如今爲止,我在kafka的使用中還沒發現過此問題,但這應該是一個須要考慮的潛在問題。更大的消息會讓GC的時間更長(由於broker須要分配更大的塊),隨時關注GC的日誌和服務器的日誌信息。若是長時間的GC致使kafka丟失了zookeeper的會話,則須要配置zookeeper.session.timeout.ms參數爲更大的超時時間。
一切的一切,都須要在權衡利弊以後,再決定選用哪一個最合適的方案。
繁榮Aaron
沒時間解釋了,快長按左邊二維碼關注咱們~~