Flume安裝與配置詳解

Flume簡介

​ 一款分佈式的海量應用日誌採集、聚合、傳輸的框架,支持配置多種數據發送方與接收方,具備高可用、高可靠的特性。好比能夠同時從文件目錄、log4j、http、avro、kafka等渠道收集日誌,經過傳輸輸出到kafka、hdfs、MySQL、文件等。在傳輸過程當中能夠對數據作簡單處理。html

應用架構

 

單個agent正則表達式

flume的核心是agent,而agent包含source、channel、sink三個組件。apache

source:source組件是專門用來收集數據的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。數組

channel:source組件把數據收集來之後,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對採集到的數據進行簡單的緩存,能夠存放在memory、jdbc、file等等。緩存

sink:sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定義架構

工做流程:flume把數據從數據源(source)收集過來,再將數據送到指定的目的地(sink)。爲保證傳輸的過程必定成功,在送到目的地(sink)以前,會先緩存數據(channel),待數據真正到達目的地(sink)後,flume再刪除本身緩存的數據。 在整個數據的傳輸的過程當中,流動的是event(基本單位),所以事務保證是在event級別進行的。app

event:event將傳輸的數據進行封裝,是flume傳輸數據的基本單位,若是是文本文件,一般是一行記錄。event也是事務的基本單位。event在單個agent中經歷source—channel—sink過程,後面可能輸出到下一個agent或者flume外的系統中。event自己爲一個字節數組,其攜帶headers(頭信息)信息,消息體,消息內容負載均衡

從上面圖中能夠看出flume支持多級的成網狀數據流動,很是的靈活好用,這應該就是flume普遍使用緣由吧。好比數據扇入到同一個agent或者扇出到多個agent。框架

安裝

​ 1.安裝jdk,1.6版本以上curl

​ 2.上傳flume的安裝包

​ 3.解壓安裝

​ 4.在conf目錄下,建立一個配置文件,好比:template.conf(名字能夠不固定,後綴也能夠不固定)

使用舉例

建議多看官方文檔,並且隨着flume版本變化,下面的配置可能針對你的版本會不能生效,甚至拋異常

基本配置

spooldir source

​ source來源爲一個目錄,flume會檢測目錄下新增的文件,將文件的內容經過logger打印到控制檯上,channel爲內存

​ vi flume-dir.conf

#先配置單通道,定義source,channel,sink,它們分別均可以配置多份,好比n個channel和n個sink
#a1 是該agent的名字,在啓動的時候須要指定agent的名字
a1.sources=r1
a1.channels=c1
a1.sinks=s1

##############配置source###################
#source的類型
a1.sources.r1.type=spooldir
#spooldir類型的source監控的目錄
a1.sources.r1.spoolDir=/home/hadoop/data/flume
#0.0.0.0表示本機
a1.sources.r1.bind=0.0.0.0
#使用的端口
a1.sources.r1.port=44445

#指定sink類型
a1.sinks.s1.type=logger

#指定channel類型
a1.channels.c1.type=memory
#buffer能夠保存多少個event
a1.channels.c1.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c1.transactionCapacity=100

#三個核心組件的綁定
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

 

啓動:./flume-ng agent -n a1 -c ../conf -f ../conf/flume-dir.conf -Dflume.root.logger=INFO,console

在/home/hadoop/data/flume目錄下新增文件

​ echo Hello Messi > 11.txt

​ echo Hello Havi > 22.txt

觀察結果

http source

#先配置單通道
a1.sources=r1
a1.channels=c1
a1.sinks=s1

##############配置source###################
a1.sources.r1.type=http
#0.0.0.0表示本機
a1.sources.r1.port=44445

a1.sinks.s1.type=logger

a1.channels.c1.type=memory
#buffer能夠保存多少個event
a1.channels.c1.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c1.transactionCapacity=100

#三個核心組件的綁定
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

 

啓動:./flume-ng agent -n a1 -c ../conf -f ../conf/flume-http.conf -Dflume.root.logger=INFO,console

測試方法與上面相似,須要構造http請求

curl -X POST -d '[{"headers":{"a":"a1","b":"b1"},"body":"hello Messi"}]' http://0.0.0.0:44445

avro source

​ avro source通常用在扇入場景,可使用./flume-ng avro-client -H 0.0.0.0 -p 44445 -F ./1.txt -c ../conf 客戶端測試

​ avro是一種序列化和rpc框架

#先配置單通道
a1.sources=r1
a1.channels=c1
a1.sinks=s1

##############配置source###################
a1.sources.r1.type=avro
#0.0.0.0表示本機
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44445

a1.sinks.s1.type=logger

a1.channels.c1.type=memory
#buffer能夠保存多少個event
a1.channels.c1.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c1.transactionCapacity=100

#三個核心組件的綁定
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

 

啓動:./flume-ng agent -n a1 -c ../conf -f ../conf/flume-avro.conf -Dflume.root.logger=INFO,console

