HDFS工做機制——自開發分佈式數據採集系統

需求描述:node

在業務系統的服務器上,業務程序會不斷生成業務日誌(好比網站的頁面訪問日誌)apache

業務日誌是用log4j生成的,會不斷地切出日誌文件,須要按期(好比每小時)從業務服務器上的日誌目錄中,探測須要採集的日誌文件(access.log不能採),發往HDFS設計模式

注意點:業務服務器可能有多臺(hdfs上的文件名不能直接用日誌服務器上的文件名)安全

當天採集到的日誌要放在hdfs的當天目錄中,採集完成的日誌文件,須要移動到到日誌服務器的一個備份目錄中按期檢查(每小時檢查一下備份目錄),將備份時長超出24小時的日誌文件清除服務器

數據採集流程分析多線程

1.流程
啓動一個定時任務
    定時探測日誌源目錄
    獲取須要採集得文件
    移動這些文件到一個待上傳得臨時目錄
    遍歷待上傳目錄中得文件,逐一傳輸到HDFS得目標路徑
    同時將傳輸得文件移動到備份目錄

啓動一個定時任務:
    探測備份目錄中得備份數據,檢查是否超出最長備份時長,超出,則刪除

2.規劃各類路徑
日誌源路徑:d:/logs/accesslog/
待上傳臨時目錄:d:/logs/toupload/
備份目錄:d:/logs/backup/日期

HDFS 存儲路徑:/logs/日期
HDFS中文件的前綴:acceaa_log_
HDFS中文件的後綴:.log

準備工做app

[root@hdp-01 ~]# start-dfs.sh
Starting namenodes on [hdp-01]
hdp-01: starting namenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-namenode-hdp-01.out
hdp-01: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-01.out
hdp-03: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-03.out
hdp-02: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-02.out
hdp-04: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-04.out
Starting secondary namenodes [hdp-02]
hdp-02: starting secondarynamenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-secondarynamenode-hdp-02.out

代碼以下dom

DataCollection oop

public class DataCollection {
    public static void main(String[] args) {
        Timer timer = new Timer();
        //1.【定時探測日誌源目錄】,每一小時執行一次
        timer.schedule(new CollectTask(),0,60*60*1000L);
        //2.【定時刪除文件】
        timer.schedule(new BackupCleanTask(),0,60*60*1000L);
    }
}

collect.properties網站

LOG_SOURCE_DIR=d:/logs/accesslog/
LOG_TOUPLOAD_DIR=d:/logs/toupload/
LOG_BACKUP_BASE_DIR=d:/logs/backup/
LOG_LEGAL_PREFIX=access.log.
HDFS_URI=hdfs://hdp-01:9000/
HDFS_DEST_BASE_DIR=/logs/
HDFS_FILE_PREFIX=access_log_
HDFS_FILE_SUFFIX=.log

Contants 

/**
 * 日誌目錄參數key
 */
public class Contants {

    /**
     * 本地要上傳文件目錄
     */
    public static final String LOG_SOURCE_DIR="LOG_SOURCE_DIR";
    /**
     * 臨時上傳目錄中的文件
     */
    public static final String LOG_TOUPLOAD_DIR="LOG_TOUPLOAD_DIR";
    /**
     * d:/logs/backup/
     */
    public static final String LOG_BACKUP_BASE_DIR="LOG_BACKUP_BASE_DIR";
    /**
     * 須要採集得文件
     */
    public static final String LOG_LEGAL_PREFIX="LOG_LEGAL_PREFIX";
    /**
     * 上傳到 HDFS ip+port
     */
    public static final String HDFS_URI="HDFS_URI";
    /**
     * hdfs:logs/目錄
     */
    public static final String HDFS_DEST_BASE_DIR="HDFS_DEST_BASE_DIR";
    /**
     * hdfs文件前綴:access_log_
     */
    public static final String HDFS_FILE_PREFIX="HDFS_FILE_PREFIX";
    /**
     * hdfs文件後綴:.log
     */
    public static final String HDFS_FILE_SUFFIX="HDFS_FILE_SUFFIX";
}

單例設計方式一:餓漢式單例,程序啓動時建立

PropertyHolderHungery

/**
 * 單例設計方式一:餓漢式單例,程序啓動時建立
 */
public class PropertyHolderHungery {

    private static Properties prop=new Properties();

