修改Flume-NG的hdfs sink解析時間戳源碼大幅提升寫入性能

  Flume-NG中的hdfs sink的路徑名(對應參數"hdfs.path",不容許爲空)以及文件前綴(對應參數"hdfs.filePrefix")支持正則解析時間戳自動按時間建立目錄及文件前綴。java

  在實際使用中發現Flume內置的基於正則的解析方式很是耗時,有很是大的提高空間。若是你不須要配置按時間戳解析時間,那這篇文章對你用處不大,hdfs sink對應的解析時間戳的代碼位於org.apache.flume.sink.hdfs.HDFSEventSink的process()方法中,涉及兩句代碼:  apache

1 // reconstruct the path name by substituting place holders
2         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
3             timeZone, needRounding, roundUnit, roundValue, useLocalTime);
4         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
5           timeZone, needRounding, roundUnit, roundValue, useLocalTime);

  其中,realPath是正則解析時間戳以後的完整路徑名,filePath參數就是配置文件中的"hdfs.path";realName就是正則解析時間戳以後的文件名前綴,fileName參數就是配置文件中的"hdfs.filePrefix"。其餘參數都相同,event.getHeaders()是一個Map裏面有時間戳(能夠經過interceptor、自定義、使用hdfs sink的useLocalTimeStamp參數三種方式來設置),其餘參數是時區、是否四捨五入以及時間單位等。maven

  BucketPath.escapeString這個方法就是正則解析時間戳所在,具體代碼咱們再也不分析,如今咱們編寫一個程序測試一下BucketPath.escapeString這個方法的性能,運行這個測試類要麼在源碼中:性能

public class Test {public static void main(String[] args) {
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("timestamp", Long.toString(System.currentTimeMillis()));
        String filePath = "hdfs://xxxx.com:8020/data/flume/%Y-%m-%d";
        String fileName = "%H-%M";
        long start = System.currentTimeMillis();
        System.out.println("start time is:" + start);
        for (int i = 0; i < 2400000; i++) {
        String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false);
        String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false);
}
     long end = System.currentTimeMillis();
     System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms.");
}
}

  這個方法後面5個參數咱們通常不須要用到,所以這裏其實都設置成在實際中沒有影響的數值了。headers參數要有「timestamp」參數,咱們這裏循環處理240W個event,看看運行結果:測試

start time is:1412853253889
end time is:1412853278210.
Total time is:24321 ms.

  我靠,竟然花了24s還多,尼瑪要知道哥目前白天的數據量也就是每秒4W個event,這還不是峯值呢。。。加上解析時間戳全量就扛不住了,怎麼辦??ui

  能怎麼辦啊?只能想辦法替換這個解析辦法了,因而,我就想到這樣了,看測試程序:spa

public class Test {

    private static SimpleDateFormat sdfYMD = null;
    private static SimpleDateFormat sdfHM = null;
    
    public static void main(String[] args) {
        
        sdfYMD = new SimpleDateFormat("yyyy-MM-dd");
        sdfHM = new SimpleDateFormat("HH-mm");
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("timestamp", Long.toString(System.currentTimeMillis()));
        String filePath = "hdfs://dm056.tj.momo.com:8020/data/flume/%Y-%m-%d";
        String fileName = "%H-%M";
        long start = System.currentTimeMillis();
        System.out.println("start time is:" + start);
        for (int i = 0; i < 2400000; i++) {
            //String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false);
            //String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false);
            
            String realPath = getTime("yyyy-MM-dd",Long.parseLong(headers.get("timestamp")));
            String realName = getTime("HH-mm",Long.parseLong(headers.get("timestamp")));
        }
        long end = System.currentTimeMillis();
        System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms.");
    }

    public static String getTime(String format,long timestamp) {
        String time="";
        if(format.equals("HH-mm"))
            time=sdfHM.format(timestamp);
        else if(format.equals("yyyy-MM-dd"))
            time=sdfYMD.format(timestamp);        
        return time;
    }
}

  咱們使用java本身的SimpleDateFormat來完成按指定格式的解析,這樣就不能將整個path或者name傳進去了,看看運行結果:code

