一、flume的特色:java
flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。apache
flume的數據流由事件(Event)貫穿始終。Event是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。數組
flume的可靠性 :app
當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。dom
flume的可恢復性:eclipse
仍是靠Channel。推薦使用FileChannel,事件Event持久化在本地文件系統裏(性能較差)。 分佈式
flume的一些核心概念:oop
Agent使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。性能
Client生產數據,運行在一個獨立的線程。ui
Source從Client收集數據,傳遞給Channel。
Sink從Channel收集數據,運行在一個獨立線程。
Channel鏈接 sources 和 sinks ,這個有點像一個隊列。
Events能夠是日誌記錄、 avro 對象等。
Flume以agent爲最小的獨立運行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成,以下圖:
值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source、Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等。Flume支持用戶創建多級流,也就是說,多個agent能夠協同工做,而且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。以下圖所示:
二、flume的案例
Spool 監測配置的目錄下新增的文件,並將文件中的數據讀取出來。須要注意兩點:
1) 拷貝到spool目錄下的文件不能夠再打開編輯。
2) spool目錄下不可包含相應的子目錄。
############################################
(a)log4j配置:
我使用log4j的DailyRollingFileAppender去每分鐘生成一個日誌到配置的目錄下,代碼以下:
#輸出信息到文件
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
#這個是生成日誌文件的目錄及文件名
log4j.appender.file.File = /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log
log4j.appender.file.Append = true #每分鐘產生一個日誌文件 #當前的文件名是testlog.log,前面分鐘產生的文件是這種命名形式testlog.log.2018-08-20-18-16。 log4j.appender.file.DatePattern = '.'yyyy-MM-dd-HH-mm log4j.appender.file.layout = org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern = [%-5p] %-d{yyyy-MM-dd HH:mm:ss} %m%n
(b)模擬產生日誌:
日誌的內容(不含log4j中的配置)爲:0a58f82b-ff6f-4feb-abe2-7c6ac9a0c24d####ERH####qhp####6677062格式爲:用戶ID--縣號--鎮號--收入
public class Main { public static void main(String[] args) throws Exception { Thread thread = new Thread(new GenerateRecord()); thread.start(); } } class GenerateRecord extends Thread { private final Logger log = Logger.getLogger(GenerateRecord.class); public void run() { while (true) { // 隨機產生一個用戶uuid UUID userId = UUID.randomUUID(); System.out.println(userId.toString().length()); // 產生一個隨機的用戶總資產 int num = (int) (Math.random() * 10000000) + 100000; // 產生一個隨意的縣名 StringBuilder sb = new StringBuilder(); for (int i = 0; i < 3; i++) { char a = (char) (Math.random() * (90 - 65) + 65); sb.append(a); } String county = sb.toString(); // 產生一個隨機的鎮名 StringBuilder sb1 = new StringBuilder(); for (int i = 0; i < 3; i++) { char a = (char) (Math.random() * (122 - 97) + 97); sb1.append(a); } String town = sb1.toString(); // 生成日誌 log.info(userId + "####" + county + "####" + town + "####" + num); // 停1秒鐘 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
在幾分鐘後停掉程序,在終端輸入cd /Users/jsj/eclipse-workspace/log4j/src/main/java/
查看生成的文件 ls -1
,以下:
(c)建立agent配置文件:
在flume安裝目錄的conf/flume.conf
下加入以下代碼:
----------------------------------------------------------------
# my application flume configuration
#agent2是咱們給agent起的名字
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2
#Spooling Directory
#set source2
#設置type爲spooldir,這個值是flume給定的alias
agent2.sources.source2.type=spooldir
#設置監控目錄,注意和前面log4j的目錄不一樣
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs
agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = false #set sink2 agent2.sinks.sink2.type=hdfs agent2.sinks.sink2.hdfs.path=hdfs://localhost:9000/flume agent2.sinks.sink2.hdfs.fileType=DataStream agent2.sinks.sink2.hdfs.writeFormat=TEXT agent2.sinks.sink2.hdfs.rollInterval=60 agent2.sinks.sink2.channel=channel2 #設置存儲到HDFS後文件的前綴 agent2.sinks.sink2.hdfs.filePrefix=%Y-%m-%d #set channel2 #設置內存通道 agent2.channels.channel2.type=memory agent2.channels.channel2.capacity=10000 agent2.channels.channel2.transactionCapacity=1000 agent2.channels.channel2.keep-alive=30
----------------------------------------------------------------
啓動服務:
./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console -n agent2 |
觀察日誌:
此時flume的終端會嗖嗖嗖的刷日誌,我截下來幾條,主要是打開文件,對正在處理的文件更名爲.tmp後綴,上傳到HDFS後把HDFS上文件的.tmp刪掉,本地的監控目錄下文件加.COMPLETED後綴。
觀察HDFS:
這時候咱們去HDFS上檢查一下:新開個終端輸入hadoop fs -ls /flume,
發現生成了比咱們文件數多的多的文件,原來只有11個,如今有62個文件。