海量日誌採集Flume(HA)

                                                        海量日誌採集Flume(HA)java

1.紹:shell

    FlumeCloudera提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統,Flume支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。

數據庫

2.日誌採集apache

    Flume—對哪一個ip  哪一個端口進行監控 --- 數據監控接收數據----內存存儲本地硬盤緩存

3.數據處理服務器

  Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。 Flume提供了從Console(控制檯)、RPCThrift-RPC)、Text(文件)、Tail(UNIX tail)、Syslog(Syslog日誌系統,支持TCPUDP2種模式),exec(命令執行)等數據源上收集數據的能力。

架構

4.Flume原理:app

        Flume OG:

            

    

            Flume邏輯上分三層架構:AgentCollectorStorage。採用多Master,爲保持數據一致項,使用zookeeper,保持數據高可用和一致性。tcp

            特色:
分佈式

              ·  3個角色:代理節點(agent),收集節點(collector),主節點(master).

            ·   gent 從各個數據源收集日誌數據,將收集到的數據集中到 Collector ,而後由收集節點彙總存入 HDFS master 負責管理 agent collector 的活動。
            ·  
agent collector source sink 組成,表明在當前節點數據是從 source 傳送到 sink

        Flume NG

            
            NG只有一個節點:代理節點(agent)Flume NG的 agent source sink Channel  組成。以下關係
       
                         

· Source:完成對日誌數據的收集,分紅 transtion event 打入到Channel之中。

Source類型

說明

Avro Source

支持Avro協議(其實是Avro RPC),提供一個Avro的接口,須要往設置的地址和端口發送Avro消息,Source就能接收到,如:Log4j Appender經過Avro Source將消息發送到Agent

Thrift Source

支持Thrift協議,提供一個Thrift接口,相似Avro

Exec Source

Source啓動的時候會運行一個設置的UNIX命令(好比 cat file),該命令會不斷地往標準輸出(stdout)輸出數據,這些數據就會被打包成Event,進行處理

JMS Source

JMS系統(消息、主題)中讀取數據,相似ActiveMQ

Spooling Directory Source

監聽某個目錄,該目錄有新文件出現時,把文件的內容打包成Event,進行處理

Netcat Source

監控某個端口,將流經端口的每個文本行數據做爲Event輸入

Sequence Generator Source

序列生成器數據源,生產序列數據

Syslog Sources

讀取syslog數據,產生Event,支持UDPTCP兩種協議

HTTP Source

基於HTTP POSTGET方式的數據源,支持JSONBLOB表示形式

Legacy Sources

兼容老的Flume OGSource0.9.x版本)

自定義Source

使用者經過實現Flume提供的接口來定製知足需求的Source


· Channel:主要提供一個隊列的功能,對source提供中的數據進行簡單的緩存。 

Channel類型

說明

Memory Channel

Event數據存儲在內存中

JDBC Channel

Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby

File Channel

Event數據存儲在磁盤文件中

Spillable Memory Channel

Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)

Pseudo Transaction Channel

測試用途

Custom Channel

自定義Channel實現  

· Sink:取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。

Sink類型

說明

HDFS Sink

數據寫入HDFS

Logger Sink

數據寫入日誌文件

Avro Sink

數據被轉換成Avro Event,而後發送到配置的RPC端口上

Thrift Sink

數據被轉換成Thrift Event,而後發送到配置的RPC端口上

IRC Sink

數據在IRC上進行回放

File Roll Sink

存儲數據到本地文件系統

Null Sink

丟棄到全部數據

HBase Sink

數據寫入HBase數據庫 

Morphline Solr Sink

數據發送到Solr搜索服務器(集羣)

ElasticSearch Sink

數據發送到Elastic Search搜索服務器(集羣)

Kite Dataset Sink

寫數據到Kite Dataset,試驗性質的

Custom Sink

自定義Sink實現


一個sink和channel只能收集一種類型數據日誌,可是能夠有多個sink和channel ,source則能夠接受多種類型數據日誌,以下:



Flume安裝和使用:

    

    安裝





運行配置:



a1.sources = r1
a1.sinks = k1
a1.channels = c1


# Describe configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141


# Describe the sink
a1.sinks.k1.type = logger


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

運行:

在/home/bigdata/flume1.6目錄下運行

 flume-ng agent -n a1 -c . -f ./conf/avro.conf -Dflume.root.logger=INFO,console


當flume能夠運行咱們就體會下收集不一樣數據源(source)日誌,並存放到hdfs上

source:  avro

flume-ng avro-client -c /home/bigdata/flime1.6/ -H ry-hadoop1 -p4141 -F ./avro.txt



source:  Exec

b1.sources=r1
b1.channels=c1
b1.sinks=k1

b1.sources.r1.type=exec
b1.sources.r1.command=tail -F /home/data/avro.txt

b1.channels.c1.type=memory
b1.channels.c1.capacity=1000
b1.channels.c1.transactionCapacity=100

b1.sinks.k1.type=logger

b1.sources.r1.channels=c1
b1.sinks.k1.channel=c1



source:   spooldir只能對一級目錄進行收集

