數據同步那些事兒(優化過程分享)

簡介

好久以前就想寫這篇文章了,主要是介紹一下我作數據同步的過程當中遇到的一些有意思的內容,和提高效率的過程。html

當前在數據處理的過程當中,數據同步如同血液通常充滿全過程,如圖:git

數據同步開源產品對比:

DataX,是淘寶的開源項目,惋惜不支持Postgresqlgithub

Sqoop,Apache開源項目,同步過程當中字段須要嚴格一致,不方便擴展,不易於二次開發sql

總體設計思路:

使用生產者消費者模型,中間使用內存,數據不落地,直接插入目標數據shell

優化過程:

一、插入數據部分:

首先生產者經過Jdbc獲取源數據內容,放入固定大小的緩存隊列,同時消費者不斷的從緩存讀取數據,根據不一樣的數據類型分別讀取出來,並逐條插入目標數據庫。數據庫

速度每秒300條,每分鐘1.8W條。數組

這樣作表面上看起來很是美好,流水式的處理,來一條處理一下,但是發現插入的速度遠遠趕不上讀取的速度,因此爲了提高寫入的速度,決定採用批量處理的方法,事例代碼:緩存

    @Override
    public Boolean call() {
        long beginTime = System.currentTimeMillis();
        this.isRunning.set(true);
        try {
            cyclicBarrier.await();
            int lineNum = 0;
            int commitCount = 0; // 緩存數量
            List<RowData> tmpRowDataList = new ArrayList<RowData>();// 緩存數組
            while (this.isGetDataRunning.get() || this.queue.size() > 0) {
                // 從隊列獲取一條數據
                RowData rowData = this.queue.poll(1, TimeUnit.SECONDS);
                if (rowData == null) {
                    logger.info("this.isGetDataRunning:" + this.isGetDataRunning + ";this.queue.size():" + this.queue.size());
                    Thread.sleep(10000);
                    continue;
                }
                // 添加到緩存數組
                tmpRowDataList.add(rowData);
                lineNum++;
                commitCount++;
                if (commitCount == SyncConstant.INSERT_SIZE) {
                    this.insertContractAch(tmpRowDataList); // 批量寫入
                    tmpRowDataList.clear(); // 清空緩存
                    commitCount = 0;
                }

                if (lineNum % SyncConstant.LOGGER_SIZE == 0) {
                    logger.info(" commit line: " + lineNum + "; queue size: " + queue.size());
                }
            }

            this.insertContractAch(tmpRowDataList); // 批量寫入
            tmpRowDataList.clear();// 清空緩存
            logger.info(" commit line end: " + lineNum);
        } catch (Exception e) {
            logger.error(" submit data error" , e);
        } finally {
            this.isRunning.set(false);
        }
        logger.info(String.format("SubmitDataToDatabase used %s second times", (System.currentTimeMillis() - beginTime) / 1000.00));
        return true;
    }

    /**
     * 批量插入數據
     *
     * @param rowDatas
     * @return
     */
    public int insertContractAch(List<RowData> rowDatas) {
        final List<RowData> tmpObjects = rowDatas;
        String sql = SqlService.createInsertPreparedSql(tableMetaData); // 獲取sql
        try {
            int[] index = this.jdbcTemplate.batchUpdate(sql, new PreparedStatementSetter(tmpObjects, this.columnMetaDataList));
            return index.length;
        } catch (Exception e) {
            logger.error(" insertContractAch error: " , e);
        }
        return 0;
    }

    /**
     * 處理批量插入的回調類
     */
    private class PreparedStatementSetter implements BatchPreparedStatementSetter {
        private List<RowData> rowDatas;
        private List<ColumnMetaData> columnMetaDataList;

        /**
         * 經過構造函數把要插入的數據傳遞進來處理
         */
        public PreparedStatementSetter(List<RowData> rowDatas, List<ColumnMetaData> columnList) {
            this.rowDatas = rowDatas;
            this.columnMetaDataList = columnList;
        }

        @Override
        public void setValues(PreparedStatement ps, int i) throws SQLException {
            RowData rowData = this.rowDatas.get(i);
            for (int j = 0; j < rowData.getColumnObjects().length; j++) {
                // 類型轉換
                try {
                    ColumnAdapterService.setParameterValue(ps, j + 1, rowData.getColumnObjects()[j], this.columnMetaDataList.get(j).getType());
                } catch (Exception e) {
                    ps.setObject(j + 1, null);
                }
            }
        }
    }

我們不是須要講解代碼,因此這裏截取了代碼片斷,所有的代碼github上有,感興趣的同窗能夠看看。PreparedStatement的好處,能夠參考文章:http://www.cnblogs.com/liqiu/p/3825544.html多線程

因爲增長批量插入的功能,終於速度提高到每秒1000條ide

二、多線程優化

每秒1000條,速度依然不理想,特別是寫的速度跟不上讀取的速度,隊列是滿的,如圖:

因此只能提高消費者的數量,採用了多消費者的模式:

速度提高到每秒3000條。

