分佈式日誌收集框架 Flume

1 需求分析

WebServer/ApplicationServer分散在各個機器上,然而咱們依舊想在Hadoop平臺上進行統計分析,如何將日誌收集到Hadoop平臺呢?html

  • 簡單的這樣嗎?
shell cp hadoop集羣的機器上;
hadoop fs -put ... /複製代碼

顯然該法面臨着容錯、負載均衡、高延遲、數據壓縮等一系列問題這顯然已經沒法知足需求了!java

不如問問神奇的Flume呢???node

只須要配置文件,輕鬆解決以上問題!web

2 Flume概述

2.1 官網

  • Flume是一種分佈式,可靠且可用的服務,用於有效地收集,聚合和移動大量日誌數據。
  • 它具備基於流式數據流的簡單靈活的架構。
  • 它具備可靠的可靠性機制和許多故障轉移和恢復機制,具備強大的容錯性。
  • 它使用簡單的可擴展數據模型,容許在線分析應用程序。

2.2 設計目標

  • 可靠性
    當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Best effort(數據發送到接收方後,不會進行確認)。
  • 擴展性
    Flume採用了三層架構,分別爲agent,collector和storage,每一層都可以水平擴展。
    其中,全部agent和collector由master統一管理,這使得系統容易監控和維護,且master容許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。
  • 管理性
    全部agent和colletor由master統一管理,這使得系統便於維護。多master狀況,Flume利用ZooKeeper和gossip,保證動態配置數據的一致性。用戶能夠在master上查看各個數據源或者數據流執行狀況,且能夠對各個數據源配置和動態加載。Flume提供了web 和shell script command兩種形式對數據流進行管理。
  • 功能可擴展性
    用戶能夠根據須要添加本身的agent,collector或者storage。此外,Flume自帶了不少組件,包括各類agent(file, syslog等),collector和storage(file,HDFS等)。

2.3 主流競品對比

其餘的還有好比:shell

  • Logstash: ELK(ElasticsSearch, Logstash, Kibana)
  • Chukwa: Yahoo/Apache, 使用Java語言開發, 負載均衡不是很好, 已經不維護了。
  • Fluentd: 和Flume相似, Ruby開發。

2.4 發展史

  • Cloudera公司提出0.9.2,叫Flume-OG
  • 2011年Flume-728編號,重要里程碑(Flume-NG),貢獻給Apache社區
  • 2012年7月 1.0版本
  • 2015年5月 1.6版本
  • ~ 1.9版本

3 核心架構及其組件

3.1 core架構

在這裏插入圖片描述

3.2 核心的組件

順便來看看官方文檔apache

3.2.1 Source - 收集

指定數據源(Avro, Thrift, Spooling, Kafka, Exec)服務器

3.2.2 Channel - 彙集

把數據暫存(Memory, File, Kafka等用的比較多)網絡

3.2.3 Sink - 輸出

把數據寫至某處(HDFS, Hive, Logger, Avro, Thrift, File, ES, HBase, Kafka等)架構

multi-agent flow

爲了跨多個代理或跳數據流,先前代理的接收器和當前跳的源須要是avro類型,接收器指向源的主機名(或IP地址)和端口。負載均衡

Consolidation合併

日誌收集中很是常見的狀況是大量日誌生成客戶端將數據發送到鏈接到存儲子系統的少數消費者代理。 例如,從數百個Web服務器收集的日誌發送給寫入HDFS集羣的十幾個代理。這能夠經過使用avro接收器配置多個第一層代理在Flume中實現,全部這些代理都指向單個代理的avro源(一樣,您能夠在這種狀況下使用thrift源/接收器/客戶端)。 第二層代理上的此源將接收的事件合併到單個信道中,該信道由信宿器消耗到其最終目的地。

Multiplexing the flow

Flume支持將事件流多路複用到一個或多個目的地。 這是經過定義能夠複製或選擇性地將事件路由到一個或多個信道的流複用器來實現的。上面的例子顯示了來自代理「foo」的源代碼將流程擴展到三個不一樣的通道。 扇出能夠複製或多路複用。 在複製流的狀況下,每一個事件被髮送到全部三個通道。 對於多路複用狀況,當事件的屬性與預配置的值匹配時,事件將被傳遞到可用通道的子集。 例如,若是一個名爲「txnType」的事件屬性設置爲「customer」,那麼它應該轉到channel1和channel3,若是它是「vendor」,那麼它應該轉到channel2,不然轉到channel3。 能夠在代理的配置文件中設置映射。

