Flume系列---2.簡單示例

1.Flume 簡單案例服務器

1.1 採集目錄到 HDFSthis

採集需求:服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到 HDFS 中去根據需求,首先定義如下 3 大要素日誌

  • 採集源,即 source——監控文件目錄 : spooldir
  • 下沉目標,即 sink——HDFS 文件系統 : hdfs sink
  • source 和 sink 之間的傳遞通道——channel,可用 file channel 也能夠用內存 channel

配置文件編寫:code

# Name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source ##注意:不能往監控目中重複丟同名文件 
a1.sources.r1.type = spooldir 
a1.sources.r1.spoolDir = /root/logs 
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ 
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true

#生成的文件類型,默認是 Sequencefile,可用 DataStream,則爲普通文本 
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

Channel 參數解釋:
capacity:默認該通道中最大的能夠存儲的 event 數量
trasactionCapacity:每次最大能夠從 source 中拿到或者送到 sink 中的 event
數量component

  1. 採集文件到 HDFS

    採集需求:好比業務系統使用 log4j 生成的日誌,日誌內容不斷增長,須要把追內存

加到日誌文件中的數據實時採集到 hdfs ci

根據需求,首先定義如下 3 大要素it

  • 採集源,即 source——監控文件內容更新 : exec ‘tail -F file’
  • 下沉目標,即 sink——HDFS 文件系統 : hdfs sink
  • Source 和 sink 之間的傳遞通道——channel,可用 file channel 也能夠用內存 channel

配置文件編寫:io

# Name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log 
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/ 
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認是 Sequencefile,可用 DataStream,則爲普通文本 
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

參數解析:event

  • rollInterval
    默認值:30

hdfs sink 間隔多長將臨時文件滾動成最終目標文件,單位:秒; 若是設置成 0,則表示不根據時間來滾動文件;
注:滾動(roll)指的是,hdfs sink 將臨時文件重命名成最終目標文 件,並新打開一個臨時文件來寫入數據;

  • rollSize
    默認值:1024

當臨時文件達到該大小(單位:bytes)時,滾動成目標文件;
若是設置成 0,則表示不根據臨時文件大小來滾動文件

  • rollCount
    默認值:10

當 events 數據達到該數量時候,將臨時文件滾動成目標文件;
若是設置成 0,則表示不根據 events 數據來滾動文件;

  • round
    默認值:false

是否啓用時間上的「捨棄」,這裏的「捨棄」,相似於「四捨五入」。

  • roundValue
    默認值:1

時間上進行「捨棄」的值;

  • roundUnit
    默認值:seconds

時間上進行「捨棄」的單位,包含:second,minute,hour

示例:a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = minute當時間爲 2015-10-16 17:38:59 時候,hdfs.path 依然會被解析爲: /flume/events/20151016/17:30/00由於設置的是捨棄 10 分鐘內的時間,所以,該目錄每 10 分鐘新生成一 個。

相關文章
相關標籤/搜索