準備測試文件:echo 111111 > 1.txt

測試:./flume-ng avro-client -H 0.0.0.0 -p 44445 -F ./1.txt -c ../conf

avro sink

#先配置單通道
a1.sources=r1
a1.channels=c1
a1.sinks=s1

##############配置source###################
a1.sources.r1.type=netcat
#0.0.0.0表示本機
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44444

a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.245.142
a1.sinks.s1.port=44445

a1.channels.c1.type=memory
#buffer能夠保存多少個event
a1.channels.c1.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c1.transactionCapacity=100

#三個核心組件的綁定
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

 

avro sink扇出

#先配置單通道
a1.sources=r1
a1.channels=c1 c2
a1.sinks=s1 s2

##############配置source###################
a1.sources.r1.type=netcat
#0.0.0.0表示本機
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44444

a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.245.142
a1.sinks.s1.port=44445

a1.sinks.s2.type=avro
a1.sinks.s2.hostname=192.168.245.142
a1.sinks.s2.port=44446

a1.channels.c1.type=memory
#buffer能夠保存多少個event
a1.channels.c1.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
#buffer能夠保存多少個event
a1.channels.c2.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c2.transactionCapacity=100

#三個核心組件的綁定
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

 

輸出到hdfs

#先配置單通道
a1.sources=r1
a1.channels=c1
a1.sinks=s1

##############配置source###################
a1.sources.r1.type=netcat
#0.0.0.0表示本機
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44444

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://192.168.245.150:9000/flume
a1.sinks.s1.hdfs.fileType=DataStream
#在實際生產中要很是注意這三個值的設定,必定要避免生成的文件不要過小,不然hadoop的性能發揮不出來
#單位是秒,若是設置爲0,表示該配置不生效
a1.sinks.s1.rollInterval=60
#單位是字節,若是設置爲0,表示該配置不生效
a1.sinks.s1.rollSize=1024
#記錄行數,若是設置爲0,表示該配置不生效
a1.sinks.s1.rollCount=0

a1.channels.c1.type=memory
#buffer能夠保存多少個event
a1.channels.c1.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c1.transactionCapacity=100

#三個核心組件的綁定
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

 

還有不少不少的source,channel,sink,他們之間能夠相互組合使用,在生產中根據實際場景進行選擇,這裏就不一一列舉了,能夠在官方網址查看,都有例子

http://flume.apache.org/FlumeUserGuide.html#flume-sources

http://flume.apache.org/FlumeUserGuide.html#flume-sinks

http://flume.apache.org/FlumeUserGuide.html#flume-channels

 

 

Channel選擇器

replicating selector

a1.sources.r1.selector.type=replicating
a1.sources.r1.selector.optional=c1

若是沒有配置selector,flume默認配置的selector就是replicating

replicating表示同一個數據源的數據每一個channel都會發送一份,若是要忽略某個發送失敗的channel,能夠經過a1.sources.r1.selector.optional指定。好比當接入flume中的某一個分支對數據要求沒有那麼嚴格,就能夠將其配置到optional當中。還可使用這種方式+監控來作高可用

#先配置單通道
a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3

##############配置source###################
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/hadoop/data/flume
#0.0.0.0表示本機
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44445
a1.sources.r1.selector.type=replicating
a1.sources.r1.selector.optional=c1

a1.sinks.s1.type=logger
a1.sinks.s2.type=logger
a1.sinks.s3.type=logger

a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.channels.c3.type=memory
#buffer能夠保存多少個event
a1.channels.c1.capacity=1000
#事務一次能夠處理多少個event
a1.channels.c1.transactionCapacity=100

#三個核心組件的綁定
a1.sources.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3

啓動:

../bin/flume-ng agent -n a1 -c ./ -f ./flume-dir-replicate-selector.conf -Dflume.root.logger=INFO,console

建立新文件

echo 123abchahah > 3.txt

multiplexing selector

根據頭信息中的字段作匹配,能夠將指定內容發送到指定通道對應的sink上,實現數據的路由發送,就像rabbitMQ中的路由模式或者主題模式

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1 s2
a1.channels=c1 c2
 
#描述/配置a1的source1
a1.sources.r1.type=http
a1.sources.r1.port=8888
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=state
a1.sources.r1.selector.mapping.cn=c1
a1.sources.r1.selector.mapping.us=c2
a1.sources.r1.selector.default=c2

#描述sink
a1.sinks.s1.type=logger
a1.sinks.s2.type=logger

#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

啓動:

../bin/flume-ng agent -n a1 -c ./ -f ./flume-dir-multi-selector.conf -Dflume.root.logger=INFO,console

測試:

curl -X POST -d '[{"headers":{"cn":"c1"},"body":"hello Messi"}]' http://0.0.0.0:8888

若是想數據輸出到c2,那麼執行

