其餘更多java基礎文章:
java基礎學習(目錄)html
Flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。
支持在日誌系統中定製各種數據發送方,用於收集數據;
同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力。名詞介紹:
Flume OG:Flume original generation,即Flume0.9x版本
Flume NG:Flume next generation,即Flume1.x版本
官網:flume.apache.orgFlume體系結構java
目前,flume-ng處理數據有兩種方式:avro-client、agent
avro-client:一次性將數據傳輸到指定的avro服務的客戶端
agent:一個持續傳輸數據的服務數據庫
Agent主要的組件包括:Source、Channel、Sink
Source:完成對日誌數據的手機,分紅transtion和event打入到channel之中 Channel:主要提供一個隊列的功能,對source提供的數據進行簡單的緩存。
Sink:取出Channel中的數據,進行相應的存儲文件系統,數據庫或是提交到遠程服務器。
數據在組件傳輸的單位是Event。apache
source意爲來源、源頭。vim
從外界採集各類類型的數據,將數據傳遞給Channel。
好比:監控某個文件只要增長數據就當即採集新增的數據、監控某個目錄一旦有新文件產生就採集新文件的內容、監控某個端口等等。緩存
Exec Source、Avro Source、NetCat Source、Spooling Directory Source等安全
詳細查看:
flume.apache.org/FlumeUserGu… 或者自帶的文檔查看。bash
一個數據的存儲池,中間通道。服務器
接受source傳出的數據,向sink指定的目的地傳輸。Channel中的數據直到進入到下一個channel中或者進入終端纔會被刪除。當sink寫入失敗後,能夠自動重寫,不會形成數據丟失,所以很可靠。網絡
channel的類型不少好比:內存中、jdbc數據源中、文件形式存儲等。
Memory Channel
File Channel
Spillable Memory Channel等
詳細查看: flume.apache.org/FlumeUserGu…
主要做用:接受channel寫入的數據以指定的形式表現出來(或存儲或展現)。
sink的表現形式不少好比:打印到控制檯、hdfs上、avro服務中、文件中等。
HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、File Roll Sink、HBaseSink、Kafka Sink等
詳細查看:
flume.apache.org/FlumeUserGu…
HDFSSink須要有hdfs的配置文件和類庫。通常採起多個sink匯聚到一臺採集機器負責推送到hdfs。
event是Flume NG傳輸的數據的基本單位,也是事務的基本單位。
在文本文件,一般是一行記錄就是一個event。
網絡消息傳輸系統中,一條消息就是一個event。
event裏有header、body
Event裏面的header類型:Map<String, String>
咱們能夠在source中自定義header的key:value,在某些channel和sink中使用header。
解壓縮:[uplooking@uplooking01 ~]$ tar -zxvf soft/apache-flume-1.8.0-bin.tar.gz -C app/
重命名:[uplooking@uplooking01 ~]$ mv app/apache-flume-1.8.0-bin/ app/flume
添加到環境變量中
vim ~/.bash_profile
export FLUME_HOME=/home/uplooking/app/flume
export PATH=$PATH:$FLUME_HOME/bin
修改配置文件,conf目錄下
cp flume-env.sh.template flume-env.sh
添加JAVA_HOME
export JAVA_HOME=/opt/jdk
複製代碼
#####################################################################
## this's flume log purpose is listenning a socket port which product
## data of stream
## this agent is consists of source which is r1 , sinks which is k1,
## channel which is c1
##
## 這裏面的a1 是flume一個實例agent的名字
######################################################################定義了當前agent的名字叫作a1
a1.sources = r1 ##定了該agent中的sources組件叫作r1
a1.sinks = k1 ##定了該agent中的sinks組件叫作k1
a1.channels = c1 ##定了該agent中的channels組件叫作c1
# 監聽數據源的方式,這裏採用監聽網絡端口
a1.sources.r1.type = netcat #source的類型爲網絡字節流
a1.sources.r1.bind = uplooking01 #source監聽的網絡的hostname
a1.sources.r1.port = 52019 #source監聽的網絡的port
# 採集的數據的下沉(落地)方式 經過日誌
a1.sinks.k1.type = logger #sink的類型爲logger日誌方式,log4j的級別有INFO、Console、file。。。
# 描述channel的部分,使用內存作數據的臨時存儲
a1.channels.c1.type = memory #channel的類型使用內存進行數據緩存,這是最多見的一種channel
a1.channels.c1.capacity = 1000 #定義了channel對的容量
a1.channels.c1.transactionCapacity = 100 #定義channel的最大的事務容量
# 使用channel將source和sink鏈接起來# 須要將source和sink使用channel鏈接起來,組成一個相似流水管道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製代碼
flume-ng agent -c conf -n a1 -f conf/flume-nc.conf -Dflume.root.logger=INFO,console
-c conf:使用配置文件的方式-n a1:指定agent的名稱爲a1-f:指定配置文件
由於數據落地是經過日誌,因此後面須要指定日誌的相關配置選項。
複製代碼
yum isntall -y telent
yum install -y nc
複製代碼
向端口發送數據:
# 使用telnet
[uplooking@uplooking01 ~]$ telnet uplooking01 52019
Trying 192.168.43.101...
Connected to uplooking01.
Escape character is '^]'.
wo ai ni
OK
sai bei de xue
OK
# 使用nc
[uplooking@uplooking01 ~]$ nc uplooking01 52019
heihei
OK
xpleaf
OK
複製代碼
此時能夠查看flume agent啓動終端的輸出:
2018-03-24 20:09:34,390 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.43.101:52019]
2018-03-24 20:10:13,022 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 20 61 69 20 6E 69 0D wo ai ni. }
2018-03-24 20:10:24,139 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 73 61 69 20 62 65 69 20 64 65 20 78 75 65 0D sai bei de xue. }
2018-03-24 20:13:26,190 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 69 68 65 69 heihei }
2018-03-24 20:13:26,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 78 70 6C 65 61 66 xpleaf }
2018-03-24 20:17:01,694 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
複製代碼
配置文件以下:
####################################################################### 監聽目錄中的新增文件## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 這裏面的a1 是flume一個實例agent的名字#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 監聽數據源的方式,這裏採用監聽目錄中的新增文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/uplooking/data/flume
a1.sources.r1.fileSuffix = .ok
# a1.sources.r1.deletePolicy = immediate
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true
# 採集的數據的下沉(落地)方式 經過日誌
a1.sinks.k1.type = logger
# 描述channel的部分,使用內存作數據的臨時存儲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 使用channel將source和sink鏈接起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製代碼
啓動flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-dir.conf -Dflume.root.logger=INFO,console
複製代碼
在監聽目錄下新增文件,內容以下:
hello you
hello he
hello me
複製代碼
能夠看到flume agent終端輸出:
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 68 65 hello he }
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 6D 65 hello me }
2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /home/uplooking/data/flume/hello.txt to /home/uplooking/data/flume/hello.txt.ok
複製代碼
能夠看到提示說,原來的文本文件已經被重命名爲.ok,查看數據目錄中的文件:
[uplooking@uplooking01 flume]$ ls
hello.txt.ok
複製代碼
tail -f與tail -F的說明:
在生產環境中,爲了防止日誌文件過大,一般會天天生成一個新的日誌文件, 這是經過重命名原來的日誌文件,而後touch一個原來的日誌文件的方式來實現的。
http-xxx.log
http-xxx.log.2017-03-15
http-xxx.log.2017-03-16
-f不會監聽分割以後的文件,而-F則會繼續監聽。
配置文件:
####################################################################### 監聽文件中的新增數據## ## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 這裏面的a1 是flume一個實例agent的名字#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 監聽數據源的方式,這裏監聽文件中的新增數據
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/uplooking/data/flume/http-flume.log
# 採集的數據的下沉(落地)方式 經過日誌
a1.sinks.k1.type = logger
# 描述channel的部分,使用內存作數據的臨時存儲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000000
a1.channels.c1.transactionCapacity = 1000000
# 使用channel將source和sink鏈接起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製代碼
啓動flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-data.conf -Dflume.root.logger=INFO,console
複製代碼
向監聽文件中添加數據:
cat hello.txt.ok > http-flume.log
複製代碼
查看flume agent終端的輸出:
2018-03-25 01:28:39,359 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }
2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 65 hello he }
2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6D 65 hello me }
複製代碼