flume分佈式日誌收集測試

官方參考文檔html

https://flume.apache.org/FlumeUserGuide.html#file-channel前端

Flume NG是一個分佈式、可靠、可用的系統,它可以將不一樣數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到一箇中心化數據存儲系統中。由原來的Flume OG到如今的Flume NG,進行了架構重構,而且如今NG版本徹底不兼容原來的OG版本。通過架構重構後,Flume NG更像是一個輕量的小工具,很是簡單,容易適應各類方式日誌收集,並支持failover和負載均衡。nginx

架構設計要點shell

Flume的架構主要有一下幾個核心概念:數據庫

  • Event:一個數據單元,帶有一個可選的消息頭apache

  • Flow:Event從源點到達目的點的遷移的抽象tomcat

  • Client:操做位於源點處的Event,將其發送到Flume Agentbash

  • Agent:一個獨立的Flume進程,包含組件Source、Channel、Sink服務器

  • Source:用來消費傳遞到該組件的Event架構

  • Channel:中轉Event的一個臨時存儲,保存有Source組件傳遞過來的Event

  • Sink:從Channel中讀取並移除Event,將Event傳遞到Flow Pipeline中的下一個Agent(若是有的話)

Flume NG架構,如圖所示:

flume-ng-architecture


外部系統產生日誌,直接經過Flume的Agent的Source組件將事件(如日誌行)發送到中間臨時的channel組件,最後傳遞給Sink組件,HDFS Sink組件能夠直接把數據存儲到HDFS集羣上。
一個最基本Flow的配置,格式以下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>


尖括號裏面的,咱們能夠根據實際需求或業務來修更名稱。下面詳細說明:
表示配置一個Agent的名稱,一個Agent確定有一個名稱。與是Agent的Source組件的名稱,消費傳遞過來的Event。與是Agent的Channel組件的名稱。與是Agent的Sink組件的名稱,從Channel中消費(移除)Event。
上面配置內容中,第一組中配置Source、Sink、Channel,它們的值能夠有1個或者多個;第二組中配置Source將把數據存儲(Put)到哪個Channel中,能夠存儲到1個或多個Channel中,同一個Source將數據存儲到多個Channel中,其實是Replication;第三組中配置Sink從哪個Channel中取(Task)數據,一個Sink只能從一個Channel中取數據。
下面,根據官網文檔,咱們展現幾種Flow Pipeline,各自適應於什麼樣的應用場景:

  • 多個Agent順序鏈接

flume-multiseq-agents
能夠將多個Agent順序鏈接起來,將最初的數據源通過收集,存儲到最終的存儲系統中。這是最簡單的狀況,通常狀況下,應該控制這種順序鏈接的Agent的數量,由於數據流經的路徑變長了,若是不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。

  • 多個Agent的數據匯聚到同一個Agent

flume-join-agent
這種狀況應用的場景比較多,好比要收集Web網站的用戶行爲日誌,Web網站爲了可用性使用的負載均衡的集羣模式,每一個節點都產生用戶行爲日誌,能夠爲每一個節點都配置一個Agent來單獨收集日誌數據,而後多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。

  • 多路(Multiplexing)Agent

flume-multiplexing-agent
這種模式,有兩種方式,一種是用來複制(Replication),另外一種是用來分流(Multiplexing)。Replication方式,能夠將最前端的數據源複製多份,分別傳遞到多個channel中,每一個channel接收到的數據都是相同的,配置格式,以下所示:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating


上面指定了selector的type的值爲replication,其餘的配置沒有指定,使用的Replication方式,Source1會將數據分別存儲到Channel1和Channel2,這兩個channel裏面存儲的數據是相同的,而後數據被傳遞到Sink1和Sink2。
Multiplexing方式,selector能夠根據header的值來肯定數據傳遞到哪個channel,配置格式,以下所示:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>



上面selector的type的值爲multiplexing,同時配置selector的header信息,還配置了多個selector的mapping的值,即header的值:若是header的值爲Value一、Value2,數據從Source1路由到Channel1;若是header的值爲Value二、Value3,數據從Source1路由到Channel2。

  • 實現load balance功能

flume-load-balance-agents
Load balancing Sink Processor可以實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每一個Sink組件分別鏈接到一個獨立的Agent上,示例配置,以下所示:


a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
  • 實現failover能

Failover Sink Processor可以實現failover功能,具體流程相似load balance,可是內部處理機制與load balance徹底不一樣:Failover Sink Processor維護一個優先級Sink組件列表,只要有一個Sink組件可用,Event就被傳遞到下一個組件。若是一個Sink可以成功處理Event,則會加入到一個Pool中,不然會被移出Pool並計算失敗次數,設置一個懲罰因子,示例配置以下所示:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000

