結果集歸併源碼解析sql
本文轉自「天河聊技術」微信公衆號微信
找到這個方法,執行查詢的方法多線程
com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#executeQueryide
@Override public ResultSet executeQuery() throws SQLException { ResultSet result; try { // 路由到預編譯對象執行單元集合 Collection<PreparedStatementUnit> preparedStatementUnits = route(); // 多線程執行sql查詢返回結果集對象集合 List<ResultSet> resultSets = new PreparedStatementExecutor( getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery(); //走結果計歸併引擎執行結果集歸併邏輯-》 result = new ShardingResultSet(resultSets, new MergeEngine( getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge()); } finally { clearBatch(); } setCurrentResultSet(result); return result; }
建立結果集歸併引擎對象,進入到構造器ui
/** * 分片結果集歸併引擎. * * @author zhangliang */ public final class MergeEngine { private final DatabaseType databaseType; private final List<ResultSet> resultSets; private final SelectStatement selectStatement; private final Map<String, Integer> columnLabelIndexMap; public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException { this.databaseType = databaseType; this.resultSets = resultSets; this.selectStatement = selectStatement; columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0)); }
獲取結果集的源數據this
columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException { // 獲取resultSet的源數據 ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i); } return result; }
進入到這個方法線程
com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#merge對象
/** * 合併結果集. * * @return 歸併完畢後的結果集 * @throws SQLException SQL異常 */ public ResultSetMerger merge() throws SQLException {//結果集合並業務方法 selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build()); }
進入到這個方法排序
com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#build隊列
private ResultSetMerger build() throws SQLException { // 排序項不爲空或者聚合選擇項不爲空 if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) { // 若是分組項和排序項一致,走流式結果集歸併-》 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else { // 不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); } return new IteratorStreamResultSetMerger(resultSets); }
return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
建立分組流式結果集歸併,進入到構造器方法
public GroupByStreamResultSetMerger( final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(resultSets, selectStatement.getOrderByItems(), nullOrderType); this.labelAndIndexMap = labelAndIndexMap; this.selectStatement = selectStatement; currentRow = new ArrayList<>(labelAndIndexMap.size()); currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues(); }
這一行代碼
super(resultSets, selectStatement.getOrderByItems(), nullOrderType);
建立排序流式結果集歸併對象
public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems, final OrderType nullOrderType) throws SQLException { this.orderByItems = orderByItems; //優先級隊列實現 this.orderByValuesQueue = new PriorityQueue<>(resultSets.size()); this.nullOrderType = nullOrderType; // 把要排序的結果集往隊列裏放-》 orderResultSetsToQueue(resultSets); isFirstNext = true; }
// 把要排序的結果集往隊列裏放-》 orderResultSetsToQueue(resultSets);
private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException { for (ResultSet each : resultSets) { OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType); if (orderByValue.next()) { orderByValuesQueue.offer(orderByValue); } } // 流式結果集歸併,設置當前的流式歸併結果集,你們看這裏存儲是當前的結果集因此不會出現內存溢出問題 setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet()); }
返回到這個方法
private ResultSetMerger build() throws SQLException { // 排序項不爲空或者聚合選擇項不爲空 if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) { // 若是分組項和排序項一致,走流式結果集歸併-》 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else { // 不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); } return new IteratorStreamResultSetMerger(resultSets); }
這一行
// 不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
建立內存分組結果集歸併對象
public GroupByMemoryResultSetMerger( final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(labelAndIndexMap); this.selectStatement = selectStatement; this.nullOrderType = nullOrderType; // 建立內存結果集行對象 memoryResultSetRows = init(resultSets); }
返回到這個方法
private ResultSetMerger build() throws SQLException { // 排序項不爲空或者聚合選擇項不爲空 if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) { // 若是分組項和排序項一致,走流式結果集歸併-》 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else { // 不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); } // 建立迭代歸併結果集歸併對象 return new IteratorStreamResultSetMerger(resultSets); }
這一行
// 建立迭代歸併結果集歸併對象 return new IteratorStreamResultSetMerger(resultSets);
public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) { this.resultSets = resultSets.iterator(); setCurrentResultSet(this.resultSets.next()); }
返回到這個方法
/** * 合併結果集. * * @return 歸併完畢後的結果集 * @throws SQLException SQL異常 */ public ResultSetMerger merge() throws SQLException {//結果集合並業務方法 selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build()); }
進入這個方法com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#decorate
private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException { ResultSetMerger result = resultSetMerger; if (null != selectStatement.getLimit()) { // 裝飾器模式對分頁結果集歸併進行了進一步的封裝 result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit()); } return result; }
結果集歸併源碼解析到這裏就結束了。
說到最後
源碼解析的部份內容比較多,仍是直接看源碼比較直觀,我這裏的源碼解析只是給予你們一個閱讀源碼的思路,我已經儘可能把sharding-jdbc實現的關鍵點都介紹了下,可能還有其餘的好的實現沒有介紹出來,sharding-jdbc源碼解析系列到這裏就所有結束了,若是須要進一步深度溝通,請加我微信,我會把你拉到天河聊技術技術討論羣裏相互交流,以上源碼解析僅供參考。