Flume日誌收集之Logger和HDFS數據傳輸方式

    Flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力。在搭建環境和使用前,請你們自行了解一下Flume,主要是它的核心組件:Source、Channel、Sink,下面將說下常見的幾種使用方式下環境的搭建。html

    主環境:虛擬機slave01,slave02,slave03,基於以前已搭建好的環境,包括JDK、Zookeeper、Hadoop,詳見以前的博客介紹。java

    下載地址:http://flume.apache.org/download.htmlnode

    1、基於netcat的source+channel(memory)+sink(logger)的數據傳輸過程git

    (1)配置環境變量FLUME_HOMEapache

    將解壓的文件移目錄下,如 /usr/local/,經過命令 vim /etc/profile 添加環境變量:vim

JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
SCALA_HOME=/usr/local/scala
HADOOP_HOME=/usr/local/hadoop
SPARK_HOME=/usr/local/spark
ZOOKEEPER_HOME=/usr/local/zookeeper
HBASE_HOME=/usr/local/hbase
KAFKA_HOME=/usr/local/kafka
HIVE_HOME=/usr/local/hive
DERBY_HOME=/usr/local/derby
FLUME_HOME=/usr/local/flume
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$ZOOKEEPER_HOME/bin:$HBASE_HOME/bin:$KAFKA_HOME/bin:$HIVE_HOME/bin:$DERBY_HOME/bin:$FLUME_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbyclient.jar:$DERBY_HOME/lib/derbytools.jar:$DERBY_HOME/lib/derbynet.jar
export JAVA_HOME JRE_HOME SCALA_HOME HADOOP_HOME SPARK_HOME ZOOKEEPER_HOME HBASE_HOME KAFKA_HOME HIVE_HOME DERBY_HOME FLUME_HOME PATH CLASSPATH

    運行命令 source /etc/profile 使變量生效。分佈式

    驗證是否配置成功,cd到 flume/bin 目錄下,執行命令:flume-ng versionoop

[hadoop@slave01 bin]$ flume-ng version
錯誤: 找不到或沒法加載主類 org.apache.flume.tools.GetJavaProperty
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d

    驗證成功,但出現了報錯信息,大都說的是JDK版本(改成1.8如下)或HBase配置(註釋掉hbase/conf/hbase-env.sh中的 export HBASE_CLASSPATH)問題,不過,不影響的話就不用在乎這個了。測試

    (2)修改 conf/目錄下配置文件    spa

    複製一份 flume-env.sh.template ,重命名爲 flume-env.sh

[hadoop@slave01 conf]$ cp flume-env.sh.template flume-env.sh

    添加JAVA_HOME配置,內容以下:

export JAVA_HOME=/usr/java/jdk1.8.0_161

    複製一份 flume-conf.properties.template ,重命名爲 flume-conf.properties

[hadoop@slave01 conf]$ cp flume-conf.properties.template flume-conf.properties

    修改成一個最簡單基本的配置,以下:

a1.sources = so1
a1.channels = c1
a1.sinks = s1

# For each one of the sources, the type is defined
a1.sources.so1.type = netcat
a1.sources.so1.bind = slave01
a1.sources.so1.port = 8888

# The channel can be defined as follows.
a1.sources.so1.channels = c1

# Each sink's type must be defined
a1.sinks.s1.type = logger

#Specify the channel the sink should use
a1.sinks.s1.channel = c1

# Each channel's type is defined.
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 100

    其中:

    a1表示名爲 a1 的agent,在下面啓動命令中 --name a1用到

    a1.sinks.s1.type=logger,表示收集到的日誌直接在flume的logger中打印出來,其餘類型的你們能夠自行嘗試。

    a1.sources.so1.type = netcat,表示組件Source使用netcat,

    a1.sources.so1.bind = slave01,表示日誌須要發送到的主機名或者Ip地址,該主機運行着netcat類型的source在監聽,

    a1.sources.so1.port = 8888,表示日誌須要發送到的端口號,該端口號要有netcat類型的source在監聽,

    其餘的配置參數,你們自行查看。

    (3)啓動測試

    在bin目錄下運行,啓動flume agent a1 服務端,其中flume-conf.properties爲加載的配置文件,注意其所在目錄:

flume-ng agent --conf conf --conf-file ../conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

    命令參數你們自行百度查看,顯示以下:

Info: Including Hadoop libraries found via (/usr/local/hadoop/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/usr/local/hive) for Hive access
.....
.....
.....
(中間省略部分)
.....
.....
.....
18/06/22 16:20:20 INFO node.Application: Starting new configuration:{ sourceRunners:{so1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:so1,state:IDLE} }} sinkRunners:{s1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@7060583 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/06/22 16:20:20 INFO node.Application: Starting Channel c1
18/06/22 16:20:20 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
18/06/22 16:20:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/06/22 16:20:21 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/06/22 16:20:21 INFO node.Application: Starting Sink s1
18/06/22 16:20:21 INFO node.Application: Starting Source so1
18/06/22 16:20:21 INFO source.NetcatSource: Source starting
18/06/22 16:20:21 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:8888]

    到此,flume啓動成功,下面是測試:

    安裝telnet,若已安裝則跳過,檢查是否安裝命令:rpm -qa | grep telnet,若無返回值則表示未安裝,安裝命令:yum install telnet,安裝後一樣檢查是否安裝成功。能夠參考以下連接:https://blog.csdn.net/typa01_kk/article/details/46604967

    打開另外一個終端,輸入以下命令,用於鏈接端口,並輸入任意字符串,如 "hello":

[hadoop@slave01 conf]$ telnet slave01 8888
Trying 127.0.0.1...
Connected to slave01.
Escape character is '^]'.
hello
OK

    能夠在第一個終端看到輸出以下:

18/06/22 16:20:36 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }

    到此,這種基於netcat的source+channel(memory)+logger數據傳輸的方式算是完成了,固然這只是單機Flume的數據傳輸,也能夠多個節點進行數據傳輸,你們能夠試試。

    2、基於netcat的source+channel(file)+sink(hdfs)的數據傳輸過程

    (1)環境變量同上保持不變

    (2)修改conf/目錄下配置文件

      同上,只修改配置文件 flume-conf.properties,以下:

a.sources = r1
a.sinks = k1
a.channels = c1

# Describe/configure the source
a.sources.r1.type = netcat
a.sources.r1.bind = slave01
a.sources.r1.port = 8889

# Describe the sink
a.sinks.k1.type = hdfs
#指定hdfs地址中的輸出目錄
a.sinks.k1.hdfs.path = hdfs://slave01:9000/output
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.rollInterval = 10
a.sinks.k1.hdfs.rollSize = 0
a.sinks.k1.hdfs.rollCount = 0
a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a.channels.c1.type = file
a.channels.c1.checkpointDir = /usr/local/flume/checkpoint
a.channels.c1.dataDirs = /usr/local/flume/data

# Bind the source and sink to the channel
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

      其中:

    a.sinks.k1.type = hdfs,表示在HDFS中查看flume收集到的日誌數據,兩個文件夾checkpoint和data可手動建立;

    hdfs地址中的輸出目錄是必須的,這是接下來啓動Hadoop訪問文件的目錄。

    (3)啓動測試

    在這以前先啓動Hadoop集羣,成功啓動後在進程中會出現DataNode,必定要保證,否則後面會報錯,可參考以前的關於Hadoop集羣的博客:

    集羣四部曲(二):完美的Hadoop集羣搭建

    運行命令啓動flume agent a 服務端:

flume-ng agent --conf conf --conf-file ../conf/flume-conf.properties --name a -Dflume.root.logger=INFO,console

    再經過telnet命令鏈接端口,輸入字符串:

[hadoop@slave01 sbin]$ telnet slave01 8889
Trying 127.0.0.1...
Connected to slave01.
Escape character is '^]'.
qwer1234
OK

    在剛纔啓動的服務端界面能夠看到輸出爲:

18/06/25 10:17:51 INFO hdfs.HDFSEventSink: Writer callback called.
18/06/25 10:17:51 INFO hdfs.BucketWriter: Closing hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596.tmp
18/06/25 10:17:51 INFO hdfs.BucketWriter: Renaming hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596.tmp to hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596
18/06/25 10:17:51 INFO hdfs.HDFSEventSink: Writer callback called.
18/06/25 10:17:52 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/06/25 10:17:52 INFO hdfs.BucketWriter: Creating hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp
18/06/25 10:18:02 INFO hdfs.BucketWriter: Closing hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp
18/06/25 10:18:02 INFO hdfs.BucketWriter: Renaming hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp to hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686
18/06/25 10:18:02 INFO hdfs.HDFSEventSink: Writer callback called.
18/06/25 10:18:08 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/checkpoint/checkpoint, elements to sync = 3
18/06/25 10:18:08 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1529893058855, queueSize: 0, queueHead: 0
18/06/25 10:18:08 INFO file.Log: Updated checkpoint for file: /usr/local/flume/data/log-8 position: 325 logWriteOrderID: 1529893058855

    這時,訪問slave01:50070,打開文件菜單並輸入存放路徑:

    在hdfs的output目錄中能夠看到目錄中多出了以時間戳命名的文件,文件中寫入了你的測試數據(qwer1234),點擊上面的文件名並下載保存到本地:

    在目錄中查看文件內容,能夠看出文件中保存的正是你寫入的數據:

[hadoop@slave01 下載]$ cat 2018-06-25-10-17-52.1529893072686 
qwer1234
[hadoop@slave01 下載]$

    以上是比較經常使用且常見的兩種數據傳輸方式:控制檯打印和HDFS,在日誌收集中都是很方便,特別是HDFS,在基於Hadoop集羣狀態下,應用非常普遍。

相關文章
相關標籤/搜索