本文源碼:GitHub·點這裏 || GitEE·點這裏java
若是常常接觸數據開發,會有這樣一個場景,服務A提供一個數據源,假設稱爲動態數據源A,須要讀取該數據源下的數據;服務B提供一個數據源,假設稱爲動態數據源B,須要寫入數據到該數據源。這個場景一般描述爲數據同步,或者數據搬運。mysql
基於上述流程圖,總體步驟以下:git
Java中JDBC下執行數據庫操做的一個重要接口,在已經創建數據庫鏈接的基礎上,向數據庫發送要執行的SQL語句。github
繼承Statement接口,且實現SQL預編譯,能夠提升批量處理效率。常應用於批量數據寫入場景。sql
存儲JDBC查詢結果集的對象,ResultSet接口提供從當前行檢索列值的方法。數據庫
提供一個數據源管理的Factory,當前場景下主要管理一個讀庫即數據源A,和一個寫庫即數據源B,數據源鏈接驗證經過,放入容器中。app
@Component public class ConnectionFactory { private volatile Map<String, Connection> connectionMap = new HashMap<>(); @Resource private JdbcConfig jdbcConfig ; @PostConstruct public void init (){ ConnectionEntity read = new ConnectionEntity( "MySql","jdbc:mysql://localhost:3306/data_read","user01","123"); if (jdbcConfig.getConnection(read) != null){ connectionMap.put(JdbcConstant.READ,jdbcConfig.getConnection(read)); } ConnectionEntity write = new ConnectionEntity( "MySql","jdbc:mysql://localhost:3306/data_write","user01","123"); if (jdbcConfig.getConnection(write) != null){ connectionMap.put(JdbcConstant.WRITE,jdbcConfig.getConnection(write)); } } public Connection getByKey (final String key){ return connectionMap.get(key) ; } }
基礎SQL管理ide
主要提供SQL的基礎模板,例如全表查,分頁查,表結構查詢。工具
public class BaseSql { public static String READ_SQL = "SELECT * FROM %s LIMIT 1"; public static String WRITE_SQL = "INSERT INTO %s (SELECT * FROM %s WHERE 1=0)" ; public static String CREATE_SQL = "SHOW CREATE TABLE %s" ; public static String SELECT_SQL = "SELECT * FROM %s" ; public static String COUNT_SQL = "SELECT COUNT(1) countNum FROM %s" ; public static String PAGE_SQL = "SELECT * FROM %s LIMIT %s,%s" ; public static String STRUCT_SQL (){ StringBuffer sql = new StringBuffer() ; sql.append(" SELECT "); sql.append(" COLUMN_NAME, "); sql.append(" IS_NULLABLE, "); sql.append(" COLUMN_TYPE, "); sql.append(" COLUMN_KEY, "); sql.append(" COLUMN_COMMENT "); sql.append(" FROM "); sql.append(" information_schema.COLUMNS "); sql.append(" WHERE "); sql.append(" table_schema = '%s' "); sql.append(" AND table_name = '%s' "); return String.valueOf(sql) ; } }
SQL參數拼接測試
根據SQL模板中缺失的參數,進行動態補全,生成完成SQL語句。
public class BuildSql { /** * 讀權限SQL */ public static String buildReadSql(String table) { String readSql = null ; if (StringUtils.isNotEmpty(table)){ readSql = String.format(BaseSql.READ_SQL, table); } return readSql; } /** * 讀權限SQL */ public static String buildWriteSql(String table){ String writeSql = null ; if (StringUtils.isNotEmpty(table)){ writeSql = String.format(BaseSql.WRITE_SQL, table,table); } return writeSql ; } /** * 表建立SQL */ public static String buildStructSql (String table){ String structSql = null ; if (StringUtils.isNotEmpty(table)){ structSql = String.format(BaseSql.CREATE_SQL, table); } return structSql ; } /** * 表結構SQL */ public static String buildTableSql (String schema,String table){ String structSql = null ; if (StringUtils.isNotEmpty(table)){ structSql = String.format(BaseSql.STRUCT_SQL(), schema,table); } return structSql ; } /** * 全表查詢SQL */ public static String buildSelectSql (String table){ String selectSql = null ; if (StringUtils.isNotEmpty(table)){ selectSql = String.format(BaseSql.SELECT_SQL,table); } return selectSql ; } /** * 總數查詢SQL */ public static String buildCountSql (String table){ String countSql = null ; if (StringUtils.isNotEmpty(table)){ countSql = String.format(BaseSql.COUNT_SQL,table); } return countSql ; } /** * 分頁查詢SQL */ public static String buildPageSql (String table,int offset,int size){ String pageSql = null ; if (StringUtils.isNotEmpty(table)){ pageSql = String.format(BaseSql.PAGE_SQL,table,offset,size); } return pageSql ; } }
讀庫嘗試一次單條數據讀取,寫庫嘗試一次不成立條件的寫入,若是沒有權限,會拋出相應異常。
@RestController public class CheckController { @Resource private ConnectionFactory connectionFactory ; // MySQLSyntaxErrorException: SELECT command denied to user @GetMapping("/checkRead") public String checkRead (){ try { String sql = BuildSql.buildReadSql("rw_read") ; ExecuteSqlUtil.query(connectionFactory.getByKey(JdbcConstant.READ),sql) ; return "success" ; } catch (SQLException e) { e.printStackTrace(); } return "fail" ; } // MySQLSyntaxErrorException: INSERT command denied to user @GetMapping("/checkWrite") public String checkWrite (){ try { String sql = BuildSql.buildWriteSql("rw_read") ; ExecuteSqlUtil.update(connectionFactory.getByKey(JdbcConstant.WRITE),sql) ; return "success" ; } catch (SQLException e) { e.printStackTrace(); } return "fail" ; } }
這裏執行最簡單操做,把讀庫表建立語句查詢出來,丟到寫庫中執行。
@RestController public class StructController { @Resource private ConnectionFactory connectionFactory ; @GetMapping("/syncStruct") public String syncStruct (){ try { String sql = BuildSql.buildStructSql("rw_read") ; ResultSet resultSet = ExecuteSqlUtil.query(connectionFactory.getByKey(JdbcConstant.READ),sql) ; String createTableSql = null ; while (resultSet.next()){ createTableSql = resultSet.getString("Create Table") ; } if (StringUtils.isNotEmpty(createTableSql)){ ExecuteSqlUtil.update(connectionFactory.getByKey(JdbcConstant.WRITE),createTableSql) ; } return "success" ; } catch (SQLException e) { e.printStackTrace(); } return "fail" ; } }
讀庫的表數據讀取,批量放入寫庫中。這裏特別說一個方法:statement.setObject();在不知道參數個數和類型時,自動適配數據類型。
@RestController public class DataSyncController { @Resource private ConnectionFactory connectionFactory ; @GetMapping("/dataSync") public List<RwReadEntity> dataSync (){ List<RwReadEntity> rwReadEntities = new ArrayList<>() ; try { Connection readConnection = connectionFactory.getByKey(JdbcConstant.READ) ; String sql = BuildSql.buildSelectSql("rw_read") ; ResultSet resultSet = ExecuteSqlUtil.query(readConnection,sql) ; while (resultSet.next()){ RwReadEntity rwReadEntity = new RwReadEntity() ; rwReadEntity.setId(resultSet.getInt("id")); rwReadEntity.setSign(resultSet.getString("sign")); rwReadEntities.add(rwReadEntity) ; } if (rwReadEntities.size() > 0){ Connection writeConnection = connectionFactory.getByKey(JdbcConstant.WRITE) ; writeConnection.setAutoCommit(false); PreparedStatement statement = writeConnection.prepareStatement("INSERT INTO rw_read VALUES(?,?)"); // 基於動態獲取列,和statement.setObject();自動適配數據類型 for (int i = 0 ; i < rwReadEntities.size() ; i++){ RwReadEntity rwReadEntity = rwReadEntities.get(i) ; statement.setInt(1,rwReadEntity.getId()) ; statement.setString(2,rwReadEntity.getSign()) ; statement.addBatch(); if (i>0 && i%2==0){ statement.executeBatch() ; } } // 處理最後一批數據 statement.executeBatch(); writeConnection.commit(); } return rwReadEntities ; } catch (SQLException e) { e.printStackTrace(); } return null ; } }
提供一個分頁查詢工具,在數據量大的狀況下不能一次性讀取大量的數據,避免資源佔用太高。
public class PageUtilEntity { /** * 分頁生成方法 */ public static PageHelperEntity<Object> pageResult (int total, int pageSize,int currentPage, List dataList){ PageHelperEntity<Object> pageBean = new PageHelperEntity<Object>(); // 總頁數 int totalPage = PageHelperEntity.countTotalPage(pageSize,total) ; // 分頁列表 List<Integer> pageList = PageHelperEntity.pageList(currentPage,pageSize,total) ; // 上一頁 int prevPage = 0 ; if (currentPage==1){ prevPage = currentPage ; } else if (currentPage>1&¤tPage<=totalPage){ prevPage = currentPage -1 ; } // 下一頁 int nextPage =0 ; if (totalPage==1){ nextPage = currentPage ; } else if (currentPage<=totalPage-1){ nextPage = currentPage+1 ; } pageBean.setDataList(dataList); pageBean.setTotal(total); pageBean.setPageSize(pageSize); pageBean.setCurrentPage(currentPage); pageBean.setTotalPage(totalPage); pageBean.setPageList(pageList); pageBean.setPrevPage(prevPage); pageBean.setNextPage(nextPage); pageBean.initjudge(); return pageBean ; } }
不少複雜度偏高的業務,越是須要藉助基礎API解決,由於複雜度高,不容易抽象化統一封裝,若是數據同步這塊業務,能夠適配多種數據庫,徹底能夠獨立封裝爲中間件,開源項目中關於多方數據同步或計算的中間件也有好多,能夠自行了解下,增加眼界開闊思路。
GitHub·地址 https://github.com/cicadasmile/data-manage-parent GitEE·地址 https://gitee.com/cicadasmile/data-manage-parent
推薦相關閱讀 |
---|
數據源管理:主從庫動態路由,AOP模式讀寫分離 |
數據源管理:基於JDBC模式,適配和管理動態數據源 |