大數據開發-Flume-頻繁產生小文件緣由和處理

1.問題背景

經過flume直接上傳實時數據到hdfs,會常遇到的一個問題就是小文件,須要調參數來設置,每每在生產環境參數大小也不一樣linux

1.flume滾動配置爲什麼不起做用?json

2.經過源碼分析得出什麼緣由?oop

3.該如何解決flume小文件?源碼分析

2. 過程分析

接着上一篇,https://blog.csdn.net/hu_lichao/article/details/110358689測試

本人在測試hdfs的sink,發現sink端的文件滾動配置項起不到任何做用,配置以下:大數據

a1.sinks.k1.type=hdfs  
a1.sinks.k1.channel=c1  
a1.sinks.k1.hdfs.useLocalTimeStamp=true  
a1.sinks.k1.hdfs.path=hdfs://linux121:9000/user/data/logs/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=XXX  
a1.sinks.k1.hdfs.rollInterval=60  
a1.sinks.k1.hdfs.rollSize=0  
a1.sinks.k1.hdfs.rollCount=0  
a1.sinks.k1.hdfs.idleTimeout=0

這裏配置的是60秒,文件滾動一次,也就每隔60秒,會新產生一個文件【前提,flume的source端有數據來】這裏注意 useLocalTimeStamp 是使用本地時間戳來對hdfs上的目錄來命名,這個屬性的目的就是至關於時間戳的攔截器,不然%Y 等等這些東西都識別不了ui

要麼用上面這個屬性,要麼用時間戳攔截器。可是當我啓動flume的時候,運行十幾秒,不斷寫入數據,發現hdfs端頻繁的產生文件,每隔幾秒就有新文件產生並且在flume的日誌輸出能夠頻繁看到這句:this

[WARN] Block Under-replication detected. Rotating file.

只要有這句,就會產生一個新的文件,意思就是檢測到複製塊正在滾動文件,結合源碼看下:人工智能

private boolean shouldRotate() {  

    boolean doRotate = false;    
    if (writer.isUnderReplicated()) {    
        this.isUnderReplicated = true;    
        doRotate = true;  
    
    } else {  
    
        this.isUnderReplicated = false;  
    
    }  
    
    if ((rollCount > 0) && (rollCount <= eventCounter)) {  
        LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter); 
        doRotate = true;  
    
    }  
    
    if ((rollSize > 0) && (rollSize <= processSize)) {  
        LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);   
        doRotate = true;  
    }     
    return doRotate;  

}

這是判斷是否滾動文件,可是這裏面的第一判斷條件是判斷是否當前的HDFSWriter正在複製塊spa

public boolean isUnderReplicated() {  
    try {  
    
        int numBlocks = getNumCurrentReplicas();  
        if (numBlocks == -1) {  
            return false;  
        }  
    
        int desiredBlocks;  
        if (configuredMinReplicas != null) {    
            desiredBlocks = configuredMinReplicas;  
        } else {   
            desiredBlocks = getFsDesiredReplication();     
        }  
    
        return numBlocks < desiredBlocks;  
    
    } catch (IllegalAccessException e) {  
        logger.error("Unexpected error while checking replication factor", e);   
    } catch (InvocationTargetException e) {   
        logger.error("Unexpected error while checking replication factor", e);      
    } catch (IllegalArgumentException e) {     
        logger.error("Unexpected error while checking replication factor", e);     
    }  
    
    return false;  

}

經過讀取的配置複製塊數量和當前正在複製的塊比較,判斷是否正在被複制

if (shouldRotate()) {  
    boolean doRotate = true;  
    
    if (isUnderReplicated) {  
    
        if (maxConsecUnderReplRotations > 0 &&  
            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {  
            doRotate = false;  
    
            if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {   
                LOG.error("Hit max consecutive under-replication rotations ({}); " +  
                    "will not continue rolling files under this path due to " +  
                    "under-replication", maxConsecUnderReplRotations);  
            }  
    
        } else {    
            LOG.warn("Block Under-replication detected. Rotating file.");  
        }  
    
        consecutiveUnderReplRotateCount++;  
    
    } else {  
    
        consecutiveUnderReplRotateCount = 0;  

}

以上方法,入口是shouldRotate()方法,也就是若是你配置了rollcount,rollsize大於0,會按照你的配置來滾動的,可是在入口進來後,發現,又去判斷了是否有塊在複製;裏面就讀取了一個固定變量maxConsecUnderReplRotations=30,也就是正在複製的塊,最多之能滾動出30個文件,若是超過了30次,該數據塊若是還在複製中,那麼數據也不會滾動了,doRotate=false,不會滾動了,因此有的人發現本身一旦運行一段時間,會出現30個文件,再結合上面的源碼看一下:若是你配置了10秒滾動一次,寫了2秒,剛好這時候該文件內容所在的塊在複製中,那麼雖然沒到10秒,依然會給你滾動文件的,文件大小,事件數量的配置同理了。

爲了解決上述問題,咱們只要讓程序感知不到寫的文件所在塊正在複製就好了,怎麼作呢??只要讓isUnderReplicated()方法始終返回false就好了,該方法是經過當前正在被複制的塊和配置中讀取的複製塊數量比較的,咱們能改的就只有配置項中複製塊的數量,而官方給出的flume配置項中有該項

hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.

默認讀的是hadoop中的dfs.replication屬性,該屬性默認值是3,這裏咱們也不去該hadoop中的配置,在flume中添加上述屬性爲1便可

完整配置以下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile =
/data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/hoult/servers/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
# 配置文件滾動方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的個數
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地時間
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

這樣程序就永遠不會由於文件所在塊的複製而滾動文件了,只會根據你的配置項來滾動文件了。。。。

3.總結

設置minBlockReplicas=1 的時候能夠保證會按你設置的幾個參數來達到不會產生過多的小文件,由於這個參數在讀取時候優先級較高,會首先判斷到有沒有Hdfs的副本複製,致使滾動文件的異常,另外flume接入數據時候能夠經過過濾器儘量把一些徹底用不到的數據進行過濾,清洗時候就 省事一些了。
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注
file

相關文章
相關標籤/搜索