數據倉庫:Mysql大量數據快速導出

背景

寫這篇文章主要是介紹一下我作數據倉庫ETL同步的過程當中遇到的一些有意思的內容和提高程序運行效率的過程。mysql

關係型數據庫:

  項目初期:遊戲的運營數據比較輕量,相關的運營數據是經過Java後臺程序聚合查詢關係型數據庫MySQL徹底能夠應付,系統經過定時任務每日統計相關數據,等待運營人員查詢便可。sql

  項目中後期:隨着開服數量增多,玩家數量愈來愈多,數據庫的數據量愈來愈大,運營後臺查詢效率愈來愈低。對於普通的關係型來講,如MySQL,當單表存儲記錄數超過500萬條後,數據庫查詢性能將變得極爲緩慢,而每每咱們都不會只作單表查詢,還有多表join。這裏假若有100個遊戲服,每一個服有20張表,而每一個表有500W數據,那麼:數據庫

  總數據量 = 100 * 20 * 500W = 10億  按當時的庫表結構,換算成磁盤空間,約爲100G左右緩存

個人天吶,如今沒有單機的內存能同一時間載入100G的數據服務器

https://www.zhihu.com/question/19719997數據結構

  因此,考慮到這一點,Hive被提出來解決難題!架構

 

數據倉庫

Hive適合作海量數據的數據倉庫工具, 由於數據倉庫中的數據有這兩個特色:最全的歷史數據(海量)、相對穩定的;所謂相對穩定,指的是數據倉庫不一樣於業務系統數據庫,數據常常會被更新,數據一旦進入數據倉庫,不多會被更新和刪除,只會被大量查詢。而Hive,也是具有這兩個特色

2、項目架構設計

 在這裏先說下初期項目架構的探索,由於數據流向,其實最終就是從MYSQL--------->Hive中,我使用的是Jdbc方式。爲何不使用下列工具呢?併發

  • Sqoop, 由於該遊戲每一個服有將近80張表,而後又有不少服,之後還會更多,而每一個服的庫表數據結構實際上是徹底同樣的,只是IP地址不同,使用Sqoop的話,將會須要維護愈來愈多的腳本,再者Sqoop無法處理原始數據中一些帶有Hive表定義的行列分隔符
  • DataX 阿里開源的數據同步中間件,沒作過詳細研究

一、全局緩存隊列

使用生產者消費者模型,中間使用內存,數據落地成txtide

 

 

首先生產者經過Jdbc獲取源數據內容,放入固定大小的緩存隊列,同時消費者不斷的從緩存讀取數據,根據不一樣的數據類型分別讀取出來,並逐條寫入相應的txt文件。工具

速度每秒約8000條。

這樣作表面上看起來很是美好,流水式的處理,來一條處理一下,但是發現消費的速度遠遠趕不上生產的速度,生產出來的數據會堆積在緩存隊列裏面,假如隊列不固定長度的話,這時候還會大量消耗內存,因此爲了提高寫入的速度,決定採用下一種方案

 

二、每一張表一個緩存隊列及writer接口

每張表各自起一個生產者消費者模型,消費者啓動時初始化相應的writer接口,架構設計以下:

 

table1的生產者經過Jdbc獲取源數據內容,放入table自帶的固定大小的緩存隊列,同時table1相應的消費者不斷的從緩存讀取數據,根據不一樣的數據類型分別讀取出來,並逐條寫入相應的txt文件。

速度每秒約2W條。

 這樣生產者線程能夠併發的進行,經過控制生產者線程的數量,能夠大大提升處理的效率, 項目關鍵代碼以下:

1)線程池

/***
 * 
 * 
 * @描述 任務線程池
 */
public class DumpExecuteService {

    private static ExecutorService dumpServerWorkerService; // 遊戲服任務
    private static ExecutorService dumpTableWorkerService; // 表數據任務
    private static ExecutorService dumpReaderWorkerService; // 讀取數據任務
    private static ExecutorService dumpWriterWorkerService; // 寫數據結果任務

