說明:後端
該結果是親自測試,只提供簡單的數據分析,很簡陋,結果可能不許確。負載均衡
先說一下結果,多sink能夠直接按常規配置,這樣的話每一個sink會啓動一個sinkrunner,至關於每一個線程一個sink,互不干擾,負載均衡是經過channel實現的,效率會提升爲n倍,若是在此基礎上加入dom
sinkgroup,則sinkgroup會啓動一個sinkrunner,就是單線程,sinkgroup從channel中讀取數據,而後分發到下面掛載的sink中,效率和單sink同樣,沒有提升,可是能夠實現兩個sink的負載均衡或者熱備模式。jvm
上面的分析即參考了源碼也參考了網上的文章,不保證絕對正確,見諒。ide
我的認爲實際使用中仍是直接配置多sink,能夠提升效率,達到負載均衡,至於熱備,能夠經過其餘負載均衡軟件或者硬件提供虛IP實現。oop
貼一下測試的配置性能
配置是同樣的,用的時候打開或者關閉sinkgroup的註釋便可。測試
這是採集結點的配置
ui
#flume配置文件編碼
agent1.sources=execSource
agent1.sinks= avrosink1 avrosink2
agent1.channels=filechannel
#sink groups 很是影響性能
#agent1.sinkgroups=avroGroup
#agent1.sinkgroups.avroGroup.sinks = avrosink1 avrosink2
#sink調度模式 load_balance failover
#agent1.sinkgroups.avroGroup.processor.type=load_balance
#負載均衡模式 輪詢 random round_robin
#agent1.sinkgroups.avroGroup.processor.selector=round_robin
#失效降級
#agent1.sinkgroups.avroGroup.processor.backoff=true
#降級時間30秒
#agent1.sinkgroups.avroGroup.processor.maxTimeOut=30000
#配置execSource
#channel
agent1.sources.execSource.channels=filechannel
#souorce 類型
agent1.sources.execSource.type=exec
#監控正在寫入的日誌文件
agent1.sources.execSource.command=tail -F /home/flume/log/test.log
#若是命令死掉是否從新啓動
agent1.sources.execSource.restart=true
#從新啓動命令的間隔時間
agent1.sources.execSource.restartThrottle=2000
#記錄命令的錯誤日誌
agent1.sources.execSource.logStdErr=true
#批量提交的大小
agent1.sources.execSource.batchSize=1000
#批量提交的超時 單位毫秒
agent1.sources.execSource.batchTimeout=1000
#配置filechannel
#channel類型 file memory
agent1.channels.filechannel.type=memory
#agent1.channels.filechannel.checkpointDir=/home/flume/channel/log/ckpdir
#agent1.channels.filechannel.dataDirs=/home/flume/channel/log/data
#單個文件大小 100M
#agent1.channels.filechannel.maxFileSize=204800000
#channel的event個數
agent1.channels.filechannel.capacity=20000000
#事務event個數
agent1.channels.filechannel.transactionCapacity=10000
#內存channel佔用內存大小 默認是jvm內存的0.8
agent1.channels.filechannel.byteCapacity=1024000000
#配置avrosink1
#sink的channel
agent1.sinks.avrosink1.channel=filechannel
#sink類型 avro thrift
agent1.sinks.avrosink1.type=avro
#ip地址
agent1.sinks.avrosink1.hostname=10.8.6.161
#端口
agent1.sinks.avrosink1.port=1463
#批量提交的個數
agent1.sinks.avrosink1.batch-size=1000
#鏈接超時 毫秒
agent1.sinks.avrosink1.connect-timeout=3000
#請求超時 毫秒
agent1.sinks.avrosink1.request-timeout=20000
#從新鏈接source的時間 單位秒 用於後端負載均衡的輪詢
agent1.sinks.avrosink1.reset-connection-interval=300
#最大鏈接數 默認5
agent1.sinks.avrosink1.maxConnections=5
#配置avrosink2
#sink的channel
agent1.sinks.avrosink2.channel=filechannel
#sink類型 avro thrift
agent1.sinks.avrosink2.type=avro
#ip地址
agent1.sinks.avrosink2.hostname=10.8.6.160
#端口
agent1.sinks.avrosink2.port=1463
#批量提交的個數
agent1.sinks.avrosink2.batch-size=1000
#鏈接超時 毫秒
agent1.sinks.avrosink2.connect-timeout=3000
#請求超時 毫秒
agent1.sinks.avrosink2.request-timeout=20000
#從新鏈接source的時間 單位秒 用於後端負載均衡的輪詢
agent1.sinks.avrosink2.reset-connection-interval=300
#最大鏈接數 默認5
agent1.sinks.avrosink2.maxConnections=5
這是匯聚結點的配置
#flume配置文件
agent1.sources=avrosource
agent1.sinks=hdfssink1 hdfssink2
agent1.channels=filechannel
#sink groups 能夠用空格分開配置多個 很是影響性能關閉
#agent1.sinkgroups=hdfsGroup
#agent1.sinkgroups.hdfsGroup.sinks = hdfssink1 hdfssink2
#sink調度模式 load_balance failover
#agent1.sinkgroups.hdfsGroup.processor.type=load_balance
#負載均衡模式 輪詢 random round_robin
#agent1.sinkgroups.hdfsGroup.processor.selector=round_robin
#失效降級
#agent1.sinkgroups.hdfsGroup.processor.backoff=true
#降級時間30秒
#agent1.sinkgroups.hdfsGroup.processor.maxTimeOut=30000
#配置avrosource
#channel
agent1.sources.avrosource.channels=filechannel
#source 類型 thrift avro
agent1.sources.avrosource.type=avro
#監控正在寫入的日誌文件
agent1.sources.avrosource.bind=0.0.0.0
#端口
agent1.sources.avrosource.port=1463
#線程數
agent1.sources.avrosource.threads=24
#增長攔截器 能夠用空格分開配置多個
agent1.sources.avrosource.interceptors=i1
#攔截器類型 必須配置Builder 由Builder來建立Interceptor
agent1.sources.avrosource.interceptors.i1.type=com.cfto.flume.interceptor.TimeStampInterceptor$Builder
#配置filechannel
#channel類型 file memory
agent1.channels.filechannel.type=memory
agent1.channels.filechannel.checkpointDir=/tmp/flume1/channel/log/ckpdir
agent1.channels.filechannel.dataDirs=/tmp/flume1/channel/log/data
#單個文件大小 100M
#agent1.channels.filechannel.maxFileSize=204800000
#channel的event個數
agent1.channels.filechannel.capacity=200000000
#事務event個數
agent1.channels.filechannel.transactionCapacity=10000
#內存channel佔用內存大小 默認是jvm內存的0.8
agent1.channels.filechannel.byteCapacity=1024000000
#配置hdfssink1
#鏈接的channel
agent1.sinks.hdfssink1.channel=filechannel
#sink的類型
agent1.sinks.hdfssink1.type=hdfs
#寫入hdfs的路徑 %{}是從header裏取屬性 %是本身解析屬性 %Y/%m/%d
#最後不要有/
agent1.sinks.hdfssink1.hdfs.path = hdfs://nameservice1/flumelog/%{dateDir}
#文件名前綴
agent1.sinks.hdfssink1.hdfs.filePrefix=hostxx_1
#是不是用本地時間戳 header裏沒有timestamp屬性且須要獲取時間是必須設置爲true
agent1.sinks.hdfssink1.hdfs.useLocalTimeStamp = true
#文件類型 SequenceFile(默認) DataStream(不壓縮) CompressedStream(壓縮)
agent1.sinks.hdfssink1.hdfs.fileType=CompressedStream
#壓縮編碼
agent1.sinks.hdfssink1.hdfs.codeC=lzop
#文件寫入格式 Text Writable
agent1.sinks.hdfssink1.hdfs.writeFormat=Text
#按時間滾動文件 單位秒 默認30秒 0不滾動
agent1.sinks.hdfssink1.hdfs.rollInterval=0
#按文件大小滾動文件 單位字節 1G
agent1.sinks.hdfssink1.hdfs.rollSize=1024000000
#按event是個數滾動文件 默認10 0不滾動
agent1.sinks.hdfssink1.hdfs.rollCount=0
##批量提交大小
agent1.sinks.hdfssink1.hdfs.batchSize=1000
#HDFS IO操做的線程池大小
agent1.sinks.hdfssink1.hdfs.threadsPoolSize=10
#hdfs文件訪問超時時間 默認 100000 單位毫秒
agent1.sinks.hdfssink1.hdfs.callTimeout=30000
#文件關閉前空閒時間 默認0 不關閉 單位秒
agent1.sinks.hdfssink1.hdfs.idleTimeout=300
#寫入hdfs文件的用戶
agent1.sinks.hdfssink1.hdfs.proxyUser=hadoop
#hdfs文件操做失敗後的重試時間 單位秒 默認180
agent1.sinks.hdfssink1.hdfs.retryInterval = 3
#配置hdfssink2
#鏈接的channel
agent1.sinks.hdfssink2.channel=filechannel
#sink的類型
agent1.sinks.hdfssink2.type=hdfs
#寫入hdfs的路徑 %{}是從header裏取屬性 %是本身解析屬性 %Y/%m/%d
#最後不要有/
agent1.sinks.hdfssink2.hdfs.path = hdfs://nameservice1/flumelog/%{dateDir}
#文件名前綴
agent1.sinks.hdfssink2.hdfs.filePrefix=hostxx_2
#是不是用本地時間戳 header裏沒有timestamp屬性且須要獲取時間是必須設置爲true
agent1.sinks.hdfssink2.hdfs.useLocalTimeStamp = true
#文件類型 SequenceFile(默認) DataStream(不壓縮) CompressedStream(壓縮)
agent1.sinks.hdfssink2.hdfs.fileType=CompressedStream
#壓縮編碼
agent1.sinks.hdfssink2.hdfs.codeC=lzop
#文件寫入格式 Text Writable
agent1.sinks.hdfssink2.hdfs.writeFormat=Text
#按時間滾動文件 單位秒 默認30秒 0不滾動
agent1.sinks.hdfssink2.hdfs.rollInterval=0
#按文件大小滾動文件 單位字節 1G
agent1.sinks.hdfssink2.hdfs.rollSize=1024000000
#按event是個數滾動文件 默認10 0不滾動
agent1.sinks.hdfssink2.hdfs.rollCount=0
##批量提交大小
agent1.sinks.hdfssink2.hdfs.batchSize=1000
#HDFS IO操做的線程池大小
agent1.sinks.hdfssink2.hdfs.threadsPoolSize=10
#hdfs文件訪問超時時間 默認 100000 單位毫秒
agent1.sinks.hdfssink2.hdfs.callTimeout=30000
#文件關閉前空閒時間 默認0 不關閉 單位秒
agent1.sinks.hdfssink2.hdfs.idleTimeout=300
#寫入hdfs文件的用戶
agent1.sinks.hdfssink2.hdfs.proxyUser=hadoop
#hdfs文件操做失敗後的重試時間 單位秒 默認180
agent1.sinks.hdfssink2.hdfs.retryInterval = 3