    //靜態代碼塊
    static {
        try {
            prop.load(PropertyHolderHungery.class.getClassLoader()
                    .getResourceAsStream("collect.properties"));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public static Properties getProps()throws Exception{
        return prop;
    }
}

單例設計模式二:懶漢式,使用的時候建立,還考慮線程安全

PropertyHolderLazy 

/**
 * 單例設計模式二:懶漢式,使用的時候建立,還考慮線程安全
 */
public class PropertyHolderLazy {
     //默認構造器私有化
    private PropertyHolderLazy(){

    }
    //禁止指令重排序
    private volatile static Properties prop=null;

    public static Properties getProps()throws Exception{
        if(prop==null){
            //加鎖,保證多線程場景下線程安全問題
            synchronized (PropertyHolderLazy.class){
                //防止再次new
                if(prop==null){
                    prop=new Properties();
                    prop.load(PropertyHolderLazy.class.getClassLoader()
                            .getResourceAsStream("collect.properties"));
                }
            }
        }
        return prop;
    }
}

CollectTask 

public class CollectTask extends TimerTask {
    //構造一個log4j日誌對象
    public static Log log = LogFactory.getLog(CollectTask.class);
    public void run() {
        /**
         *     1.定時探測日誌源目錄
         *     2.獲取須要採集得文件
         *     3.移動這些文件到一個待上傳得臨時目錄
         *     4.遍歷待上傳目錄中得文件,逐一傳輸到HDFS得目標路徑
         *     5.同時將傳輸得文件移動到備份目錄
         */
        try{
            //獲取配置參數
            final Properties props = PropertyHolderLazy.getProps();
            //獲取本次採集時的日期
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
            String day = sdf.format(new Date());
            //獲取本地要上傳文件目錄
            File srcDir = new File(props.getProperty(Contants.LOG_SOURCE_DIR));
            //2.【獲取須要採集得文件】
            File[] listFiles = srcDir.listFiles(new FilenameFilter() {
            public boolean accept(File dir, String name) {
                if(name.startsWith(props.getProperty(Contants.LOG_LEGAL_PREFIX))){
                    return true;
                }
                return false;
            }
        });
        //記錄日誌
        log.info("探測到以下文件須要採集:"+Arrays.toString(listFiles));
        //獲取臨時上傳目錄中的文件
        File toUploadDir = new File(props.getProperty(Contants.LOG_TOUPLOAD_DIR));
        //3.【移動這些文件到一個待上傳得臨時目錄】
        for (File file:listFiles) {
           //將採集的文件移到臨時上傳目錄,將源目錄中須要上傳的文件移動到臨時上傳目錄中
            FileUtils.moveFileToDirectory(file,toUploadDir,true);
        }
            //記錄日誌
            log.info("上述文件移動到待上傳目錄"+toUploadDir.getAbsolutePath());
            //構造一個HDFS的客戶端對象
            FileSystem fs = FileSystem.get(new URI(props.getProperty(Contants.HDFS_URI)), new Configuration(), "root");
            //從臨時上傳目錄中列出全部文件
            File[] toUploadFiles = toUploadDir.listFiles();
            //1.檢查hdfs 中的日期是否存在
            Path hdfsDestPath = new Path(props.getProperty(Contants.HDFS_DEST_BASE_DIR) + day);
            if(!fs.exists(hdfsDestPath)){
                fs.mkdirs(hdfsDestPath);
            }
            //2.檢查本地的備份目錄是否存在
            File backupDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR) + day);
            if(!backupDir.exists()){
                backupDir.mkdirs();
            }
            //4.【遍歷待上傳目錄中得文件,逐一傳輸到HDFS得目標路徑】
            for (File file:toUploadFiles) {
                //傳輸文件到HDFS並更名
                Path destPath = new Path(hdfsDestPath +"/"+props.getProperty(Contants.HDFS_FILE_PREFIX)
                        + UUID.randomUUID() +props.getProperty(Contants.HDFS_FILE_SUFFIX));
                //將臨時上傳目錄中的文件上傳到hdfs中
                fs.copyFromLocalFile(new Path(file.getAbsolutePath()),destPath);
                //記錄日誌
                log.info("文件傳輸到hdfs完成:"+file.getAbsolutePath() +"-->"+destPath);
                //5.【同時將傳輸得文件移動到備份目錄】
                FileUtils.moveFileToDirectory(file,backupDir,true);
                //記錄日誌
                log.info("文件備份完成:"+file.getAbsolutePath() +"-->"+backupDir);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

BackupCleanTask 

public class BackupCleanTask extends TimerTask {

    public void run() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
        long now = new Date().getTime();
        //探測備份目錄
        try {
            //獲取配置參數
            final Properties props = PropertyHolderLazy.getProps();
            File backupBaseDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR));
            File[] dayBackDir = backupBaseDir.listFiles();
            //判斷備份目錄是否已經超過24h
            for (File dir : dayBackDir) {
                long time = sdf.parse(dir.getName()).getTime();
                if(now-time>24*60*60*1000L){
                    //遞歸刪除目錄
                    FileUtils.deleteDirectory(dir);
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

日誌配置

log4j.rootLogger=CONSOLE,stdout,logfile
#stdout控制器
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#輸出格式
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]:%L - %m%n
#文件路徑輸出
log4j.appender.logfile=org.apache.log4j.RollingFileAppender
log4j.appender.logfile.File=d:/logs/collect/collect.log 
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

pom依賴

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>

    </dependencies>

控制檯輸出

...
47 - 探測到以下文件須要採集:[d:\logs\accesslog\access.log.1, d:\logs\accesslog\access.log.2, d:\logs\accesslog\access.log.3]
57 - 上述文件移動到待上傳目錄d:\logs\toupload
79 - 文件傳輸到hdfs完成:d:\logs\toupload\access.log.1-->/logs/2019-05-25-12/access_log_9dc0542e-0153-4bb3-b804-d85115c20153.log
83 - 文件備份完成:d:\logs\toupload\access.log.1-->d:\logs\backup\2019-05-25-12
79 - 文件傳輸到hdfs完成:d:\logs\toupload\access.log.2-->/logs/2019-05-25-12/access_log_5ce3450d-8874-4dd7-a23e-8d6a7a52c6d9.log
83 - 文件備份完成:d:\logs\toupload\access.log.2-->d:\logs\backup\2019-05-25-12
79 - 文件傳輸到hdfs完成:d:\logs\toupload\access.log.3-->/logs/2019-05-25-12/access_log_f7b7c741-bb87-4778-98ad-e33f06501441.log
83 - 文件備份完成:d:\logs\toupload\access.log.3-->d:\logs\backup\2019-05-25-12
...

效果圖

版權@須臾之餘https://my.oschina.net/u/3995125

相關文章
相關標籤/搜索