    /***
     * 初始化任務線程池
     * @param concurrencyDBCount 併發數量
     */
    public synchronized static void startup(int concurrencyDBCount) {

        if (dumpServerWorkerService != null)
            return;

        if (concurrencyDBCount > 2)
            concurrencyDBCount = 2; // 最多支持兩個數據庫任務併發執行

        if (concurrencyDBCount < 1)
            concurrencyDBCount = 1;

        dumpServerWorkerService = Executors.newFixedThreadPool(concurrencyDBCount, new NamedThreadFactory(
                "DumpExecuteService.dumpServerWorkerService" + System.currentTimeMillis()));
        dumpTableWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpTableWorkerService"
                + System.currentTimeMillis()));
        dumpWriterWorkerService = Executors.newFixedThreadPool(8, new NamedThreadFactory("DumpExecuteService.dumpWriterWorkerService"
                + System.currentTimeMillis()));
        dumpReaderWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpReaderWorkerService"
                + System.currentTimeMillis()));
    }

    public static Future<Integer> submitDumpServerWorker(DumpServerWorkerLogic worker) {
        return dumpServerWorkerService.submit(worker);
    }

    public static Future<Integer> submitDumpWriteWorker(DumpWriteWorkerLogic worker) {
        return dumpWriterWorkerService.submit(worker);
    }

    public static Future<Integer> submitDumpReadWorker(DumpReadWorkerLogic worker) {
        return dumpReaderWorkerService.submit(worker);
    }

    public static Future<Integer> submitDumpTableWorker(DumpTableWorkerLogic worker) {
        return dumpTableWorkerService.submit(worker);
    }

    /***
     * 關閉線程池
     */
    public synchronized static void shutdown() {

        //執行線程池關閉...
    }
}

說明:該類定義4個線程池,分別用於執行不一樣的任務

2)遊戲服任務線程池

/**
 * 1) 獲取 遊戲服log庫數據庫鏈接 
2) 依次處理單張表
*/ public class DumpServerWorkerLogic extends AbstractLogic implements Callable<Integer> { private static Logger logger = LoggerFactory.getLogger(DumpServerWorkerLogic.class); private final ServerPO server;// 數據庫 private final String startDate;// 開始時間 private SourceType sourceType;// 數據來源類型 private Map<String, Integer> resultDBMap;// 表記錄計數 private GameType gameType; public DumpServerWorkerLogic(ServerPO server, String startDate, SourceType sourceType, Map<String, Integer> resultDBMap, GameType gameType) { CheckUtil.checkNotNull("DumpServerWorkerLogic.server", server); CheckUtil.checkNotNull("DumpServerWorkerLogic.startDate", startDate); CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType); CheckUtil.checkNotNull("DumpServerWorkerLogic.resultDBMap", resultDBMap); CheckUtil.checkNotNull("DumpServerWorkerLogic.gameType", gameType); this.server = server; this.startDate = startDate; this.sourceType = sourceType; this.resultDBMap = resultDBMap; this.gameType = gameType; } @Override public Integer call() { // 獲取鏈接, 並取得該庫的全部表 Connection conn = null; try { conn = JdbcUtils.getDbConnection(server); } catch (Exception e) { throw new GameRuntimeException(e.getMessage(), e); } List<String> tableNames = null; DumpDbInfoBO dumpDbInfoBO = DumpConfig.getDumpDbInfoBO(); int totalRecordCount = 0; try { switch (this.sourceType) { case GAME_LOG: tableNames = JdbcUtils.getAllTableNames(conn); break; case INFOCENTER: tableNames = dumpDbInfoBO.getIncludeInfoTables(); tableNames.add("pay_action"); break; case EVENT_LOG: tableNames = new ArrayList<String>(); Date date = DateTimeUtil.string2Date(startDate, "yyyy-MM-dd"); String sdate = DateTimeUtil.date2String(date, "yyyyMMdd"); String smonth = DateTimeUtil.date2String(date, "yyyyMM"); tableNames.add("log_device_startup" + "_" + smonth); tableNames.add("log_device" + "_" + sdate); break; } // 遍歷table for (String tableName : tableNames) { // 過濾 if (dumpDbInfoBO.getExcludeTables().contains(tableName)) continue; DumpTableWorkerLogic tableTask = new DumpTableWorkerLogic(conn, server, tableName, startDate, resultDBMap, gameType, sourceType); Future<Integer> tableFuture = DumpExecuteService.submitDumpTableWorker(tableTask); int count = tableFuture.get(); totalRecordCount += count; logger.info(String.format("DumpServerWorkerLogic %s-%s.%s be done", startDate, server.getLogDbName(), tableName)); } return totalRecordCount; } catch (Exception e) { throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage()); } finally { JdbcUtils.closeConnection(conn); } } }

 

 3)表處理任務,一個表一個

 

/***
 * 
 * 
 * @描述 建立一個表查詢結果寫任務 (一個表一個)
 */
public class DumpTableWorkerLogic implements Callable<Integer> {
    private static Logger logger = LoggerFactory.getLogger(DumpTableWorkerLogic.class);

    private final String tableName;
    private final Connection conn;

    private ServerPO server;

    private String startDate;

    private Map<String, Integer> resultDBMap;// 表記錄計數

