大數據學習——採集目錄到HDFS

採集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去web

根據需求,首先定義如下3大要素服務器

l  採集源,即source——監控文件目錄 :  spooldirspa

l  下沉目標,即sink——HDFS文件系統  :  hdfs sink3d

l  source和sink之間的傳遞通道——channel,可用file channel 也能夠用內存memory channel日誌

配置文件編寫:code

vi spooldir-hdfs-sink.conform

 
 

#定義三大組件的名稱blog

agent1.sources = source1內存

agent1.sinks = sink1ci

agent1.channels = channel1

 

# 配置source組件

agent1.sources.source1.type = spooldir

agent1.sources.source1.spoolDir = /root/data/

agent1.sources.source1.fileHeader = false

 

#配置攔截器

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = timestamp

# 配置sink組件

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path =/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

#滾動生成的文件按大小生成

agent1.sinks.sink1.hdfs.rollSize = 102400

#滾動生成的文件按行數生成

agent1.sinks.sink1.hdfs.rollCount = 1000000

#滾動生成的文件按時間生成

agent1.sinks.sink1.hdfs.rollInterval = 60

#開啓滾動生成目錄

agent1.sinks.sink1.hdfs.round = true

#以10爲一梯度滾動生成

agent1.sinks.sink1.hdfs.roundValue = 10

#單位爲分鐘

agent1.sinks.sink1.hdfs.roundUnit = minute

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

agent1.channels.channel1.keep-alive = 120

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

添加數據

aaa.txt

13601249301     100     200     300     400     500     600     700
13601249302     100     200     300     400     500     600     700
13601249303     100     200     300     400     500     600     700
13601249304     100     200     300     400     500     600     700
13601249305     100     200     300     400     500     600     700

執行命令

 bin/flume-ng agent -c conf -f conf/spooldir-hdfs-sink.conf -n agent1  -Dflume.root.logger=INFO,console   

 

 

flume的source採用spoodir時! 目錄下面不容許存放同名的文件,不然報錯!

Channel參數解釋:

capacity:默認該通道中最大的能夠存儲的event數量

trasactionCapacity:每次最大能夠從source中拿到或者送到sink中的event數量

keep-alive:event添加到通道中或者移出的容許時間

其餘組件:Interceptor(攔截器)

用於Source的一組Interceptor,按照預設的順序在必要地方裝飾和過濾events。

內建的Interceptors容許增長event的headers好比:時間戳、主機名、靜態標記等等

定製的interceptors能夠經過內省event payload(讀取原始日誌),實現本身的業務邏輯(很強大)

相關文章
相關標籤/搜索