start time is:1412853670246
end time is:1412853672204.
Total time is:1958 ms.

  尼瑪!!!不是吧,不到2s。。。我這是在個人MBP上測試的,i5+8G+128G SSD,騷年你還猶豫什麼呢?orm

  來開始改動源碼吧。。。對象

  咱們最好把解析格式作成可配置的,而且最好還保留原來的能夠加前綴名的方式,由於有可能須要加入主機名啊什麼的,可是能夠把這個前綴做爲中綴,解析時間戳的結果做爲前綴。。。

  一、咱們須要兩個SimpleDateFormat來分別實現對path和name的格式化,並在配置時完成實例化,這樣能夠建立一次對象就Ok,還須要path和name的格式化串,這個能夠作成全局的或者局部的,咱們這是全局的(其實沒有必要,是否是?哈哈),變量聲明階段代碼:

   private SimpleDateFormat sdfPath = null;        //for file in hdfs path
    private SimpleDateFormat sdfName = null;        //for file name prefix

    private String filePathFormat;
    private String fileNameFormat;

  二、configure(Context context)方法中須要對上述對象進行配置了,很簡單,很明顯,相關代碼以下:

1      filePath = Preconditions.checkNotNull(
2                 context.getString("hdfs.path"), "hdfs.path is required");
3         filePathFormat =  context.getString("hdfs.path.format", "yyyy/MM/dd");        //time's format ps:"yyyy-MM-dd"
4         sdfPath = new SimpleDateFormat(filePathFormat);
5         fileName = context.getString("hdfs.filePrefix", defaultFileName);
6         fileNameFormat = context.getString("hdfs.filePrefix.format", "HHmm");
7         sdfName = new SimpleDateFormat(fileNameFormat);

  增長的是上面的三、四、六、7四行代碼,解析格式串是在"hdfs.path.format"和"hdfs.filePrefix.format"中進行配置,其它的地方不要存在時間戳格式串了,也不要出現原來內置的那些%H、%mm等等格式了。上面兩個format配置有默認格式串,本身作決定就好。

  三、增長解析時間戳方法:

1     public String getTime(String type,long timestamp) {
2         String time="";
3         if(type.equals("name"))
4             time=sdfName.format(timestamp);
5         else if(type.equals("path"))
6             time=sdfPath.format(timestamp);
7         return time;
8     }

  參數type用來指定是文件名的仍是路徑名的,用來調用相應地格式化對象。

  四、下面是重點了,上面幾步即便配置了,不在這修改也不會起任何做用,修改process()方法,用如下代碼替換最上面提到的兩行代碼:

1                 String realPath = filePath;
2                 String realName = fileName;
3                 if(realName.equals("%host") && event.getHeaders().get("host") != null)
4                     realName = event.getHeaders().get("host").toString();
5                 if(event.getHeaders().get("timestamp") != null){
6                     long time = Long.parseLong(event.getHeaders().get("timestamp"));
7                     realPath += DIRECTORY_DELIMITER + getTime("path",time);
8                     realName = getTime("name",time) + "." + realName;
9                 }        

  這幾行的邏輯其實有:A、能夠自定義中綴("hdfs.filePrefix",能夠是常量或者是"%host",後者用來獲取主機名,前提是要設置hostinterceptor);B、默認中綴就是默認的"FlumeData";C、若是headers中存在時間戳,調用getTime方法解析時間戳。

  五、編譯&打包&替換&運行。。。

   哥打包比較原始,由於只修改了一個類,就把編譯後的class文件以HDFSEventSink開頭的幾個class文件替換了原來flume-hdfs-sink的jar包中的對應的class文件。。。尼瑪,原始吧。。。會maven,直接上maven吧。。。

 

  我這邊的測試結果是若是沒有配置壓縮功能,性能提高超過70%,若是配置上壓縮功能(gzip)性能提高超過50%,這數值僅供參考,不一樣環境不一樣主機不一樣人品可能不盡相同。。

  期待大夥的測試結果。。。

相關文章
相關標籤/搜索