本文系微博運維數據平臺(DIP)在Flume方面的優化擴展經驗總結,在使用Flume FileChannel的場景下將吞吐率由10M/s~20M/s提高至80M/s~90M/s,分爲四個部分進行介紹:
- 應用場景
- Flume實例架構
- Flume調試及優化擴展
- Flume Todo
- 生產環境部署
1. 應用場景
咱們的應用場景是一個典型的實時數據傳輸(接收)過程,架構圖以下:
包括三個組件:
(1)ServiceServer ScribeClient:業務產生的日誌以「Log」的形式寫入業務部署服務器的本地磁盤,而後經過ScribeClient傳輸至咱們的Flume集羣;
(2)Flume:使用多個Flume實例構建Flume集羣,經過動態域名、VIP對外提供服務;其中,每個Flume實例使用ScribeSource接收ServcieServer ScribeClient傳輸過來的日誌數據,而後使用FileChannel將ScribeSource接收過來的數據以「事務」的形式持久化至本地磁盤,最近經過KafkaSink將FileChannle中的數據輸出至Kafka集羣;
(3)Kakfa:Kafka集羣接收Flume集羣傳輸過來的日誌數據,用於後續的實時計算;
能夠看出,以上整個過程就是日誌實時寫入Kafka集羣的過程,有幾點須要特殊說明:
(1)既然是實時數據傳輸,爲何不直接經過Kafka Producer API(或基於此實現的開源組件)將日誌數據直接寫入Kafka集羣,而是使用Scribe間接傳輸數據?
假設咱們有一個Web服務,須要將Web的訪問日誌實時寫入Kafka集羣,這個能夠經過Log4j擴展實現(不肯定是否已有開源組件支持),這種方式數據實時性較強,可是Kafka集羣運行過程當中一旦出現異常(如:網絡流量波動)會直接影響該Web服務的運行狀態,進而影響線上業務,所以不能使用這種直接傳輸的方式;
Scribe能夠在數據接收服務(這裏特指Flume集羣,也能夠是Kafka)出現異常或不可用的狀況下,暫時將數據緩存至本地磁盤,待數據接收服務恢復以後,繼續數據傳輸;雖然數據傳輸的實時性有所損耗,但整個數據傳輸過程更加可靠,並且避免了數據傳輸對線上服務的影響,所以使用這種間接傳輸的方式。
(2)Flume爲何使用FileChannel,而不使用吞吐率更高的MemoryChannel?
MemoryChannel使用內存存儲事務,吞吐率極高,但基於內存的事務實現模式在Flume部署服務器宕機或Flume實例異常終止的狀況下,全部存儲在內存中的日誌數據將所有丟失;另外,內存空間受限於RAM和JVM的約束,數據傳輸量波動(如數據量猛增)的狀況下可能會引起異常;
FileChannel使用基於本地磁盤的事務實現模式,即便出現Flume部署服務器宕機或Flume實例異常終止的狀況,由於接收到的日誌數據都以事務的形式持久化至本地磁盤,能夠在Flume實例恢復正常以後繼續數據傳輸,不會有數據丟失的狀況;並且本地磁盤相對於內存而言,存儲空間比較富餘,數據可靠性較強,所以使用FileChannel。
2. Flume實例架構
在咱們的應用場景中,對於單獨一個Flume實例而言,架構以下:
宏觀上看,Flume實例內部僅有三個組件:ScribeSource、FileChannel、KafkaSink,實際上內部的結構仍是比較複雜的,以下圖所示:
這裏先介紹兩個比較重要的實例:
Receiver:Receiver是一個線程,對於Flume ScribeSource而言能夠設置多個Receiver線程(經過指定ScribeSource workerThreads數值實現),它不斷地將Flume ScribeSource接收到的數據以「事務」的形式寫入FileChannel;
PollingRunner:PollingRunner也是一個線程,它不斷地將FileChannel中的數據以「事務」的形式讀取出來並寫入Kafka;
對應的Flume配置文件:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = file
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDirs = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.kafka_sink.channel = file_channel
3. Flume調試及優化擴展
爲了方便Flume的調試,咱們本身開發模擬了一個Scrbie Client Simulator實例,能夠兼容Scribe通訊協議,以每秒大約90M/s的速率輸出數據至Flume(這裏特指單實例Flume),其中模擬的日誌數據來源於咱們平臺常見的業務數據,後續的討論均創建在這個Scribe Client Simulator實例的基礎上。
3.1 ScribeSource
ScribeSource中有一個很是重要的配置屬性「workerThreads」,如上所述,它的值被設定爲5,那麼這個值是如何得出的呢,它又會產生什麼樣的做用?
ScribeSource中的每個WorkerThread就是一個Receiver實例,即「workerThreads」的值決定着ScribeSource中有幾個Receiver實例,有多少個Receiver實例直接影響着ScribeSource接收數據的速率,調試過程以下:
(1)爲了不Channel自身的性能瓶頸對ScribeSource的影響,咱們這裏使用吞吐率極高的MemoryChannel;
(2)爲了不Sink自身的性能瓶頸對ScribeSource、MemoryChannel的影響,咱們這裏使用NullSink,它會將消費到的數據直接丟棄;
通過上述兩步,咱們能夠認爲Flume ScribeSource的調試過程當中徹底能夠忽略MemoryChannel、NullSink的影響。
(3)啓動Scrbie Client Simulator實例,使它不斷地往咱們的Flume實例寫入數據,觀察Flume實例部署機器的網絡寫入流量狀況,進而調整「workerThreads」值的大小(建議數值從1開始,逐漸增大),使該機器的網絡寫入流量達到業務需求;
根據咱們場景的具體狀況,通過上述三步的測試,最終將ScribeSource workerThreads的值選定爲5,吞吐率大體爲80~90M/s,這是咱們認爲的一個理想峯值。
Flume配置以下:
myagent.sources = scribe_source
myagent.channels = memory_channel
myagent.sinks = null_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define memory channel
myagent.channels.memory_channel.type = memory
myagent.channels.memory_channel.capacity = 10000
myagent.channels.memory_channel.transactionCapacity = 10000
myagent.channels.memory_channel.byteCapacityBufferPercentage = 20
myagent.channels.memory_channel.byteCapacity = 800000
# define null sink
myagent.sinks.null_sink.type = null
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = memory_channel
myagent.sinks.null_sink.channel = memory_channel
3.2 FileChannel
3.2.1 默認FileChannel
通過3.1的測試以後,咱們能夠認爲Flume ScribeSource不存在接收數據的性能瓶頸,接下來開始調試FileChannel,關於使用FileChannel的緣由能夠參考1.(2)。
在3.1Flume配置的基礎之上,修改成FileChannel,其他配置保持不變,以下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = null_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = file
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDirs = /data0/flume/data
# define null sink
myagent.sinks.null_sink.type = null
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = memory_channel
myagent.sinks.null_sink.channel = memory_channel
再次重複3.1的調試過程,使用Scrbie Client Simulator實例進行數據寫入測試時, 咱們發現Flume實例部署機器的網絡寫入流量降低不少,大約只有10M/s~20M/s。能夠看出,在吞吐率方面,FileChannel與MemoryChannel之間有很大的差距。咱們來分析一下具體的緣由。
根據2.中的Flume實例架構圖,咱們能夠大體得出ScribeSource中的某一個Receiver與FileChannel的交互流程,以下圖:
Receiver的工做實際是一個將數據循環寫入FileChannel的過程,每一次的循環能夠理解爲一個指處理(批量寫入),每一次的批處理都須要通過如下幾個步驟:
(1)獲取FileChannel的事務——getTransaction;
(2)打開事務——begin;
(3)批量寫入數據——put;
(4)提交或回滾事務——commit or rollback;
(5)關閉事務——close;
通過對Flume FileChannel相關源碼的分析,致使FileChannel吞吐率降低的主要緣由集中於事務的提交過程——commit,有如下兩點:
(1)鎖競爭,從上圖中能夠看出,Receiver的每一次批量寫入過程當中都會涉及到事務提交(不考慮異常回滾的狀況),事務提交的內部過程涉及到讀鎖或寫鎖的「加鎖」操做,多個Receiver(WorkerThread線程)共存的狀況下鎖競爭的狀況就會比較嚴重;
(2)Writer sync,FileChanel是基於本地磁盤實現的事務模式,每一次事務的提交都會伴隨着一次「sync」,衆所周知,「sync」是一種系統性能開銷比較大的操做;
綜合上述兩點,咱們能夠得出,多個Receiver的存在致使FileChannel存在資源競爭的問題(多個Receiver之間沒法安全的共享一個FileChannel的事務),由於須要加鎖,必然帶來相互之間鎖的競爭;某一個Receiver得到鎖以後,又須要進行系統性能開銷比較大的「sync」操做,且耗時相對較長,這就意味着該Receiver從獲取鎖到釋放鎖的過程會花費比較長的時間,在這段時間內該Receiver獨佔FileChannel,其它Receiver只能處於阻塞狀態,直至能夠獲取到鎖;基於上述兩個緣由,致使FileChannel在多Receiver的環境下吞吐率嚴重降低。
3.2.2 擴展FileChannel
FileChannel的實現過程是比較複雜的,直接優化FileChannel的代碼不太現實,那麼是否能夠經過多個FileChannel的方式來解決吞吐率嚴重降低的問題呢?若是FileChannel的數目大於或等於ScribeSource Receiver的數目,ScribeSource Receiver使用「哈希」(Hash)的方式來選取FileChannel,就能夠避免ScribeSource Receiver之間相互競爭FileChannel資源,以下圖所示:
雖然對於某一個FileChannel來講,與它交互的Receiver依然要通過獲取鎖——sync——釋放鎖的過程,但多個Receiver之間是並行的,整體上吞吐率獲得提高。
那麼如何實現這個方案呢?這裏咱們須要用到Flume提供的「Custom Channel Selector」機制,即實現咱們本身的「Channel Selector」,代碼以下:
這裏有兩個關鍵點:
(1)隨機code的生成,目前代碼實現提供兩種選擇:event.getBody().hashCode()或者System.currentTimeMillis();
(2)根據隨機code的值對FileChannel的數目取餘(哈希),從而選取出一個FileChannel並返回;
那麼如何使用上述方案及本身的擴展呢?Flume配置文件以下:
myagent.sources = scribe_source
myagent.channels = file_channel file_channel2 file_channel3
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
myagent.sources.scribe_source.selector.type = com.weibo.dip.flume.extension.channel.selector.HashChannelSelector
# define file channel
myagent.channels.file_channel.type = file
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDirs = /data0/flume/data
# define file channel2
myagent.channels.file_channel2.type = file
myagent.channels.file_channel2.checkpointDir = /data0/flume/checkpoint2
myagent.channels.file_channel2.dataDirs = /data0/flume/data2
# define file channel3
myagent.channels.file_channel3.type = file
myagent.channels.file_channel3.checkpointDir = /data0/flume/checkpoint3
myagent.channels.file_channel3.dataDirs = /data0/flume/data3
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel file_channel2 file_channel3
myagent.sinks.kafka_sink.channel = file_channel
配置中須要顯示指定使用咱們本身擴展的「Channel Selector」:myagent.sources.scribe_source.selector.type = com.weibo.dip.flume.extension.channel.selector.HashChannelSelector;而後指定三個FileChannel,並分別關聯至ScribeSource和NullSink。
通過咱們的測試大概須要10-12個FileChannel(注意:這裏的數值考慮了後續的KafkaSink,詳情見後),吞吐率便可達到80~90M/s。
FileChannel的吞吐率雖然獲得提高,可是這麼多的FileChannel使用上述逐個配置FileChannel的方式是極其不方便維護的,應該只使用一個「FileChannel」,以下圖:
咱們應該利用Flume提供的「Custom Channel」機制,本身擴展一個「FileChannel」,取名爲MultithreadingFileChannel,使其內部包含多個FileChannel,從而達到簡化配置的目的,核心源碼以下:
MultithreadingFileChannel再也不須要「Channel Selector」的參與,自身內部封裝了FileChannel之間的「哈希」處理邏輯,具體體如今建立事務(createTransaction)的過程當中。
使用MultithreadingFileChannel的Flume配置以下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = null_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define null sink
myagent.sinks.null_sink.type = null
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.null_sink.channel = file_channel
3.2.3 KafkaSink
通過3.一、3.2的調試過程以後,咱們能夠認爲Flume ScribeSource、MultithreadingFileChannel不存在性能瓶頸,接下來開始調試KafkaSink。
咱們將3.2.2中的NullSink替換爲KafkaSink,以下圖:
Flume配置文件以下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.null_sink.channel = file_channel
啓動調試過程後咱們發現,Flume實例部署機器的網絡寫入流量大約爲90M/s左右,而Kakfa實例(單機版)的網絡寫入流量(即Flume實例部署機器的網絡寫出流量)僅爲60M左右。熟悉Kafka的同窗可能知道,這並非由於Kafka單機版實例致使的,究其緣由,主要有如下幾點:
(1)KafkaSink是一個單實例,它從MultithreadingFileChannel中讀取數據時也須要事務的參與(實際上它是與MultithreadingFileChannel中的某一個FileChannel創建事務);雖然ScribeSource與MultithreadingFileChannel FileChannels之間、MultithreadingFileChannel FileChannels與KafkaSink之間使用了「Channels Hash」機制,但不能徹底排除「碰撞」發生的可能性;一旦「碰撞」發生,則表示「碰撞」發生期間,KafkaSink從MultithreadingFileChannel中讀取不到任何數據;這也是爲何MultithreadingFileChannel中的FileChannels數目須要明顯大於ScribeSource Receiver數目的緣由;
(2)KafkaSink Producer也是一個單實例,也就是說只有一個Producer在寫出數據,對吞吐率也會帶來必定的影響;
參考3.2.2的方案,咱們嘗試使用多個KafkaSink實例來解決這個問題,以下圖:
Flume配置文件以下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink kafka_sink2 kafka_sink3
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# define kafka sink2
myagent.sinks.kafka_sink2.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink2.topic = mytopic
myagent.sinks.kafka_sink2.brokerList = kafkahost:9092
myagent.sinks.kafka_sink2.requiredAcks = 1
myagent.sinks.kafka_sink2.batchSize = 1000
# define kafka sink3
myagent.sinks.kafka_sink3.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink3.topic = mytopic
myagent.sinks.kafka_sink3.brokerList = kafkahost:9092
myagent.sinks.kafka_sink3.requiredAcks = 1
myagent.sinks.kafka_sink3.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.kafka_sink.channel = file_channel
myagent.sinks.kafka_sink2.channel = file_channel
myagent.sinks.kafka_sink3.channel = file_channel
通過咱們的測試大概須要8-10個KafkaSink,吞吐率便可達到80~90M/s,這麼多的KafkaSink使用上述逐個配置KafkaSink的方式是極其不方便維護的,應該只使用一個「KafkaSink」,以下圖:
MultithreadingKafkaSink與MultithreadingFileChannel不一樣,內部並不會包含多個KafkaSink,而是包含多個ChannelConsumer;每個ChannelConsumer都從MultithreadingFileChannel讀取數據並經過自身內部的Kafka Producer實例(也就是說,每個ChannelConsumer實例都包含一個Kafka Producer實例)將數據寫入Kakfa。
Flume配置文件以下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = com.weibo.dip.flume.extension.sink.MultithreadingKafkaSink
myagent.sinks.kafka_sink.topicHeaderName = category
myagent.sinks.kafka_sink.consumers = 10
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.null_sink.channel = file_channel
綜上所述,經過咱們的優化擴展和相應的參數調優,咱們將ScribeClient、Flume(使用FileChannel)、Kafka之間的數據傳輸速率提高至80~90M/s。
4. Flume Todo
雖然ScribeClient、Flume、Kafka之間的數據傳輸速率通過咱們的擴展優化以後達到咱們的預設值,但擴展過程當中引入的ScribeSource.Receivers、MultithreadingFileChannel、MultithreadingKafkaSink是否也會對Flume實例或Flume實例部署服務器帶來一些問題,這裏僅僅闡述一些可能出現的問題。
(1)ScribeSource.Receivers多個線程實例、MultithreadingKafkaSink.ChannelConsumers多個線程實例是否會致使Flume實例或Flume實例部署服務器CPU使用率或負載太高?
(2)MultithreadingFileChannel多個FileChannel的使用,是否會致使Flume實例部署服務器帶來過多的磁盤開銷?
5. 生產環境部署
(1)假設Flume集羣的域名爲flume.dip.weibo.com,端口爲1466,ScribeClient經過該域名和端口發送數據;
(2)flume.dip.weibo.com指向若干個動態域名,這些動態域名依據不一樣的機房進行劃分,如flume.cluster.dip.weibo.com、flume.cluster2.dip.weibo.com、flume.cluster3.dip.weibo.com;動態域名在這裏的做用:不一樣的機房的ScribeClient在向flume.dip.weibo.com寫入數據時,網絡層面會自動根據ScribeClient所在的機房將數據導入至該機房對應的Flume動態域名,即:機房內數據傳輸;
(3)每個動態域名被映射至一個VIP;
(4)每個VIP被映射至多個Flume實例;(3)和(4)的做用體如今Flume故障轉換和負載均衡。
備註:調試過程當中咱們發現,數據吞吐率達到80~90M/s時,JVM大體須要15G MEM。