curl -X POST -d '[{"headers":{"us":"c2"},"body":"hello Messi"}]' http://0.0.0.0:8888

輸出結果:只會有一個通道會輸出數據

爲了更好的觀測結果,能夠配置avro sink將數據發送給flume中的下一個agent

序列化器

通常不須要設置,都使用默認配置,可是在生產當中爲了解決數據格式與默認的序列化器不匹配問題,能夠採用兩種方式: 1.在輸入flume採集以前,將數據格式調整爲與序列化器匹配的格式

2.自定義序列化器,在flume採集的時候就自動作格式轉換,參考Flume自定義Hbase Sink的EventSerializer序列化類

攔截器配置

Timestamp

在頭信息裏面包含timestamp鍵值對

在multiplexing selector對應的配置文件中增長以下配置

#配置攔截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp

啓動與測試與multiplexing selector同樣

輸出:

host

在頭信息裏面包含host鍵值對

按照以下的配置對timestamp的配置文件作相應修改

#配置攔截器
a1.sources.r1.interceptors=i1 i2
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i2.type=host
#true表示用ip,不然是host
a1.sources.r1.interceptors.i2.useIP=true

啓動與測試與multiplexing selector同樣

輸出:

static

在頭信息中增長一靜態的鍵值對,鍵和值都是在配置文件中指定

#配置攔截器
a1.sources.r1.interceptors=i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i2.type=host
#true表示用ip,不然是host
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.key=mykey
a1.sources.r1.interceptors.i3.value=myvalue

配置中我增長了一對mykey:myvalue的健值,在輸出結果裏面即爲:

Event: { headers:{host=192.168.245.141, mykey=myvalue, us=c2, timestamp=1527748087208} body: 68 65 6C 6C 6F 20 4D 65 73 73 69                hello Messi }

remove

是flume1.7後增長的特性,這裏只是列一下配置,沒有作測試,功能是把前面增長的static攔截器輸出的mykey:myvalue去掉

#配置攔截器
a1.sources.r1.interceptors=i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i2.type=host
#true表示用ip,不然是host
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.key=mykey
a1.sources.r1.interceptors.i3.value=myvalue
a1.sources.r1.interceptors.i4.type=remove_header
a1.sources.r1.interceptors.i4.withName=mykey

uuid

在頭信息中添加一個uuid(全局惟一的ID)

#配置攔截器
#a1.sources.r1.interceptors=i1 i2 i3 i4
a1.sources.r1.interceptors=i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i2.type=host
#true表示用ip,不然是host
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.key=mykey
a1.sources.r1.interceptors.i3.value=myvalue
#a1.sources.r1.interceptors.i4.type=remove_header
#a1.sources.r1.interceptors.i4.withName=mykey
a1.sources.r1.interceptors.i4.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i4.headerName=acctId

啓動與測試與上面相同

輸出:

Event: { headers:{host=192.168.245.141, acctId=9ae14360-f8f8-480c-98a1-32afac593a8a, mykey=myvalue, us=c2, timestamp=1527749151344} body: 68 65 6C 6C 6F 20 4D 65 73 73 69                hello Messi }

 

search and replace

功能與String類中的replace相似,支持正則表達式作匹配,將匹配到的內容替換爲replace_string

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString = replace_string

regex filter

使用正則表達式在消息內容中作匹配,若是匹配成功,能夠將event排除掉,也能夠包含進來

regex extractor

使用正則表達式從消息內容中匹配到字符串,並將匹配到的字符串做爲頭信息key-value中的value,key在配置文件中指定

a1.sources.r1.interceptors = i1
		a1.sources.r1.interceptors.i1.type = regex_extractor
		a1.sources.r1.interceptors.i1.regex = ^(?:[^\\|]*\\|){14}\\d+_\\d+_(\\d+)\\|.*$
		a1.sources.r1.interceptors.i1.serializers = s1
		a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp

將匹配到的結果做爲timestamp的值

logger-4j appender

配合上面的regex extractor配置,能夠將log4j中輸出的日誌直接傳給flume。log4j.properties中須要以下配置

log4j.rootLogger = info,stdout,flume

		log4j.appender.stdout = org.apache.log4j.ConsoleAppender
		log4j.appender.stdout.Target = System.out
		log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
		log4j.appender.stdout.layout.ConversionPattern = %m%n

		log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
		log4j.appender.flume.Hostname = hadoop01
		log4j.appender.flume.Port = 44444
		#表示若是flume響應失敗,應用不出現異常
		log4j.appender.flume.UnsafeMode = true
		log4j.appender.flume.layout = org.apache.log4j.PatternLayout
		#能夠將%n去掉,不然結果會出現兩個換行,由於flume後面會對每一個event自動換行
		log4j.appender.flume.layout.ConversionPattern = %m%n

該appender還有負載均衡功能,均可以參考apache flume官方文檔輕鬆搞定,有興趣的同窗多研究研究

相關文章
相關標籤/搜索