flume的memeryChannel中transactionCapacity和sink的batchsize須要注意事項

一.html

fluem中出現,transactionCapacity查詢一下,得出一下這些:java

最近在作flume的實時日誌收集,用flume默認的配置後,發現不是徹底實時的,因而看了一下,原來是memeryChannel的transactionCapacity在做怪,由於他默認是100,也就是說收集端的sink會在收集到了100條之後再去提交事務(即發送到下一個目的地),因而我修改了transactionCapacity到10,想看看是否是會更加實時一點,結果發現收集日誌的agent啓動的時候報錯了。git

16/04/29 09:36:15 ERROR sink.AbstractRpcSink: Rpc Sink avro-sink: Unable to get event from channel memoryChannel. Exception follows.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.Java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)github

因而很納悶,爲何默認值100能夠,而設置10就會說小了呢,因而查閱資料,發現原來是sink的batchsize參數在做怪,下面,我就來理一理這個前因後果,這個sink的batchsize是什麼意思呢,就是sink會一次從channel中取多少個event去發送,而這個發送是要最終以事務的形式去發送的,所以這個batchsize的event會傳送到一個事務的緩存隊列中(takeList),這是一個雙向隊列,這個隊列能夠在事務失敗時進行回滾(也就是把取出來的數據吐memeryChannel的queue中),它的初始大小就是transactionCapacity定義的大小,源碼中有: takeList = new LinkedBlockingDeque<Event>(transCapacity); 源碼來自https://segmentfault.com/a/1190000003586635的分享。redis

再看這個錯誤拋出的地方:apache

 if(takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
    }json

在上面的狀況中,sink一次取100個events,塞到takelist中,在塞了10個後,就會引起上述異常,所以,這個錯誤的解決辦法就是:在sink中,channel的transactionCapacity參數不能小於sink的batchsize。ubuntu

 

二.segmentfault

Flume-ng出現HDFS IO error,Callable timed out異常

目前解決方案:

瀏覽器

記Flume-NG一些注意事項

1、關於Source:

一、spool-source:適合靜態文件,即文件自己不是動態變化的;

二、avro source能夠適當提升線程數量來提升此source性能;

三、ThriftSource在使用時有個問題須要注意,使用批量操做時出現異常並不會打印異常內容而是"Thrift source %s could not append events to the channel.",這是由於源碼中在出現異常時,它並未捕獲異常而是獲取組件名稱,這是源碼中的一個bug,也能夠說明thrift不多有人用,不然這個問題也不會存在在不少版本中;

四、若是一個source對應多個channel,默認就是每一個channel是一樣的一份數據,會把這批數據複製N份發送到N個channel中,因此若是某個channel滿了會影響總體的速度的哦;

五、ExecSource官方文檔已經說明是異步的,可能會丟數據哦,儘可能使用tail -F,注意是大寫的;

2、關於Channel:

一、採集節點建議使用新的複合類型的SpillableMemoryChannel,彙總節點建議採用memory channel,具體還要看實際的數據量,通常每分鐘數據量超過120MB大小的flume agent都建議用memory channel(本身測的file channel處理速率大概是2M/s,不一樣機器、不一樣環境可能不一樣,這裏只提供參考),由於一旦此agent的channel出現溢出狀況,將會致使大多數時間處於file channel(SpillableMemoryChannel自己是file channel的一個子類,並且複合channel會保證必定的event的順序的使得讀完內存中的數據後,再須要把溢出的拿走,可能這時內存已滿又會溢出。。。),性能大大下降,彙總一旦成爲這樣後果可想而知;

二、調整memory 佔用物理內存空間,須要兩個參數byteCapacityBufferPercentage(默認是20)和byteCapacity(默認是JVM最大可用內存的0.8)來控制,計算公式是:byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize),很明顯能夠調節這兩個參數來控制,至於byteCapacitySlotSize默認是100,將物理內存轉換成槽(slot)數,這樣易於管理,可是可能會浪費空間,至少我是這樣想的。。。;

三、還有一個有用的參數"keep-alive"這個參數用來控制channel滿時影響source的發送,channel空時影響sink的消費,就是等待時間,默認是3s,超過這個時間就甩異常,通常不需配置,可是有些狀況頗有用,好比你得場景是每分鐘開頭集中發一次數據,這時每分鐘的開頭量可能比較大,後面會愈來愈小,這時你能夠調大這個參數,不至於出現channel滿了得狀況;

3、關於Sink:

一、avro sink的batch-size能夠設置大一點,默認是100,增大會減小RPC次數,提升性能;

二、內置hdfs sink的解析時間戳來設置目錄或者文件前綴很是損耗性能,由於是基於正則來匹配的,能夠經過修改源碼來替換解析時間功能來極大提高性能,稍後我會寫一篇文章來專門說明這個問題;

三、RollingFileSink文件名不能自定義,並且不能定時滾動文件,只能按時間間隔滾動,能夠本身定義sink,來作定時寫文件;