基本功能

咱們看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),瞭解它的功能集合,可以讓咱們在應用中更好地選擇使用哪種方案。說明Flume NG的功能,實際仍是圍繞着Agent的三個組件Source、Channel、Sink來看它可以支持哪些技術或協議。咱們再也不對各個組件支持的協議詳細配置進行說明,經過列表的方式分別對三個組件進行概要說明:

Source類型	                說明
Avro Source	                支持Avro協議(其實是Avro RPC),內置支持
Thrift Source	                支持Thrift協議,內置支持
Exec Source	                基於Unix的command在標準輸出上生產數據
JMS Source	                從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過
Spooling Directory Source	監控指定目錄內數據變動
Twitter 1% firehose Source	經過API持續下載Twitter數據,試驗性質
Netcat Source	                監控某個端口,將流經端口的每個文本行數據做爲Event輸入
Sequence Generator Source	序列生成器數據源,生產序列數據
Syslog Sources	                讀取syslog數據,產生Event,支持UDP和TCP兩種協議
HTTP Source	                    基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
Legacy Sources	                兼容老的Flume OG中Source(0.9.x版本)
Flume Channel
###############################################################
Channel類型	                說明
Memory Channel	                Event數據存儲在內存中
JDBC Channel	                Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel	                Event數據存儲在磁盤文件中
Spillable Memory Channel	Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
Pseudo Transaction Channel	測試用途
Custom Channel	                自定義Channel實現
Flume Sink
###################################################################
Sink類型	                說明
HDFS Sink	                數據寫入HDFS
Logger Sink	                數據寫入日誌文件
Avro Sink	                數據被轉換成Avro Event,而後發送到配置的RPC端口上
Thrift Sink	                數據被轉換成Thrift Event,而後發送到配置的RPC端口上
IRC Sink	                數據在IRC上進行回放
File Roll Sink	        存儲數據到本地文件系統
Null Sink	                丟棄到全部數據
HBase Sink	                數據寫入HBase數據庫
Morphline Solr Sink	        數據發送到Solr搜索服務器(集羣)
ElasticSearch Sink	        數據發送到Elastic Search搜索服務器(集羣)
Kite Dataset Sink	        寫數據到Kite Dataset,試驗性質的
Custom Sink	                自定義Sink實現
#################################################################
另外還有Channel Selector、Sink Processor、Event Serializer、Interceptor等組件,能夠參考官網提供的用戶手冊。
  1. 安裝配置略,能夠參考網上教程


  2. 下面是測試的配置文件

    agent 配置文件以下



  3. # Name the  components on this agent
    a1.sources =  r1
    a1.sinks =  k1
    a1.channels  = c1
    
    a1.sinks.k1.type = avro 
    a1.sinks.k1.hostname = 127.0.0.1
    a1.sinks.k1.port = 44444 
    a1.sinks.k1.channel = c1
    
    # Use a  channel which buffers events in memory
    a1.channels.c1.type  = memory
    a1.channels.c1.capacity  = 1000
    a1.channels.c1.transactionCapacity  = 100
    
    a1.sources.r1.type  = exec
    a1.sources.r1.command  = tail -F  /var/log/nginx/access.log
    a1.sources.r1.channels  = c1
    a1.sources.s.deserializer.maxLineLength=65535

server端配置文件以下:  測試複製(Replication)1個source 複製到多個channels 輸出到多個sink

# Name the  components on this agent
b1.sources =  r1
b1.sinks =  k1 k2 k3 
b1.channels  = c1 c2 c3
b1.sources.r1.selector.type = replicating



b1.sources.r1.type = avro 
b1.sources.r1.channels = c1 c2 c3 
b1.sources.r1.bind = 0.0.0.0 
b1.sources.r1.port = 44444


b1.channels.c1.type = file 
b1.channels.c1.write-timeout = 10 
b1.channels.c1.keep-alive = 10 
b1.channels.c1.checkpointDir = /flume/check
b1.channels.c1.useDualCheckpoints = true 
b1.channels.c1.backupCheckpointDir = /flume/backup
b1.channels.c1.dataDirs = /flume


b1.channels.c2.type=memory  
b1.channels.c2.capacity=2000000  
b1.channels.c2.transactionCapacity=10000  


b1.channels.c3.type=memory  
b1.channels.c3.capacity=2000000  
b1.channels.c3.transactionCapacity=10000  
# Describe the sink 
b1.sinks.k1.type  = hdfs
b1.sinks.k1.channel  = c1
b1.sinks.k1.hdfs.path  = hdfs://localhost:9000/user/hadoop/flume/collected/
b1.sinks.k1.hdfs.filePrefix  = chen_test
b1.sinks.k1.hdfs.round  = true
b1.sinks.k1.hdfs.roundValue  = 10
b1.sinks.k1.hdfs.roundUnit  = minute