在數據Linux本地建一個文件夾log

agent.sources=r1
agent.channels=c1
agent.sinks=k1

agent.sources.r1.type=spooldir
agent.sources.r1.spooldir=/home/data/log
agent.sources.r1.fileHeader=true

agent.channels.c1.type=memory
agent.channels.c1.capacity=1000
agent.channels.c1.transactionCapacity=100

agent.sinks.k1.type=logger

agent.sources.r1.channels=c1
agent.sinks.k1.channel=c1
啓動:
flume-ng agent -n agent -c /home/bigdata/flime1.6/ -f /home/bigdata/flime1.6/conf/spoolDir.conf -Dflume.root.logger=INFO,console



source: TCP

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    




source:JSONHandler

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


source 就講5個。

而後講存儲

hdfsSinK.conf

配置:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ry-hadoop1:8020/flume
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=60

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

運行:

flume-ng agent -n a1 -c . -f ./conf/hdfsSink.conf -Dflume.root.logger=INFO,console


寫一個shell腳本,循環輸出tcp數據,而後收集在hdfs種

#!/bin/sh
int=1
while(( $int<=500000  ))
do
	echo "this is message"$int | nc ry-hadoop1 5140
	echo "this is message"$int
	let "int++"
done


設定收集日誌的具體時間。








那麼有個問題,當hadoop維護期間不能存儲數據時,咱們的日誌文件存在哪裏呢?

本地,那麼咱們看看如何存在本地

通道類型爲文本形式

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/data/log/
a1.sinks.k1.sink.serializer=TEXT

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1



channels通道類型爲文件形式

a1.sources = s1
a1.channels = c1
a1.sinks = k1

# For each one of the sources, the type is defined
a1.sources.s1.type = syslogtcp
a1.sources.s1.host = localhost
a1.sources.s1.port = 5180

# Each sink's type must be defined
a1.sinks.k1.type = logger

# Each channel's type is defined.
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/log/checkpoint
a1.channels.c1.dataDir = /home/data/log/data

#Bind the source and sinks to channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1


flume的HA:

        Flume支持Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是複製和複用。在複製的狀況下,流的事件被髮送到全部的配置通道。在複用的狀況下,事件被髮送到可用的渠道中的一個子集。Fan out流須要指定源和Fan out通道的規則。大白話來講就是,當你採集日誌的時候能夠經過一個agent進行保存多份日誌。啓動多臺集羣講多臺的flume鏈接起來,能夠同時接收到其中一臺的數據進行備份,這個有點相似zookeeper。

   1) Replicating Channel Selector   多個Channel
在3臺機器上啓動flume的avor,而後複製master鏈接啓動source爲:replicating的flume

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在master啓動鏈接:

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = slave1
a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

當你寫一條數據進入日誌時,其餘3臺機器都會有反應


1) MulChnSel_a1.conf

    輸入數據映射的匹配。


a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2

# For each one of the sources, the type is defined
a1.sources.s1.type = org.apache.flume.source.http.HTTPSource
a1.sources.s1.port = 8887
a1.sources.s1.channels = c1 c2
a1.sources.s1.selector.type = multiplexing

a1.sources.s1.selector.header = company
a1.sources.s1.selector.mapping.ali = c1
a1.sources.s1.selector.mapping.baidu = c2
a1.sources.s1.selector.default = c2

# Each sink's type must be defined
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 5555
a1.sinks.k1.channel = c1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = slave1
a1.sinks.k2.port = 5555
a1.sinks.k2.channel = c2

# Each channel's type is defined.
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100


3)Flume Sink Processors

failover的機器是一直髮送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
  
#這個是配置failover的關鍵,須要有一個sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#處理的類型是failover
a1.sinkgroups.g1.processor.type = failover
#優先級,數字越大優先級越高,每一個sink的優先級必須不相同
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#設置爲10秒,固然能夠根據你的實際情況更改爲更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000
  
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating
  
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = m1
a1.sinks.k1.port = 5555
  
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = m2
a1.sinks.k2.port = 5555
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
 
在hadoop1建立Flume_Sink_Processors_avro.conf配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
  
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
  
# Describe the sink
a1.sinks.k1.type = logger
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啓動:

flume-ng agent -c . -f /home/bigdata/flume/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

測試:

而後在hadoop1hadoop2的任意一臺機器上,測試產生log

# echo "idoall.org test1 failover" | nc localhost 5140


4) Load balancing Sink Processor

        load balance typefailover不一樣的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種狀況下若是被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
  
#這個是配置Load balancing的關鍵,須要有一個sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
  
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
  
  
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = m1
a1.sinks.k1.port = 5555
  
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = m2
a1.sinks.k2.port = 5555
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

啓動:

#flume-ng agent -c . -f /home/bigdata/flume/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

測試:

輸入太快產生的日誌可能會落到一臺機器上

echo "idoall.org test1" | nc localhost 5140



flume的海量日誌離線採集於存儲。不一樣的數據源,不一樣的數據存儲方式(本地和hdfs),均衡負載的存儲方式,存儲時間,存儲數據大小等等的設定。

相關文章
相關標籤/搜索