一.html
fluem中出現,transactionCapacity查詢一下,得出一下這些:java
最近在作flume的實時日誌收集,用flume默認的配置後,發現不是徹底實時的,因而看了一下,原來是memeryChannel的transactionCapacity在做怪,由於他默認是100,也就是說收集端的sink會在收集到了100條之後再去提交事務(即發送到下一個目的地),因而我修改了transactionCapacity到10,想看看是否是會更加實時一點,結果發現收集日誌的agent啓動的時候報錯了。git
16/04/29 09:36:15 ERROR sink.AbstractRpcSink: Rpc Sink avro-sink: Unable to get event from channel memoryChannel. Exception follows.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.Java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)github
因而很納悶,爲何默認值100能夠,而設置10就會說小了呢,因而查閱資料,發現原來是sink的batchsize參數在做怪,下面,我就來理一理這個前因後果,這個sink的batchsize是什麼意思呢,就是sink會一次從channel中取多少個event去發送,而這個發送是要最終以事務的形式去發送的,所以這個batchsize的event會傳送到一個事務的緩存隊列中(takeList),這是一個雙向隊列,這個隊列能夠在事務失敗時進行回滾(也就是把取出來的數據吐memeryChannel的queue中),它的初始大小就是transactionCapacity定義的大小,源碼中有: takeList = new LinkedBlockingDeque<Event>(transCapacity); 源碼來自https://segmentfault.com/a/1190000003586635的分享。redis
再看這個錯誤拋出的地方:apache
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}json
在上面的狀況中,sink一次取100個events,塞到takelist中,在塞了10個後,就會引起上述異常,所以,這個錯誤的解決辦法就是:在sink中,channel的transactionCapacity參數不能小於sink的batchsize。ubuntu
二.segmentfault
三瀏覽器
1、關於Source:
一、spool-source:適合靜態文件,即文件自己不是動態變化的;
二、avro source能夠適當提升線程數量來提升此source性能;
三、ThriftSource在使用時有個問題須要注意,使用批量操做時出現異常並不會打印異常內容而是"Thrift source %s could not append events to the channel.",這是由於源碼中在出現異常時,它並未捕獲異常而是獲取組件名稱,這是源碼中的一個bug,也能夠說明thrift不多有人用,不然這個問題也不會存在在不少版本中;
四、若是一個source對應多個channel,默認就是每一個channel是一樣的一份數據,會把這批數據複製N份發送到N個channel中,因此若是某個channel滿了會影響總體的速度的哦;
五、ExecSource官方文檔已經說明是異步的,可能會丟數據哦,儘可能使用tail -F,注意是大寫的;
2、關於Channel:
一、採集節點建議使用新的複合類型的SpillableMemoryChannel,彙總節點建議採用memory channel,具體還要看實際的數據量,通常每分鐘數據量超過120MB大小的flume agent都建議用memory channel(本身測的file channel處理速率大概是2M/s,不一樣機器、不一樣環境可能不一樣,這裏只提供參考),由於一旦此agent的channel出現溢出狀況,將會致使大多數時間處於file channel(SpillableMemoryChannel自己是file channel的一個子類,並且複合channel會保證必定的event的順序的使得讀完內存中的數據後,再須要把溢出的拿走,可能這時內存已滿又會溢出。。。),性能大大下降,彙總一旦成爲這樣後果可想而知;
二、調整memory 佔用物理內存空間,須要兩個參數byteCapacityBufferPercentage(默認是20)和byteCapacity(默認是JVM最大可用內存的0.8)來控制,計算公式是:byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize),很明顯能夠調節這兩個參數來控制,至於byteCapacitySlotSize默認是100,將物理內存轉換成槽(slot)數,這樣易於管理,可是可能會浪費空間,至少我是這樣想的。。。;
三、還有一個有用的參數"keep-alive"這個參數用來控制channel滿時影響source的發送,channel空時影響sink的消費,就是等待時間,默認是3s,超過這個時間就甩異常,通常不需配置,可是有些狀況頗有用,好比你得場景是每分鐘開頭集中發一次數據,這時每分鐘的開頭量可能比較大,後面會愈來愈小,這時你能夠調大這個參數,不至於出現channel滿了得狀況;
3、關於Sink:
一、avro sink的batch-size能夠設置大一點,默認是100,增大會減小RPC次數,提升性能;
二、內置hdfs sink的解析時間戳來設置目錄或者文件前綴很是損耗性能,由於是基於正則來匹配的,能夠經過修改源碼來替換解析時間功能來極大提高性能,稍後我會寫一篇文章來專門說明這個問題;
三、RollingFileSink文件名不能自定義,並且不能定時滾動文件,只能按時間間隔滾動,能夠本身定義sink,來作定時寫文件;
四、hdfs sink的文件名中的時間戳部分不能省去,可增長前綴、後綴以及正在寫的文件的先後綴等信息;"hdfs.idleTimeout"這個參數頗有意義,指的是正在寫的hdfs文件多長時間不更新就關閉文件,建議都配置上,好比你設置瞭解析時間戳存不一樣的目錄、文件名,並且rollInterval=0、rollCount=0、rollSize=1000000,若是這個時間內的數據量達不到rollSize的要求並且後續的寫入新的文件中了,就是一直打開,相似情景不注意的話可能不少;"hdfs.callTimeout"這個參數指的是每一個hdfs操做(讀、寫、打開、關閉等)規定的最長操做時間,每一個操做都會放入"hdfs.threadsPoolSize"指定的線程池中得一個線程來操做;
五、關於HBase sink(非異步hbase sink:AsyncHBaseSink),rowkey不能自定義,並且一個serializer只能寫一列,一個serializer按正則匹配多個列,性能可能存在問題,建議本身根據需求寫一個hbase sink;
六、avro sink能夠配置failover和loadbalance,所用的組件和sinkgroup中的是同樣的,並且也能夠在此配置壓縮選項,須要在avro source中配置解壓縮;
4、關於SinkGroup:
一、無論是loadbalance或者是failover的多個sink須要共用一個channel;
二、loadbalance的多個sink若是都是直接輸出到同一種設備,好比都是hdfs,性能並不會有明顯增長,由於sinkgroup是單線程的它的process方法會輪流調用每一個sink去channel中take數據,並確保處理正確,使得是順序操做的,可是若是是發送到下一級的flume agent就不同了,take操做是順序的,可是下一級agent的寫入操做是並行的,因此確定是快的;
三、其實用loadbalance在必定意義上能夠起到failover的做用,生產環境量大建議loadbalance;
5、關於監控monitor:
一、監控我這邊作得仍是比較少的,可是目前已知的有如下幾種吧:cloudera manager(前提是你得安裝CDH版本)、ganglia(這個天生就是支持的)、http(其實就是將統計信息jmx信息,封裝成json串,使用jetty展現在瀏覽器中而已)、再一個就是本身實現收集監控信息,本身作(能夠收集http的信息或者本身實現相應的接口實現本身的邏輯,具體能夠參考我之前的博客);
二、簡單說一下cloudera manager這種監控,最近在使用,確實很強大,能夠查看實時的channel進出數據速率、channel實時容量、sink的出速率、source的入速率等等,圖形化的東西確實很豐富很直觀,能夠提供不少flume agent總體運行狀況的信息和潛在的一些信息;
6、關於flume啓動:
一、flume組件啓動順序:channels——>sinks——>sources,關閉順序:sources——>sinks——>channels;
二、自動加載配置文件功能,會先關閉全部組件,再重啓全部組件;
三、關於AbstractConfigurationProvider中的Map<Class<? extends Channel>, Map<String, Channel>> channelCache這個對象,始終存儲着agent中得全部channel對象,由於在動態加載時,channel中可能還有未消費完的數據,可是須要對channel從新配置,因此用以來緩存channel對象的全部數據及配置信息;
四、經過在啓動命令中添加 "no-reload-conf"參數爲true來取消自動加載配置文件功能;
7、關於interceptor:
請看個人關於這個組件的博客,傳送門;
8、關於自定義組件:sink、source、channel:
一、channel不建議自定義哦,這個要求比較高,其餘倆都是框架式的開發,往指定的方法填充本身配置、啓動、關閉、業務邏輯便可,之後有機會單獨寫一篇文章來介紹;
二、關於自定義組件請相信github,上面好多好多好多,能夠直接用的自定義組件....;
9、關於Flume-NG集羣網絡拓撲方案:
一、在每臺採集節點上部署一個flume agent,而後作一到多個彙總flume agent(loadbalance),採集只負責收集數據發往彙總,彙總能夠寫HDFS、HBase、spark、本地文件、kafka等等,這樣通常修改會只在彙總,agent少,維護工做少;
二、採集節點沒有部署flume agent,可能發往mongo、redis等,這時你須要自定義source或者使用sdk來將其中的數據取出併發往flume agent,這樣agent就又能夠充當「採集節點」或者彙總節點了,可是這樣在前面至關於加了一層控制,就又多了一層風險;
三、因爲能力有限,其它未知,上面兩種,第一種好些,這裏看看美團的架構———— 傳送門;
4、
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
sink是hdfs,而後使用目錄自動生成功能。出現如題的錯誤,看官網文檔說的是須要在每一個文件記錄行的開頭須要有時間戳,可是時間戳的格式可能比較難調節,因此亦可設置hdfs.useLocalTimeStamp這個參數,好比以每一個小時做爲一個文件夾,那麼配置應該是這樣:
或者修改hdfs.timeZone這個參數使之能夠和咱們上傳的log文件的日期格式同樣應該就能夠了,沒有測試過。
5、flume學習(三):flume將log4j日誌數據寫入到hdfs
、本次咱們把log4j的日誌直接採集輸出到hdfs中去。須要修改flume.conf中sink的配置:
6、【Flume】【源碼分析】flume中sink到hdfs,文件系統頻繁產生文件,文件滾動配置不起做用?
參考:http://blog.csdn.net/simonchi/article/details/43231891
7、查看最終配置
(來源http://www.it610.com/article/2107322.htm)
最終配置文件示例 ``` # flume1 which ones we want to activate. flume1.channels = ch1 flume1.sources = src1 flume1.sinks = sink1 # Define a memory channel called ch1 on flume1 flume1.channels.ch1.type = memory flume1.channels.ch1.capacity = 100000 flume1.channels.ch1.transactionCapacity = 1000 flume1.channels.ch1.keep-alive = 30 # Define an Avro source called src1 on flume1 and tell it # to bind to 0.0.0.0:8888. Connect it to channel ch1. flume1.sources.src1.channels = ch1 flume1.sources.src1.type = avro flume1.sources.src1.bind = 0.0.0.0 flume1.sources.src1.port = 8888 flume1.sources.src1.threads = 5 flume1.sinks.sink1.type = hdfs flume1.sinks.sink1.channel = ch1 flume1.sinks.sink1.hdfs.path =hdfs://master:9000/ysg/flume/ysg/%Y%m flume1.sinks.sink1.hdfs.filePrefix = ysg flume1.sinks.sink1.hdfs.fileSuffix = .log flume1.sinks.sink1.hdfs.inUseSuffix = .tmp flume1.sinks.sink1.hdfs.maxOpenFiles = 5000 flume1.sinks.sink1.hdfs.batchSize= 1 flume1.sinks.sink1.hdfs.fileType = DataStream flume1.sinks.sink1.hdfs.writeFormat =Text #flume1.sinks.sink1.hdfs.rollSize =64*1024*1024 flume1.sinks.sink1.hdfs.rollSize = 67108864 flume1.sinks.sink1.hdfs.rollCount = 0 flume1.sinks.sink1.hdfs.rollInterval = 0 flume1.sinks.sink1.hdfs.minBlockReplicas=1 flume1.sinks.sink1.hdfs.useLocalTimeStamp = true flume1.sinks.sink1.hdfs.connect-timeout=80000 flume1.sinks.sink1.hdfs.callTimeout=120000 flume1.sinks.sink1.hdfs.idleTimeout = 60
8、 3.1 基礎參數調優經驗 --去掉 每寫一行在行尾添加一個換行符 狀況
lc.sinks.sink_hdfs.serializer.appendNewline = false
感謝上面帶有鏈接的帖子。支持原創!(本帖來源於互聯網,如有侵犯,請聯繫我!)