http://flume.apache.org/FlumeUserGuide.html#avro-sourcehtml
經過一個通道未來源和接收器連接。須要列出源,接收器和通道,爲給定的代理,而後指向源和接收器及通道。一個源的實例能夠指定多個通道,但只能指定一個接收器實例。格式以下:java
# list the sources, sinks and channels for the agent <Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>
實例解析:一個代理名爲agent_foo,外部經過avro客戶端,而且發送數據經過內存通道給hdfs。在配置文件foo.config的可能看起來像這樣:node
# list the sources, sinks and channels for the agent agent_foo.sources = avro-appserver-src-1 agent_foo.sinks = hdfs-sink-1 agent_foo.channels = mem-channel-1 # set channel for source agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1 # set channel for sink agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
案例說明:這將使事件流從avro-appserver-src-1到hdfs-sink-1經過內存通道mem-channel-1。當代理開始foo.config做爲其配置文件,它會實例化流。web
配置單個組件apache
定義流以後,須要設置每一個源,接收器和通道的屬性。能夠分別設定組件的屬性值。緩存
# properties for sources <Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels <Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks <Agent>.sources.<Sink>.<someProperty> = <someValue>
「type」屬性必須爲每一個組件設置,以瞭解它須要什麼樣的對象。每一個源,接收器和通道類型有其本身的一套,它所需的性能,以實現預期的功能。全部這些,必須根據須要設置。在前面的例子中,從hdfs-sink-1中的流到HDFS,經過內存通道mem-channel-1的avro-appserver-src-1源。下面是 一個例子,顯示了這些組件的配置。bash
agent_foo.sources = avro-AppSrv-source agent_foo.sinks = hdfs-Cluster1-sink agent_foo.channels = mem-channel-1 # set channel for sources, sinks # properties of avro-AppSrv-source agent_foo.sources.avro-AppSrv-source.type = avro agent_foo.sources.avro-AppSrv-source.bind = localhost agent_foo.sources.avro-AppSrv-source.port = 10000 # properties of mem-channel-1 agent_foo.channels.mem-channel-1.type = memory agent_foo.channels.mem-channel-1.capacity = 1000 agent_foo.channels.mem-channel-1.transactionCapacity = 100 # properties of hdfs-Cluster1-sink agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata #...
經過flume來監控一個目錄,當目錄中有新文件時,將文件內容輸出到控制檯。服務器
建立一個test01.conf的文件:app
#配置一個agent,agent的名稱能夠自定義(如a1) #指定agent的sources(如s1)、sinks(如k1)、channels(如c1) #分別指定agent的sources,sinks,channels的名稱 名稱能夠自定義 a1.sources = s1 a1.sinks = k1 a1.channels = c1 #描述source #配置目錄scource a1.sources.s1.type = spooldir a1.sources.s1.spoolDir = /opt/flume/logs a1.sources.s1.fileHeader= true a1.sources.s1.channels =c1 #配置sink a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 #配置channel(內存作緩存) a1.channels.c1.type = memory
啓動命令curl
./bin/flume-ng agent --conf conf --conf-file ./conf/test1.conf --name a1 -Dflume.root.logger=INFO,console
測試 Flume
從新打開一個終端,咱們將123.log移動到logs目錄
$ cp test.log logs/
原始的Flume終端將在日誌消息中輸出事件:
2018-11-03 03:54:54,207 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one. 2018-11-03 03:54:54,207 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /opt/flume/logs/test.log to /opt/flume/logs/test.log.COMPLETED 2.6 NetCat Source
案例2:實時模擬從web服務器中讀取數據到hdfs中
此處使用 exec source 詳細參考 上一節裏面的 2.3 Exec Source 介紹
單個Flume代理能夠包含幾個獨立的流。你能夠在一個配置文件中列出多個源,接收器和通道。這些組件能夠鏈接造成多個流。
# list the sources, sinks and channels for the agent <Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>
能夠鏈接源和接收器到其相應的通道,設置兩個不一樣的流。例如,若是須要設置一個agent_foo代理兩個流,一個從外部Avro客戶端到HDFS,另一個是tail的輸出到Avro接收器,而後在這裏是作一個配置。
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 exec-tail-source2 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # flow #1 configuration agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 # flow #2 configuration agent_foo.sources.exec-tail-source2.channels = file-channel-2 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
設置一個多層的流,須要有一個指向下一跳avro源的第一跳的avro 接收器。這將致使第一Flume代理轉發事件到下一個Flume代理。例如,若是按期發送的文件,每一個事件(1文件)AVRO客戶端使用本地Flume 代理,那麼這個當地的代理能夠轉發到另外一個有存儲的代理。
配置以下:
Weblog agent config:
# list sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source agent_foo.sinks = avro-forward-sink agent_foo.channels = file-channel # define the flow agent_foo.sources.avro-AppSrv-source.channels = file-channel agent_foo.sinks.avro-forward-sink.channel = file-channel # avro sink properties agent_foo.sinks.avro-forward-sink.type = avro agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100 agent_foo.sinks.avro-forward-sink.port = 10000 # configure other pieces #...
HDFS agent config:
# list sources, sinks and channels in the agent agent_foo.sources = avro-collection-source agent_foo.sinks = hdfs-sink agent_foo.channels = mem-channel # define the flow agent_foo.sources.avro-collection-source.channels = mem-channel agent_foo.sinks.hdfs-sink.channel = mem-channel # avro source properties agent_foo.sources.avro-collection-source.type = avro agent_foo.sources.avro-collection-source.bind = 10.1.1.100 agent_foo.sources.avro-collection-source.port = 10000 # configure other pieces #...
這裏鏈接從weblog-agent的avro-forward-sink 到hdfs-agent的avro-collection-source收集源。最終結果從外部源的appserver最終存儲在HDFS的事件。
建立一個case_avro.conf的文件:
a1.sources = s1 a1.sinks = k1 a1.channels = c1 a1.sources.s1.type = avro a1.sources.s1.channels = c1 a1.sources.s1.bind = localhost a1.sources.s1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
建立一個case_avro_sink.conf的文件:
a2.sources = s1 a2.sinks = k1 a2.channels = c1 a2.sources.s1.type = syslogtcp a2.sources.s1.channels = c1 a2.sources.s1.host = 192.168.123.102 a2.sources.s1.port = 33333 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.type = avro a2.sinks.k1.hostname = 192.168.123.102 a2.sinks.k1.port = 22222 a2.sinks.k1.channel = c1
說明:case_avro_sink.conf是前面的Agent,case_avro.conf是後面的Agent
先啓動Avro的Source,監聽端口
$ ./bin/flume-ng agent --conf conf --conf-file ./conf/case_avro.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
再啓動Avro的Sink
$ ./bin/flume-ng agent --conf conf --conf-file ./conf/case_avro_sink.conf --name a2 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
能夠看到已經創建鏈接
在Avro Sink上生成測試log
$ echo "hello flume avro sink" | nc 192.168.1.102 33333
查看結果:
Flume支持扇出流從一個源到多個通道。有兩種模式的扇出,複製和複用。在複製流的事件被髮送到全部的配置通道。在複用的狀況下,事件被髮送到合格的渠 道只有一個子集。扇出流,須要指定源和扇出通道的規則。這是經過添加一個通道「選擇」,能夠複製或複用。再進一步指定選擇的規則,若是它是一個多路。若是你 不指定一個選擇,則默認狀況下它複製。
# list the sources, sinks and channels for the agent <Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>
複用的選擇集的屬性進一步分叉。這須要指定一個事件屬性映射到一組通道。選擇配置屬性中的每一個事件頭檢查。若是指定的值相匹配,那麼該事件被髮送到全部的通道映射到該值。若是沒有匹配,那麼該事件被髮送到設置爲默認配置的通道。
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
映射容許每一個值通道能夠重疊。默認值能夠包含任意數量的通道。下面的示例中有一個單一的流複用兩條路徑。代理有一個單一的avro源和鏈接道兩個接收器的兩個通道。
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # set channels for source agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2 # set channel for sinks agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 # channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
「State」做爲Header的選擇檢查。若是值是「CA」,而後將其發送到mem-channel-1,若是它的「AZ」的,那麼jdbc- channel-2,若是它的「NY」那麼發到這兩個。若是「State」頭未設置或不匹配的任何三個,而後去默認的mem-channel-1通道。
case_replicate_sink.conf
a1.sources = s1 a1.sinks = k1 k2 a1.channels = c1 c2 a1.sources.s1.type = syslogtcp a1.sources.s1.channels = c1 c2 a1.sources.s1.host = 192.168.1.102 a1.sources.s1.port = 6666 a1.sources.s1.selector.type = replicating a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.1.102 a1.sinks.k1.port = 7777 a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.1.102 a1.sinks.k1.port = 7777 a1.sinks.k1.channel = c2
case_replicate_s1.conf
a2.sources = s1 a2.sinks = k1 a2.channels = c1 a2.sources.s1.type = avro a2.sources.s1.channels = c1 a2.sources.s1.host = 192.168.1.102 a2.sources.s1.port = 7777 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.type = logger a2.sinks.k1.channel = c1
case_replicate_s2.conf
a3.sources = s1 a3.sinks = k1 a3.channels = c1 a3.sources.s1.type = avro a3.sources.s1.channels = c1 a3.sources.s1.host = 192.168.1.102 a3.sources.s1.port = 7777 a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.type = logger a3.sinks.k1.channel = c1
先啓動Avro的Source,監聽端口
$ ./bin/flume-ng agent --conf conf --conf-file ./conf/case_replicate_s1.conf --name a2 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
$ ./bin/flume-ng agent --conf conf --conf-file ./conf/case_replicate_s2.conf --name a3 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
再啓動Avro的Sink
$ ./bin/flume-ng agent --conf conf --conf-file ./confcase_replicate_sink.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
生成測試log
$ echo "hello via channel selector" | nc 192.168.1.102 6666
case_multi_sink.conf
#2個channel和2個sink的配置文件 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.host = 0.0.0.0 a1.sources.r1.selector.type = multiplexing a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 a1.sources.r1.selector.default = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.1.102 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 192.168.1.102 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
case_ multi _s1.conf
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.channels = c1 a2.sources.r1.bind = 192.168.1.102 a2.sources.r1.port = 4545 # Describe the sink a2.sinks.k1.type = logger a2.sinks.k1.channel = c1 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100
case_ multi _s2.conf
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.channels = c1 a3.sources.r1.bind = 192.168.1.102 a3.sources.r1.port = 4545 # Describe the sink a3.sinks.k1.type = logger a3.sinks.k1.channel = c1 # Use a channel which buffers events in memory a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100
先啓動Avro的Source,監聽端口
$ ./bin/flume-ng agent -c . -f ./conf/case_ multi _s1.conf -n a2 -Dflume.root.logger=INFO,console $ ./bin/flume-ng agent -c . -f ./conf/case_ multi _s2.conf -n a3 -Dflume.root.logger=INFO,console
再啓動Avro的Sink
$ ./bin/lume-ng agent -c . -f ./conf/case_multi_sink.conf -n a1 -Dflume.root.logger=INFO,console
根據配置文件生成測試的header 爲state的POST請求
$ curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' http://localhost:5140 $ curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' http://localhost:5140 $ curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' http://localhost:5140