好久以前就想寫這篇文章了,主要是介紹一下我作數據同步的過程當中遇到的一些有意思的內容,和提高效率的過程。html
當前在數據處理的過程當中,數據同步如同血液通常充滿全過程,如圖:git
DataX,是淘寶的開源項目,惋惜不支持Postgresqlgithub
Sqoop,Apache開源項目,同步過程當中字段須要嚴格一致,不方便擴展,不易於二次開發sql
使用生產者消費者模型,中間使用內存,數據不落地,直接插入目標數據shell
首先生產者經過Jdbc獲取源數據內容,放入固定大小的緩存隊列,同時消費者不斷的從緩存讀取數據,根據不一樣的數據類型分別讀取出來,並逐條插入目標數據庫。數據庫
速度每秒300條,每分鐘1.8W條。數組
這樣作表面上看起來很是美好,流水式的處理,來一條處理一下,但是發現插入的速度遠遠趕不上讀取的速度,因此爲了提高寫入的速度,決定採用批量處理的方法,事例代碼:緩存
@Override public Boolean call() { long beginTime = System.currentTimeMillis(); this.isRunning.set(true); try { cyclicBarrier.await(); int lineNum = 0; int commitCount = 0; // 緩存數量 List<RowData> tmpRowDataList = new ArrayList<RowData>();// 緩存數組 while (this.isGetDataRunning.get() || this.queue.size() > 0) { // 從隊列獲取一條數據 RowData rowData = this.queue.poll(1, TimeUnit.SECONDS); if (rowData == null) { logger.info("this.isGetDataRunning:" + this.isGetDataRunning + ";this.queue.size():" + this.queue.size()); Thread.sleep(10000); continue; } // 添加到緩存數組 tmpRowDataList.add(rowData); lineNum++; commitCount++; if (commitCount == SyncConstant.INSERT_SIZE) { this.insertContractAch(tmpRowDataList); // 批量寫入 tmpRowDataList.clear(); // 清空緩存 commitCount = 0; } if (lineNum % SyncConstant.LOGGER_SIZE == 0) { logger.info(" commit line: " + lineNum + "; queue size: " + queue.size()); } } this.insertContractAch(tmpRowDataList); // 批量寫入 tmpRowDataList.clear();// 清空緩存 logger.info(" commit line end: " + lineNum); } catch (Exception e) { logger.error(" submit data error" , e); } finally { this.isRunning.set(false); } logger.info(String.format("SubmitDataToDatabase used %s second times", (System.currentTimeMillis() - beginTime) / 1000.00)); return true; } /** * 批量插入數據 * * @param rowDatas * @return */ public int insertContractAch(List<RowData> rowDatas) { final List<RowData> tmpObjects = rowDatas; String sql = SqlService.createInsertPreparedSql(tableMetaData); // 獲取sql try { int[] index = this.jdbcTemplate.batchUpdate(sql, new PreparedStatementSetter(tmpObjects, this.columnMetaDataList)); return index.length; } catch (Exception e) { logger.error(" insertContractAch error: " , e); } return 0; } /** * 處理批量插入的回調類 */ private class PreparedStatementSetter implements BatchPreparedStatementSetter { private List<RowData> rowDatas; private List<ColumnMetaData> columnMetaDataList; /** * 經過構造函數把要插入的數據傳遞進來處理 */ public PreparedStatementSetter(List<RowData> rowDatas, List<ColumnMetaData> columnList) { this.rowDatas = rowDatas; this.columnMetaDataList = columnList; } @Override public void setValues(PreparedStatement ps, int i) throws SQLException { RowData rowData = this.rowDatas.get(i); for (int j = 0; j < rowData.getColumnObjects().length; j++) { // 類型轉換 try { ColumnAdapterService.setParameterValue(ps, j + 1, rowData.getColumnObjects()[j], this.columnMetaDataList.get(j).getType()); } catch (Exception e) { ps.setObject(j + 1, null); } } } }
我們不是須要講解代碼,因此這裏截取了代碼片斷,所有的代碼github上有,感興趣的同窗能夠看看。PreparedStatement的好處,能夠參考文章:http://www.cnblogs.com/liqiu/p/3825544.html多線程
因爲增長批量插入的功能,終於速度提高到每秒1000條ide
每秒1000條,速度依然不理想,特別是寫的速度跟不上讀取的速度,隊列是滿的,如圖:
因此只能提高消費者的數量,採用了多消費者的模式:
速度提高到每秒3000條。
這時候觀察,隨着消費者的增長,觀察緩存隊列常常有空的狀況,也就是說生產跟不上消費者速度,若是增長生產者的線程,那麼也會增長程序的複雜性,由於勢必要將讀取的數據進行分割。因此採用Pgdump的方式直接獲取數據(並非全部狀況都適用,好比數據中有特殊的分隔符與設定的分隔符同樣,或者有分號,單引號之類的)
代碼片斷以下:
/** * 將數據放入緩存隊列 */ public void putCopyData() { DataSourceMetaData dataSource = dataSourceService.getDataSource(syncOptions.getSrcDataSourceName()); String copyCommand = this.getCopyCommand(dataSource, querySql); //獲取copy命令 ShellExecuter.execute(copyCommand, queue,columnMetaDatas); } /** * 執行copy的shell命令 * @param dataSource * @param sql * @return */ public String getCopyCommand(DataSourceMetaData dataSource, String sql){ String host = dataSource.getIp(); String user = dataSource.getUserName(); String dataBaseName = dataSource.getDatabaseName(); //String psqlPath = "/Library/PostgreSQL/9.3/bin/psql"; String psqlPath = "/opt/pg93/bin/psql"; String execCopy = psqlPath + " -h " + host + " -U " + user + " " + dataBaseName +" -c \"COPY (" + sql + ") TO STDOUT WITH DELIMITER E'"+ HiveDivideConstant.COPY_COLUMN_DIVIDE+"' CSV NULL AS E'NULL'\" "; // 執行copy命令 LOGGER.info(execCopy); return execCopy; }
意思就是經過執行一個Shell程序,獲取數據,而後讀取進程的輸出流,不斷寫入緩存。這樣生產者的問題基本都解決了,速度徹底取決於消費者寫入數據庫的速度了。下面是執行Shell的Java方法代碼:
public static int execute(String shellPath, LinkedBlockingQueue<RowData> queue, List<ColumnMetaData> columnMetaDatas) { int success = -1; Process pid = null; String[] cmd; try { cmd = new String[]{"/bin/sh", "-c", shellPath}; // 執行Shell命令 pid = Runtime.getRuntime().exec(cmd); if (pid != null) { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pid.getInputStream()), SyncConstant.SHELL_STREAM_BUFFER_SIZE); try { String line; while ((line = bufferedReader.readLine()) != null) { // LOGGER.info(String.format("shell info output [%s]", line)); String[] columnObjects = line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1); if (columnObjects.length != columnMetaDatas.size()) { LOGGER.error(" 待同步的表有特殊字符,不能使用copy [{}] ", line); throw new RuntimeException("待同步的表有特殊字符,不能使用copy " + line); } RowData rowData = new RowData(line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1)); queue.put(rowData); } } catch (Exception ioe) { LOGGER.error(" execute shell error", ioe); } finally { try { if (bufferedReader != null) { bufferedReader.close(); } } catch (Exception e) { LOGGER.error("execute shell, get system.out error", e); } } success = pid.waitFor(); if (success != 0) { LOGGER.error("execute shell error "); } } else { LOGGER.error("there is not pid "); } } catch (Exception ioe) { LOGGER.error("execute shell error", ioe); } finally { if (null != pid) { try { //關閉錯誤輸出流 pid.getErrorStream().close(); } catch (IOException e) { LOGGER.error("close error stream of process fail. ", e); } finally { try { //關閉標準輸入流 pid.getInputStream().close(); } catch (IOException e) { LOGGER.error("close input stream of process fail.", e); } finally { try { pid.getOutputStream().close(); } catch (IOException e) { LOGGER.error(String.format("close output stream of process fail.", e)); } } } } } return success; }
在上線一段時間以後,發現兩個問題:一、使用Jdbc方式獲取數據,若是這個數據表比較大,那麼獲取第一條數據的速度特別慢;二、這個進程還會佔用很是大的內存,而且GC不掉。分析緣由,是Postgresql的Jdbc獲取數據的時候,會一次將全部數據放入到內存,若是同步的數據表很是大,那麼甚至會將內存撐爆。
那麼優化的方法是設置使Jdbc不是一次所有將數據拿到內存,而是批次獲取,代碼以下:
con.setAutoCommit(false); //並非全部數據庫都適用,好比hive就不支持,orcle不須要 stmt.setFetchSize(10000); //每次獲取1萬條記錄
總體設計方案:
如今這個項目已經開源,代碼放在:https://github.com/autumn-star/synchronous