海量日誌採集Flume(HA)java
1.介紹:shell
Flume是Cloudera提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統,Flume支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。
數據庫
2.日誌採集apache
Flume—對哪一個ip 哪一個端口進行監控 --- 數據監控—接收數據----內存—存儲本地硬盤緩存
3.數據處理服務器
Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。 Flume提供了從Console(控制檯)、RPC(Thrift-RPC)、Text(文件)、Tail(UNIX tail)、Syslog(Syslog日誌系統,支持TCP和UDP等2種模式),exec(命令執行)等數據源上收集數據的能力。
架構
4.Flume原理:app
Flume邏輯上分三層架構:Agent,Collector,Storage。採用多Master,爲保持數據一致項,使用zookeeper,保持數據高可用和一致性。tcp
特色:
分佈式
· 3個角色:代理節點(agent),收集節點(collector),主節點(master).
· Source:完成對日誌數據的收集,分紅 transtion 和 event 打入到Channel之中。
Source類型 |
說明 |
Avro Source |
支持Avro協議(其實是Avro RPC),提供一個Avro的接口,須要往設置的地址和端口發送Avro消息,Source就能接收到,如:Log4j Appender經過Avro Source將消息發送到Agent |
Thrift Source |
支持Thrift協議,提供一個Thrift接口,相似Avro |
Exec Source |
Source啓動的時候會運行一個設置的UNIX命令(好比 cat file),該命令會不斷地往標準輸出(stdout)輸出數據,這些數據就會被打包成Event,進行處理 |
JMS Source |
從JMS系統(消息、主題)中讀取數據,相似ActiveMQ |
Spooling Directory Source |
監聽某個目錄,該目錄有新文件出現時,把文件的內容打包成Event,進行處理 |
Netcat Source |
監控某個端口,將流經端口的每個文本行數據做爲Event輸入 |
Sequence Generator Source |
序列生成器數據源,生產序列數據 |
Syslog Sources |
讀取syslog數據,產生Event,支持UDP和TCP兩種協議 |
HTTP Source |
基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 |
Legacy Sources |
兼容老的Flume OG中Source(0.9.x版本) |
自定義Source |
使用者經過實現Flume提供的接口來定製知足需求的Source。 |
· Channel:主要提供一個隊列的功能,對source提供中的數據進行簡單的緩存。
Channel類型 |
說明 |
Memory Channel |
Event數據存儲在內存中 |
JDBC Channel |
Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby |
File Channel |
Event數據存儲在磁盤文件中 |
Spillable Memory Channel |
Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用) |
Pseudo Transaction Channel |
測試用途 |
Custom Channel |
自定義Channel實現 |
· Sink:取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。
Sink類型 |
說明 |
HDFS Sink |
數據寫入HDFS |
Logger Sink |
數據寫入日誌文件 |
Avro Sink |
數據被轉換成Avro Event,而後發送到配置的RPC端口上 |
Thrift Sink |
數據被轉換成Thrift Event,而後發送到配置的RPC端口上 |
IRC Sink |
數據在IRC上進行回放 |
File Roll Sink |
存儲數據到本地文件系統 |
Null Sink |
丟棄到全部數據 |
HBase Sink |
數據寫入HBase數據庫 |
Morphline Solr Sink |
數據發送到Solr搜索服務器(集羣) |
ElasticSearch Sink |
數據發送到Elastic Search搜索服務器(集羣) |
Kite Dataset Sink |
寫數據到Kite Dataset,試驗性質的 |
Custom Sink |
自定義Sink實現 |
Flume安裝和使用:
運行配置:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
運行:
在/home/bigdata/flume1.6目錄下運行
flume-ng agent -n a1 -c . -f ./conf/avro.conf -Dflume.root.logger=INFO,console
source: avro
flume-ng avro-client -c /home/bigdata/flime1.6/ -H ry-hadoop1 -p4141 -F ./avro.txt
source: Exec
b1.sources=r1 b1.channels=c1 b1.sinks=k1 b1.sources.r1.type=exec b1.sources.r1.command=tail -F /home/data/avro.txt b1.channels.c1.type=memory b1.channels.c1.capacity=1000 b1.channels.c1.transactionCapacity=100 b1.sinks.k1.type=logger b1.sources.r1.channels=c1 b1.sinks.k1.channel=c1
source: spooldir只能對一級目錄進行收集
在數據Linux本地建一個文件夾log
agent.sources=r1 agent.channels=c1 agent.sinks=k1 agent.sources.r1.type=spooldir agent.sources.r1.spooldir=/home/data/log agent.sources.r1.fileHeader=true agent.channels.c1.type=memory agent.channels.c1.capacity=1000 agent.channels.c1.transactionCapacity=100 agent.sinks.k1.type=logger agent.sources.r1.channels=c1 agent.sinks.k1.channel=c1啓動:
flume-ng agent -n agent -c /home/bigdata/flime1.6/ -f /home/bigdata/flime1.6/conf/spoolDir.conf -Dflume.root.logger=INFO,console
source: TCP
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = 0.0.0.0 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
source:JSONHandler
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 8888 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
source 就講5個。
而後講存儲
hdfsSinK.conf
配置:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = 0.0.0.0 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://ry-hadoop1:8020/flume a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollSize=10240 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=60 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
運行:
flume-ng agent -n a1 -c . -f ./conf/hdfsSink.conf -Dflume.root.logger=INFO,console
寫一個shell腳本,循環輸出tcp數據,而後收集在hdfs種
#!/bin/sh int=1 while(( $int<=500000 )) do echo "this is message"$int | nc ry-hadoop1 5140 echo "this is message"$int let "int++" done
設定收集日誌的具體時間。
那麼有個問題,當hadoop維護期間不能存儲數據時,咱們的日誌文件存在哪裏呢?
本地,那麼咱們看看如何存在本地
通道類型爲文本形式
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = 0.0.0.0 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/data/log/ a1.sinks.k1.sink.serializer=TEXT # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
channels通道類型爲文件形式
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # For each one of the sources, the type is defined a1.sources.s1.type = syslogtcp a1.sources.s1.host = localhost a1.sources.s1.port = 5180 # Each sink's type must be defined a1.sinks.k1.type = logger # Each channel's type is defined. a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/data/log/checkpoint a1.channels.c1.dataDir = /home/data/log/data #Bind the source and sinks to channels a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
Flume支持Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是複製和複用。在複製的狀況下,流的事件被髮送到全部的配置通道。在複用的狀況下,事件被髮送到可用的渠道中的一個子集。Fan out流須要指定源和Fan out通道的規則。大白話來講就是,當你採集日誌的時候能夠經過一個agent進行保存多份日誌。啓動多臺集羣講多臺的flume鏈接起來,能夠同時接收到其中一臺的數據進行備份,這個有點相似zookeeper。
1) Replicating Channel Selector 多個Channel
在3臺機器上啓動flume的avor,而後複製master鏈接啓動source爲:replicating的flume
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
在master啓動鏈接:
a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.host = 0.0.0.0 a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = replicating # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = master a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = slave1 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
當你寫一條數據進入日誌時,其餘3臺機器都會有反應
1) MulChnSel_a1.conf
輸入數據映射的匹配。
a1.sources = s1 a1.channels = c1 c2 a1.sinks = k1 k2 # For each one of the sources, the type is defined a1.sources.s1.type = org.apache.flume.source.http.HTTPSource a1.sources.s1.port = 8887 a1.sources.s1.channels = c1 c2 a1.sources.s1.selector.type = multiplexing a1.sources.s1.selector.header = company a1.sources.s1.selector.mapping.ali = c1 a1.sources.s1.selector.mapping.baidu = c2 a1.sources.s1.selector.default = c2 # Each sink's type must be defined a1.sinks.k1.type = avro a1.sinks.k1.hostname = master a1.sinks.k1.port = 5555 a1.sinks.k1.channel = c1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = slave1 a1.sinks.k2.port = 5555 a1.sinks.k2.channel = c2 # Each channel's type is defined. a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
3)Flume Sink Processors
failover的機器是一直髮送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink。
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 #這個是配置failover的關鍵,須要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #處理的類型是failover a1.sinkgroups.g1.processor.type = failover #優先級,數字越大優先級越高,每一個sink的優先級必須不相同 a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 #設置爲10秒,固然能夠根據你的實際情況更改爲更快或者很慢 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = replicating # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 在hadoop1建立Flume_Sink_Processors_avro.conf配置文件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啓動:
flume-ng agent -c . -f /home/bigdata/flume/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
測試:
而後在hadoop1或hadoop2的任意一臺機器上,測試產生log
# echo "idoall.org test1 failover" | nc localhost 5140
4) Load balancing Sink Processor
load balance type和failover不一樣的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種狀況下若是被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 #這個是配置Load balancing的關鍵,須要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname = m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
啓動:
#flume-ng agent -c . -f /home/bigdata/flume/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
測試:
輸入太快產生的日誌可能會落到一臺機器上
echo "idoall.org test1" | nc localhost 5140
flume的海量日誌離線採集於存儲。不一樣的數據源,不一樣的數據存儲方式(本地和hdfs),均衡負載的存儲方式,存儲時間,存儲數據大小等等的設定。