4 環境配置與部署

4.1 系統需求

  • 系統
    macOS 10.14.14
  • Java運行時環境
    Java 1.8或更高版本
  • 內存源
    通道或接收器使用的配置的足夠內存
  • 磁盤空間
    通道或接收器使用的配置的足夠磁盤空間
  • 目錄權限
    代理使用的目錄的讀/寫權限

4.2 下載與安裝

4.3 配置

  • 查看安裝路徑
  • 系統配置文件
export FLUME_VERSION=1.9.0
export FLUME_HOME=/usr/local/Cellar/flume/1.9.0/libexec
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$FLUME_HOME/bin:$PATH複製代碼

  • flume配置文件
    配置JAVA_HOME
  • 驗證
    bin下的命令執行文件

安裝成功

5 實戰

使用Flume的核心就在於配置文件

  • 配置Source
  • 配置Channel
  • 配置Sink
  • 組織在一塊兒

5.1 場景1 - 從指定網絡端口收集數據輸出到控制檯

看看官網的第一個案例

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1複製代碼

a1:agent名稱r1:Source名稱k1:Sink名稱c1:Channel名稱

看看其中的

Sources : netcat

相似於netcat的源,它偵聽給定端口並將每行文本轉換爲事件。 像nc -k -l [host] [port]這樣的行爲。 換句話說,它打開一個指定的端口並偵聽數據。 指望是提供的數據是換行符分隔的文本。 每行文本都轉換爲Flume事件,並經過鏈接的通道發送。

必需屬性以粗體顯示。

Sinks:logger

在INFO級別記錄事件。 一般用於測試/調試目的。 必需屬性以粗體顯示。 此接收器是惟一的例外,它不須要在「記錄原始數據」部分中說明的額外配置。

channel:memor

事件存儲在具備可配置最大大小的內存中隊列中。 它很是適用於須要更高吞吐量的流量,而且在代理髮生故障時準備丟失分階段數據。 必需屬性以粗體顯示。

實戰

新建example.conf配置

在conf目錄下

啓動一個agent

使用名爲flume-ng的shell腳本啓動代理程序,該腳本位於Flume發行版的bin目錄中。 您須要在命令行上指定代理名稱,config目錄和配置文件:

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template複製代碼
  • 回顧命令參數的意義
bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console複製代碼

如今,代理將開始運行在給定屬性文件中配置的源和接收器。

使用telnet進行測試驗證

  • 注意
telnet 127.0.0.1 44444複製代碼
  • 發送了兩條數據
  • 這邊接收到了數據

    讓咱們詳細分析下上圖中的數據信息
2019-06-12 17:52:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] 
Event: { headers:{} body: 4A 61 76 61 45 64 67 65 0D                      JavaEdge. }複製代碼

其中的Event是Fluem數據傳輸的基本單元Event = 可選的header + byte array

5.2 場景2 - 監控一個文件實時採集新增的數據輸出到控制檯

Exec Source

Exec源在啓動時運行給定的Unix命令,並指望該進程在標準輸出上連續生成數據(stderr被簡單地丟棄,除非屬性logStdErr設置爲true)。 若是進程因任何緣由退出,則源也會退出而且不會生成其餘數據。 這意味着諸如cat [named pipe]或tail -F [file]之類的配置將產生所需的結果,而日期可能不會 - 前兩個命令產生數據流,然後者產生單個事件並退出

Agent 選型

exec source + memory channel + logger sink

配置文件

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /Volumes/doc/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1複製代碼

在conf下新建配置文件以下:

  • data.log文件內容
  • 成功接收
    在這裏插入圖片描述

5.3 應用場景3 - 將A服務器上的日誌實時採集到B服務器

技術選型

exec s + memory c + avro savro s + memory c + loger s

配置文件

exec-memory-avro.conf

# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444

# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel複製代碼

# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444

# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel複製代碼

參考

https://tech.meituan.com/2013/12/09/meituan-flume-log-system-architecture-and-design.html

相關文章
相關標籤/搜索