Flume NG 學習筆記(二)單機與集羣Flume 配置

下面的內容基原本自官網:http://flume.apache.org/FlumeUserGuide.htmlhtml

本文使用的是最新版本的apache flume 1.5,安裝完Flume而後測試下Flume是否能夠用,在Flume目錄下用如下語句測試:java

bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.templatenode

結果如圖顯示:apache



Ok,咱們接下去看下面經常使用架構、功能配置示例架構

1、最簡單的單一代理Flume 配置

 

下面是配置文件:app


[html] view plain copyide

  1. #文件名:single_case1.conf.conf  測試

  2. #配置內容:  網站

  3. #single_case1.conf.conf: A single-node Flume configuration  ui

  4. #Name the components on this agent  

  5. a1.sourcesr1  

  6. a1.sinksk1  

  7. a1.channelsc1  

  8.    

  9. #Describe/configure the source  

  10. a1.sources.r1.typenetcat  

  11. a1.sources.r1.bindlocalhost  

  12. a1.sources.r1.port44444  

  13.    

  14. #Describe the sink  

  15. a1.sinks.k1.typelogger  

  16.    

  17. #Use a channel which buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.capacity1000  

  20. a1.channels.c1.transactionCapacity100  

  21.    

  22. #Bind the source and sink to the channel  

  23. a1.sources.r1.channelsc1  

  24. a1.sinks.k1.channelc1  




說明下,這裏全部的例子都是將配置文件放到 $FLUME_HOME/conf 目錄下,後面就不贅述了。

 

#敲命令

flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console

 

#參數命令

-c conf 指定配置目錄爲conf

-f conf/single_case1.conf指定配置文件爲conf/single_case1.conf

-n a1 指定agent名字爲a1,須要與case1_example.conf中的一致

-Dflume.root.logger=INFO,console指定DEBUF模式在console輸出INFO信息

具體參數命令請經過flume-nghelp查看

 

#而後在另外一個終端進行測試

telnet 127.0.0.1 44444



而後會看在以前啓動的終端查看console輸出到以下:



這裏會發現消息hello world! 輸出了,而hello world! hello world!hello world!則被攔截了。由於在配置文件中,咱們選擇的輸出方式爲:a1.sinks.k1.type= logger

,即console輸出,flume-ng針對logger是隻顯示16個字節的,剩下的都被sink截了。下面是源碼
在LoggerSink.Java中:


[java] view plain copy

  1. if(event != null) {  

  2.        if (logger.isInfoEnabled()) {  

  3.          logger.info("Event: " + EventHelper.dumpEvent(event));  

  4.        }  

  5. }  



咱們去看EventHelper.java的dumpEvent方法:


[java] view plain copy

  1. privatestatic final int DEFAULT_MAX_BYTES = 16;  

  2. publicstatic String dumpEvent(Event event) {  

  3.    return dumpEvent(event, DEFAULT_MAX_BYTES);  

  4. }  

  5.    

  6. publicstatic String dumpEvent(Event event, int maxBytes) {  

  7.    StringBuilder buffer = new StringBuilder();  

  8.    if (event == null || event.getBody() == null) {  

  9.      buffer.append("null");  

  10.    } else if (event.getBody().length == 0) {  

  11.      // do nothing... in this case, HexDump.dump() will throw anexception  

  12.    } else {  

  13.      byte[] body = event.getBody();  

  14.      byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));  

  15.      ByteArrayOutputStream out = new ByteArrayOutputStream();  

  16.      try {  

  17.        HexDump.dump(data, 0, out, 0);  

  18.        String hexDump = new String(out.toByteArray());  

  19.        // remove offset since it's not relevant for such a smalldataset  

  20.        if(hexDump.startsWith(HEXDUMP_OFFSET)) {  

  21.          hexDump =hexDump.substring(HEXDUMP_OFFSET.length());  

  22.        }  

  23.        buffer.append(hexDump);  

  24.      } catch (Exception e) {  

  25.       if(LOGGER.isInfoEnabled()) {  

  26.         LOGGER.info("Exception while dumpingevent", e);  

  27.       }  

  28.        buffer.append("...Exception while dumping:").append(e.getMessage());  

  29.      }  

  30.      String result = buffer.toString();  

  31.      if(result.endsWith(EOL) && buffer.length() >EOL.length()) {  

  32.        buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();  

  33.      }  

  34.    }  

  35.    return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";  

  36.  }  



不難看出,在event處理過程當中,發生了數據截取操做。

Ok,進入下一個環節。

 

2、「集羣」代理Flume 配置



這裏集羣的概念是多臺機器的管理,最簡單的就是兩臺機器一臺代理主機從數據源獲取數據,而後將數據在傳送到另外一臺主機上,進行輸出。這樣作的意義是,一個業務多數據源的時候,咱們能夠對每一個數據源設置代理,而後將它們彙總到一臺代理主機上進行輸出。

下面實現最簡單的集羣配置,即兩個代理,一臺接受數據源數據的代理將數據推送到彙總的代理,而彙總的代理再將數據輸出。所以這兩臺主機分別是push,pull

根據上圖須要用AVRO RPC通訊,所以推數據sinks類型與拉數據的sources的類型都是avro 。而拉數據代理的數據源,咱們用前文講的Spool Source 形式來處理,這裏咱們預先建好目錄與文件,test.log



下面設置推代理主機的flume配置文件:


[html] view plain copy

  1. #推數據代理的配置文件push.conf  

  2. #Name the components on this agent  

  3. a2.sourcesr1  

  4. a2.sinksk1  

  5. a2.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a2.sources.r1.typespooldir  

  9. a2.sources.r1.spoolDir= /tmp/logs  

  10. a2.sources.r1.channelsc1  

  11.    

  12. #Use a channel which buffers events in memory  

  13. a2.channels.c1.typememory  

  14. a2.channels.c1.keep-alive10  

  15. a2.channels.c1.capacity100000  

  16. a2.channels.c1.transactionCapacity100000  

  17.    

  18. #Describe/configure the source  

  19. a2.sinks.k1.typeavro  

  20. a2.sinks.k1.channelc1  

  21. a2.sinks.k1.hostnamepull  

  22. a2.sinks.k1.port4444  




下面設置彙總代理主機的flume配置文件:


[html] view plain copy

  1. #彙總數據代理的配置文件pull.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.typeavro  

  9. a1.sources.r1.channelsc1  

  10. a1.sources.r1.bindpull  

  11. a1.sources.r1.port44444  

  12.    

  13. #Describe the sink  

  14. a1.sinks.k1.typelogger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. #Use a channel which buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.keep-alive10  

  20. a1.channels.c1.capacity100000  

  21. a1.channels.c1.transactionCapacity100000  




雖然Spool Source是非實時的,但因爲數據量少,處理仍是很快的,所以咱們只能先啓動pull代理。

#敲命令

flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console


上圖顯示成功。

前後去啓動push主機的flume

#敲命令

flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console




查看pull主機的狀態,發現數據已經傳過來了。

而後會過去看push主機的文件


已經加上後綴名.COMPLETED。這與前文說的是一致的。

 

下面只要將新數據存入到目錄/tmp/logs,push主機就會將數據發送到pull主機輸出,並修改新數據文件的文件名。

相關文章
相關標籤/搜索