在測試hdfs的sink,發現sink端的文件滾動配置項起不到任何做用,配置以下:java
a1.sinks.k1.type=hdfs a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M a1.sinks.k1.hdfs.filePrefix=XXX a1.sinks.k1.hdfs.rollInterval=60 a1.sinks.k1.hdfs.rollSize=0 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=0
這裏配置的是60秒,文件滾動一次,也就每隔60秒,會新產生一個文件【前提,flume的source端有數據來】oop
可是當我啓動flume的時候,運行十幾秒,不斷寫入數據,發現hdfs端頻繁的產生文件,每隔幾秒就有新文件產生
並且在flume的日誌輸出能夠頻繁看到這句:測試
[WARN] Block Under-replication detected. Rotating file.this
只要有這句,就會產生一個新的文件spa
意思就是檢測到複製塊正在滾動文件,結合源碼看下:debug
private boolean shouldRotate() { boolean doRotate = false; if (writer.isUnderReplicated()) { this.isUnderReplicated = true; doRotate = true; } else { this.isUnderReplicated = false; } if ((rollCount > 0) && (rollCount <= eventCounter)) { LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter); doRotate = true; } if ((rollSize > 0) && (rollSize <= processSize)) { LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize); doRotate = true; } return doRotate; }
這是判斷是否滾動文件,可是這裏面的第一判斷條件是判斷是否當前的HDFSWriter正在複製塊日誌
public boolean isUnderReplicated() { try { int numBlocks = getNumCurrentReplicas(); if (numBlocks == -1) { return false; } int desiredBlocks; if (configuredMinReplicas != null) { desiredBlocks = configuredMinReplicas; } else { desiredBlocks = getFsDesiredReplication(); } return numBlocks < desiredBlocks; } catch (IllegalAccessException e) { logger.error("Unexpected error while checking replication factor", e); } catch (InvocationTargetException e) { logger.error("Unexpected error while checking replication factor", e); } catch (IllegalArgumentException e) { logger.error("Unexpected error while checking replication factor", e); } return false; }
經過讀取的配置複製塊數量和當前正在複製的塊比較,判斷是否正在被複制orm
if (shouldRotate()) { boolean doRotate = true; if (isUnderReplicated) { if (maxConsecUnderReplRotations > 0 && consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) { doRotate = false; if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) { LOG.error("Hit max consecutive under-replication rotations ({}); " + "will not continue rolling files under this path due to " + "under-replication", maxConsecUnderReplRotations); } } else { LOG.warn("Block Under-replication detected. Rotating file."); } consecutiveUnderReplRotateCount++; } else { consecutiveUnderReplRotateCount = 0; }
以上方法,入口是shouldRotate()方法,也就是若是你配置了rollcount,rollsize大於0,會按照你的配置來滾動的,可是在入口進來後,發現,又去判斷了是否有塊在複製;
裏面就讀取了一個固定變量maxConsecUnderReplRotations=30,也就是正在複製的塊,最多之能滾動出30個文件,若是超過了30次,該數據塊若是還在複製中,那麼數據也不會滾動了,doRotate=false,不會滾動了,因此有的人發現本身一旦運行一段時間,會出現30個文件blog
再結合上面的源碼看一下:事件
若是你配置了10秒滾動一次,寫了2秒,剛好這時候該文件內容所在的塊在複製中,那麼雖然沒到10秒,依然會給你滾動文件的,文件大小,事件數量的配置同理了。
爲了解決上述問題,咱們只要讓程序感知不到寫的文件所在塊正在複製就好了,怎麼作呢??
只要讓isUnderReplicated()方法始終返回false就好了
該方法是經過當前正在被複制的塊和配置中讀取的複製塊數量比較的,咱們能改的就只有配置項中複製塊的數量,而官方給出的flume配置項中有該項
hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
默認讀的是hadoop中的dfs.replication屬性,該屬性默認值是3
這裏咱們也不去該hadoop中的配置,在flume中添加上述屬性爲1便可
配置以下:
a1.sinks.k1.type=hdfs a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M a1.sinks.k1.hdfs.filePrefix=cmcc a1.sinks.k1.hdfs.minBlockReplicas=1 #a1.sinks.k1.hdfs.fileType=DataStream #a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.rollInterval=60 a1.sinks.k1.hdfs.rollSize=0 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=0
這樣程序就永遠不會由於文件所在塊的複製而滾動文件了