2.2. 採集案例
2.2.4. 採集文件到HDFS
需求
好比業務系統使用log4j生成的日誌,日誌內容不斷增長,須要把追加到日誌文件中的數據實時採集到 hdfsjava
分析
根據需求,首先定義如下3大要素node
- 採集源,即source——監控文件內容更新 : exec ‘tail -F file’
- 下沉目標,即sink——HDFS文件系統 : hdfs sink
- Source和sink之間的傳遞通道——channel,可用file channel 也能夠用 內存channel
Step 1: 定義 Flume 配置文件
cd /export/servers/apache-flume-1.8.0-bin/conf vim tail-file.conf
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /export/servers/taillogs/access_log agent1.sources.source1.channels = channel1 # Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-% agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
Step 2: 啓動 Flume
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin bin/flume-ng agent -c conf -f conf/tail-file.conf -n agent1
Step 3: 開發 Shell 腳本定時追加文件內容
mkdir -p /export/servers/shells/ cd /export/servers/shells/ vim tail-file.sh
#!/bin/bash while true dodate >> /export/servers/taillogs/access_log; sleep 0.5; done
Step 4: 啓動腳本
# 建立文件夾 mkdir -p /export/servers/taillogs # 啓動腳本 sh /export/servers/shells/tail-file.sh
本文同步分享在 博客「cwl_java」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。web