關於Flume中Chanel.Selector.header解釋

flume內置的ChannelSelector有兩種,分別是Replicating和Multiplexing。app

Replicating類型的ChannelSelector會針對每個Event,拷貝到全部的Channel中,這是默認的ChannelSelector。dom

replicating類型的ChannelSelector例子以下oop

 1 a1.sources = r1
 2 a1.channels = c1 c2 # 若是有100個Event,那麼c1和c2中都會有這100個事件
 3 
 4 a1.channels.c1.type = memory
 5 a1.channels.c1.capacity = 1000
 6 a1.channels.c1.transactionCapacity = 100
 7 
 8 
 9 a1.channels.c2.type = memory
10 a1.channels.c2.capacity = 1000
11 a1.channels.c2.transactionCapacity = 100

 

Multiplexing類型的ChannelSelector會根據Event中Header中的某個屬性決定分發到哪一個Channel。spa

multiplexing類型的ChannelSelector例子以下:code

1 a1.sources = r1
2 
3 a1.sources.source1.selector.type = multiplexing
4 a1.sources.source1.selector.header = validation # 以header中的validation對應的值做爲條件
5 a1.sources.source1.selector.mapping.SUCCESS = c2 # 若是header中validation的值爲SUCCESS,使用c2這個channel
6 a1.sources.source1.selector.mapping.FAIL = c1 # 若是header中validation的值爲FAIL,使用c1這個channel
7 a1.sources.source1.selector.default = c1 # 默認使用c1這個channel
a1.sources.source1.selector.header = validation # 以header中的validation對應的值做爲條件

同理,以下conf文件:
 1 a1.sources = r1
 2 a1.sinks = k1 k2
 3 a1.channels = c1 c2
 4 
 5 # Describe/configure the source
 6 a1.sources.r1.type = exec
 7 a1.sources.r1.command = tail -F /usr/lib/flume-ng/test.log
 8 a1.sources.r1.interceptors = i1
 9 a1.sources.r1.interceptors.i1.type = regex_extractor
10 a1.sources.r1.interceptors.i1.regex = (\\w+):(\\w+):(\\w+)
11 a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
12 a1.sources.r1.interceptors.i1.serializers.s1.name = ip
13 a1.sources.r1.interceptors.i1.serializers.s2.name = domain
14 a1.sources.r1.interceptors.i1.serializers.s3.name = course
15 
16 a1.sources.r1.selector.type = multiplexing
17 a1.sources.r1.selector.header = course
18 a1.sources.r1.selector.mapping.hadoop = c1
19 a1.sources.r1.selector.default = c2
20 
21 
22 # Describe the sink
23 a1.sinks.k1.type = file_roll
24 a1.sinks.k1.channel = c1
25 a1.sinks.k1.sink.directory = /tmp/multiplexing/flume_sink1
26 
27 a1.sinks.k2.type = file_roll
28 a1.sinks.k2.channel = c2
29 a1.sinks.k2.sink.directory = /tmp/multiplexing/flume_sink2
30 
31 # Use a channel which buffers events in memory
32 a1.channels.c1.type = memory
33 a1.channels.c1.capacity = 1000
34 a1.channels.c1.transactionCapacity = 100
35 
36 a1.channels.c2.type = memory
37 a1.channels.c2.capacity = 1000
38 a1.channels.c2.transactionCapacity = 100
39 
40 # Bind the source and sink to the channel
41 a1.sources.r1.channels = c1 c2 
42 a1.sinks.k1.channel = c1
43 a1.sinks.k2.channel = c2

interceptor只對頭部進行改變。blog

source r1中的頭部有IP、Domain和cource三種信息,而r1的selector.header = course,表示selector只對IP,Domain和Cource中的Cource進行判斷選擇,而後再劃分channel。事件

相關文章
相關標籤/搜索