版權聲明:本文爲博主原創文章,未經博主容許不得轉載。html
攔截器主要是對事件的header信息信息操做,要麼直接忽略他,要麼修改他的數據express
file_roll sink 和hdfs sink 都支持EventSerializer接口json
Body TextSerializer,別名:text。這個攔截器將把事件的body部分寫入到輸出流中而不須要任何轉換或者修改。事件的header將直接被忽略。app
下面是官網配置:curl
Property Nametcp |
Defaultide |
Description測試 |
appendNewlinethis |
true |
Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons. |
下面是官網例子:appendNewline是選擇是否加入到新行去。默認是true,而false 就是換行,通常咱們都選擇換行。
a1.sinks=k1
a1.sinks.k1.type=file_roll
a1.sinks.k1.channel=c1
a1.sinks.k1.sink.directory=/var/log/flume
a1.sinks.k1.sink.serializer=text
a1.sinks.k1.sink.serializer.appendNewline=false
下面是實際例子
由於是考慮Body TextSerializer的特性,他會忽略header的信息,所以咱們這邊要採用http source來接收定義的header 與body 的內容
[html] view plain copy
#配置文件:body_case15.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /tmp/logs
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline =false
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#敲命令
flume-ng agent -c conf -fconf/body_case15.conf -n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
curl -X POST -d '[{"headers":{"looklook1" : "looklook1 isheader","looklook2": "looklook2 isheader"},"body" : "hellolooklook5"}]'http://192.168.233.128:50000
#在啓動源發送的代理終端查看console輸出
數據已經輸出,但只輸出了hello looklook5,即BODY這塊。
Avro Event Serializer別名:avro_event。這個攔截器將把事件序列化到一個Avro容器文件中。使用的模式和RPC Avro機制使用到的處理flume事件的機制同樣。這個序列化器繼承自AbstractAvroEventSerializer類。
官網例子
Property Name |
Default |
Description |
syncIntervalBytes |
2048000 |
Avro sync interval, in approximate bytes. |
compressionCodec |
null |
Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
下面是官網例子
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer=avro_event
a1.sinks.k1.serializer.compressionCodec=snappy
例子這邊就不演示了,由於和BodyText Serializer 差距不大。
官網說Flume 能夠在事件傳輸過程當中對它進行修改與刪除,而這個都是經過Interceptor進行實現的,實際都是往事件的header裏插數據。而Timestamp Interceptor攔截器就是能夠往event的header中插入關鍵詞爲timestamp的時間戳。
下面是官網配置
Property Name |
Default |
Description |
type |
– |
The component type name, has to be timestamp or the FQCN |
preserveExisting |
false |
If the timestamp already exists, should it be preserved - true or false |
以及官網例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels= c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
下面是測試例子
[html] view plain copy
#配置文件:timestamp_case16.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = looklook5.
a1.sinks.k1.hdfs.fileType=DataStream
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這裏拿header做爲文件夾目錄名稱。
#敲命令
flume-ng agent -c conf -f conf/timestamp_case16.conf-n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "TimestampInterceptor" | nc 192.168.233.128 50000
#在啓動源發送的代理終端查看console輸出
查看hdfs生成的文件,能夠看到timestamp已經生成在header裏面,能夠根據自定義的格式生成文件夾,數據也都傳輸過來了。
該攔截器能夠往event的header中插入關鍵詞默認爲host主機名或者ip地址(注意是agent運行的機器的主機名或者ip地址)
下面是官網配置
Property Name |
Default |
Description |
type |
– |
The component type name, has to be host |
preserveExisting |
false |
If the host header already exists, should it be preserved - true or false |
useIP |
true |
Use the IP Address if true, else use hostname. |
hostHeader |
host |
The header key to be used. |
以及官網例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=host
a1.sources.r1.interceptors.i1.hostHeader=hostname
下面是測試例子
[html] view plain copy
#配置文件:time_host_case17.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type =timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader =hostname
a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}
a1.sinks.k1.hdfs.fileType=DataStream
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
增長一個攔截器,類型是host,h將hostname做爲文件前綴。
#敲命令
flume-ng agent -c conf -f conf/time_host_case17.conf-n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "Time&hostInterceptor1" | nc 192.168.233.128 50000
echo "Time&hostInterceptor2" | nc 192.168.233.128 50000
#在啓動源發送的代理終端查看console輸出
查看hdfs生成的文件,能夠看到host已經生成在header裏面,能夠根據自定義的格式生成文件夾,數據也都傳輸過來了。
Static Interceptor攔截器容許用戶增長一個static的header併爲全部的事件賦值。範圍是全部事件。
官網配置
Property Name |
Default |
Description |
type |
– |
The component type name, has to be static |
preserveExisting |
true |
If configured header already exists, should it be preserved - true or false |
key |
key |
Name of header that should be created |
value |
value |
Static value that should be created |
其中參數key與value等於相似json格式裏的"headers":{" key":" value"}
下面是官網例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels= c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=static
a1.sources.r1.interceptors.i1.key=datacenter
a1.sources.r1.interceptors.i1.value=NEW_YORK
以及實際的列子
[html] view plain copy
#配置文件:static_case18.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = looklook5
a1.sources.r1.interceptors.i1.value =looklook10
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#敲命令
flume-ng agent -c conf -f conf/static_case18.conf-n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "statInterceptor1" | nc 192.168.233.128 50000
#在啓動源發送的代理終端查看console輸出
能夠看到輸出的header信息裏自定義部分正確輸出,body部分也輸出正確。
Regex Filtering Interceptor攔截器用於過濾事件,篩選出與配置的正則表達式相匹配的事件。能夠用於包含事件和排除事件。經常使用於數據清洗,經過正則表達式把數據過濾出來。
官網配置
Property Name |
Default |
Description |
type |
– |
The component type name has to be regex_filter |
regex |
」.*」 |
Regular expression for matching against events |
excludeEvents |
false |
If true, regex determines events to exclude, otherwise regex determines events to include. |
excludeEvents 爲true的時候爲排除全部匹配正則表達式的數據。
下面是測試例子
[html] view plain copy
#配置文件:regex_filter_case19.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
咱們對開頭字母是數字的數據,所有過濾。
#敲命令
flume-ng agent -c conf -f conf/regex_filter_case19.conf-n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "a" | nc192.168.233.128 50000
echo "1222" |nc 192.168.233.128 50000
echo "a222" |nc 192.168.233.128 50000
#在啓動源發送的代理終端查看console輸出
能夠看出1222 被認爲是無效的數據沒有發出來。
Regex Filtering Interceptor測試成功。