三、升級讀取方式

這時候觀察,隨着消費者的增長,觀察緩存隊列常常有空的狀況,也就是說生產跟不上消費者速度,若是增長生產者的線程,那麼也會增長程序的複雜性,由於勢必要將讀取的數據進行分割。因此採用Pgdump的方式直接獲取數據(並非全部狀況都適用,好比數據中有特殊的分隔符與設定的分隔符同樣,或者有分號,單引號之類的)

代碼片斷以下:

    /**
     * 將數據放入緩存隊列
     */
    public void putCopyData() {
        DataSourceMetaData dataSource = dataSourceService.getDataSource(syncOptions.getSrcDataSourceName());
        String copyCommand = this.getCopyCommand(dataSource, querySql); //獲取copy命令
        ShellExecuter.execute(copyCommand, queue,columnMetaDatas);
    }

    /**
     * 執行copy的shell命令
     * @param dataSource
     * @param sql
     * @return
     */
    public String getCopyCommand(DataSourceMetaData dataSource, String sql){
        String host = dataSource.getIp();
        String user = dataSource.getUserName();
        String dataBaseName = dataSource.getDatabaseName();
        //String psqlPath = "/Library/PostgreSQL/9.3/bin/psql";
        String psqlPath = "/opt/pg93/bin/psql";
        String execCopy = psqlPath + " -h " + host + " -U " + user + " " + dataBaseName +" -c \"COPY (" + sql + ") TO STDOUT WITH DELIMITER E'"+ HiveDivideConstant.COPY_COLUMN_DIVIDE+"' CSV NULL AS E'NULL'\" "; // 執行copy命令
        LOGGER.info(execCopy);
        return execCopy;
    }

意思就是經過執行一個Shell程序,獲取數據,而後讀取進程的輸出流,不斷寫入緩存。這樣生產者的問題基本都解決了,速度徹底取決於消費者寫入數據庫的速度了。下面是執行Shell的Java方法代碼:

    public static int execute(String shellPath, LinkedBlockingQueue<RowData> queue, List<ColumnMetaData> columnMetaDatas) {

        int success = -1;
        Process pid = null;
        String[] cmd;

        try {
            cmd = new String[]{"/bin/sh", "-c", shellPath};
            // 執行Shell命令
            pid = Runtime.getRuntime().exec(cmd);
            if (pid != null) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pid.getInputStream()), SyncConstant.SHELL_STREAM_BUFFER_SIZE);
                try {
                    String line;
                    while ((line = bufferedReader.readLine()) != null) {
                        // LOGGER.info(String.format("shell info output [%s]", line));
                        String[] columnObjects = line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1);
                        if (columnObjects.length != columnMetaDatas.size()) {
                            LOGGER.error(" 待同步的表有特殊字符,不能使用copy [{}] ", line);
                            throw new RuntimeException("待同步的表有特殊字符,不能使用copy " + line);
                        }
                        RowData rowData = new RowData(line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1));
                        queue.put(rowData);
                    }
                } catch (Exception ioe) {
                    LOGGER.error(" execute shell error", ioe);
                } finally {
                    try {
                        if (bufferedReader != null) {
                            bufferedReader.close();
                        }
                    } catch (Exception e) {
                        LOGGER.error("execute shell, get system.out error", e);
                    }
                }
                success = pid.waitFor();
                if (success != 0) {
                    LOGGER.error("execute shell error ");
                }
            } else {
                LOGGER.error("there is not pid ");
            }
        } catch (Exception ioe) {
            LOGGER.error("execute shell error", ioe);
        } finally {
            if (null != pid) {
                try {
                    //關閉錯誤輸出流
                    pid.getErrorStream().close();
                } catch (IOException e) {
                    LOGGER.error("close error stream of process fail. ", e);
                } finally {
                    try {
                        //關閉標準輸入流
                        pid.getInputStream().close();
                    } catch (IOException e) {
                        LOGGER.error("close input stream of process fail.", e);
                    } finally {
                        try {
                            pid.getOutputStream().close();
                        } catch (IOException e) {
                            LOGGER.error(String.format("close output stream of process fail.", e));
                        }
                    }
                }
            }
        }

        return success;
    }

四、內存優化

在上線一段時間以後,發現兩個問題:一、使用Jdbc方式獲取數據,若是這個數據表比較大,那麼獲取第一條數據的速度特別慢;二、這個進程還會佔用很是大的內存,而且GC不掉。分析緣由,是Postgresql的Jdbc獲取數據的時候,會一次將全部數據放入到內存,若是同步的數據表很是大,那麼甚至會將內存撐爆。

那麼優化的方法是設置使Jdbc不是一次所有將數據拿到內存,而是批次獲取,代碼以下:

con.setAutoCommit(false); //並非全部數據庫都適用,好比hive就不支持,orcle不須要
stmt.setFetchSize(10000); //每次獲取1萬條記錄

總體設計方案:

如今這個項目已經開源,代碼放在:https://github.com/autumn-star/synchronous

相關文章
相關標籤/搜索