Apache Flume 是一個分佈式,高可用的數據收集系統。它能夠從不一樣的數據源收集數據,通過聚合後發送到存儲系統中,一般用於日誌數據的收集。Flume 分爲 NG 和 OG (1.0 以前) 兩個版本,NG 在 OG 的基礎上進行了徹底的重構,是目前使用最爲普遍的版本。下面的介紹均以 NG 爲基礎。html
下圖爲 Flume 的基本架構圖:git
外部數據源以特定格式向 Flume 發送 events
(事件),當 source
接收到 events
時,它將其存儲到一個或多個 channel
,channe
會一直保存 events
直到它被 sink
所消費。sink
的主要功能從 channel
中讀取 events
,並將其存入外部存儲系統或轉發到下一個 source
,成功後再從 channel
中移除 events
。github
1. Eventweb
Evnet
是 Flume NG 數據傳輸的基本單元。相似於 JMS 和消息系統中的消息。一個 Evnet
由標題和正文組成:前者是鍵/值映射,後者是任意字節數組。shell
2. Sourceapache
數據收集組件,從外部數據源收集數據,並存儲到 Channel 中。數組
3. Channelbash
Channel
是源和接收器之間的管道,用於臨時存儲數據。能夠是內存或持久化的文件系統:服務器
Memory Channel
: 使用內存,優勢是速度快,但數據可能會丟失 (如忽然宕機);File Channel
: 使用持久化的文件系統,優勢是能保證數據不丟失,可是速度慢。4. Sink架構
Sink
的主要功能從 Channel
中讀取 Evnet
,並將其存入外部存儲系統或將其轉發到下一個 Source
,成功後再從 Channel
中移除 Event
。
5. Agent
是一個獨立的 (JVM) 進程,包含 Source
、 Channel
、 Sink
等組件。
Flume 中的每個組件都提供了豐富的類型,適用於不一樣場景:
Source 類型 :內置了幾十種類型,如 Avro Source
,Thrift Source
,Kafka Source
,JMS Source
;
Sink 類型 :HDFS Sink
,Hive Sink
,HBaseSinks
,Avro Sink
等;
Channel 類型 :Memory Channel
,JDBC Channel
,Kafka Channel
,File Channel
等。
對於 Flume 的使用,除非有特別的需求,不然經過組合內置的各類類型的 Source,Sink 和 Channel 就能知足大多數的需求。在 Flume 官網 上對全部類型組件的配置參數均以表格的方式作了詳盡的介紹,並附有配置樣例;同時不一樣版本的參數可能略有所不一樣,因此使用時建議選取官網對應版本的 User Guide 做爲主要參考資料。
Flume 支持多種架構模式,分別介紹以下
Flume 支持跨越多個 Agent 的數據傳遞,這要求前一個 Agent 的 Sink 和下一個 Agent 的 Source 都必須是 Avro
類型,Sink 指向 Source 所在主機名 (或 IP 地址) 和端口(詳細配置見下文案例三)。
日誌收集中經常存在大量的客戶端(好比分佈式 web 服務),Flume 支持使用多個 Agent 分別收集日誌,而後經過一個或者多個 Agent 聚合後再存儲到文件系統中。
Flume 支持從一個 Source 向多個 Channel,也就是向多個 Sink 傳遞事件,這個操做稱之爲 Fan Out
(扇出)。默認狀況下 Fan Out
是向全部的 Channel 複製 Event
,即全部 Channel 收到的數據都是相同的。同時 Flume 也支持在 Source
上自定義一個複用選擇器 (multiplexing selector) 來實現自定義的路由規則。
Flume 配置一般須要如下兩個步驟:
<Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>
<Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels <Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks <Agent>.sources.<Sink>.<someProperty> = <someValue>
爲方便你們後期查閱,本倉庫中全部軟件的安裝均單獨成篇,Flume 的安裝見:
介紹幾個 Flume 的使用案例:
需求: 監聽文件內容變更,將新增長的內容輸出到控制檯。
實現: 主要使用 Exec Source
配合 tail
命令實現。
新建配置文件 exec-memory-logger.properties
,其內容以下:
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources屬性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c #將sources與channels進行綁定 a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = logger #將sinks與channels進行綁定 a1.sinks.k1.channel = c1 #配置channel類型 a1.channels.c1.type = memory
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \ --name a1 \ -Dflume.root.logger=INFO,console
向文件中追加數據:
控制檯的顯示:
需求: 監聽指定目錄,將目錄下新增長的文件存儲到 HDFS。
實現:使用 Spooling Directory Source
和 HDFS Sink
。
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources屬性 a1.sources.s1.type =spooldir a1.sources.s1.spoolDir =/tmp/logs a1.sources.s1.basenameHeader = true a1.sources.s1.basenameHeaderKey = fileName #將sources與channels進行綁定 a1.sources.s1.channels =c1 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/ a1.sinks.k1.hdfs.filePrefix = %{fileName} #生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true #將sinks與channels進行綁定 a1.sinks.k1.channel = c1 #配置channel類型 a1.channels.c1.type = memory
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \ --name a1 -Dflume.root.logger=INFO,console
拷貝任意文件到監聽目錄下,能夠從日誌看到文件上傳到 HDFS 的路徑:
# cp log.txt logs/
查看上傳到 HDFS 上的文件內容與本地是否一致:
# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801
需求: 將本服務器收集到的數據發送到另一臺服務器。
實現:使用 avro sources
和 avro Sink
實現。
新建配置 netcat-memory-avro.properties
,監聽文件內容變化,而後將新的文件內容經過 avro sink
發送到 hadoop001 這臺服務器的 8888 端口:
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources屬性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop001 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 a1.sinks.k1.channel = c1 #配置channel類型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
使用 avro source
監聽 hadoop001 服務器的 8888 端口,將獲取到內容輸出到控制檯:
#指定agent的sources,sinks,channels a2.sources = s2 a2.sinks = k2 a2.channels = c2 #配置sources屬性 a2.sources.s2.type = avro a2.sources.s2.bind = hadoop001 a2.sources.s2.port = 8888 #將sources與channels進行綁定 a2.sources.s2.channels = c2 #配置sink a2.sinks.k2.type = logger #將sinks與channels進行綁定 a2.sinks.k2.channel = c2 #配置channel類型 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100
啓動日誌彙集 Flume:
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \ --name a2 -Dflume.root.logger=INFO,console
在啓動日誌收集 Flume:
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \ --name a1 -Dflume.root.logger=INFO,console
這裏建議按以上順序啓動,緣由是 avro.source
會先與端口進行綁定,這樣 avro sink
鏈接時纔不會報沒法鏈接的異常。可是即便不按順序啓動也是不要緊的,sink
會一直重試,直至創建好鏈接。
向文件 tmp/log.txt
中追加內容:
能夠看到已經從 8888 端口監聽到內容,併成功輸出到控制檯:
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南