四、hdfs sink的文件名中的時間戳部分不能省去,可增長前綴、後綴以及正在寫的文件的先後綴等信息;"hdfs.idleTimeout"這個參數頗有意義,指的是正在寫的hdfs文件多長時間不更新就關閉文件,建議都配置上,好比你設置瞭解析時間戳存不一樣的目錄、文件名,並且rollInterval=0、rollCount=0、rollSize=1000000,若是這個時間內的數據量達不到rollSize的要求並且後續的寫入新的文件中了,就是一直打開,相似情景不注意的話可能不少;"hdfs.callTimeout"這個參數指的是每一個hdfs操做(讀、寫、打開、關閉等)規定的最長操做時間,每一個操做都會放入"hdfs.threadsPoolSize"指定的線程池中得一個線程來操做;

五、關於HBase sink(非異步hbase sink:AsyncHBaseSink),rowkey不能自定義,並且一個serializer只能寫一列,一個serializer按正則匹配多個列,性能可能存在問題,建議本身根據需求寫一個hbase sink;

六、avro sink能夠配置failover和loadbalance,所用的組件和sinkgroup中的是同樣的,並且也能夠在此配置壓縮選項,須要在avro source中配置解壓縮;

4、關於SinkGroup:

一、無論是loadbalance或者是failover的多個sink須要共用一個channel;

二、loadbalance的多個sink若是都是直接輸出到同一種設備,好比都是hdfs,性能並不會有明顯增長,由於sinkgroup是單線程的它的process方法會輪流調用每一個sink去channel中take數據,並確保處理正確,使得是順序操做的,可是若是是發送到下一級的flume agent就不同了,take操做是順序的,可是下一級agent的寫入操做是並行的,因此確定是快的;

三、其實用loadbalance在必定意義上能夠起到failover的做用,生產環境量大建議loadbalance;

5、關於監控monitor:

一、監控我這邊作得仍是比較少的,可是目前已知的有如下幾種吧:cloudera manager(前提是你得安裝CDH版本)、ganglia(這個天生就是支持的)、http(其實就是將統計信息jmx信息,封裝成json串,使用jetty展現在瀏覽器中而已)、再一個就是本身實現收集監控信息,本身作(能夠收集http的信息或者本身實現相應的接口實現本身的邏輯,具體能夠參考我之前的博客);

二、簡單說一下cloudera manager這種監控,最近在使用,確實很強大,能夠查看實時的channel進出數據速率、channel實時容量、sink的出速率、source的入速率等等,圖形化的東西確實很豐富很直觀,能夠提供不少flume agent總體運行狀況的信息和潛在的一些信息;

6、關於flume啓動:

一、flume組件啓動順序:channels——>sinks——>sources,關閉順序:sources——>sinks——>channels;

二、自動加載配置文件功能,會先關閉全部組件,再重啓全部組件;

三、關於AbstractConfigurationProvider中的Map<Class<? extends Channel>, Map<String, Channel>> channelCache這個對象,始終存儲着agent中得全部channel對象,由於在動態加載時,channel中可能還有未消費完的數據,可是須要對channel從新配置,因此用以來緩存channel對象的全部數據及配置信息;

四、經過在啓動命令中添加 "no-reload-conf"參數爲true來取消自動加載配置文件功能;

7、關於interceptor:

請看個人關於這個組件的博客,傳送門;

8、關於自定義組件:sink、source、channel:

一、channel不建議自定義哦,這個要求比較高,其餘倆都是框架式的開發,往指定的方法填充本身配置、啓動、關閉、業務邏輯便可,之後有機會單獨寫一篇文章來介紹;

二、關於自定義組件請相信github,上面好多好多好多,能夠直接用的自定義組件....;

9、關於Flume-NG集羣網絡拓撲方案:

一、在每臺採集節點上部署一個flume agent,而後作一到多個彙總flume agent(loadbalance),採集只負責收集數據發往彙總,彙總能夠寫HDFS、HBase、spark、本地文件、kafka等等,這樣通常修改會只在彙總,agent少,維護工做少;

二、採集節點沒有部署flume agent,可能發往mongo、redis等,這時你須要自定義source或者使用sdk來將其中的數據取出併發往flume agent,這樣agent就又能夠充當「採集節點」或者彙總節點了,可是這樣在前面至關於加了一層控制,就又多了一層風險;

三、因爲能力有限,其它未知,上面兩種,第一種好些,這裏看看美團的架構———— 傳送門

 

4、

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

sink是hdfs,而後使用目錄自動生成功能。出現如題的錯誤,看官網文檔說的是須要在每一個文件記錄行的開頭須要有時間戳,可是時間戳的格式可能比較難調節,因此亦可設置hdfs.useLocalTimeStamp這個參數,好比以每一個小時做爲一個文件夾,那麼配置應該是這樣:

 

[plain]  view plain  copy
 
  1. a1.sinks.k1.hdfs.path = hdfs://ubuntu:9000/flume/events/%y-%m-%d/%H  
  2. a1.sinks.k1.hdfs.filePrefix = events-  
  3. a1.sinks.k1.hdfs.round = true  
  4. a1.sinks.k1.hdfs.roundValue = 1  
  5. a1.sinks.k1.hdfs.roundUnit = hour  
  6. a1.sinks.k1.hdfs.useLocalTimeStamp = true  


