一款分佈式的海量應用日誌採集、聚合、傳輸的框架,支持配置多種數據發送方與接收方,具備高可用、高可靠的特性。好比能夠同時從文件目錄、log4j、http、avro、kafka等渠道收集日誌,經過傳輸輸出到kafka、hdfs、MySQL、文件等。在傳輸過程當中能夠對數據作簡單處理。html
單個agent正則表達式
flume的核心是agent,而agent包含source、channel、sink三個組件。apache
source:source組件是專門用來收集數據的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。數組
channel:source組件把數據收集來之後,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對採集到的數據進行簡單的緩存,能夠存放在memory、jdbc、file等等。緩存
sink:sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定義架構
工做流程:flume把數據從數據源(source)收集過來,再將數據送到指定的目的地(sink)。爲保證傳輸的過程必定成功,在送到目的地(sink)以前,會先緩存數據(channel),待數據真正到達目的地(sink)後,flume再刪除本身緩存的數據。 在整個數據的傳輸的過程當中,流動的是event(基本單位),所以事務保證是在event級別進行的。app
event:event將傳輸的數據進行封裝,是flume傳輸數據的基本單位,若是是文本文件,一般是一行記錄。event也是事務的基本單位。event在單個agent中經歷source—channel—sink過程,後面可能輸出到下一個agent或者flume外的系統中。event自己爲一個字節數組,其攜帶headers(頭信息)信息,消息體,消息內容負載均衡
從上面圖中能夠看出flume支持多級的成網狀數據流動,很是的靈活好用,這應該就是flume普遍使用緣由吧。好比數據扇入到同一個agent或者扇出到多個agent。框架
1.安裝jdk,1.6版本以上curl
2.上傳flume的安裝包
3.解壓安裝
4.在conf目錄下,建立一個配置文件,好比:template.conf(名字能夠不固定,後綴也能夠不固定)
建議多看官方文檔,並且隨着flume版本變化,下面的配置可能針對你的版本會不能生效,甚至拋異常
source來源爲一個目錄,flume會檢測目錄下新增的文件,將文件的內容經過logger打印到控制檯上,channel爲內存
vi flume-dir.conf
#先配置單通道,定義source,channel,sink,它們分別均可以配置多份,好比n個channel和n個sink #a1 是該agent的名字,在啓動的時候須要指定agent的名字 a1.sources=r1 a1.channels=c1 a1.sinks=s1 ##############配置source################### #source的類型 a1.sources.r1.type=spooldir #spooldir類型的source監控的目錄 a1.sources.r1.spoolDir=/home/hadoop/data/flume #0.0.0.0表示本機 a1.sources.r1.bind=0.0.0.0 #使用的端口 a1.sources.r1.port=44445 #指定sink類型 a1.sinks.s1.type=logger #指定channel類型 a1.channels.c1.type=memory #buffer能夠保存多少個event a1.channels.c1.capacity=1000 #事務一次能夠處理多少個event a1.channels.c1.transactionCapacity=100 #三個核心組件的綁定 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
啓動:./flume-ng agent -n a1 -c ../conf -f ../conf/flume-dir.conf -Dflume.root.logger=INFO,console
在/home/hadoop/data/flume目錄下新增文件
echo Hello Messi > 11.txt
echo Hello Havi > 22.txt
觀察結果
#先配置單通道 a1.sources=r1 a1.channels=c1 a1.sinks=s1 ##############配置source################### a1.sources.r1.type=http #0.0.0.0表示本機 a1.sources.r1.port=44445 a1.sinks.s1.type=logger a1.channels.c1.type=memory #buffer能夠保存多少個event a1.channels.c1.capacity=1000 #事務一次能夠處理多少個event a1.channels.c1.transactionCapacity=100 #三個核心組件的綁定 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
啓動:./flume-ng agent -n a1 -c ../conf -f ../conf/flume-http.conf -Dflume.root.logger=INFO,console
測試方法與上面相似,須要構造http請求
curl -X POST -d '[{"headers":{"a":"a1","b":"b1"},"body":"hello Messi"}]' http://0.0.0.0:44445
avro source通常用在扇入場景,可使用./flume-ng avro-client -H 0.0.0.0 -p 44445 -F ./1.txt -c ../conf 客戶端測試
avro是一種序列化和rpc框架
#先配置單通道 a1.sources=r1 a1.channels=c1 a1.sinks=s1 ##############配置source################### a1.sources.r1.type=avro #0.0.0.0表示本機 a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=44445 a1.sinks.s1.type=logger a1.channels.c1.type=memory #buffer能夠保存多少個event a1.channels.c1.capacity=1000 #事務一次能夠處理多少個event a1.channels.c1.transactionCapacity=100 #三個核心組件的綁定 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
啓動:./flume-ng agent -n a1 -c ../conf -f ../conf/flume-avro.conf -Dflume.root.logger=INFO,console
準備測試文件:echo 111111 > 1.txt
測試:./flume-ng avro-client -H 0.0.0.0 -p 44445 -F ./1.txt -c ../conf
#先配置單通道 a1.sources=r1 a1.channels=c1 a1.sinks=s1 ##############配置source################### a1.sources.r1.type=netcat #0.0.0.0表示本機 a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=44444 a1.sinks.s1.type=avro a1.sinks.s1.hostname=192.168.245.142 a1.sinks.s1.port=44445 a1.channels.c1.type=memory #buffer能夠保存多少個event a1.channels.c1.capacity=1000 #事務一次能夠處理多少個event a1.channels.c1.transactionCapacity=100 #三個核心組件的綁定 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
#先配置單通道 a1.sources=r1 a1.channels=c1 c2 a1.sinks=s1 s2 ##############配置source################### a1.sources.r1.type=netcat #0.0.0.0表示本機 a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=44444 a1.sinks.s1.type=avro a1.sinks.s1.hostname=192.168.245.142 a1.sinks.s1.port=44445 a1.sinks.s2.type=avro a1.sinks.s2.hostname=192.168.245.142 a1.sinks.s2.port=44446 a1.channels.c1.type=memory #buffer能夠保存多少個event a1.channels.c1.capacity=1000 #事務一次能夠處理多少個event a1.channels.c1.transactionCapacity=100 a1.channels.c2.type=memory #buffer能夠保存多少個event a1.channels.c2.capacity=1000 #事務一次能夠處理多少個event a1.channels.c2.transactionCapacity=100 #三個核心組件的綁定 a1.sources.r1.channels=c1 c2 a1.sinks.s1.channel=c1 a1.sinks.s2.channel=c2
#先配置單通道 a1.sources=r1 a1.channels=c1 a1.sinks=s1 ##############配置source################### a1.sources.r1.type=netcat #0.0.0.0表示本機 a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=44444 a1.sinks.s1.type=hdfs a1.sinks.s1.hdfs.path=hdfs://192.168.245.150:9000/flume a1.sinks.s1.hdfs.fileType=DataStream #在實際生產中要很是注意這三個值的設定,必定要避免生成的文件不要過小,不然hadoop的性能發揮不出來 #單位是秒,若是設置爲0,表示該配置不生效 a1.sinks.s1.rollInterval=60 #單位是字節,若是設置爲0,表示該配置不生效 a1.sinks.s1.rollSize=1024 #記錄行數,若是設置爲0,表示該配置不生效 a1.sinks.s1.rollCount=0 a1.channels.c1.type=memory #buffer能夠保存多少個event a1.channels.c1.capacity=1000 #事務一次能夠處理多少個event a1.channels.c1.transactionCapacity=100 #三個核心組件的綁定 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
還有不少不少的source,channel,sink,他們之間能夠相互組合使用,在生產中根據實際場景進行選擇,這裏就不一一列舉了,能夠在官方網址查看,都有例子
http://flume.apache.org/FlumeUserGuide.html#flume-sources
http://flume.apache.org/FlumeUserGuide.html#flume-sinks
http://flume.apache.org/FlumeUserGuide.html#flume-channels
a1.sources.r1.selector.type=replicating a1.sources.r1.selector.optional=c1
若是沒有配置selector,flume默認配置的selector就是replicating
replicating表示同一個數據源的數據每一個channel都會發送一份,若是要忽略某個發送失敗的channel,能夠經過a1.sources.r1.selector.optional指定。好比當接入flume中的某一個分支對數據要求沒有那麼嚴格,就能夠將其配置到optional當中。還可使用這種方式+監控來作高可用
#先配置單通道 a1.sources=r1 a1.channels=c1 c2 c3 a1.sinks=s1 s2 s3 ##############配置source################### a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/home/hadoop/data/flume #0.0.0.0表示本機 a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=44445 a1.sources.r1.selector.type=replicating a1.sources.r1.selector.optional=c1 a1.sinks.s1.type=logger a1.sinks.s2.type=logger a1.sinks.s3.type=logger a1.channels.c1.type=memory a1.channels.c2.type=memory a1.channels.c3.type=memory #buffer能夠保存多少個event a1.channels.c1.capacity=1000 #事務一次能夠處理多少個event a1.channels.c1.transactionCapacity=100 #三個核心組件的綁定 a1.sources.r1.channels=c1 c2 c3 a1.sinks.s1.channel=c1 a1.sinks.s2.channel=c2 a1.sinks.s3.channel=c3
啓動:
../bin/flume-ng agent -n a1 -c ./ -f ./flume-dir-replicate-selector.conf -Dflume.root.logger=INFO,console
建立新文件
echo 123abchahah > 3.txt
根據頭信息中的字段作匹配,能夠將指定內容發送到指定通道對應的sink上,實現數據的路由發送,就像rabbitMQ中的路由模式或者主題模式
#配置Agent a1 的組件 a1.sources=r1 a1.sinks=s1 s2 a1.channels=c1 c2 #描述/配置a1的source1 a1.sources.r1.type=http a1.sources.r1.port=8888 a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=state a1.sources.r1.selector.mapping.cn=c1 a1.sources.r1.selector.mapping.us=c2 a1.sources.r1.selector.default=c2 #描述sink a1.sinks.s1.type=logger a1.sinks.s2.type=logger #描述內存channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=100 #位channel 綁定 source和sink a1.sources.r1.channels=c1 c2 a1.sinks.s1.channel=c1 a1.sinks.s2.channel=c2
啓動:
../bin/flume-ng agent -n a1 -c ./ -f ./flume-dir-multi-selector.conf -Dflume.root.logger=INFO,console
測試:
curl -X POST -d '[{"headers":{"cn":"c1"},"body":"hello Messi"}]' http://0.0.0.0:8888
若是想數據輸出到c2,那麼執行
curl -X POST -d '[{"headers":{"us":"c2"},"body":"hello Messi"}]' http://0.0.0.0:8888
輸出結果:只會有一個通道會輸出數據
爲了更好的觀測結果,能夠配置avro sink將數據發送給flume中的下一個agent
通常不須要設置,都使用默認配置,可是在生產當中爲了解決數據格式與默認的序列化器不匹配問題,能夠採用兩種方式: 1.在輸入flume採集以前,將數據格式調整爲與序列化器匹配的格式
2.自定義序列化器,在flume採集的時候就自動作格式轉換,參考Flume自定義Hbase Sink的EventSerializer序列化類
在頭信息裏面包含timestamp鍵值對
在multiplexing selector對應的配置文件中增長以下配置
#配置攔截器 a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=timestamp
啓動與測試與multiplexing selector同樣
輸出:
在頭信息裏面包含host鍵值對
按照以下的配置對timestamp的配置文件作相應修改
#配置攔截器 a1.sources.r1.interceptors=i1 i2 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i2.type=host #true表示用ip,不然是host a1.sources.r1.interceptors.i2.useIP=true
啓動與測試與multiplexing selector同樣
輸出:
在頭信息中增長一靜態的鍵值對,鍵和值都是在配置文件中指定
#配置攔截器 a1.sources.r1.interceptors=i1 i2 i3 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i2.type=host #true表示用ip,不然是host a1.sources.r1.interceptors.i2.useIP=true a1.sources.r1.interceptors.i3.type=static a1.sources.r1.interceptors.i3.key=mykey a1.sources.r1.interceptors.i3.value=myvalue
配置中我增長了一對mykey:myvalue的健值,在輸出結果裏面即爲:
Event: { headers:{host=192.168.245.141, mykey=myvalue, us=c2, timestamp=1527748087208} body: 68 65 6C 6C 6F 20 4D 65 73 73 69 hello Messi }
是flume1.7後增長的特性,這裏只是列一下配置,沒有作測試,功能是把前面增長的static攔截器輸出的mykey:myvalue去掉
#配置攔截器 a1.sources.r1.interceptors=i1 i2 i3 i4 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i2.type=host #true表示用ip,不然是host a1.sources.r1.interceptors.i2.useIP=true a1.sources.r1.interceptors.i3.type=static a1.sources.r1.interceptors.i3.key=mykey a1.sources.r1.interceptors.i3.value=myvalue a1.sources.r1.interceptors.i4.type=remove_header a1.sources.r1.interceptors.i4.withName=mykey
在頭信息中添加一個uuid(全局惟一的ID)
#配置攔截器 #a1.sources.r1.interceptors=i1 i2 i3 i4 a1.sources.r1.interceptors=i1 i2 i3 i4 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i2.type=host #true表示用ip,不然是host a1.sources.r1.interceptors.i2.useIP=true a1.sources.r1.interceptors.i3.type=static a1.sources.r1.interceptors.i3.key=mykey a1.sources.r1.interceptors.i3.value=myvalue #a1.sources.r1.interceptors.i4.type=remove_header #a1.sources.r1.interceptors.i4.withName=mykey a1.sources.r1.interceptors.i4.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder a1.sources.r1.interceptors.i4.headerName=acctId
啓動與測試與上面相同
輸出:
Event: { headers:{host=192.168.245.141, acctId=9ae14360-f8f8-480c-98a1-32afac593a8a, mykey=myvalue, us=c2, timestamp=1527749151344} body: 68 65 6C 6C 6F 20 4D 65 73 73 69 hello Messi }
功能與String類中的replace相似,支持正則表達式作匹配,將匹配到的內容替換爲replace_string
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Remove leading alphanumeric characters in an event body. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString = replace_string
使用正則表達式在消息內容中作匹配,若是匹配成功,能夠將event排除掉,也能夠包含進來
使用正則表達式從消息內容中匹配到字符串,並將匹配到的字符串做爲頭信息key-value中的value,key在配置文件中指定
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = regex_extractor a1.sources.r1.interceptors.i1.regex = ^(?:[^\\|]*\\|){14}\\d+_\\d+_(\\d+)\\|.*$ a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
將匹配到的結果做爲timestamp的值
配合上面的regex extractor配置,能夠將log4j中輸出的日誌直接傳給flume。log4j.properties中須要以下配置
log4j.rootLogger = info,stdout,flume log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = %m%n log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = hadoop01 log4j.appender.flume.Port = 44444 #表示若是flume響應失敗,應用不出現異常 log4j.appender.flume.UnsafeMode = true log4j.appender.flume.layout = org.apache.log4j.PatternLayout #能夠將%n去掉,不然結果會出現兩個換行,由於flume後面會對每一個event自動換行 log4j.appender.flume.layout.ConversionPattern = %m%n
該appender還有負載均衡功能,均可以參考apache flume官方文檔輕鬆搞定,有興趣的同窗多研究研究