    private GameType gameType;

    private SourceType sourceType;// 數據來源類型

    public DumpTableWorkerLogic(Connection conn, ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap,
            GameType gameType, SourceType sourceType) {
        CheckUtil.checkNotNull("DumpTableWorkerLogic.conn", conn);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.tableName", tableName);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.server", server);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.startDate", startDate);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.resultDBMap", resultDBMap);
        CheckUtil.checkNotNull("DumpTableWorkerLogic.gameType", gameType);
        CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType);

        this.conn = conn;
        this.tableName = tableName;
        this.server = server;
        this.startDate = startDate;
        this.resultDBMap = resultDBMap;
        this.gameType = gameType;
        this.sourceType = sourceType;

        logger.info("DumpTableWorkerLogic[{}] Reg", tableName);
    }

    @Override
    public Integer call() {
        logger.info("DumpTableWorkerLogic[{}] Start", tableName);

        // 寫檢查結果任務
        DumpWriteWorkerLogic writerWorker = new DumpWriteWorkerLogic(server, tableName, startDate, resultDBMap, gameType,
                sourceType);
        Future<Integer> writeFuture = DumpExecuteService.submitDumpWriteWorker(writerWorker);
        logger.info("DumpTableWorkerLogic[{}] writer={}", tableName);

        // 數據查詢任務
        DumpReadWorkerLogic readerWorker = new DumpReadWorkerLogic(conn, tableName, writerWorker, startDate);
        DumpExecuteService.submitDumpReadWorker(readerWorker);
        logger.info("DumpTableWorkerLogic[{}] reader={}", tableName);

        try {
            int writeCount = writeFuture.get();
            logger.info("DumpTableWorkerLogic[{}] ---" + startDate + "---" + server.getId() + "---" + tableName + "---導出數據條數---"
                    + writeCount);
            return writeCount;
        }  catch (Exception e) {
            throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage());
        }
    }

}

 

 

4)單表讀取任務線程

/***
 * mysql讀取數據任務
 * 
 */
public class DumpReadWorkerLogic implements Callable<Integer> {

    private static Logger logger = LoggerFactory.getLogger(DumpReadWorkerLogic.class);

    private String tableName;

    private final Connection conn;

    private DumpWriteWorkerLogic writerWorker; // 寫結果數據任務

    private String startDate;// 開始導出日期

    private static final int LIMIT = 50000;// 限制sql一次讀出條數

    public DumpReadWorkerLogic(Connection conn, String tableName, DumpWriteWorkerLogic writerWorker, String startDate) {
        CheckUtil.checkNotNull("MysqlDataReadWorker.conn", conn);
        CheckUtil.checkNotNull("MysqlDataReadWorker.tableName", tableName);
        CheckUtil.checkNotNull("MysqlDataReadWorker.startDate", startDate);

        this.conn = conn;
        this.tableName = tableName;
        this.writerWorker = writerWorker;
        this.startDate = startDate;

        logger.info("DumpReadWorkerLogic Reg. tableName={}", this.tableName);
    }

    @Override
    public Integer call() {
        try {
            List<Map<String, Object>> result = JdbcUtils.queryForList(conn, "show full fields from " + tableName);

            int index = 0;
            String querySql = "";

            int totalCount = 0;
            while (true) {
                int offset = index * LIMIT;
                querySql = DumpLogic.getTableQuerySql(result, tableName, true, startDate) + " limit " + offset + "," + LIMIT;
                int row = DumpLogic.query(conn, querySql, writerWorker);
                totalCount += row;
                logger.info("tableName=" + tableName + ", offset=" + offset + ", index=" + index + ", row=" + row + ", limit=" + LIMIT);
                if (row < LIMIT)
                    break;
                index++;
            }
            writerWorker.prepareClose();
            logger.info(startDate + "---" + tableName + "---Read.End");
            return totalCount;
        }
        catch (Exception e) {
            throw new GameRuntimeException(e, "MysqlDataReadWorker fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage());
        }
    }

}

 

5)單表寫入任務線程

/***
 * 
 * 
 * @描述 mysql數據導出任務
 */
