因爲是第一次進行源碼編譯與開發,步驟有點複雜,後續再進行簡化html
Flume是Cloudera提供的一個高可用、高可靠、分佈式的海量日誌採集、聚合和傳輸的系統。Flume支持在日誌系統中定製各種數據發送方用於收集數據,同時Flume提供對數據的簡單處理,並將數據處理結果寫入各類數據接收方的能力。java
編譯所需環境:git
maven 3.xapache
java sdk 1.6 以上api
git服務器
下載源碼,本文選用的是apache-flume-1.6.0-src.tar.gzmaven
解壓至工做路徑中,在git中執行:分佈式
mvn clean mvn package -DskipTests
結果以下圖所示即編譯成功。 工具
而後在以下圖的所示的flume-ng-dist文件夾下的target下出現apche-flume-1.6.0-bin.tar.gz與apche-flume-1.6.0-src.tar.gz,這樣就能夠任意的修改/開發各類本身須要的功能了。oop
Flume中的攔截器(interceptor),用戶Source讀取events發送到Sink的時候,在events header中加入一些有用的信息,或者對events的內容進行過濾,完成初步的數據清洗。這在實際業務場景中很是有用,Flume-ng 1.6中目前提供瞭如下攔截器:
在flume源碼編譯成功的前提下,就能夠對源碼作爲所欲爲的修改了,在本文主要對攔截器進行開發, 在org.apache.flume.interceptor下進行開發的:
具體開發原理等見博文flume攔截器分析
參考:http://blog.csdn.net/high2011/article/details/53282128
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.zookeeperConnect = ip :2181,ip2:2181,ip3:2181 tier1.sources.source1.topic = kafka_topicconfig_test_1 tier1.sources.source1.groupId = kafka_topicconfig_default_group tier1.sources.source1.channels = channel1 tier1.sources.source1.interceptors = i1 tier1.sources.source1.interceptors.i1.type = timestamp tier1.sources.source1.kafka.consumer.timeout.ms = 1000 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = hdfs://ip:9000/flume/%{topic}/%y-%m-%d tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1
注意(此處有待研究):
. 調整註釋: Kafka源覆蓋兩個Kafka使用者參數:
暫尚未實驗,先將連接記下:
1.http://blog.csdn.net/high2011/article/details/53282128 2.http://blog.csdn.net/huguoping830623/article/details/48138319
#pass
準備步驟:
測試場景:
在tps及硬件環境一致的前提下,場景一此時服務器cpu的平均使用率爲:3.18%;而此時服務器cpu的平均使用率爲:16.34%,.
在測試環境下,場景2的cpu使用率是場景1的5倍多,全部在生產環境下是否使用或增長該功能,還須要繼續討論。