最近在作一個分佈式調用鏈跟蹤系統,java
在兩個地方採用了flume (我使用的flume版本是1.5.0-cdh5.4.4),一個是宿主系統 ,用flume agent進行日誌蒐集。 一個是從kafka拉日誌分析後寫入hbase. node
後面這個flume(從kafka拉日誌分析後寫入flume)用了3臺 , 系統上線之後 ,線上拋了一個這樣的異常:apache
Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doPut(MemoryChannel.java:84)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)多線程
從異常信息直觀理解是MemoryChannel的事務的Put隊列滿了,爲何會這樣呢?架構
咱們先從Flume的體系結構提及,Flume是apache一個負責日誌採集和傳輸的開源工具,它的特色是可以很靈活的經過配置實現不一樣數據存儲系統之間的數據交換 。分佈式
它有三個最主要的組件:ide
Source : 負責從數據源獲取數據,包含兩種類型的Source . EventDrivenSource 和 PollableSource , 前者指的是事件驅動型數據源,故名思議,就是須要外部系統主動送數據 ,好比AvroSource ,ThriftSource ; 而PollableSource 指的是須要Source主動從數據源拉取數據 ,好比KafkaSource 。Source 獲取到數據之後向Channel 寫入Event 。 工具
Sink : 負責從Channel拉取Event , 寫入下游存儲或者對接其餘Agent. google
Channel:用於實現Source和Sink之間的數據緩衝, 主要有文件通道和內存通道兩類。spa
Flume的架構圖以下:
而個人flume 配置以下:
a1.sources = kafkasource
a1.sinks = hdfssink hbasesink
a1.channels = hdfschannel hbasechannel
a1.sources.kafkasource.channels = hdfschannel hbasechannel
a1.sinks.hdfssink.channel = hdfschannel
a1.sinks.hbasesink.channel = hbasechannel
a1.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafkasource.zookeeperConnect = zk1:2181,zk2:2181,zk3:2181
a1.sources.kafkasource.topic = nagual_topic
a1.sources.kafkasource.groupId = flume
a1.sources.kafkasource.kafka.consumer.timeout.ms = 500
a1.sinks.hdfssink.type = hdfs
a1.sinks.hdfssink.hdfs.path = hdfs://namenode:8020/flume/kafka_events/%y-%m-%d/%H%M
a1.sinks.hdfssink.hdfs.filePrefix = events-prefix
a1.sinks.hdfssink.hdfs.round = true
a1.sinks.hdfssink.hdfs.roundValue = 10
a1.sinks.hdfssink.hdfs.roundUnit = minute
a1.sinks.hdfssink.hdfs.fileType = SequenceFile
a1.sinks.hdfssink.hdfs.writeFormat = Writable
a1.sinks.hdfssink.hdfs.rollInterval = 60
a1.sinks.hdfssink.hdfs.rollCount = -1
a1.sinks.hdfssink.hdfs.rollSize = -1
a1.sinks.hbasesink.type = hbase
a1.sinks.hbasesink.table = htable_nagual_tracelog
a1.sinks.hbasesink.index_table = htable_nagual_tracelog_index
a1.sinks.hbasesink.serializer =NagualTraceLogEventSerializer
a1.sinks.hbasesink.columnFamily = rpcid
a1.sinks.hbasesink.zookeeperQuorum = zk1:2181,zk2:2181,zk3:2181
a1.channels.hdfschannel.type = memory
a1.channels.hdfschannel.capacity= 10000
a1.channels.hdfschannel.byteCapacityBufferPercentage = 20
a1.channels.hdfschannel.byteCapacity = 536870912
個人flume agent從kafka拉取日誌之後,轉換成hbase 的row put操做,中間採用了memchannel ,爲何會出現以前提到的異常呢? 經過通讀一遍源碼, 基本找到了問題所在:
咱們把源碼拆解成如下幾個主要步驟來分析:
1、flume 的啓動:
如上圖所示, 整個flume 啓動的主要流程是這樣的:
FLUME_HOME/bin目錄中的flume-ng啓動腳本啓動Application , Application建立一個PollingPropertiesFileConfigurationProvider, 這個Provider的做用是啓動 一個配置文件的監控線程FileWatcherRunnable ,定時監控配置文件的變動,
一旦配置文件變動,則從新獲得SinkRunner, SourceRunner以及channel的配置, 包裝成MaterialedConfiguration,經過google guava的eventbus 推送配置變動給Application ,Application啓動一個LifeCycleSupervisor,由它來負責監控
SourceRunner ,SinkRunner,Channel的運行狀況 。 SourceRunner ,SinkRunner ,Channel都繼承或實現了LifeCycleAware接口,LifeCycleSupervisor經過定時檢查這些組件的指望狀態是否和當前狀態一致, 若是不一致則調用指望狀態對應的方法,
(具體代碼能夠參考LifeCycleSupervisor的內部類MonitorRunnable) . 按照 Channel 、 SinkRunner 、 SourceRunner(能夠理解爲先接水管,再接水盆,再接水龍頭)順序進行啓動,每一個組件的啓動都作什麼呢?
以個人flume配置文件來講明:
(1) MemChannel
咱們首先先介紹一下MemChannel的幾個配置參數:
capacity :控制了MemChannel中一個LinkedBlockingDeque<Event> (咱們後面簡稱爲MemDeque)的最大event個數。
transactionCapacity: 控制了一個MemChannel的事務(MemoryTransaction)中putList 和takeList兩個 LinkedBlockingDeque 的最大長度 。
byteCapacityBufferPercentage: 控制了MemDeque中event header 的佔比, 默認是20%
byteCapacity:控制了MemDeque的最大字節數, 默認值是應用分配到的最大堆內存(Xmx參數指定)的 80% (咱們稱之爲x ),這個值x乘以 1 - byteCapacityBufferPercentage * 0.01 就獲得了MemDeque中event body的最大字節數。如何利用這個參數來進行流控, 咱們後面還有詳細說明。
keepalive : 控制了一個 MemChannel 事務從MemDeque中讀寫操做的最大阻塞時間 , 單位:秒。
啓動之後, 創建了一個LinkedBlockingDeque ,這是一個雙端隊列,能夠進行雙向讀寫,而且用capacity 參數控制了它的最大長度, 另外還建立了幾個信號量Semaphore ,
queueRemaining: 標識MemDeque的初始容量,
queueStored : 標識MemDeque寫入的 event數量,也就是待處理的event 數量。
bytesRemaing:標識MemDeque 中還能寫入多少字節的flume event body;
(2) SinkRunner:
它的機制是啓動一個所謂的PollingRunner 線程 ,經過輪詢操做,調用一個 SinkProcessor來進行實際的輪詢處理, 而這個SinkProcessor則調用 Sink的process 方法進行event處理, 在輪詢的處理上,有一個所謂的 補償機制( backoff) ,就是當sink獲取不到 event 的時候, PollingRunner 線程須要等待一段backoff時間,等channel中的數據獲得了補償再來進行pollling 操做。而hbasesink 在啓動的時候,則把hbase 操做相關的配置:htable, columnfamily ,hbase zk集羣的配置信息準備好了。也就是說SinkRunner採用的方式是Pull .
(3) SourceRunner:
SourceRunner包含兩類, 一類是對應EventDrivenSource 的 EventDrivenSourceRunner , 一個是對應PollableSource的PollableSourceRunner , 簡單的說,前者是push ,後者是pull 。
EventDrivenSource 有表明性的是thrift source , 在本地啓動java nio server之後, 從外部接收event ,交給ThriftSource 內部的ThriftSourceHandler進行處理。
然後者PollableSourceRunner ,則經過啓動一個PollingRunner線程 ,相似SinkRunner中的輪詢處理策略 ,啓動Source , 在Source內部, 使用ChannelProcessor處理events , ChannelProcessor內部會走一組過濾器構建的過濾器鏈 ,而後經過通道選擇器ChannelSelector選擇好通道之後 ,啓動事務 ,把一批event 寫入Channel .
咱們用下面這張示意圖來講明各組件啓動之後的內部運做原理:
整個圖有些複雜, 說明以下:
PollingSourceRunner經過線程啓動定時任務 ,間隔一段時間調用kafkasource 從kafka broker 拉取日誌,拉完之後,進入一個ChannelProcessor,這個通道處理器先經過一個過濾器鏈對event進行過濾 ,過濾之後,經過一個ChannelSelector通道選擇器,選擇evnet要投遞的Channel , 而後啓動Channel 下的一個事務 (注意,這個事務是用ThreadLocal維持的,也就是說一個線程對應了一個事務) , 事務啓動之後,批量向事務MemoryTransaction的一個putList的尾部寫入,putlist是一個LinkedBlockingDeque .
事務提交的時候, 把putlist中的event批量移除, 轉移到MemoryChannel的一個LinkedBlockingDeque 裏面來.
而SinkRunner則啓動PollingRunner , 也經過定時啓動任務,調用SinkProcessor,最後調用HbaseSink的process方法,這個方法也負責啓動一個事務 ,批量從MemoryChannel的LinkedBlockingDeque中拉取event , 寫入takelist ,批量作完hbase 的put操做之後,作memoryTransaction的事務提交操做。事務提交的處理邏輯前面描述過。
而負責進行通道過載保護,正是在MemoryTransaction事務的提交時刻作的 ,這個過載保護的代碼能夠參考MemoryChannel的MemoryTransaction 內部 類的doCommit方法, 它的思路是這樣的:
比較事務提交的時候,takelist和putlist的大小,若是takelist的長度比 putlist 長度要小, 則認爲sink的消費能力(takelist長度標識)要比source的生產能力(putlist)要弱, 此時,須要經過一個bytesRemaining的 Semaphore來決定是否容許把putlist中的event轉移到MemoryChannel的linkedBlockingDeque來, 若是容許 ,則操做 , 操做之後,putlist 和takelist 都被清理掉了。 bytesRemaining信號量(標示還有多少flume event body存儲空間)和queueStored(標示有多少個event能被消費)信號量都被釋放了 。
綜上所述, Flume 的memorychannel採用了兩個雙端隊列putlist和takelist ,分別表示source 的生產能力 和sink 的消費能力,source 和sink啓動一個事務 ,source 寫putlist ,提交事務之後 ,把putlist批量移動到另外一個deque .
而sink 則負責從MemoryChannel的Deque 取event, 寫入takelist(只作流控用) , 最後sink的事務提交之後,也把putlist 的event批量移動到deque 。 等於在一個事務裏面用putlist 作了寫入緩衝,用takelist作了流控, memorychannel中的 deque是多個事務共享的存儲。
至此, 咱們對memorychannel 的細節已經弄清楚了,回過頭來看以前出現的那個異常 , 就能知道爲何了?
首先, 咱們的transactionCapacity參數沒有配置,那默認就是100 ,也就是putlist和takelist 的長度只有100 ,即寫入緩衝容量只有100個event .而MemoryChannel的Deque咱們配置了10000 ,容許 flume event body的最大字節數咱們配置了
536870912 * (1 - 20 * 0.01) = 400M左右 , 問題並非出在了memorychannel的雙端隊列容量不夠用,而是下游的hbase sink由於有一個批量處理的默認值是100 ,而在這默認的100次處理中 ,每一次處理都涉及到了對象的avro反序列化 , 100次批量寫
入hbase 之後纔會清理MemoryTransaction的 putlist,而這個時候上游kafka source 再有數據寫入putlist 就出現了前文描述的那個異常。
解決辦法:從異常提示的幾個思路,咱們一一作個思考:
, consider committing more frequently, increasing capacity or increasing thread count
1)更頻繁的提交事務: 若是採用這種思路的話,好比只下降hbase sink 的批處理數量,而上游的kafka source的生產能力保持不變,能夠預見的是會形成MemoryChannel中Deque堆積的event數量會愈來愈多(由於更頻繁的把event 從 putlist轉移到了 Memory Deque) . 這種方法只是把問題從putlist 轉移到了另外一個Deque 。(要MemoryChannel的Deque更大了)。
2) 增長transactionCapacity: 即增長每個事務的寫緩衝能力(putlist長度增長) ,可是調節到多少呢?若是上游的壓力陡增 ,仍是會出現這個問題 。這種方法只能暫時緩解,不能完全解決問題。
3) 增長線程數量: 這裏我想flume做者的思路是把sink改成多線程增長消費能力來解決。 這個我認爲纔是解決問題的根本,增長下游的處理能力 。
那如何增長下游的處理能力呢,除了作flume自己的scaleout ,減小單臺flume的壓力外。 還有幾種方法供咱們思考:
A:把hbase sink擴展爲多線程 , 每個線程一個event隊列。ChannelProcessor在投遞的時候輪詢投遞到多個隊列 。------考慮用Akka ?
B: 使用Disruptor , ChannelProcessor做爲生產者,SinkProcessor做爲消費者 。
C: 直接換成 storm ,利用storm集羣的實時處理能力?
用了兩天的時間,採用了storm ,的確極大提升了吞吐能力(20多萬條消息大概在兩分多鐘處理完,QPS 達到了1600。 後面單獨再寫文章來講明storm目前在使用過程當中踩到的坑 。