public class DumpWriteWorkerLogic implements Callable<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(DumpWriteWorkerLogic.class);
    private String tableName;// 表名

    private AtomicBoolean alive; // 線程是否活着

    private BufferedWriter writer;

    private ArrayBlockingQueue<String> queue; // 消息隊列

    private ServerPO server;// 服務器

    private String startDate;// 開始時間

    private Map<String, Integer> resultDBMap;// 當天某服某表數量記錄

    private GameType gameType;

    private SourceType sourceType;// 數據來源類型

    public DumpWriteWorkerLogic(ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap, GameType gameType,
            SourceType sourceType) {
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.tableName", tableName);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.server", server);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.startDate", startDate);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.resultDBMap", resultDBMap);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.gameType", gameType);
        CheckUtil.checkNotNull("DumpWriteWorkerLogic.sourceType", sourceType);

        this.tableName = tableName;
        this.server = server;
        this.startDate = startDate;
        this.queue = new ArrayBlockingQueue<>(65536);
        this.alive = new AtomicBoolean(true);
        this.gameType = gameType;
        this.sourceType = sourceType;
        this.writer = createWriter();
        this.resultDBMap = resultDBMap;

        logger.info("DumpWriteWorkerLogic Reg. tableName={}", this.tableName);
    }

    /***
     * 建立writer, 若文件不存在,會新建文件
     * 
     * @param serverId
     * @return
     */
    private BufferedWriter createWriter() {
        try {
            File toFile = FileUtils.getFilenameOfDumpTable(sourceType, tableName, startDate, gameType, ".txt");
            if (!toFile.exists()) {
                FileUtils.createFile(sourceType, tableName, startDate, gameType);
            }
            return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(toFile, true), Charsets.UTF_8), 5 * 1024 * 1024);
        } catch (Exception e) {
            throw new GameRuntimeException(e, "DumpWriteWorkerLogic createWriter fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage());
        }
    }

    /***
     * 寫入文件
     * 
     * @param line
     *            一條記錄
     */
    private void writeToFile(String line) {
        try {
            this.writer.write(line + "\n");
        } catch (Exception e) {
            throw new GameRuntimeException(e, "DumpWriteWorkerLogic writeToFile fail. errorMsg={%s} ", e.getMessage());
        }
    }

    /**
     * 記錄數據到消息隊列; 若是消息隊列滿了, 會阻塞直到能夠put爲止
     * 
     * @param result
     */
    public void putToWriterQueue(String line) {

        CheckUtil.checkNotNull("DumpWriteWorkerLogic putToWriterQueue", line);

        try {
            queue.put(line);
        } catch (InterruptedException e) {
            throw new GameRuntimeException(e, "DumpWriteWorkerLogic putToWriterQueue fail. errorMsg={%s} ", e.getMessage());
        }
    }

    /**
     * 準備關閉 (通知一下"須要處理的用戶數據都處理完畢了"; task 寫完數據, 就能夠完畢了)
     */
    public void prepareClose() {
        alive.set(false);
    }

    @Override
    public Integer call() {
        logger.info("DumpWriteWorkerLogic Start. tableName={}", this.tableName);
        try {
            int totalCount = 0;
            while (alive.get() || !queue.isEmpty()) {
                List<String> dataList = new ArrayList<String>();
                queue.drainTo(dataList);
                int count = processDataList(dataList);
                totalCount += count;
            }
            logger.info("DumpWriteWorkerLogic ---" + startDate + "---" + tableName + "---Writer.End");
            return totalCount;
        } catch (Exception exp) {
            throw new GameRuntimeException(exp, "DumpWriteWorkerLogic call() fail. errorMsg={%s} ", exp.getMessage());
        } finally {
            FileUtil.close(this.writer);
        }
    }

    /***
     * 處理數據:寫入本地文件及map
     * 
     * @param dataList
     *            數據集合
     * @return
     */
    private int processDataList(List<String> dataList) {
        int totalCount = 0;

        // 全部記錄
        String key = server.getId() + "#" + tableName + "#" + sourceType.getIndex();
        if (dataList != null && dataList.size() > 0) {

            for (String line : dataList) {

                // 按行寫入文件
                writeToFile(line);

                // 記錄到result_data_record_count
                if (resultDBMap.get(key) != null) {
                    resultDBMap.put(key, resultDBMap.get(key) + 1);
                }
                else {
                    resultDBMap.put(key, 1);
                }

                totalCount++;
            }
        }

        return totalCount;
    }

}

內存優化

一、使用Jdbc方式獲取數據,若是這個數據表比較大,那麼獲取數據的速度特別慢;

二、這個進程還會佔用很是大的內存,而且GC不掉。分析緣由,Jdbc獲取數據的時候,會一次將全部數據放入到內存,若是同步的數據表很是大,那麼甚至會將內存撐爆。

那麼優化的方法是讓Jdbc不是一次所有將數據拿到內存,而是分頁獲取,每次最大limit數設置爲50000,請參考read線程。

 

通過這種架構優化後,5000W數據大約花費40min可完成導出

 

說明:

由於本文只是記錄項目的設計過程,詳細的代碼後面會開源。

相關文章
相關標籤/搜索