下面的內容基原本自官網:http://flume.apache.org/FlumeUserGuide.htmlhtml
本文使用的是最新版本的apache flume 1.5,安裝完Flume而後測試下Flume是否能夠用,在Flume目錄下用如下語句測試:java
bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.templatenode
結果如圖顯示:apache
Ok,咱們接下去看下面經常使用架構、功能配置示例架構
下面是配置文件:app
[html] view plain copyide
#文件名:single_case1.conf.conf 測試
#配置內容: 網站
#single_case1.conf.conf: A single-node Flume configuration ui
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#Bind the source and sink to the channel
a1.sources.r1.channels= c1
a1.sinks.k1.channel= c1
說明下,這裏全部的例子都是將配置文件放到 $FLUME_HOME/conf 目錄下,後面就不贅述了。
#敲命令
flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console
#參數命令
-c conf 指定配置目錄爲conf
-f conf/single_case1.conf指定配置文件爲conf/single_case1.conf
-n a1 指定agent名字爲a1,須要與case1_example.conf中的一致
-Dflume.root.logger=INFO,console指定DEBUF模式在console輸出INFO信息
具體參數命令請經過flume-nghelp查看
#而後在另外一個終端進行測試
telnet 127.0.0.1 44444
而後會看在以前啓動的終端查看console輸出到以下:
這裏會發現消息hello world! 輸出了,而hello world! hello world!hello world!則被攔截了。由於在配置文件中,咱們選擇的輸出方式爲:a1.sinks.k1.type= logger
,即console輸出,flume-ng針對logger是隻顯示16個字節的,剩下的都被sink截了。下面是源碼
在LoggerSink.Java中:
[java] view plain copy
if(event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event));
}
}
咱們去看EventHelper.java的dumpEvent方法:
[java] view plain copy
privatestatic final int DEFAULT_MAX_BYTES = 16;
publicstatic String dumpEvent(Event event) {
return dumpEvent(event, DEFAULT_MAX_BYTES);
}
publicstatic String dumpEvent(Event event, int maxBytes) {
StringBuilder buffer = new StringBuilder();
if (event == null || event.getBody() == null) {
buffer.append("null");
} else if (event.getBody().length == 0) {
// do nothing... in this case, HexDump.dump() will throw anexception
} else {
byte[] body = event.getBody();
byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
HexDump.dump(data, 0, out, 0);
String hexDump = new String(out.toByteArray());
// remove offset since it's not relevant for such a smalldataset
if(hexDump.startsWith(HEXDUMP_OFFSET)) {
hexDump =hexDump.substring(HEXDUMP_OFFSET.length());
}
buffer.append(hexDump);
} catch (Exception e) {
if(LOGGER.isInfoEnabled()) {
LOGGER.info("Exception while dumpingevent", e);
}
buffer.append("...Exception while dumping:").append(e.getMessage());
}
String result = buffer.toString();
if(result.endsWith(EOL) && buffer.length() >EOL.length()) {
buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();
}
}
return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";
}
不難看出,在event處理過程當中,發生了數據截取操做。
Ok,進入下一個環節。
這裏集羣的概念是多臺機器的管理,最簡單的就是兩臺機器一臺代理主機從數據源獲取數據,而後將數據在傳送到另外一臺主機上,進行輸出。這樣作的意義是,一個業務多數據源的時候,咱們能夠對每一個數據源設置代理,而後將它們彙總到一臺代理主機上進行輸出。
下面實現最簡單的集羣配置,即兩個代理,一臺接受數據源數據的代理將數據推送到彙總的代理,而彙總的代理再將數據輸出。所以這兩臺主機分別是push,pull
根據上圖須要用AVRO RPC通訊,所以推數據sinks類型與拉數據的sources的類型都是avro 。而拉數據代理的數據源,咱們用前文講的Spool Source 形式來處理,這裏咱們預先建好目錄與文件,test.log
下面設置推代理主機的flume配置文件:
[html] view plain copy
#推數據代理的配置文件push.conf
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= spooldir
a2.sources.r1.spoolDir= /tmp/logs
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= pull
a2.sinks.k1.port= 4444
下面設置彙總代理主機的flume配置文件:
[html] view plain copy
#彙總數據代理的配置文件pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= pull
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000
雖然Spool Source是非實時的,但因爲數據量少,處理仍是很快的,所以咱們只能先啓動pull代理。
#敲命令
flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
上圖顯示成功。
前後去啓動push主機的flume
#敲命令
flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console
查看pull主機的狀態,發現數據已經傳過來了。
而後會過去看push主機的文件
已經加上後綴名.COMPLETED。這與前文說的是一致的。
下面只要將新數據存入到目錄/tmp/logs,push主機就會將數據發送到pull主機輸出,並修改新數據文件的文件名。