b1.sinks.k2.channel = c2
b1.sinks.k2.type = file_roll
b1.sinks.k2.batchSize = 100000000
b1.sinks.k2.rollInterval = 1000000
b1.sinks.k2.serializer = TEXT
b1.sinks.k2.sink.directory = /var/log/flume


b1.sinks.k3.channel = c3 
b1.sinks.k3.type = logger

啓動測試命令

flume-ng agent -c . -f test.conf   -n b1  -Dflume.root.logger=INFO,console

-c  配置文件目錄  -f配置文件  -n  節點名字  和配置文件對應     console打到終端



5.flume-ng負載均衡load-balance、failover集羣搭建

參考連接

http://blog.csdn.net/lskyne/article/details/37662835


6. 測試   區分 flume日誌合併在一塊兒的日誌

a1配置
[root@host_12 test]# cat  a1.conf 
# Name the  components on this agent
a1.sources =  r1
a1.sinks =  k1
a1.channels  = c1

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type=static
a1.sources.r1.interceptors.i1.key=nginx
a1.sources.r1.interceptors.i1.value=nginx_1
a1.sources.r1.interceptors.i1.preserveExisting=false


#a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = host
#a1.sources.r1.interceptors.i1.hostHeader = hostname

a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = 127.0.0.1
a1.sinks.k1.port = 44444 
a1.sinks.k1.channel = c1

# Use a  channel which buffers events in memory
a1.channels.c1.type  = memory
a1.channels.c1.capacity  = 1000
a1.channels.c1.transactionCapacity  = 100

a1.sources.r1.type  = exec
###匹配shell  tomcat yyyy:mm:dd:hh格式的日誌
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.command  = tail -f  /var/log/nginx_1/access_`date +%Y%m%d%H`.log
a1.sources.r1.channels  = c1

#########匹配替換行裏的文本的內容
#a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = search_replace
#a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
#a1.sources.r1.interceptors.i1.replaceString = lxw1234
#a1.sources.r1.interceptors.i1.charset = UTF-8
###########################################
a2配置

[root@host_12 test]# cat  a2.conf 
# Name the  components on this agent
a2.sources =  r1
a2.sinks =  k1
a2.channels  = c1

a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type=static
a2.sources.r1.interceptors.i1.key=nginx
a2.sources.r1.interceptors.i1.value=nginx_2
a2.sources.r1.interceptors.i1.preserveExisting=false


a2.sinks.k1.type = avro 
a2.sinks.k1.hostname = 127.0.0.1
a2.sinks.k1.port = 44444 
a2.sinks.k1.channel = c1

# Use a  channel which buffers events in memory
a2.channels.c1.type  = memory
a2.channels.c1.capacity  = 1000
a2.channels.c1.transactionCapacity  = 100

a2.sources.r1.type  = exec
a2.sources.r1.shell = /bin/bash -c
a2.sources.r1.command  = tail -f  /var/log/nginx_2/access_`date +%Y%m%d%H`.log
a2.sources.r1.channels  = c1

####################################
server配置
[root@host_12 test]# cat   h1.conf
# Name the  components on this agent
serv_1.sources =  r1
serv_1.sinks =   k2 k3 
serv_1.channels  =  c2 c3
#serv_1.sources.r1.selector.type = replicating

serv_1.sources.r1.selector.type = multiplexing
serv_1.sources.r1.selector.header = nginx
serv_1.sources.r1.selector.mapping.nginx_1 = c2
serv_1.sources.r1.selector.mapping.nginx_2 = c3
 
 
 
serv_1.sources.r1.type = avro 
serv_1.sources.r1.channels =  c2 c3 
serv_1.sources.r1.bind = 0.0.0.0 
serv_1.sources.r1.port = 44444
 
 
 
 
serv_1.channels.c2.type=memory  
serv_1.channels.c2.capacity=2000000  
serv_1.channels.c2.transactionCapacity=10000  
 
 
serv_1.channels.c3.type=memory  
serv_1.channels.c3.capacity=2000000  
serv_1.channels.c3.transactionCapacity=10000  
 
 
serv_1.sinks.k2.channel = c2
serv_1.sinks.k2.type = file_roll
serv_1.sinks.k2.batchSize = 100000000
serv_1.sinks.k2.rollInterval = 1000000
serv_1.sinks.k2.serializer = TEXT
serv_1.sinks.k2.sink.directory = /var/log/flume/
 
 
serv_1.sinks.k3.channel = c3 
serv_1.sinks.k3.type = logger
相關文章
相關標籤/搜索