

flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。shell



 flume的可靠性 :json

 當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。數組



仍是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統裏(性能較差)。 curl



Agent使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。分佈式





Channel鏈接 sources 和 sinks ,這個有點像一個隊列。

Events能夠是日誌記錄、 avro 對象等。



  值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source,Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等。Flume支持用戶創建多級流,也就是說,多個agent能夠協同工做,而且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。以下圖所示:







           5.驗證   flume-ng -version

案例1:Avro 能夠發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制




vi /home/hadoop/flume-1.5.0-bin/conf/avro.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind =

a1.sources.r1.port = 4141

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

   (b)啓動服務 flume agent a1


flume-ng agent -c .-f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console



echo "hello world" > /home/hadoop/flume-1.5.0-bin/log.00




flume-ng avro-client -c . -H m1 -p 4141 -F /home/hadoop/flume-1.5.0-bin/log.00


(f)在m1的控制檯,能夠看到如下信息,注意最後一行:  hello world



案例2:Spool 監測配置的目錄下新增的文件,並將文件中的數據讀取出來。須要注意兩點:

1) 拷貝到spool目錄下的文件不能夠再打開編輯。
2) spool目錄下不可包含相應的子目錄



vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= spooldir

a1.sources.r1.channels = c1

a1.sources.r1.spoolDir = /home/hadoop/flume-1.5.0-bin/logs

a1.sources.r1.fileHeader = true

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


(b)啓動服務flume agent a1


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console



echo "spool test1" > /home/hadoop/flume-1.5.0-bin/logs/spool_text.log


 Event: { headers:{file=/home/hadoop/flume-1.5.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31        spool test1 }


案例3:Exec 執行一個給定的命令得到輸出的源,若是要使用tail命令,必選使得file足夠大才能看到輸出內容



vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= exec

a1.sources.r1.channels = c1

a1.sources.r1.command= tail-F /home/hadoop/flume-1.5.0-bin/log_exec_tail

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啓動服務flume agent a1


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console



for i in {1..100};do echo "exec tail$i" >> /home/hadoop/flume-1.5.0-bin/log_exec_tail;echo $i;sleep 0.1;done



Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test }

Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test }


案例4:Syslogtcp 監聽TCP的端口作爲數據源


vi /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


 (b)啓動flume agent a1


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console



echo "hello idoall.org syslog" | nc localhost 5140



Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }






vi /home/hadoop/flume-1.5.0-bin/conf/post_json.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 8888

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啓動flume agent a1



flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console

(c)生成JSON 格式的POST request


curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888



Event: { headers:{b=b1, a=a1}

body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79  idoall.org_body }


案例6:Hadoop sink


vi /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://m1:9000/user/flume/syslogtcp

a1.sinks.k1.hdfs.filePrefix = Syslog

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啓動flume agent a1


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console



echo "hello idoall flume -> hadoop testing one" | nc localhost 5140

(d) 在m1上再打開一個窗口,去hadoop上檢查文件是否生成


hadoop fs -ls /user/flume/syslogtcp

hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504


案例7:File Roll Sink


vi /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5555

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= file_roll

a1.sinks.k1.sink.directory = /home/hadoop/flume-1.5.0-bin/logs

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啓動flume agent a1


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console



echo "hello idoall.org syslog" | nc localhost 5555

echo "hello idoall.org syslog 2" | nc localhost 5555



ll /home/hadoop/flume-1.5.0-bin/logs

cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-1

cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-2

hello idoall.org syslog

hello idoall.org syslog 2



案例8:Replicating Channel Selector Flume支持Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是複製和複用。在複製的狀況下,流的事件被髮送到全部的配置通道。在複用的狀況下,事件被髮送到可用的渠道中的一個子集。Fan out流須要指定源和Fan out通道的規則。此次咱們須要用到m1,m2兩臺機器


vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf


a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= replicating

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind =

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf

 scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf

(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console



echo "hello idoall.org syslog" | nc localhost 5140



Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }


案例9:Multiplexing Channel Selector



vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf


a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= multiplexing

a1.sources.r1.selector.header = type


a1.sources.r1.selector.mapping.baidu = c1

a1.sources.r1.selector.mapping.ali = c2

a1.sources.r1.selector.default = c1

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind =

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf

scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf

(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console



curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://localhost:5140 &&

curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://localhost:5140 &&

curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://localhost:5140



Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31} 

Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33}



Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32}



案例10:Flume Sink Processors failover的機器是一直髮送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink。



vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf


a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

#這個是配置failover的關鍵,須要有一個sink group

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2


a1.sinkgroups.g1.processor.type= failover


a1.sinkgroups.g1.processor.priority.k1 = 5

a1.sinkgroups.g1.processor.priority.k2 = 10


a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= replicating

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind =

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf


scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf

(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console



echo "idoall.org test1 failover" | nc localhost 5140



Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }



echo "idoall.org test2 failover" | nc localhost 5140



Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }



flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console



echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140



Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }



案例11:Load balancing Sink Processor load balance type和failover不一樣的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種狀況下若是被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。



vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf


a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1

#這個是配置Load balancing的關鍵,須要有一個sink group

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type= load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = round_robin

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind =

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf


scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf

(d)打開4個窗口,在m1和m2上同時啓動兩個flume agent


flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console



echo "idoall.org test1" | nc localhost 5140

echo "idoall.org test2" | nc localhost 5140

echo "idoall.org test3" | nc localhost 5140

echo "idoall.org test4" | nc localhost 5140



Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }



Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }




案例12:Hbase sink



cp/home/hadoop/hbase-0.96.2-hadoop2/lib/protobuf-java-2.5.0.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-client-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-common-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-protocol-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-server-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop2-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/htrace-core-2.04.jar /home/hadoop/flume-1.5.0-bin/lib




vi /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf


a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.type= hbase

a1.sinks.k1.table = test_idoall_org

a1.sinks.k1.columnFamily = name

a1.sinks.k1.column = idoall

a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

a1.sinks.k1.channel = memoryChannel

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(e)啓動flume agent


flume-ngagent -c . –f /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console



echo "hello idoall.org from flume" | nc localhost 5140



hbase shell


hbase(main):001:0> list






=> ["hbase2hive_idoall","hive2hbase_idoall","test_idoall_org"]


hbase(main):002:0> scan "test_idoall_org"


hbase(main):004:0> quit

