官方參考文檔html
https://flume.apache.org/FlumeUserGuide.html#file-channel前端
Flume NG是一個分佈式、可靠、可用的系統,它可以將不一樣數據源的海量日誌數據進行高效收集、聚合、移動,最後存儲到一箇中心化數據存儲系統中。由原來的Flume OG到如今的Flume NG,進行了架構重構,而且如今NG版本徹底不兼容原來的OG版本。通過架構重構後,Flume NG更像是一個輕量的小工具,很是簡單,容易適應各類方式日誌收集,並支持failover和負載均衡。nginx
架構設計要點shell
Flume的架構主要有一下幾個核心概念:數據庫
Event:一個數據單元,帶有一個可選的消息頭apache
Flow:Event從源點到達目的點的遷移的抽象tomcat
Client:操做位於源點處的Event,將其發送到Flume Agentbash
Agent:一個獨立的Flume進程,包含組件Source、Channel、Sink服務器
Source:用來消費傳遞到該組件的Event架構
Channel:中轉Event的一個臨時存儲,保存有Source組件傳遞過來的Event
Sink:從Channel中讀取並移除Event,將Event傳遞到Flow Pipeline中的下一個Agent(若是有的話)
Flume NG架構,如圖所示:
外部系統產生日誌,直接經過Flume的Agent的Source組件將事件(如日誌行)發送到中間臨時的channel組件,最後傳遞給Sink組件,HDFS Sink組件能夠直接把數據存儲到HDFS集羣上。
一個最基本Flow的配置,格式以下:
# list the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Source2> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ... <Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2>
尖括號裏面的,咱們能夠根據實際需求或業務來修更名稱。下面詳細說明:
表示配置一個Agent的名稱,一個Agent確定有一個名稱。與是Agent的Source組件的名稱,消費傳遞過來的Event。與是Agent的Channel組件的名稱。與是Agent的Sink組件的名稱,從Channel中消費(移除)Event。
上面配置內容中,第一組中配置Source、Sink、Channel,它們的值能夠有1個或者多個;第二組中配置Source將把數據存儲(Put)到哪個Channel中,能夠存儲到1個或多個Channel中,同一個Source將數據存儲到多個Channel中,其實是Replication;第三組中配置Sink從哪個Channel中取(Task)數據,一個Sink只能從一個Channel中取數據。
下面,根據官網文檔,咱們展現幾種Flow Pipeline,各自適應於什麼樣的應用場景:
多個Agent順序鏈接
能夠將多個Agent順序鏈接起來,將最初的數據源通過收集,存儲到最終的存儲系統中。這是最簡單的狀況,通常狀況下,應該控制這種順序鏈接的Agent的數量,由於數據流經的路徑變長了,若是不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。
多個Agent的數據匯聚到同一個Agent
這種狀況應用的場景比較多,好比要收集Web網站的用戶行爲日誌,Web網站爲了可用性使用的負載均衡的集羣模式,每一個節點都產生用戶行爲日誌,能夠爲每一個節點都配置一個Agent來單獨收集日誌數據,而後多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。
多路(Multiplexing)Agent
這種模式,有兩種方式,一種是用來複制(Replication),另外一種是用來分流(Multiplexing)。Replication方式,能夠將最前端的數據源複製多份,分別傳遞到多個channel中,每一個channel接收到的數據都是相同的,配置格式,以下所示:
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
上面指定了selector的type的值爲replication,其餘的配置沒有指定,使用的Replication方式,Source1會將數據分別存儲到Channel1和Channel2,這兩個channel裏面存儲的數據是相同的,而後數據被傳遞到Sink1和Sink2。
Multiplexing方式,selector能夠根據header的值來肯定數據傳遞到哪個channel,配置格式,以下所示:
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
上面selector的type的值爲multiplexing,同時配置selector的header信息,還配置了多個selector的mapping的值,即header的值:若是header的值爲Value一、Value2,數據從Source1路由到Channel1;若是header的值爲Value二、Value3,數據從Source1路由到Channel2。
實現load balance功能
Load balancing Sink Processor可以實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每一個Sink組件分別鏈接到一個獨立的Agent上,示例配置,以下所示:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
實現failover能
Failover Sink Processor可以實現failover功能,具體流程相似load balance,可是內部處理機制與load balance徹底不一樣:Failover Sink Processor維護一個優先級Sink組件列表,只要有一個Sink組件可用,Event就被傳遞到下一個組件。若是一個Sink可以成功處理Event,則會加入到一個Pool中,不然會被移出Pool並計算失敗次數,設置一個懲罰因子,示例配置以下所示:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000
基本功能
咱們看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),瞭解它的功能集合,可以讓咱們在應用中更好地選擇使用哪種方案。說明Flume NG的功能,實際仍是圍繞着Agent的三個組件Source、Channel、Sink來看它可以支持哪些技術或協議。咱們再也不對各個組件支持的協議詳細配置進行說明,經過列表的方式分別對三個組件進行概要說明:
Source類型 說明 Avro Source 支持Avro協議(其實是Avro RPC),內置支持 Thrift Source 支持Thrift協議,內置支持 Exec Source 基於Unix的command在標準輸出上生產數據 JMS Source 從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過 Spooling Directory Source 監控指定目錄內數據變動 Twitter 1% firehose Source 經過API持續下載Twitter數據,試驗性質 Netcat Source 監控某個端口,將流經端口的每個文本行數據做爲Event輸入 Sequence Generator Source 序列生成器數據源,生產序列數據 Syslog Sources 讀取syslog數據,產生Event,支持UDP和TCP兩種協議 HTTP Source 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 Legacy Sources 兼容老的Flume OG中Source(0.9.x版本) Flume Channel ############################################################### Channel類型 說明 Memory Channel Event數據存儲在內存中 JDBC Channel Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby File Channel Event數據存儲在磁盤文件中 Spillable Memory Channel Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用) Pseudo Transaction Channel 測試用途 Custom Channel 自定義Channel實現 Flume Sink ################################################################### Sink類型 說明 HDFS Sink 數據寫入HDFS Logger Sink 數據寫入日誌文件 Avro Sink 數據被轉換成Avro Event,而後發送到配置的RPC端口上 Thrift Sink 數據被轉換成Thrift Event,而後發送到配置的RPC端口上 IRC Sink 數據在IRC上進行回放 File Roll Sink 存儲數據到本地文件系統 Null Sink 丟棄到全部數據 HBase Sink 數據寫入HBase數據庫 Morphline Solr Sink 數據發送到Solr搜索服務器(集羣) ElasticSearch Sink 數據發送到Elastic Search搜索服務器(集羣) Kite Dataset Sink 寫數據到Kite Dataset,試驗性質的 Custom Sink 自定義Sink實現 ################################################################# 另外還有Channel Selector、Sink Processor、Event Serializer、Interceptor等組件,能夠參考官網提供的用戶手冊。
安裝配置略,能夠參考網上教程
下面是測試的配置文件
agent 配置文件以下
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 127.0.0.1 a1.sinks.k1.port = 44444 a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/nginx/access.log a1.sources.r1.channels = c1 a1.sources.s.deserializer.maxLineLength=65535
server端配置文件以下: 測試複製(Replication)1個source 複製到多個channels 輸出到多個sink
# Name the components on this agent b1.sources = r1 b1.sinks = k1 k2 k3 b1.channels = c1 c2 c3 b1.sources.r1.selector.type = replicating b1.sources.r1.type = avro b1.sources.r1.channels = c1 c2 c3 b1.sources.r1.bind = 0.0.0.0 b1.sources.r1.port = 44444 b1.channels.c1.type = file b1.channels.c1.write-timeout = 10 b1.channels.c1.keep-alive = 10 b1.channels.c1.checkpointDir = /flume/check b1.channels.c1.useDualCheckpoints = true b1.channels.c1.backupCheckpointDir = /flume/backup b1.channels.c1.dataDirs = /flume b1.channels.c2.type=memory b1.channels.c2.capacity=2000000 b1.channels.c2.transactionCapacity=10000 b1.channels.c3.type=memory b1.channels.c3.capacity=2000000 b1.channels.c3.transactionCapacity=10000 # Describe the sink b1.sinks.k1.type = hdfs b1.sinks.k1.channel = c1 b1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/hadoop/flume/collected/ b1.sinks.k1.hdfs.filePrefix = chen_test b1.sinks.k1.hdfs.round = true b1.sinks.k1.hdfs.roundValue = 10 b1.sinks.k1.hdfs.roundUnit = minute b1.sinks.k2.channel = c2 b1.sinks.k2.type = file_roll b1.sinks.k2.batchSize = 100000000 b1.sinks.k2.rollInterval = 1000000 b1.sinks.k2.serializer = TEXT b1.sinks.k2.sink.directory = /var/log/flume b1.sinks.k3.channel = c3 b1.sinks.k3.type = logger
啓動測試命令
flume-ng agent -c . -f test.conf -n b1 -Dflume.root.logger=INFO,console
-c 配置文件目錄 -f配置文件 -n 節點名字 和配置文件對應 console打到終端
5.flume-ng負載均衡load-balance、failover集羣搭建
參考連接
http://blog.csdn.net/lskyne/article/details/37662835
6. 測試 區分 flume日誌合併在一塊兒的日誌
a1配置 [root@host_12 test]# cat a1.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type=static a1.sources.r1.interceptors.i1.key=nginx a1.sources.r1.interceptors.i1.value=nginx_1 a1.sources.r1.interceptors.i1.preserveExisting=false #a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = host #a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sinks.k1.type = avro a1.sinks.k1.hostname = 127.0.0.1 a1.sinks.k1.port = 44444 a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.type = exec ###匹配shell tomcat yyyy:mm:dd:hh格式的日誌 a1.sources.r1.shell = /bin/bash -c a1.sources.r1.command = tail -f /var/log/nginx_1/access_`date +%Y%m%d%H`.log a1.sources.r1.channels = c1 #########匹配替換行裏的文本的內容 #a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = search_replace #a1.sources.r1.interceptors.i1.searchPattern = [0-9]+ #a1.sources.r1.interceptors.i1.replaceString = lxw1234 #a1.sources.r1.interceptors.i1.charset = UTF-8 ########################################### a2配置 [root@host_12 test]# cat a2.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type=static a2.sources.r1.interceptors.i1.key=nginx a2.sources.r1.interceptors.i1.value=nginx_2 a2.sources.r1.interceptors.i1.preserveExisting=false a2.sinks.k1.type = avro a2.sinks.k1.hostname = 127.0.0.1 a2.sinks.k1.port = 44444 a2.sinks.k1.channel = c1 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sources.r1.type = exec a2.sources.r1.shell = /bin/bash -c a2.sources.r1.command = tail -f /var/log/nginx_2/access_`date +%Y%m%d%H`.log a2.sources.r1.channels = c1 #################################### server配置 [root@host_12 test]# cat h1.conf # Name the components on this agent serv_1.sources = r1 serv_1.sinks = k2 k3 serv_1.channels = c2 c3 #serv_1.sources.r1.selector.type = replicating serv_1.sources.r1.selector.type = multiplexing serv_1.sources.r1.selector.header = nginx serv_1.sources.r1.selector.mapping.nginx_1 = c2 serv_1.sources.r1.selector.mapping.nginx_2 = c3 serv_1.sources.r1.type = avro serv_1.sources.r1.channels = c2 c3 serv_1.sources.r1.bind = 0.0.0.0 serv_1.sources.r1.port = 44444 serv_1.channels.c2.type=memory serv_1.channels.c2.capacity=2000000 serv_1.channels.c2.transactionCapacity=10000 serv_1.channels.c3.type=memory serv_1.channels.c3.capacity=2000000 serv_1.channels.c3.transactionCapacity=10000 serv_1.sinks.k2.channel = c2 serv_1.sinks.k2.type = file_roll serv_1.sinks.k2.batchSize = 100000000 serv_1.sinks.k2.rollInterval = 1000000 serv_1.sinks.k2.serializer = TEXT serv_1.sinks.k2.sink.directory = /var/log/flume/ serv_1.sinks.k3.channel = c3 serv_1.sinks.k3.type = logger