或者修改hdfs.timeZone這個參數使之能夠和咱們上傳的log文件的日期格式同樣應該就能夠了,沒有測試過。

 

5、flume學習(三):flume將log4j日誌數據寫入到hdfs

、本次咱們把log4j的日誌直接採集輸出到hdfs中去。須要修改flume.conf中sink的配置:

[plain]  view plain  copy
 
  1. tier1.sources=source1  
  2. tier1.channels=channel1  
  3. tier1.sinks=sink1  
  4.   
  5. tier1.sources.source1.type=avro  
  6. tier1.sources.source1.bind=0.0.0.0  
  7. tier1.sources.source1.port=44444  
  8. tier1.sources.source1.channels=channel1  
  9.   
  10. tier1.channels.channel1.type=memory  
  11. tier1.channels.channel1.capacity=10000  
  12. tier1.channels.channel1.transactionCapacity=1000  
  13. tier1.channels.channel1.keep-alive=30  
  14.   
  15. tier1.sinks.sink1.type=hdfs  
  16. tier1.sinks.sink1.channel=channel1  
  17. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events  
  18. tier1.sinks.sink1.hdfs.fileType=DataStream  
  19. tier1.sinks.sink1.hdfs.writeFormat=Text  
  20. tier1.sinks.sink1.hdfs.rollInterval=0  
  21. tier1.sinks.sink1.hdfs.rollSize=10240  
  22. tier1.sinks.sink1.hdfs.rollCount=0  
  23. tier1.sinks.sink1.hdfs.idleTimeout=60  

 

6、【Flume】【源碼分析】flume中sink到hdfs,文件系統頻繁產生文件,文件滾動配置不起做用?

解決方案 
緣由:flume配置問題或者說代碼問題  ,文件滾動的判斷條件存在漏洞
增長配置參數 ,便可按照參數滾動文件
flume1.sinks.sink1.hdfs.minBlockReplicas=1

參考:http://blog.csdn.net/simonchi/article/details/43231891 

 

7、查看最終配置

(來源http://www.it610.com/article/2107322.htm)

 
最終配置文件示例

```
# flume1 which ones we want to activate.
flume1.channels = ch1
flume1.sources = src1
flume1.sinks = sink1


# Define a memory channel called ch1 on flume1
flume1.channels.ch1.type = memory
flume1.channels.ch1.capacity = 100000
flume1.channels.ch1.transactionCapacity = 1000
flume1.channels.ch1.keep-alive = 30
 
# Define an Avro source called src1 on flume1 and tell it
# to bind to 0.0.0.0:8888. Connect it to channel ch1.
flume1.sources.src1.channels = ch1
flume1.sources.src1.type = avro
flume1.sources.src1.bind = 0.0.0.0
flume1.sources.src1.port = 8888
flume1.sources.src1.threads = 5
 
 
flume1.sinks.sink1.type = hdfs
flume1.sinks.sink1.channel = ch1
flume1.sinks.sink1.hdfs.path =hdfs://master:9000/ysg/flume/ysg/%Y%m 
flume1.sinks.sink1.hdfs.filePrefix = ysg
flume1.sinks.sink1.hdfs.fileSuffix = .log
flume1.sinks.sink1.hdfs.inUseSuffix = .tmp
flume1.sinks.sink1.hdfs.maxOpenFiles = 5000 
flume1.sinks.sink1.hdfs.batchSize= 1
flume1.sinks.sink1.hdfs.fileType = DataStream
flume1.sinks.sink1.hdfs.writeFormat =Text
#flume1.sinks.sink1.hdfs.rollSize =64*1024*1024
flume1.sinks.sink1.hdfs.rollSize = 67108864
flume1.sinks.sink1.hdfs.rollCount = 0
flume1.sinks.sink1.hdfs.rollInterval = 0
flume1.sinks.sink1.hdfs.minBlockReplicas=1

flume1.sinks.sink1.hdfs.useLocalTimeStamp = true
flume1.sinks.sink1.hdfs.connect-timeout=80000
flume1.sinks.sink1.hdfs.callTimeout=120000
flume1.sinks.sink1.hdfs.idleTimeout = 60

 

8、 3.1 基礎參數調優經驗  --去掉 每寫一行在行尾添加一個換行符 狀況

  • HdfsSink中默認的serializer會每寫一行在行尾添加一個換行符,咱們日誌自己帶有換行符,這樣會致使每條日誌後面多一個空行,修改配置不要自動添加換行符;      
lc.sinks.sink_hdfs.serializer.appendNewline = false
 
  • 調大MemoryChannel的capacity,儘可能利用MemoryChannel快速的處理能力;
    • 調大HdfsSink的batchSize,增長吞吐量,減小hdfs的flush次數;
    • 適當調大HdfsSink的callTimeout,避免沒必要要的超時錯誤;

  

 

感謝上面帶有鏈接的帖子。支持原創!(本帖來源於互聯網,如有侵犯,請聯繫我!)

相關文章
相關標籤/搜索