Flume實戰案例 -- 採集某個目錄到HDFS

需求分析
  • 採集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去html

  • 結構示意圖:
    web

  • 根據需求,首先定義如下3大要素shell

    • 數據源組件,即source ——監控文件目錄 : spooldirapache

      spooldir特性:vim

      一、監視一個目錄,只要目錄中出現新文件,就會採集文件中的內容服務器

      二、採集完成的文件,會被agent自動添加一個後綴:COMPLETEDide

      三、此source可靠,不會丟失數據;即便flume重啓或被killoop

      注意:ui

      所監視的目錄中不容許有同名的文件;且文件被放入spooldir後,就不能修改this

      ①若是文件放入spooldir後,又向文件寫入數據,會打印錯誤及中止

      ②若是有同名的文件出如今spooldir,也會打印錯誤及中止

    • 下沉組件,即sink——HDFS文件系統 : hdfs sink

    • 通道組件,即channel——可用file channel 也能夠用內存channel

flume配置文件開發
  • 配置文件編寫:

    cd /bigdata/install/flume-1.9.0/conf/
    mkdir -p /bigdata/install/mydata/flume/dirfile
    vim spooldir.conf
  • 內容以下

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    # 注意:不能往監控目中重複丟同名文件
    a1.sources.r1.type = spooldir
    # 監控的路徑
    a1.sources.r1.spoolDir = /bigdata/install/mydata/flume/dirfile
    # Whether to add a header storing the absolute path filename
    #文件絕對路徑放到header
    a1.sources.r1.fileHeader = true
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    #採集到的數據寫入到次路徑
    a1.sinks.k1.hdfs.path = hdfs://hadoop01:8020/spooldir/files/%y-%m-%d/%H%M/    
    # 指定在hdfs上生成的文件名前綴
    a1.sinks.k1.hdfs.filePrefix = events-
    # timestamp向下舍round down
    a1.sinks.k1.hdfs.round = true
    # 按10分鐘,爲單位向下取整;如55分,舍成50;38 -> 30
    a1.sinks.k1.hdfs.roundValue = 10
    # round的單位
    a1.sinks.k1.hdfs.roundUnit = minute
    # 每3秒滾動生成一個文件;默認30;(0 = never roll based on time interval)
    a1.sinks.k1.hdfs.rollInterval = 3
    # 每x字節,滾動生成一個文件;默認1024;(0: never roll based on file size)
    a1.sinks.k1.hdfs.rollSize = 20
    # 每x個event,滾動生成一個文件;默認10; (0 = never roll based on number of events)
    a1.sinks.k1.hdfs.rollCount = 5
    # 每x個event,flush到hdfs
    a1.sinks.k1.hdfs.batchSize = 1
    # 使用本地時間
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件類型,默認是Sequencefile;可選DataStream,則爲普通文本;可選CompressedStream壓縮數據
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    # channel中存儲的event的最大數目
    a1.channels.c1.capacity = 1000
    # 每次傳輸數據,從source最多得到event的數目或向sink發送的event的最大的數目
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    組件官網地址:

    spooling directory source

    hdfs sink

    memory channel

  • Channel參數解釋:

    • capacity:默認該通道中最大的能夠存儲的event數量
    • trasactionCapacity:每次最大能夠從source中拿到或者送到sink中的event數量
    • keep-alive:event添加到通道中或者移出的容許時間
啓動flume
cd /bigdata/install/flume-1.9.0
bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console
上傳文件到指定目錄
  • 將不一樣的文件上傳到下面目錄裏面去,注意文件不能重名

    mkdir -p /home/hadoop/datas
    cd /home/hadoop/datas
    vim a.txt
    
    # 加入以下內容
    ab cd ef
    english math
    hadoop alibaba
  • 再執行;

    cp a.txt /bigdata/install/mydata/flume/dirfile
  • 而後觀察flume的console動靜、hdfs webui生成的文件

  • 觀察spooldir的目標目錄

  • 將同名文件再次放到/bigdata/install/mydata/flume/dirfile觀察現象:

    cp a.txt /bigdata/install/mydata/flume/dirfile
  • flume控制檯報錯

相關文章
相關標籤/搜索