Flume原理分析與使用案例

一、flume的特色:java

  flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。apache

flume的數據流由事件(Event)貫穿始終。Event是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。數組

   flume的可靠性 :app

  當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。dom

  flume的可恢復性:eclipse

  仍是靠Channel。推薦使用FileChannel,事件Event持久化在本地文件系統裏(性能較差)。 分佈式

 

  flume的一些核心概念:oop

  Agent使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。性能

  Client生產數據,運行在一個獨立的線程。ui

  Source從Client收集數據,傳遞給Channel。

  Sink從Channel收集數據,運行在一個獨立線程。

  Channel鏈接 sources 和 sinks ,這個有點像一個隊列。

  Events能夠是日誌記錄、 avro 對象等。

  Flume以agent爲最小的獨立運行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成,以下圖:

 

  值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source、Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等。Flume支持用戶創建多級流,也就是說,多個agent能夠協同工做,而且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。以下圖所示:

 

 二、flume的案例

  Spool 監測配置的目錄下新增的文件,並將文件中的數據讀取出來。須要注意兩點:

  1) 拷貝到spool目錄下的文件不能夠再打開編輯。
  2) spool目錄下不可包含相應的子目錄。

############################################

(a)log4j配置:

   我使用log4j的DailyRollingFileAppender去每分鐘生成一個日誌到配置的目錄下,代碼以下:

#輸出信息到文件
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
#這個是生成日誌文件的目錄及文件名
log4j.appender.file.File = /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log
log4j.appender.file.Append = true #每分鐘產生一個日誌文件 #當前的文件名是testlog.log,前面分鐘產生的文件是這種命名形式testlog.log.2018-08-20-18-16。 log4j.appender.file.DatePattern = '.'yyyy-MM-dd-HH-mm log4j.appender.file.layout = org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern = [%-5p] %-d{yyyy-MM-dd HH:mm:ss} %m%n

(b)模擬產生日誌:

日誌的內容(不含log4j中的配置)爲:0a58f82b-ff6f-4feb-abe2-7c6ac9a0c24d####ERH####qhp####6677062格式爲:用戶ID--縣號--鎮號--收入

 

public class Main {

    public static void main(String[] args) throws Exception {

        Thread thread = new Thread(new GenerateRecord());
        thread.start();
    }
}

class GenerateRecord extends Thread {

    private final Logger log = Logger.getLogger(GenerateRecord.class);

    public void run() {
        while (true) {
            // 隨機產生一個用戶uuid
            UUID userId = UUID.randomUUID();
            System.out.println(userId.toString().length());
            // 產生一個隨機的用戶總資產
            int num = (int) (Math.random() * 10000000) + 100000;
            // 產生一個隨意的縣名
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < 3; i++) {
                char a = (char) (Math.random() * (90 - 65) + 65);
                sb.append(a);
            }
            String county = sb.toString();
            // 產生一個隨機的鎮名
            StringBuilder sb1 = new StringBuilder();
            for (int i = 0; i < 3; i++) {
                char a = (char) (Math.random() * (122 - 97) + 97);
                sb1.append(a);
            }
            String town = sb1.toString();
            // 生成日誌
            log.info(userId + "####" + county + "####" + town + "####" + num);
            // 停1秒鐘
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

在幾分鐘後停掉程序,在終端輸入cd /Users/jsj/eclipse-workspace/log4j/src/main/java/查看生成的文件 ls -1 ,以下:

 

 

(c)建立agent配置文件:

在flume安裝目錄的conf/flume.conf下加入以下代碼:

----------------------------------------------------------------

# my application flume configuration
#agent2是咱們給agent起的名字
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2

#Spooling Directory
#set source2
#設置type爲spooldir,這個值是flume給定的alias
agent2.sources.source2.type=spooldir
#設置監控目錄,注意和前面log4j的目錄不一樣
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs

agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = false #set sink2 agent2.sinks.sink2.type=hdfs agent2.sinks.sink2.hdfs.path=hdfs://localhost:9000/flume agent2.sinks.sink2.hdfs.fileType=DataStream agent2.sinks.sink2.hdfs.writeFormat=TEXT agent2.sinks.sink2.hdfs.rollInterval=60 agent2.sinks.sink2.channel=channel2 #設置存儲到HDFS後文件的前綴 agent2.sinks.sink2.hdfs.filePrefix=%Y-%m-%d #set channel2 #設置內存通道 agent2.channels.channel2.type=memory agent2.channels.channel2.capacity=10000 agent2.channels.channel2.transactionCapacity=1000 agent2.channels.channel2.keep-alive=30

 ----------------------------------------------------------------

啓動服務:

 

./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console  -n agent2

觀察日誌:

此時flume的終端會嗖嗖嗖的刷日誌,我截下來幾條,主要是打開文件,對正在處理的文件更名爲.tmp後綴,上傳到HDFS後把HDFS上文件的.tmp刪掉,本地的監控目錄下文件加.COMPLETED後綴。

 

觀察HDFS:

這時候咱們去HDFS上檢查一下:新開個終端輸入hadoop fs -ls /flume,發現生成了比咱們文件數多的多的文件,原來只有11個,如今有62個文件。

相關文章
相關標籤/搜索