flume的特色:java
flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。shell
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。apache
flume的可靠性 :json
當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。數組
flume的可恢復性:app
仍是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統裏(性能較差)。 curl
flume的一些核心概念:tcp
Agent使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。分佈式
Client生產數據,運行在一個獨立的線程。oop
Source從Client收集數據,傳遞給Channel。
Sink從Channel收集數據,運行在一個獨立線程。
Channel鏈接 sources 和 sinks ,這個有點像一個隊列。
Events能夠是日誌記錄、 avro 對象等。
Flume以agent爲最小的獨立運行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成,以下圖:
值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source,Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等。Flume支持用戶創建多級流,也就是說,多個agent能夠協同工做,而且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。以下圖所示:
2、如何安裝?
1.下載安裝包
2.配置環境變量
3.修改配置文件(案例給出)
4.啓動服務(案例給出)
5.驗證 flume-ng -version
3、flume的案例
案例1:Avro 能夠發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制
(a)建立agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/avro.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(b)啓動服務 flume agent a1
flume-ng agent -c .-f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console |
(c)建立指定文件
echo "hello world" > /home/hadoop/flume-1.5.0-bin/log.00 |
(d)使用avro-client發送文件
flume-ng avro-client -c . -H m1 -p 4141 -F /home/hadoop/flume-1.5.0-bin/log.00 |
(f)在m1的控制檯,能夠看到如下信息,注意最後一行: hello world
案例2:Spool 監測配置的目錄下新增的文件,並將文件中的數據讀取出來。須要注意兩點:
1) 拷貝到spool目錄下的文件不能夠再打開編輯。
2) spool目錄下不可包含相應的子目錄
(a)建立agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /home/hadoop/flume-1.5.0-bin/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(b)啓動服務flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console |
(c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目錄
echo "spool test1" > /home/hadoop/flume-1.5.0-bin/logs/spool_text.log |
(d)在m1的控制檯,能夠看到如下相關信息:
Event: { headers:{file=/home/hadoop/flume-1.5.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }
案例3:Exec 執行一個給定的命令得到輸出的源,若是要使用tail命令,必選使得file足夠大才能看到輸出內容
(a)建立agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= exec a1.sources.r1.channels = c1 a1.sources.r1.command= tail-F /home/hadoop/flume-1.5.0-bin/log_exec_tail # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(b)啓動服務flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console |
(c)生成足夠多的內容在文件裏
for i in {1..100};do echo "exec tail$i" >> /home/hadoop/flume-1.5.0-bin/log_exec_tail;echo $i;sleep 0.1;done |
(e)在m1的控制檯,能夠看到如下信息:
Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test } Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test } |
案例4:Syslogtcp 監聽TCP的端口作爲數據源
(a)建立agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
|
(b)啓動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console |
(c)測試產生syslog
echo "hello idoall.org syslog" | nc localhost 5140 |
(d)在m1的控制檯,能夠看到如下信息:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org } |
案例5:JSONHandler
(a)建立agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/post_json.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 8888 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(b)啓動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console |
(c)生成JSON 格式的POST request
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888 |
(d)在m1的控制檯,能夠看到如下信息:
Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79 idoall.org_body } |
案例6:Hadoop sink
(a)建立agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://m1:9000/user/flume/syslogtcp a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(b)啓動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console |
(c)測試產生syslog
echo "hello idoall flume -> hadoop testing one" | nc localhost 5140 |
(d) 在m1上再打開一個窗口,去hadoop上檢查文件是否生成
hadoop fs -ls /user/flume/syslogtcp hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504 |
案例7:File Roll Sink
(a)建立agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5555 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= file_roll a1.sinks.k1.sink.directory = /home/hadoop/flume-1.5.0-bin/logs # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(b)啓動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console |
(c)測試產生log
echo "hello idoall.org syslog" | nc localhost 5555 echo "hello idoall.org syslog 2" | nc localhost 5555 |
(d)查看/home/hadoop/flume-1.5.0-bin/logs下是否生成文件,默認每30秒生成一個新文件
ll /home/hadoop/flume-1.5.0-bin/logs cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-1 cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-2 hello idoall.org syslog hello idoall.org syslog 2 |
案例8:Replicating Channel Selector Flume支持Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是複製和複用。在複製的狀況下,流的事件被髮送到全部的配置通道。在複用的狀況下,事件被髮送到可用的渠道中的一個子集。Fan out流須要指定源和Fan out通道的規則。此次咱們須要用到m1,m2兩臺機器
(a)在m1建立replicating_Channel_Selector配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type= replicating # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type= memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 |
(b)在m1建立replicating_Channel_Selector_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(c)在m1上將2個配置文件複製到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf |
(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console |
(e)而後在m1或m2的任意一臺機器上,測試產生syslog
echo "hello idoall.org syslog" | nc localhost 5140 |
(f)在m1和m2的sink窗口,分別能夠看到如下信息,這說明信息獲得了同步:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org } |
案例9:Multiplexing Channel Selector
(a)在m1建立Multiplexing_Channel_Selector配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type= org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type= multiplexing a1.sources.r1.selector.header = type #映射容許每一個值通道能夠重疊。默認值能夠包含任意數量的通道。 a1.sources.r1.selector.mapping.baidu = c1 a1.sources.r1.selector.mapping.ali = c2 a1.sources.r1.selector.default = c1 # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type= memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 |
(b)在m1建立Multiplexing_Channel_Selector_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(c)將2個配置文件複製到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf |
(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console |
(e)而後在m1或m2的任意一臺機器上,測試產生syslog
curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://localhost:5140 |
(f)在m1的sink窗口,能夠看到如下信息:
Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31} Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33} |
(g)在m2的sink窗口,能夠看到如下信息:
Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32} |
能夠看到,根據header中不一樣的條件分佈到不一樣的channel上
案例10:Flume Sink Processors failover的機器是一直髮送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink。
(a)在m1建立Flume_Sink_Processors配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 #這個是配置failover的關鍵,須要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #處理的類型是failover a1.sinkgroups.g1.processor.type= failover #優先級,數字越大優先級越高,每一個sink的優先級必須不相同 a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 #設置爲10秒,固然能夠根據你的實際情況更改爲更快或者很慢 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type= replicating # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type= memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 |
(b)在m1建立Flume_Sink_Processors_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(c)將2個配置文件複製到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf
scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf |
(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console |
(e)而後在m1或m2的任意一臺機器上,測試產生log
echo "idoall.org test1 failover" | nc localhost 5140 |
(f)由於m2的優先級高,因此在m2的sink窗口,能夠看到如下信息,而m1沒有:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } |
(g)這時咱們中止掉m2機器上的sink(ctrl+c),再次輸出測試數據:
echo "idoall.org test2 failover" | nc localhost 5140 |
(h)能夠在m1的sink窗口,看到讀取到了剛纔發送的兩條測試數據:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } |
(i)咱們再在m2的sink窗口中,啓動sink:
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console |
(j)輸入兩批測試數據:
echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140 |
(k)在m2的sink窗口,咱們能夠看到如下信息,由於優先級的關係,log消息會再次落到m2上:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 } |
案例11:Load balancing Sink Processor load balance type和failover不一樣的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種狀況下若是被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。
(a)在m1建立Load_balancing_Sink_Processors配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 #這個是配置Load balancing的關鍵,須要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type= load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 |
(b)在m1建立Load_balancing_Sink_Processors_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(c)將2個配置文件複製到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf
scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf |
(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console |
(e)而後在m1或m2的任意一臺機器上,測試產生log,一行一行輸入,輸入太快,容易落到一臺機器上
echo "idoall.org test1" | nc localhost 5140 echo "idoall.org test2" | nc localhost 5140 echo "idoall.org test3" | nc localhost 5140 echo "idoall.org test4" | nc localhost 5140 |
(f)在m1的sink窗口,能夠看到如下信息:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 } |
(g)在m2的sink窗口,能夠看到如下信息:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } |
說明輪詢模式起到了做用。
案例12:Hbase sink
(a)在測試以前,請先將hbase啓動
(b)而後將如下文件複製到flume中:
cp/home/hadoop/hbase-0.96.2-hadoop2/lib/protobuf-java-2.5.0.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-client-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-common-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-protocol-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-server-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop2-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/htrace-core-2.04.jar /home/hadoop/flume-1.5.0-bin/lib |
(c)確保test_idoall_org表在hbase中已經存在。
(d)在m1建立hbase_simple配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= logger a1.sinks.k1.type= hbase a1.sinks.k1.table = test_idoall_org a1.sinks.k1.columnFamily = name a1.sinks.k1.column = idoall a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = memoryChannel # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
(e)啓動flume agent
flume-ngagent -c . –f /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console |
(f)測試產生syslog
echo "hello idoall.org from flume" | nc localhost 5140 |
(g)這時登陸到hbase中,能夠發現新數據已經插入
hbase shell
hbase(main):001:0> list TABLE hbase2hive_idoall hive2hbase_idoall test_idoall_org
=> ["hbase2hive_idoall","hive2hbase_idoall","test_idoall_org"]
hbase(main):002:0> scan "test_idoall_org"
hbase(main):004:0> quit |
通過這麼多flume的例子測試,若是你所有作完後,會發現flume的功能真的很強大,能夠進行各類搭配來完成你想要的工做,俗話說師傅領進門,修行在我的,如何可以結合你的產品業務,將flume更好的應用起來,快去動手實踐吧。