flume源碼編譯/攔截器分析(一)

flume介紹


因爲是第一次進行源碼編譯與開發,步驟有點複雜,後續再進行簡化html

Flume是Cloudera提供的一個高可用、高可靠、分佈式的海量日誌採集、聚合和傳輸的系統。Flume支持在日誌系統中定製各種數據發送方用於收集數據,同時Flume提供對數據的簡單處理,並將數據處理結果寫入各類數據接收方的能力。java


flume源碼編譯

  1. 編譯所需環境:git

    maven 3.xapache

    java sdk 1.6 以上api

    git服務器

  2. 下載源碼,本文選用的是apache-flume-1.6.0-src.tar.gzmaven

解壓至工做路徑中,在git中執行:分佈式

mvn clean
mvn package -DskipTests

結果以下圖所示即編譯成功。 圖1工具

而後在以下圖的所示的flume-ng-dist文件夾下的target下出現apche-flume-1.6.0-bin.tar.gz與apche-flume-1.6.0-src.tar.gz,這樣就能夠任意的修改/開發各類本身須要的功能了。oop

輸入圖片說明

  • 注意,在下載的源碼中有hadoop與hbase版本須要修改,不一樣版本修改不一樣,上網搜一下看看。

flume攔截器

Flume中的攔截器(interceptor),用戶Source讀取events發送到Sink的時候,在events header中加入一些有用的信息,或者對events的內容進行過濾,完成初步的數據清洗。這在實際業務場景中很是有用,Flume-ng 1.6中目前提供瞭如下攔截器:

在flume源碼編譯成功的前提下,就能夠對源碼作爲所欲爲的修改了,在本文主要對攔截器進行開發, 在org.apache.flume.interceptor下進行開發的:

具體開發原理等見博文flume攔截器分析

kafka向flume發送數據

參考:http://blog.csdn.net/high2011/article/details/53282128

  1. 首先寫生產者
  2. 搭建好flume環境並配置好配置文件 flume配置文件
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = ip
:2181,ip2:2181,ip3:2181
tier1.sources.source1.topic = kafka_topicconfig_test_1
tier1.sources.source1.groupId = kafka_topicconfig_default_group
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 1000

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://ip:9000/flume/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
  1. 搭建好hadoop環境 具體見hadoop基礎環境搭建,在此教程中hadoop爲僞分佈式,分佈式搭建見博客hadoop分佈式搭建
  • 注意(此處有待研究):

    1. 爲了得到更高的吞吐量,請配置多個Kafka源以從同一主題讀取。
    2. 若是配置具備相同groupID的全部源,而且主題包含多個分區,則每一個源從不一樣的分區集中讀取數據,從而提升吞吐率。
  • . 調整註釋: Kafka源覆蓋兩個Kafka使用者參數:

    1. auto.commit.enable由源設置爲false,而且每一個批處理都提交。 爲了提升性能,請使用kafka.auto.commit.enable設置將其設置爲true。 若是源在提交前失敗,則可能致使數據丟失。
    2. consumer.timeout.ms設置爲10,所以當Flume輪詢Kafka的新數據時,它等待不超過10毫秒的數據可用。 將此設置爲更高的值能夠下降CPU利用率,由於輪詢頻率較低,但在向通道寫入批處理時引入了延遲。

flume->kafka

暫尚未實驗,先將連接記下:

1.http://blog.csdn.net/high2011/article/details/53282128 2.http://blog.csdn.net/huguoping830623/article/details/48138319

kafka批量發送數據

腳本形式

#pass

api形式

見博文向kafka批量發送已存在的txt文件

壓測報告

測試目的:測試flume的過濾器功能是否能夠移植到自研軟件上使用

測試過程:

準備步驟:

  • 根據業務需求,在flume中開發過濾器KafkaInterceptor.java;
  • 從生產環境下取出大小爲4k的2290135條數據;

測試場景:

  1. 在不利用過濾器的條件下,利用kafka將數據發送給flume,而後存到hdfs中;
  2. 在利用過濾器的前提下,利用kafka將數據發送給flume,而後存到hdfs中;

測試結果(測試工具爲nomn及nomn analyser):

在tps及硬件環境一致的前提下,場景一此時服務器cpu的平均使用率爲:3.18%;而此時服務器cpu的平均使用率爲:16.34%,.

測試結論:

在測試環境下,場景2的cpu使用率是場景1的5倍多,全部在生產環境下是否使用或增長該功能,還須要繼續討論。


若是有flume問題,請加QQ羣:140467035 ,共同窗習進步

相關文章
相關標籤/搜索