Kafka consumer處理大消息數據問題

案例分析

處理kafka consumer的程序的時候,發現以下錯誤:java

ERROR [2017-01-12 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

如上log能夠看出,問題就是有一個較大的消息數據在codeTopic的partition 3上,而後consumer未能消費,提示我能夠減少broker容許進入的消息數據的大小,或者增大consumer程序消費數據的大小。算法

從log上來看一目瞭然,若是要解決當前問題的話,shell

  1. 減少broker消息體大小(設置message.max.bytes參數);
  2. 增大consumer獲取數據信息大小(設置fetch.message.max.bytes參數)。默認broker消息體大小爲1000000字節即爲1M大小。

消費者方面:fetch.message.max.bytes——>這將決定消費者能夠獲取的數據大小。
broker方面:replica.fetch.max.bytes——>這將容許broker的副本發送消息在集羣並確保消息被正確地複製。若是這是過小,則消息不會被複制,所以,消費者永遠不會看到的消息,由於消息永遠不會承諾(徹底複製)。
broker方面:message.max.bytes——>能夠接受數據生產者最大消息數據大小。
服務器

由個人場景來看較大的消息體已經進入到了kafka,我這裏要解決這個問題,只須要增長consumer的fetch.message.max.bytes數值就好。我單獨把那條數據消費出來,寫到一個文件中發現那條消息大小爲1.5M左右,爲了不再次發生這種問題我把consumer程序的fetch.message.max.bytes參數調節爲了3072000即爲3M,重啓consumer程序,查看log一切正常,解決這個消費錯誤到此結束,下面介紹一下kafka針對大數據處理的思考。session

kafka的設計初衷

Kafka設計的初衷是迅速處理小量的消息,通常10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,咱們須要處理更大的消息,好比XML文檔或JSON內容,一個消息差很少有10-100M,這種狀況下,Kakfa應該如何處理?app

針對這個問題,能夠參考以下建議:

  • 最好的方法是不直接傳送這些大的數據。若是有共享存儲,如NAS, HDFS, S3等,能夠把這些大的文件存放到共享存儲,而後使用Kafka來傳送文件的位置信息。性能

  • 第二個方法是,將大的消息數據切片或切塊,在生產端將數據切片爲10K大小,使用分區主鍵確保一個大消息的全部部分會被髮送到同一個kafka分區(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分從新還原爲原始的消息。測試

  • 第三,Kafka的生產端能夠壓縮消息,若是原始消息是XML,當經過壓縮以後,消息可能會變得不那麼大。在生產端的配置參數中使用compression.codeccommpressed.topics能夠開啓壓縮功能,壓縮算法可使用GZipSnappyfetch

不過若是上述方法都不是你須要的,而你最終仍是但願傳送大的消息,那麼,則能夠在kafka中設置下面一些參數:大數據

broker 配置

message.max.bytes (默認:1000000) – broker能接收消息的最大字節數,這個值應該比消費端的fetch.message.max.bytes更小纔對,不然broker就會由於消費端沒法使用這個消息而掛起。
log.segment.bytes (默認: 1GB) – kafka數據文件的大小,確保這個數值大於一個消息的長度。通常說來使用默認值便可(通常一個消息很難大於1G,由於這是一個消息系統,而不是文件系統)。
replica.fetch.max.bytes (默認: 1MB) – broker可複製的消息的最大字節數。這個值應該比message.max.bytes大,不然broker會接收此消息,但沒法將此消息複製出去,從而形成數據丟失。

Consumer 配置

fetch.message.max.bytes (默認 1MB) – 消費者能讀取的最大消息。這個值應該大於或等於message.max.bytes。因此,若是你必定要選擇kafka來傳送大的消息,還有些事項須要考慮。要傳送大的消息,不是當出現問題以後再來考慮如何解決,而是在一開始設計的時候,就要考慮到大消息對集羣和主題的影響。

性能: 根據前面提到的性能測試,kafka在消息爲10K時吞吐量達到最大,更大的消息會下降吞吐量,在設計集羣的容量時,尤爲要考慮這點。
可用的內存和分區數:Brokers會爲每一個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則須要差很少1G的內存,確保 分區數最大的消息不會超過服務器的內存,不然會報OOM錯誤。一樣地,消費端的fetch.message.max.bytes指定了最大消息須要的內存空間,一樣,分區數最大須要內存空間 不能超過服務器的內存。因此,若是你有大的消息要傳送,則在內存必定的狀況下,只能使用較少的分區數或者使用更大內存的服務器。
垃圾回收:到如今爲止,我在kafka的使用中還沒發現過此問題,但這應該是一個須要考慮的潛在問題。更大的消息會讓GC的時間更長(由於broker須要分配更大的塊),隨時關注GC的日誌和服務器的日誌信息。若是長時間的GC致使kafka丟失了zookeeper的會話,則須要配置zookeeper.session.timeout.ms參數爲更大的超時時間。

相關文章
相關標籤/搜索