採集案例

採集目錄到HDFS

採集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去html

根據需求,首先定義如下3大要素前端

  1. 採集源,即source——監控文件目錄 :  spooldir
  2. 下沉目標,即sink——HDFS文件系統  :  hdfs sink
  3.  sourcesink之間的傳遞通道——channel,可用file channel 也能夠用內存channel

 

配置文件編寫:node

#定義三大組件的名稱
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 配置source組件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/hadoop/logs/
agent1.sources.source1.fileHeader = false

#配置攔截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# 配置sink組件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

Channel參數解釋:web

capacity:默認該通道中最大的能夠存儲的event數量apache

trasactionCapacity:每次最大能夠從source中拿到或者送到sink中的event數量服務器

keep-aliveevent添加到通道中或者移出的容許時間ide

 

 

2.採集文件到HDFS

 

 

根據需求,首先定義如下3大要素oop

 

  • 採集源,即source——監控文件內容更新 :  exec  ‘tail -F file’
  • 下沉目標,即sink——HDFS文件系統  :  hdfs sink
  • Sourcesink之間的傳遞通道——channel,可用file channel 也能夠用 內存channel

 

配置文件編寫:.ui

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1

#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

 

tail-hdfs.conf

用tail命令獲取數據,下沉到hdfs


mkdir /home/hadoop/log

while true
do
echo 111111 >> /home/admin/log/test.log
sleep 0.5
done

tail -F test.log

採集到hdfs中, 文件中的目錄不用本身建的

檢查下hdfs式否是salf模式:
    hdfs dfsadmin -report

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1

前端頁面查看下, master:50070, 文件目錄: /flum/events/16-04-20/


啓動命令:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
################################################################

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#exec 指的是命令
# Describe/configure the source
a1.sources.r1.type = exec
#F根據文件名追中, f根據文件的nodeid追中
a1.sources.r1.command = tail -F /home/admin/log/test.log
a1.sources.r1.channels = c1

# Describe the sink
#下沉目標
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
#指定目錄, flum幫作目的替換
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
#文件的命名, 前綴
a1.sinks.k1.hdfs.filePrefix = events-

#10 分鐘就改目錄
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#文件滾動以前的等待時間(秒)
a1.sinks.k1.hdfs.rollInterval = 3

#文件滾動的大小限制(bytes)
a1.sinks.k1.hdfs.rollSize = 500

#寫入多少個event數據後滾動文件(事件個數)
a1.sinks.k1.hdfs.rollCount = 20

#5個事件就往裏面寫入
a1.sinks.k1.hdfs.batchSize = 5

#用本地時間格式化目錄
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#下沉後, 生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

從端口接收數據

在第一臺機器上啓動接收服務:this

從avro端口接收數據,下沉到logger
bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
#########

採集配置文件,avro-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#source中的avro組件是接收者服務, 綁定本機
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在第二臺機器上發送數據:

向第二臺機器的4141端口發送test.log文件數據

發送數據:
$ bin/flume-ng avro-client -H 第一臺機器ip -p 4141 -F /home/admin/log/test.log

 

更多sourcesink組件

Flume支持衆多的sourcesink類型,詳細手冊可參考官方文檔

http://flume.apache.org/FlumeUserGuide.html

相關文章
相關標籤/搜索